本文目录导读:
这个Python全栈案例确实能帮助你理解异步非阻塞IO的核心原理,但需要设计得足够精巧,我会为你构建一个从“阻塞”到“非阻塞”再到“异步”的渐进式案例,并解释每个阶段的原理。
案例设计:模拟一个简单的Web服务器
我们将用Python实现一个迷你Web服务器,逐步从阻塞模型演进到异步非阻塞模型。
阶段1:阻塞IO模型(BIO)
import socket
import time
def handle_request(client_socket):
"""处理请求,模拟IO阻塞操作"""
# 模拟读取请求数据(阻塞IO操作)
request = client_socket.recv(1024) # 阻塞在这里等待数据
print(f"收到请求: {request}")
# 模拟数据库查询(阻塞操作)
time.sleep(2) # 模拟IO等待
# 返回响应
response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World"
client_socket.send(response) # 发送也可能阻塞
client_socket.close()
def blocking_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8888))
server.listen(5)
print("阻塞服务器启动,监听端口8888...")
while True:
client_socket, addr = server.accept() # 阻塞等待连接
print(f"新连接来自: {addr}")
handle_request(client_socket) # 同步处理,阻塞直到完成
原理说明:每个请求必须等待前一个请求完全处理完毕,包括IO等待,当time.sleep(2)时,整个服务器被阻塞,无法处理其他请求,这就是典型的阻塞IO模型。
阶段2:多线程非阻塞模型
import threading
import socket
import time
def threaded_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8889))
server.listen(5)
print("多线程服务器启动,监听端口8889...")
while True:
client_socket, addr = server.accept()
# 为每个连接创建新线程,主线程不阻塞
t = threading.Thread(target=handle_request, args=(client_socket,))
t.start()
原理说明:虽然每个线程内部仍然是阻塞的,但主线程可以立即接受新连接,这解决了并发问题,但线程开销大,CPU上下文切换频繁。
阶段3:异步非阻塞IO模型(使用select)
import socket
import select
import time
def async_nonblocking_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False) # 设置为非阻塞模式
server.bind(('localhost', 8890))
server.listen(5)
print("异步非阻塞服务器启动,监听端口8890...")
inputs = [server] # 需要监听的可读socket列表
outputs = [] # 需要监听的可写socket列表
message_queues = {} # 存储每个连接的数据
while inputs:
# select会阻塞,但可以同时监控多个socket
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for s in readable:
if s is server: # 新连接
client_socket, addr = server.accept()
print(f"新连接来自: {addr}")
client_socket.setblocking(False)
inputs.append(client_socket)
message_queues[client_socket] = []
else: # 已有连接的数据
try:
data = s.recv(1024)
if data:
print(f"收到数据: {data}")
message_queues[s].append(data)
if s not in outputs:
outputs.append(s) # 标记为可写
else:
# 连接关闭
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
del message_queues[s]
except BlockingIOError:
continue # 非阻塞模式下的正常行为
for s in writable:
if s in message_queues and message_queues[s]:
# 模拟处理延迟
time.sleep(2) # 注意:这里仍然会阻塞事件循环
response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World"
try:
s.send(response)
except BlockingIOError:
continue
outputs.remove(s)
原理说明:
setblocking(False)使socket变为非阻塞select允许同时监控多个socket,当某个socket可读或可写时才处理- 单线程处理多个连接,避免了线程切换开销
- 缺陷:
time.sleep(2)仍然会阻塞整个事件循环
阶段4:真正的异步非阻塞(使用asyncio)
import asyncio
async def handle_client(reader, writer):
"""异步处理客户端请求"""
# 读取请求(非阻塞,让出控制权)
request = await reader.read(1024)
print(f"收到请求: {request}")
# 模拟异步IO操作(不阻塞事件循环)
await asyncio.sleep(2) # 模拟IO等待,但事件循环继续运行
# 发送响应
response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World"
writer.write(response)
await writer.drain() # 确保数据发送完毕
writer.close()
await writer.wait_closed()
async def async_server():
server = await asyncio.start_server(handle_client, 'localhost', 8891)
print("asyncio服务器启动,监听端口8891...")
async with server:
await server.serve_forever()
# 运行
asyncio.run(async_server())
核心原理:
- 事件循环(Event Loop):
asyncio.run()创建事件循环,不断检查就绪的协程 - 协程(Coroutine):
async def定义的函数,可以暂停和恢复 - await:遇到
await时,协程暂停执行,将控制权交还给事件循环 - 非阻塞:
await asyncio.sleep(2)不会阻塞事件循环,事件循环可以去处理其他连接
关键原理图示
传统阻塞模型:
请求1 → |等待IO| → 处理 → 请求2 → |等待IO| → 处理
↑ 阻塞 ↑ 阻塞
多线程模型:
请求1 → |等待IO| → 处理
请求2 → |等待IO| → 处理 (并行但线程开销大)
异步非阻塞(asyncio):
请求1 → |等待IO| → 处理
请求2 → |等待IO| ✓ 请求3 → |等待IO| (事件循环不停切换)
↑ 暂停协程 ↑ 恢复其他协程
你能从中学到什么?
| 模型 | 并发处理 | 资源消耗 | 适用场景 | 原理点 |
|---|---|---|---|---|
| 阻塞 | 串行 | 低 | 开发调试 | 理解IO阻塞的本质 |
| 多线程 | 并发 | 高 | CPU密集型 | 知道线程切换代价 |
| select | 并发 | 中 | 少量连接 | 理解非阻塞+事件通知 |
| asyncio | 高并发 | 极低 | IO密集型 | 掌握真正的异步非阻塞 |
实战建议
- 先运行每个阶段的代码,观察其行为差异(用多个浏览器标签页同时访问)
- 使用
ab(Apache Bench) 或wrk进行压力测试,对比性能 - 用
strace或dtrace跟踪系统调用,观察阻塞与非阻塞的区别 - 深入研究
asyncio的源码,理解Future、Task、Event Loop的实现
这个案例之所以有效,是因为它从最基础的socket编程开始,逐步引入抽象层次,让你看到异步非阻塞IO的“肌肉”是如何生长的,当你运行阶段4的asyncio版本,并发处理数千连接而CPU占用率极低时,你才能真正理解异步非阻塞的威力。
标签: 异步非阻塞IO