本文目录导读:
我来详细介绍如何使用Python的select模块实现非阻塞网络通信。
基础概念
select模块允许同时监控多个文件描述符(socket、文件等),当其中任何一个变为可读、可写或发生异常时返回。
非阻塞服务器实现
单线程服务器示例
import select
import socket
import sys
def nonblocking_server(host='localhost', port=8888):
# 创建服务器socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.setblocking(False) # 设置为非阻塞
server_socket.bind((host, port))
server_socket.listen(5)
print(f"服务器启动在 {host}:{port}")
# 需要监控的socket列表
inputs = [server_socket] # 可读事件
outputs = [] # 可写事件
# 存储客户端数据
message_queues = {}
while inputs:
# 使用select监控socket
readable, writable, exceptional = select.select(
inputs, outputs, inputs
)
# 处理可读事件
for sock in readable:
if sock is server_socket:
# 新客户端连接
client_socket, client_address = sock.accept()
client_socket.setblocking(False)
inputs.append(client_socket)
message_queues[client_socket] = []
print(f"新客户端连接: {client_address}")
else:
# 接收客户端数据
data = sock.recv(1024)
if data:
print(f"收到数据: {data.decode()}")
message_queues[sock].append(data)
if sock not in outputs:
outputs.append(sock)
else:
# 客户端断开连接
print(f"客户端断开: {sock.getpeername()}")
if sock in outputs:
outputs.remove(sock)
inputs.remove(sock)
sock.close()
del message_queues[sock]
# 处理可写事件
for sock in writable:
if message_queues.get(sock):
data = message_queues[sock].pop(0)
# 回声服务:发送回客户端
sock.send(data)
if not message_queues[sock]:
outputs.remove(sock)
# 处理异常事件
for sock in exceptional:
print(f"异常事件: {sock.getpeername()}")
inputs.remove(sock)
if sock in outputs:
outputs.remove(sock)
sock.close()
del message_queues[sock]
if __name__ == '__main__':
try:
nonblocking_server()
except KeyboardInterrupt:
print("\n服务器关闭")
sys.exit(0)
非阻塞客户端实现
import socket
import select
import sys
import time
def nonblocking_client(host='localhost', port=8888):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.setblocking(False) # 设置为非阻塞
try:
# 尝试连接(非阻塞)
client_socket.connect((host, port))
except BlockingIOError:
# 非阻塞连接会立即返回异常
pass
# 使用select等待连接完成
readable, writable, exceptional = select.select(
[client_socket], [client_socket], [client_socket], 5
)
if writable:
print("连接成功!")
client_socket.setblocking(False) # 保持非阻塞
else:
print("连接失败")
client_socket.close()
return
# 发送数据
send_data = b"Hello, non-blocking server!"
client_socket.send(send_data)
# 等待响应
time.sleep(0.1)
# 接收数据(非阻塞)
try:
data = client_socket.recv(4096)
if data:
print(f"收到响应: {data.decode()}")
except BlockingIOError:
print("暂无数据可读")
client_socket.close()
if __name__ == '__main__':
nonblocking_client()
多客户端聊天服务器
import select
import socket
import queue
class ChatServer:
def __init__(self, host='localhost', port=9999):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.setblocking(False)
self.server.bind((host, port))
self.server.listen(10)
self.inputs = [self.server]
self.outputs = []
self.message_queues = {}
self.clients = {} # socket -> username
def run(self):
print("聊天服务器启动中...")
while self.inputs:
readable, writable, exceptional = select.select(
self.inputs, self.outputs, self.inputs
)
for sock in readable:
if sock is self.server:
self._accept_new_client()
else:
self._handle_client_data(sock)
for sock in writable:
self._send_queued_messages(sock)
for sock in exceptional:
self._handle_exception(sock)
def _accept_new_client(self):
client_socket, address = self.server.accept()
client_socket.setblocking(False)
self.inputs.append(client_socket)
self.message_queues[client_socket] = queue.Queue()
# 发送欢迎消息
welcome = b"Welcome to chat server! Please enter your name: "
client_socket.send(welcome)
print(f"新客户端: {address}")
def _handle_client_data(self, sock):
try:
data = sock.recv(1024)
except:
self._remove_client(sock)
return
if data:
# 处理接收到的数据
message = data.decode().strip()
if sock not in self.clients:
# 首次消息作为用户名
self.clients[sock] = message
self._broadcast(f"{message} joined the chat!")
reply = f"Welcome {message}! You can start chatting.\n"
sock.send(reply.encode())
else:
# 正常聊天消息
username = self.clients[sock]
self._broadcast(f"{username}: {message}")
else:
self._remove_client(sock)
def _broadcast(self, message):
print(f"广播: {message}")
for sock in self.inputs:
if sock != self.server:
self.message_queues[sock].put(message.encode())
if sock not in self.outputs:
self.outputs.append(sock)
def _send_queued_messages(self, sock):
try:
while not self.message_queues[sock].empty():
message = self.message_queues[sock].get_nowait()
sock.send(message + b'\n')
except:
self._remove_client(sock)
else:
if self.message_queues[sock].empty():
self.outputs.remove(sock)
def _remove_client(self, sock):
if sock in self.inputs:
self.inputs.remove(sock)
if sock in self.outputs:
self.outputs.remove(sock)
username = self.clients.get(sock, "Unknown")
print(f"客户端断开: {username}")
if sock in self.clients:
del self.clients[sock]
if sock in self.message_queues:
del self.message_queues[sock]
sock.close()
if username != "Unknown":
self._broadcast(f"{username} left the chat!")
def _handle_exception(self, sock):
self._remove_client(sock)
if __name__ == '__main__':
server = ChatServer()
try:
server.run()
except KeyboardInterrupt:
print("\n聊天服务器关闭")
高级用法:超时和多种IO监控
import select
import socket
import time
def select_with_timeout():
"""演示select的超时功能"""
# 创建测试socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.setblocking(False)
udp_socket.bind(('localhost', 0))
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket.setblocking(False)
# 使用select监控多种socket
timeout = 5 # 5秒超时
print("等待数据...(5秒超时)")
readable, writable, exceptional = select.select(
[udp_socket, tcp_socket],
[tcp_socket],
[udp_socket, tcp_socket],
timeout
)
if not (readable or writable or exceptional):
print("超时!没有事件发生")
else:
print(f"可读: {len(readable)}, 可写: {len(writable)}, 异常: {len(exceptional)}")
udp_socket.close()
tcp_socket.close()
def monitor_network_events():
"""监控网络事件示例"""
# 创建一个简单的UDP监听器
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('localhost', 5005))
print("UDP监听器启动在 localhost:5005")
while True:
# 每秒检查一次
readable, _, _ = select.select([sock], [], [], 1)
if readable:
data, addr = sock.recvfrom(1024)
print(f"收到来自 {addr} 的数据: {data.decode()}")
else:
print("等待数据...")
if __name__ == '__main__':
select_with_timeout()
性能优化建议
import select
import socket
import threading
from collections import defaultdict
class OptimizedSelector:
"""优化后的select实现"""
def __init__(self, max_connections=1000):
self.max_connections = max_connections
self.inputs = []
self.outputs = []
self.message_buffers = defaultdict(list)
self.lock = threading.Lock()
def handle_events(self):
"""批量处理事件,提高性能"""
with self.lock:
if len(self.inputs) > self.max_connections:
print(f"警告: 连接数 {len(self.inputs)} 超过限制 {self.max_connections}")
return
try:
readable, writable, _ = select.select(
self.inputs, self.outputs, [],
timeout=0.1 # 使用short timeout避免阻塞
)
except select.error as e:
print(f"select错误: {e}")
return
# 批量处理可读事件
for sock in readable:
self._handle_read(sock)
# 批量处理可写事件
for sock in writable:
self._handle_write(sock)
def _handle_read(self, sock):
"""处理读事件"""
try:
data = sock.recv(4096)
if not data:
self._remove_socket(sock)
else:
with self.lock:
self.message_buffers[sock].append(data)
except BlockingIOError:
pass # 正常非阻塞行为
except Exception as e:
print(f"读错误: {e}")
self._remove_socket(sock)
def _handle_write(self, sock):
"""处理写事件"""
with self.lock:
buffer = self.message_buffers.get(sock, [])
if buffer:
try:
sock.sendall(buffer.pop(0))
except BlockingIOError:
pass # 正常非阻塞行为
except Exception as e:
print(f"写错误: {e}")
self._remove_socket(sock)
def _remove_socket(self, sock):
"""安全移除socket"""
with self.lock:
if sock in self.inputs:
self.inputs.remove(sock)
if sock in self.outputs:
self.outputs.remove(sock)
if sock in self.message_buffers:
del self.message_buffers[sock]
try:
sock.close()
except:
pass
关键要点
- 设置非阻塞模式:使用
socket.setblocking(False) - 处理BlockingIOError:非阻塞操作可能引发此异常
- 使用select返回:
readable, writable, exceptional = select.select() - 维护socket列表:持续更新需要监控的socket
- 处理连接断开:及时从列表中移除断开连接的socket
select适合连接数较少的场景,对于大量并发连接,建议考虑epoll(Linux)或kqueue(macOS/BSD)。
标签: 非阻塞