WWW.YOUINFO.SITE
标签聚合 limit

/tag/limit

LinuxDo 最新话题 · 2026-06-07 22:42:03+08:00 · tech

@Limitee 头像是皮卡丘,所以结合了宝可梦画风 pocket-spark.stellafortuna.dpdns.org 网站名字是:pocket-spark [!quote] 口袋里的金色火花w 象征着那个默默给予我们支持、在需要时为我们充能的温暖存在~ 还带音效~ 只因为他很可爱,且有问必回 用的正是那个开源字体 可以这样用: body { font-family: "JiangChengLvDongSong"; font-weight: normal; } 加载完了才会渲染,非常贴心~ 源码: <!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Adventure Record: Thank You!</title> <!-- 引入超级超级好看的江城律动宋字体~ --> <link href="https://fontsapi.zeoseven.com/147/main/result.css" onload="this.rel='stylesheet'" rel="preload" as="style" crossorigin /> <noscript><link rel="stylesheet" href="https://fontsapi.zeoseven.com/147/main/result.css" /></noscript> <style> * { box-sizing: border-box; margin: 0; padding: 0; } body { font-family: "JiangChengLvDongSong", sans-serif; font-weight: normal; min-height: 100vh; display: flex; justify-content: center; align-items: center; /* 经典精灵球红白背景分割 */ background: linear-gradient(to bottom, #E3350D 50%, #F5F5F5 50%); position: relative; overflow: hidden; } /* 精灵球中间的黑色分割线 */ body::before { content: ''; position: absolute; width: 100%; height: 16px; background-color: #222222; top: 50%; transform: translateY(-50%); z-index: 1; box-shadow: 0 4px 10px rgba(0,0,0,0.15); } /* 主体卡片:悬浮在精灵球中央的按钮装置 */ .card-container { position: relative; z-index: 10; background: #FFFFFF; border: 8px solid #222222; border-radius: 24px; width: 90%; max-width: 480px; padding: 40px 30px; box-shadow: 0 20px 0px rgba(0, 0, 0, 0.15); text-align: center; transition: all 0.3s cubic-bezier(0.175, 0.885, 0.32, 1.275); } /* 顶部装饰:致敬血条/等级框 */ .badge-header { display: inline-flex; align-items: center; gap: 8px; background: #222222; color: #FFCB05; /* 电气黄 */ padding: 6px 16px; border-radius: 30px; font-size: 14px; letter-spacing: 1px; margin-bottom: 24px; } .badge-dot { width: 8px; height: 8px; background-color: #FFCB05; border-radius: 50%; animation: pulse 1.5s infinite; } /* 文本展示区 */ .content-box { margin-bottom: 35px; } .main-title { font-size: 32px; color: #222222; line-height: 1.4; margin-bottom: 12px; } .sub-title { font-size: 16px; color: #777777; letter-spacing: 0.5px; } /* 交互按钮 */ .action-btn { background-color: #FFCB05; color: #222222; border: 4px solid #222222; border-radius: 16px; padding: 16px 36px; font-size: 20px; font-family: "JiangChengLvDongSong", sans-serif; cursor: pointer; outline: none; position: relative; overflow: hidden; box-shadow: 0 6px 0 #222222; transition: all 0.1s ease; } .action-btn:hover { background-color: #FFE054; transform: translateY(-2px); box-shadow: 0 8px 0 #222222; } .action-btn:active { transform: translateY(4px); box-shadow: 0 2px 0 #222222; } /* 触发感谢后的彩蛋状态 */ .card-container.unlocked { border-color: #FFCB05; box-shadow: 0 20px 0px rgba(255, 203, 5, 0.2); } .card-container.unlocked .badge-header { background: #FFCB05; color: #222222; } .card-container.unlocked .badge-dot { background-color: #E3350D; } /* 粒子礼花 */ .particle { position: absolute; pointer-events: none; width: 12px; height: 12px; background: #FFCB05; border-radius: 50%; z-index: 100; animation: popOut 0.8s ease-out forwards; } @keyframes pulse { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.3); opacity: 0.5; } 100% { transform: scale(1); opacity: 1; } } @keyframes popOut { 0% { transform: translate(0, 0) scale(1); opacity: 1; } 100% { transform: translate(var(--tx), var(--ty)) scale(0); opacity: 0; } } </style> </head> <body> <div class="card-container" id="card"> <div class="badge-header"> <span class="badge-dot"></span> <span>SYSTEM OVERVIEW</span> </div> <div class="content-box"> <h1 class="main-title" id="title">野生的贡献者出现了!</h1> <p class="sub-title" id="subtitle">冒险之所以能继续,是因为一路上有你</p> </div> <button class="action-btn" id="btn" onclick="triggerGratitude()"> 表达感谢 ! </button> </div> <script> let hasTriggered = false; function triggerGratitude() { if (hasTriggered) return; hasTriggered = true; const card = document.getElementById('card'); const title = document.getElementById('title'); const subtitle = document.getElementById('subtitle'); const btn = document.getElementById('btn'); // 切换为解锁/感谢状态 card.classList.add('unlocked'); title.innerHTML = "使用了「 谢谢你 」!"; subtitle.innerHTML = "效果超群!你的付出我们都收到了喵~"; btn.innerHTML = "MISSION ACCOMPLISHED"; btn.style.backgroundColor = "#EEEEEE"; btn.style.color = "#888888"; btn.style.cursor = "default"; btn.style.transform = "none"; btn.style.boxShadow = "none"; btn.style.borderColor = "#CCCCCC"; // 播放经典宝可梦中心恢复生命值的 8-bit 音效 playPokemonHealSound(); // 喷射黄色小礼花 const rect = btn.getBoundingClientRect(); const startX = rect.left + rect.width / 2; const startY = rect.top + rect.height / 2; for (let i = 0; i < 40; i++) { createParticle(startX, startY); } } // Web Audio API 合成 8-bit 经典音效 (C5 -> D5 -> E5 -> F5 -> G5 -> A5...) function playPokemonHealSound() { const AudioContext = window.AudioContext || window.webkitAudioContext; if (!AudioContext) return; const ctx = new AudioContext(); const now = ctx.currentTime; // 经典宝可梦中心恢复旋律的音高 const notes = [ 392.00, // G4 440.00, // A4 493.88, // B4 523.25, // C5 587.33, // D5 659.25 // E5 ]; notes.forEach((freq, index) => { const osc = ctx.createOscillator(); const gainNode = ctx.createGain(); // 经典的方波模拟 8-bit 复古感 osc.type = 'square'; osc.frequency.setValueAtTime(freq, now + index * 0.12); gainNode.gain.setValueAtTime(0.08, now + index * 0.12); gainNode.gain.exponentialRampToValueAtTime(0.01, now + index * 0.12 + 0.15); osc.connect(gainNode); gainNode.connect(ctx.destination); osc.start(now + index * 0.12); osc.stop(now + index * 0.12 + 0.18); }); } // 纯 CSS 粒子礼花效果 function createParticle(x, y) { const p = document.createElement('div'); p.classList.add('particle'); // 随机颜色(电气黄、红、白) const colors = ['#FFCB05', '#E3350D', '#FFFFFF', '#222222']; p.style.backgroundColor = colors[Math.floor(Math.random() * colors.length)]; // 随机大小 const size = Math.random() * 8 + 6; p.style.width = `${size}px`; p.style.height = `${size}px`; // 随机星形或圆形 if (Math.random() > 0.5) { p.style.borderRadius = '0px'; // 像素风小方块 } p.style.left = `${x}px`; p.style.top = `${y}px`; // 随机散射方向和距离 const angle = Math.random() * Math.PI * 2; const distance = Math.random() * 180 + 80; const tx = Math.cos(angle) * distance; const ty = Math.sin(angle) * distance - 50; // 稍微向上飘一点 p.style.setProperty('--tx', `${tx}px`); p.style.setProperty('--ty', `${ty}px`); document.body.appendChild(p); // 动画结束后移除 setTimeout(() => { p.remove(); }, 800); } </script> </body> </html> 2 个帖子 - 2 位参与者 阅读完整话题

LinuxDo 最新话题 · 2026-06-06 14:06:43+08:00 · tech

我使用了 sub2api 反代了一个 GPT Pro 的号,然后开启了生图支持;然后使用了佬友开发的 Image-Studio 来画图,结果发现打到了上游 4000/min 的限制。我心想这不可能啊!让 codex 去 sub2api 服务器查一查,结果啥也没查出来,sub2api 的调用次数很正常,对应的连接 ip 和 user-agent 都是认识的,完全不知道为什么会打到 4000/min 的限制啊!! 【不是推广不是推广啊,真是遇到问题了】 14 个帖子 - 4 位参与者 阅读完整话题

V2EX - 技术 · 2026-06-04 18:18:12+08:00 · tech

最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。

V2EX - 技术 · 2026-06-04 16:55:07+08:00 · tech

最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。

V2EX - 技术 · 2026-06-04 16:05:03+08:00 · tech

最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。

V2EX - 技术 · 2026-06-04 15:56:11+08:00 · tech

最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。

V2EX - 技术 · 2026-06-04 15:33:46+08:00 · tech

最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。

V2EX - 技术 · 2026-06-04 14:21:26+08:00 · tech

最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验: 1. 为什么弃用定时器? 早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。 但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。 ——优化点: 改用延迟计算。 不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。 2. 重视“权重( Weight )” 有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。 下单:5 weight 查深度:2 weight 批量撤单:10 weight 所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。 3. 处理网络抖动( Jitter )带来的假限流 理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。 ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。 4. Python 示例 以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算: import time import threading class AllTickLimiter: def __init__(self, capacity: float, rate: float): """ :param capacity: 桶容量(最大允许的突发请求权重) :param rate: 令牌恢复速率(每秒恢复的权重数) """ self.capacity = float(capacity) self.rate = float(rate) self.tokens = float(capacity) self.last_tick = time.monotonic() self._lock = threading.Lock() def allow(self, weight: float = 1.0) -> bool: """ 检查当前令牌是否足够支付本次请求的权重 """ with self._lock: now = time.monotonic() # 1. 延迟计算:计算自上次请求以来生成的令牌 delta = (now - self.last_tick) * self.rate self.tokens = min(self.capacity, self.tokens + delta) self.last_tick = now # 2. 尝试消费 if self.tokens >= weight: self.tokens -= weight return True return False def sync_from_header(self, server_remaining: float): """ 利用响应头中的权威剩余量进行校准 防止本地计算与服务端由于网络延迟导致的偏差 """ with self._lock: # 强制同步服务端返回的剩余额度 self.tokens = min(self.capacity, server_remaining) self.last_tick = time.monotonic() # --- 实战调用示例 — # 假设你的 API 套餐是每秒 10 个 Token limiter = AllTickLimiter(capacity=20, rate=10) def get_market_data(symbol: str): # 假设查询实时报价权重为 1 weight = 1.0 if limiter.allow(weight): # 模拟 AllTick API 请求 # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}") # data = response.json() print(f"[{symbol}] 请求成功") # 进阶操作:从 Header 获取服务端权威数据进行同步 # remaining = float(response.headers.get("X-RateLimit-Remaining", 20)) # limiter.sync_from_header(remaining) else: print(f"[{symbol}] 触发本地限流,请求被拦截") # 模拟快速并发请求 for i in range(15): get_market_data("BTCUSDT") 5. 分布式下的抉择 如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本 。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。