We’ve reset 5-hour and weekly rate limits for all users. Enjoy Fable 5! 5:48 AM · Jun 10, 2026 如图,本来用了30%多,发现重置了,而且A/好像不会改变周额度刷新的重置时间,比奥特曼更拟人。 2 个帖子 - 2 位参与者 阅读完整话题
clash verge自动选择模式 - { name: 自动选择, type: url-test, proxies: [‘ 美国1’, ‘ 美国2’], url: ‘ http://www.gstatic.com/generate_204 ’, interval: 86400 } 这个配置,会频繁的两个来回切,可能是线路不稳定,这种情况访问gpt什么的问题大吗,还是需要固定一下? 1 个帖子 - 1 位参与者 阅读完整话题
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。
GITHUB今天最火的项目,有佬实测过吗? github.com GitHub - chopratejas/headroom: Compress tool outputs, logs, files, and RAG chunks... Compress tool outputs, logs, files, and RAG chunks before they reach the LLM. 60-95% fewer tokens, same answers. Library, proxy, MCP server. Headroom 会在 AI 代理读取的所有内容(包括工具输出、日志、RAG 数据块、文件和对话历史记录)到达 LLM 之前对其进行压缩。同样的答案,只需极少的tokens。 7 个帖子 - 7 位参与者 阅读完整话题
GITHUB今天最火的项目,有佬实测过吗? github.com GitHub - chopratejas/headroom: Compress tool outputs, logs, files, and RAG chunks... Compress tool outputs, logs, files, and RAG chunks before they reach the LLM. 60-95% fewer tokens, same answers. Library, proxy, MCP server. Headroom 会在 AI 代理读取的所有内容(包括工具输出、日志、RAG 数据块、文件和对话历史记录)到达 LLM 之前对其进行压缩。同样的答案,只需极少的tokens。 1 个帖子 - 1 位参与者 阅读完整话题
从 sub2api记得官方codex20x报错"API returned 429: {“detail”:“Rate limit exceeded”}" 继续讨论: 应该是估计限制了请求头,非官方反代估计不行了** 解决方法**:开自动透传、开ws mode(透传)、开仅允许 Codex 官方客户端** 这样就可以了,具体这三个选项哪个起作用不知道,但是都开就行了** 1 个帖子 - 1 位参与者 阅读完整话题
openai速度限制了,又蹦了API returned 429: {“detail”:“Rate limit exceeded”} 无论api和codex端全蹦!!! 9 个帖子 - 8 位参与者 阅读完整话题
可见周限额全部失效,而且GPT-5.3-Codex-Spark突然不能调用了 去官网 codex cloud 看是有周限额的,再结合这几天的连续举报,是不是 codex 要修改协议了? 5 个帖子 - 2 位参与者 阅读完整话题
sub2api记得官方codex20x报错“API returned 429: {“detail”:“Rate limit exceeded”}” 忽然报错了,我看了离5消失和周限还远着呢?这是什么情况? 9 个帖子 - 7 位参与者 阅读完整话题
进入devin, migrate from winsurf, codex 居然不能用了。 1 个帖子 - 1 位参与者 阅读完整话题
https://rate.linux.do/ 想问佬友是不是一样的情况? 应该指定话题给谁呢? 1 个帖子 - 1 位参与者 阅读完整话题