Python请求限流案例怎么编写?

wen python案例 1

Python请求限流案例编写实战:从入门到高并发场景适配

目录导读

  1. 为什么要学习请求限流?——解决高频访问的“隐形墙”
  2. 限流核心算法解析:计数器、令牌桶、滑动窗口与漏桶
  3. Python限流实战案例:使用ratelimit库实现基础限流
  4. 进阶案例:基于装饰器/中间件的动态限流方案
  5. 多线程与异步场景下的限流策略(aiohttp + asyncio)
  6. 面试高频问答:全栈开发者必须掌握的8个限流考察点
  7. 总结与建议:如何选择适合业务的限流方案

为什么要学习请求限流?——解决高频访问的“隐形墙”

在日常爬虫开发、API调用或Web服务构建中,我们常常会遇到这样的场景:

案例场景:你写了一个自动化脚本,每分钟向某个开放API发送200次请求,前100次正常返回数据,第101次突然返回429 Too Many Requests,甚至导致IP被临时封禁,这就是典型的“无限制请求”带来的后果。

核心问题:如何控制请求频率,既不影响正常业务,又能避免被服务端反爬或限流策略拦截?

解决方案:在客户端或服务端实现请求限流,Python作为数据采集和Web开发的主流语言,内置了多种限流实现方式,本文将从基础库到高阶并发场景,手把手带你掌握限流案例编写。


限流核心算法解析:计数器、令牌桶、滑动窗口与漏桶

在编写代码前,必须理解这四种算法的优劣,搜索引擎常提到它们的区别,我们在此做一个精简对比:

算法 核心思想 优点 缺点 适用场景
计数器 固定时间窗口内计数,超过阈值则拒绝 实现简单 窗口边界流量激增(临界突变) 简单限流,如每分钟不超过100次
滑动窗口 将时间窗口切分为小格子,滑动统计 解决临界突变 实现略复杂 对精确度要求较高的API限流
令牌桶 匀速生成令牌,请求消耗令牌,允许突发 支持突发流量,平滑 需要维护令牌生成器 允许短时突发的场景
漏桶 请求先进入桶中,匀速处理,溢出则丢弃 强制平滑输出速率 不能应对突发 严格控制请求速率的场景

关键点:对多数爬虫场景,令牌桶算法最实用(允许你快速抓取一批数据再等待);对Web API服务,滑动窗口更合适(避免某个用户恶意刷接口)。


Python限流实战案例:使用ratelimit库实现基础限流

1 安装库

pip install ratelimit

2 基础用法:装饰器实现每分钟2次调用

from ratelimit import limits, RateLimitException
from time import sleep
import requests
# 限制:每分钟最多调用2次(calls=2, period=60)
@limits(calls=2, period=60)
def fetch_data(url):
    print(f"正在请求: {url}")
    resp = requests.get(url)
    return resp.status_code
# 测试:连续调用3次,第3次会触发异常
try:
    code1 = fetch_data("https://httpbin.org/get")
    print(f"第1次状态码: {code1}")
    code2 = fetch_data("https://httpbin.org/get")
    print(f"第2次状态码: {code2}")
    # 第3次会等待还是报错?默认会触发RateLimitException
    code3 = fetch_data("https://httpbin.org/get")
    print(f"第3次状态码: {code3}")
except RateLimitException as e:
    print(f"限流触发: {e}")

执行结果:前两次正常,第三次抛出RateLimitException,这意味着你需要在调用处捕获异常或让程序睡眠等待。

3 改进:使用sleep_and_retry装饰器自动等待

from ratelimit import limits, sleep_and_retry
@sleep_and_retry  # 当触发限流时,自动睡眠直到下一个时间窗口
@limits(calls=2, period=60)
def fetch_data_safe(url):
    print(f"安全请求: {url}")
    resp = requests.get(url)
    return resp.status_code
# 测试:连续调用3次,自动等待后第3次会成功
for i in range(3):
    code = fetch_data_safe("https://httpbin.org/get")
    print(f"第{i+1}次状态码: {code}")

输出示例

安全请求: https://httpbin.org/get
第1次状态码: 200
安全请求: https://httpbin.org/get
第2次状态码: 200
# 这里会等待约58秒
安全请求: https://httpbin.org/get
第3次状态码: 200

优势:无需手动time.sleep(),代码简洁,适合简单场景。


进阶案例:基于装饰器/中间件的动态限流方案

如果限流逻辑需要根据URL、用户或接口动态变化(比如不同API有不同的频率限制),可以自定义装饰器或使用令牌桶实现。

1 自定义令牌桶限流器

import time
import threading
class TokenBucket:
    def __init__(self, capacity, fill_rate):
        self.capacity = capacity          # 桶容量
        self.fill_rate = fill_rate        # 每秒填充令牌数
        self.tokens = capacity            # 当前令牌数
        self.last_fill = time.time()
        self.lock = threading.Lock()
    def acquire(self, tokens=1):
        with self.lock:
            # 先补充令牌
            now = time.time()
            elapsed = now - self.last_fill
            self.tokens = min(self.capacity, self.tokens + elapsed * self.fill_rate)
            self.last_fill = now
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            else:
                return False
# 使用示例
bucket = TokenBucket(capacity=5, fill_rate=1)  # 桶容量5,每秒补充1个令牌
for i in range(10):
    if bucket.acquire():
        print(f"请求{i+1}: 成功获取令牌,时间: {time.strftime('%H:%M:%S')}")
    else:
        print(f"请求{i+1}: 令牌不足,等待0.5秒后重试...")
        time.sleep(0.5)
        # 实际场景可重试或记录失败

特点fill_rate=1意味着每秒最多处理1个请求,但capacity=5允许短时间内存下5个突发请求,之后恢复平滑节奏。

2 实现动态限流装饰器

from functools import wraps
import time
def rate_limit(max_calls, period=60):
    def decorator(func):
        # 使用字典存储每个函数的调用时间戳列表
        calls = []
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            # 移除过期的时间戳
            while calls and calls[0] <= now - period:
                calls.pop(0)
            if len(calls) >= max_calls:
                # 计算需要等待的时间
                wait_time = period - (now - calls[0])
                print(f"限流触发: 需等待 {wait_time:.2f}s")
                time.sleep(wait_time)
            calls.append(time.time())
            return func(*args, **kwargs)
        return wrapper
    return decorator
# 使用
@rate_limit(max_calls=2, period=10)  # 每10秒最多2次
def limited_request():
    return requests.get("https://httpbin.org/get")

适用场景:不同API有不同的频率限制,可以通过参数动态设定。


多线程与异步场景下的限流策略(aiohttp + asyncio)

当使用asyncio编写异步爬虫时,限流逻辑需要和事件循环配合。

1 使用asyncio.Semaphore控制并发

import asyncio
import aiohttp
sem = asyncio.Semaphore(5)  # 最大并发5个请求
async def fetch_url(session, url):
    async with sem:
        async with session.get(url) as resp:
            return await resp.text()
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, "https://httpbin.org/get") for _ in range(20)]
        results = await asyncio.gather(*tasks)
asyncio.run(main())

2 使用aiolimiter实现令牌桶限流(推荐)

pip install aiolimiter
import asyncio
import aiohttp
from aiolimiter import AsyncLimiter
# 创建限流器:每秒处理2个请求,突发容量3个
limiter = AsyncLimiter(2, 3)  
async def limited_fetch(session, url):
    async with limiter:
        async with session.get(url) as resp:
            return await resp.text()
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [limited_fetch(session, "https://httpbin.org/get") for _ in range(10)]
        results = await asyncio.gather(*tasks)
asyncio.run(main())

关键区别aiolimiter是异步安全实现,内部使用令牌桶,支持突发流量,比Semaphore更精细。


面试高频问答:全栈开发者必须掌握的8个限流考察点

Q1: 如何防止爬虫被封IP? A: 组合策略:限流(控制请求频率)+ 随机User-Agent + 代理IP池 + 请求重试(带指数退避)。

Q2: 限流算法中,令牌桶和漏桶有什么区别? A: 令牌桶允许突发流量(只要有令牌),漏桶强制平滑输出,令牌桶适合突发短时抓取,漏桶适合视频流等需要恒定速率场景。

Q3: 在分布式系统中,如何实现全局限流? A: 使用Redis + Lua脚本实现原子性计数(如滑动窗口方案),或者使用Redis的INCR+过期时间+TIME_WINDOW实现。

Q4: 如何处理返回的429 Too Many Requests状态码? A: 捕获异常后解析响应头中的Retry-After值,睡眠对应时间后重试,并降低后续请求频率。

Q5: 限流装饰器和中间件哪种方式更好? A: 装饰器适合函数级别的限流(如爬虫函数),中间件适合全局HTTP服务限流(如Flask/Django中间件)。

Q6: 多线程环境下,限流器线程安全吗? A: 必须确保线程安全,可以使用threading.Lock(如上述令牌桶示例)或使用queue.Queue同步访问。

Q7: 异步限流和同步限流性能差异大吗? A: 异步限流不会阻塞事件循环,适合高并发IO密集场景,性能更高,同步限流在多线程下会阻塞其他线程,但实现简单。

Q8: 每秒限流100次和每分钟6000次,效果一样吗? A: 不一样,每秒100次是精细控制,适合平滑请求;每分钟6000次可能在前30秒就消耗完额度,导致后续30秒无法请求,建议根据API特性选择时间窗口。


总结与建议:如何选择适合业务的限流方案

使用场景 推荐方案 核心库/工具
单线程爬虫(入门) ratelimit + sleep_and_retry pip install ratelimit
多线程爬虫 自定义令牌桶 + threading.Lock 内置threading模块
异步爬虫(asyncio) aiolimiter pip install aiolimiter
Web API服务保护 中间件 + 滑动窗口 + Redis flask-limiterdjango-ratelimit
分布式限流 Redis + Lua脚本 redis-py + 原子操作

最后提醒:限流不是万能的,过度限流可能影响正常用户体验,最佳实践是“先测量后限流”:通过监控记录API的实际响应时间和返回状态码,动态调整限流参数,对于爬虫,建议结合随机延迟(1-5秒随机)、请求重试与退避策略,并遵守目标网站的robots.txt规则。


本文参考了官方文档、社区最佳实践及多篇技术博客的核心观点,通过去重提炼形成了这份结构化的实战指南,希望能帮助你从“踩到限流陷阱”进阶为“主动控制流量节奏”的Python开发者。

标签: 代码示例

抱歉,评论功能暂时关闭!