本文目录导读:
在Python中实现异常重试有多种方式,我为你整理了从简单到复杂的几种实现方法:
基础手动重试(最直接)
import time
import random
def unstable_function():
"""模拟可能失败的操作"""
if random.random() < 0.7: # 70%概率失败
raise ConnectionError("连接失败")
return "成功"
def manual_retry(max_retries=3, delay=1):
"""手动重试实现"""
for attempt in range(max_retries):
try:
result = unstable_function()
print(f"第{attempt+1}次尝试成功")
return result
except ConnectionError as e:
if attempt == max_retries - 1:
raise # 最后一次失败,向上抛出异常
print(f"第{attempt+1}次失败: {e},等待{delay}秒后重试...")
time.sleep(delay)
delay *= 2 # 指数退避
# 使用
try:
result = manual_retry()
print(f"最终结果: {result}")
except Exception as e:
print(f"所有重试都失败: {e}")
使用装饰器封装(推荐方案)
import time
import random
from functools import wraps
def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
"""
重试装饰器
参数:
max_attempts: 最大重试次数
delay: 初始延迟时间(秒)
backoff: 延迟倍数(指数退避)
exceptions: 需要捕获的异常类型
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts - 1:
raise
print(f"第{attempt+1}次尝试失败: {e},{current_delay}秒后重试...")
time.sleep(current_delay)
current_delay *= backoff
return None # 实际不会执行到这里
return wrapper
return decorator
# 使用装饰器
@retry(max_attempts=5, delay=1, backoff=2, exceptions=(ConnectionError, TimeoutError))
def network_request():
"""模拟网络请求"""
if random.random() < 0.8: # 80%概率失败
raise ConnectionError("网络连接失败")
return {"status": "ok", "data": "示例数据"}
# 测试
result = network_request()
print(f"请求结果: {result}")
使用第三方库 tenacity(最强大的方案)
# 安装: pip install tenacity
import random
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3), # 最多重试3次
wait=wait_exponential(multiplier=1, min=1, max=10), # 指数退避
retry=retry_if_exception_type((ConnectionError, TimeoutError)) # 指定异常类型
)
def flaky_function():
"""不稳定函数"""
if random.random() < 0.7:
raise ConnectionError("连接失败")
return "操作成功"
# 高级用法
from tenacity import before_sleep, after_log
import logging
logging.basicConfig(level=logging.INFO)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=1, max=30),
before_sleep=before_sleep(lambda retry_state:
print(f"准备重试第{retry_state.attempt_number}次")),
reraise=True # 重试失败后重新抛出异常
)
def advanced_retry_example():
if random.random() < 0.9:
raise ValueError("操作失败")
return "高级重试成功"
# 使用
try:
result = advanced_retry_example()
print(result)
except Exception as e:
print(f"最终失败: {e}")
完整实用的例子(文件读取+网络请求)
import time
import random
import requests
from functools import wraps
def retry_with_callback(max_attempts=3, delay=1, backoff=2):
"""带回调的重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
# 最后一次失败,执行回调
wrapper.last_error = e
print(f"所有{max_attempts}次重试都失败")
raise
print(f"第{attempt+1}次失败: {type(e).__name__}: {e}")
print(f"等待{current_delay}秒后重试...")
time.sleep(current_delay)
current_delay *= backoff
return None
return wrapper
return decorator
# 实际应用场景1:读取配置文件
@retry_with_callback(max_attempts=3, delay=0.5)
def read_config(file_path):
"""读取配置文件,模拟文件可能被占用的场景"""
if random.random() < 0.6:
raise IOError(f"文件被占用: {file_path}")
with open(file_path, 'r') as f:
return f.read()
# 实际应用场景2:API请求
@retry_with_callback(max_attempts=3, delay=2)
def fetch_data(url):
"""获取API数据"""
response = requests.get(url, timeout=5)
response.raise_for_status() # 如果状态码不是200,抛出异常
return response.json()
# 测试文件读取
try:
config = read_config("config.txt")
print("配置读取成功")
except IOError:
print("配置文件读取失败")
# 测试API请求
try:
data = fetch_data("https://api.example.com/data")
print(f"获取数据成功: {data}")
except requests.RequestException:
print("API请求失败")
异步函数的重试(asyncio版本)
import asyncio
import random
async def async_retry(func, max_attempts=3, delay=1):
"""异步重试函数"""
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
if attempt == max_attempts - 1:
raise
print(f"第{attempt+1}次失败,{delay}秒后重试...")
await asyncio.sleep(delay)
delay *= 2
async def unstable_async_func():
"""模拟不稳定的异步函数"""
await asyncio.sleep(0.1)
if random.random() < 0.7:
raise ValueError("异步操作失败")
return "异步操作成功"
# 使用
async def main():
try:
result = await async_retry(unstable_async_func, max_attempts=5, delay=0.5)
print(f"异步结果: {result}")
except Exception as e:
print(f"异步操作最终失败: {e}")
# 运行
asyncio.run(main())
关键设计要点
- 异常类型:明确指定要捕获的异常类型,避免捕获不应该重试的异常
- 退避策略:使用指数退避避免对服务器造成压力
- 最大重试次数:设置合理上限,避免无限重试
- 日志记录:记录重试过程和失败原因,便于调试
建议在实际项目中使用 tenacity 库,它功能完善且经过了大量生产环境验证,如果项目依赖较少,可以选择自定义装饰器实现。
标签: 重试机制