协程asyncio怎么用?Python异步编程从入门到实战
目录导读
- 为什么需要协程asyncio?——解决高并发与I/O瓶颈的核心逻辑
- 基础概念解析——事件循环、协程函数、任务与Future
- 快速上手:第一个asyncio程序——从同步到异步的代码演变
- 核心操作详解——await、gather、create_task与超时控制
- 常见陷阱与误区——阻塞调用、回调地狱与调试技巧
- 实战案例:异步爬虫与并发下载——让代码效率提升10倍
- 性能调优与最佳实践——资源管理、并发限制与日志调试
- 问答精选——关于asyncio最常被问到的5个问题
为什么需要协程asyncio?
传统同步编程中,当程序遇到网络请求、文件读取或数据库查询等I/O操作时,CPU会闲置等待,这种“阻塞等待”导致资源浪费,尤其在处理数千个并发连接时,线程或进程切换成本极高。协程asyncio的核心思想是:在单个线程内,通过事件循环实现“非阻塞等待”,让CPU在等待期间切换到其他任务,从而大幅提升并发能力。
与多线程的对比:
- 多线程存在GIL锁、线程安全与上下文切换开销,而协程完全在用户态调度,开销极低。
- 一个线程可运行数十万个协程,而线程数受限于系统资源。
基础概念解析
- 事件循环(Event Loop):asyncio的调度器,负责监听、执行与恢复协程,它是核心驱动,相当于一个同步管理器。
- 协程函数(Coroutine Function):使用
async def定义的函数,调用后返回一个协程对象。 - 可等待对象(Awaitable):协程、Task、Future等,可通过
await关键字等待其完成。 - 任务(Task):将协程包装为可独立调度和管理的对象,常用
asyncio.create_task()创建。
快速上手:第一个asyncio程序
同步版本(耗时3秒):
import time
def sync_task():
time.sleep(1)
return "done"
start = time.time()
results = [sync_task() for _ in range(3)]
print(f"同步耗时:{time.time()-start:.2f}s") # 输出约3秒
异步版本(耗时1秒):
import asyncio
async def async_task():
await asyncio.sleep(1) # 模拟I/O等待
return "done"
async def main():
tasks = [async_task() for _ in range(3)]
results = await asyncio.gather(*tasks)
return results
start = time.time()
asyncio.run(main())
print(f"异步耗时:{time.time()-start:.2f}s") # 输出约1秒
关键代码解析:
asyncio.run(main()):启动事件循环并运行main协程。await asyncio.sleep(1):真正的非阻塞等待,所有任务同时进行。gather:并发执行多个协程,并等待全部完成。
核心操作详解
await——交出控制权
当遇到await时,当前协程会暂停,事件循环自动切换到其他就绪的协程。注意:await只能在async def函数内使用。
asyncio.gather——并发聚合
results = await asyncio.gather(task1(), task2(), return_exceptions=True)
- 默认遇到异常会直接抛出,
return_exceptions=True可收集异常而不中断其他任务。
create_task——后台任务调度
async def worker():
while True:
await asyncio.sleep(1)
print("running...")
async def main():
task = asyncio.create_task(worker()) # 立即开始
await asyncio.sleep(3)
task.cancel() # 停止后台任务
超时控制
try:
result = await asyncio.wait_for(long_task(), timeout=2.0)
except asyncio.TimeoutError:
print("任务超时")
常见陷阱与误区
- 在协程内使用time.sleep():会阻塞整个事件循环!应始终使用
await asyncio.sleep()。 - 忘记await:协程对象不会被调度执行,导致程序表现怪异,可用
asyncio.run()内检查警告。 - CPU密集型任务阻塞:计算密集型任务应使用
asyncio.to_thread()切换到线程池运行。result = await asyncio.to_thread(cpu_intensive_func, arg)
- 事件循环多重嵌套:避免在异步函数内再次调用
asyncio.run(),否则会创建新循环。
实战案例:异步爬虫与并发下载
假设需要下载100个网页,同步版本需逐个请求,而异步版本可一次性批量发送。
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def download_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理失败的情形
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"失败:{url} - {result}")
return results
# 使用示例
urls = ["https://example.com/api" + str(i) for i in range(100)]
asyncio.run(download_all(urls))
为什么快?上百个HTTP请求被并发发送,I/O等待时间全部被利用,总耗时接近最慢的一个请求时间,而非所有请求之和。
性能调优与最佳实践
- 限制并发数:使用信号量
asyncio.Semaphore控制同时运行的协程数量,防止资源耗尽。sem = asyncio.Semaphore(10) async def limited_fetch(url): async with sem: return await fetch(url) - 复用连接:
aiohttp.ClientSession建议作为长连接复用,避免频繁创建销毁。 - 日志与调试:设置
asyncio.get_event_loop().set_debug(True),可检测协程泄漏和阻塞。 - 优雅关闭:使用
loop.shutdown_asyncgens()确保异步生成器正确清理。
问答精选
Q1:asyncio能替代多线程吗?
A:场景不同,对于I/O密集型任务(网络请求、数据库读写),asyncio性能远优于线程,但对于CPU密集型任务(图像处理、加密计算),多进程更合适,两者可结合:asyncio管理I/O,通过concurrent.futures.ThreadPoolExecutor分发CPU任务。
Q2:为什么我的asyncio程序反而比同步慢?
A:常见原因是错误使用阻塞操作(如requests库而非aiohttp),或并发度设置过高导致上下文切换开销,请检查代码中是否混入同步I/O调用。
Q3:如何获取协程的返回值?
A:await直接获取,或通过gather的返回列表,若使用create_task,可通过task.result()获取(需确认已完成)。
Q4:Python 3.10及以上版本对asyncio有什么改进?
A:引入了更稳定的asyncio.run(),TaskGroup(用于更安全的管理任务组),以及改进的timeout上下文管理器。
Q5:有哪些推荐的第三方库配合asyncio?
A:HTTP客户端首选aiohttp,数据库异步驱动如asyncpg(PostgreSQL)、aiomysql,异步Web框架推荐FastAPI,消息队列用aio-pika,核心原则:选择原生支持asyncio的库,避免使用run_in_executor包装同步库。
您已掌握asyncio从理论到实战的核心要点,关键在于理解“非阻塞”的本质,并始终使用正确的异步库,建议从一个小型爬虫或Web服务器开始实践,逐步扩展至复杂系统,异步编程的思维方式需要时间适应,但一旦掌握,您将开启Python高效并发的大门。
标签: asyncio