怎样用Python的select模块实现非阻塞网络通信

访客 网络编程 1

本文目录导读:

  1. 基础概念
  2. 非阻塞服务器实现
  3. 非阻塞客户端实现
  4. 多客户端聊天服务器
  5. 高级用法:超时和多种IO监控
  6. 性能优化建议
  7. 关键要点

我来详细介绍如何使用Python的select模块实现非阻塞网络通信。

基础概念

select模块允许同时监控多个文件描述符(socket、文件等),当其中任何一个变为可读、可写或发生异常时返回。

非阻塞服务器实现

单线程服务器示例

import select
import socket
import sys
def nonblocking_server(host='localhost', port=8888):
    # 创建服务器socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.setblocking(False)  # 设置为非阻塞
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"服务器启动在 {host}:{port}")
    # 需要监控的socket列表
    inputs = [server_socket]  # 可读事件
    outputs = []  # 可写事件
    # 存储客户端数据
    message_queues = {}
    while inputs:
        # 使用select监控socket
        readable, writable, exceptional = select.select(
            inputs, outputs, inputs
        )
        # 处理可读事件
        for sock in readable:
            if sock is server_socket:
                # 新客户端连接
                client_socket, client_address = sock.accept()
                client_socket.setblocking(False)
                inputs.append(client_socket)
                message_queues[client_socket] = []
                print(f"新客户端连接: {client_address}")
            else:
                # 接收客户端数据
                data = sock.recv(1024)
                if data:
                    print(f"收到数据: {data.decode()}")
                    message_queues[sock].append(data)
                    if sock not in outputs:
                        outputs.append(sock)
                else:
                    # 客户端断开连接
                    print(f"客户端断开: {sock.getpeername()}")
                    if sock in outputs:
                        outputs.remove(sock)
                    inputs.remove(sock)
                    sock.close()
                    del message_queues[sock]
        # 处理可写事件
        for sock in writable:
            if message_queues.get(sock):
                data = message_queues[sock].pop(0)
                # 回声服务:发送回客户端
                sock.send(data)
                if not message_queues[sock]:
                    outputs.remove(sock)
        # 处理异常事件
        for sock in exceptional:
            print(f"异常事件: {sock.getpeername()}")
            inputs.remove(sock)
            if sock in outputs:
                outputs.remove(sock)
            sock.close()
            del message_queues[sock]
if __name__ == '__main__':
    try:
        nonblocking_server()
    except KeyboardInterrupt:
        print("\n服务器关闭")
        sys.exit(0)

非阻塞客户端实现

import socket
import select
import sys
import time
def nonblocking_client(host='localhost', port=8888):
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.setblocking(False)  # 设置为非阻塞
    try:
        # 尝试连接(非阻塞)
        client_socket.connect((host, port))
    except BlockingIOError:
        # 非阻塞连接会立即返回异常
        pass
    # 使用select等待连接完成
    readable, writable, exceptional = select.select(
        [client_socket], [client_socket], [client_socket], 5
    )
    if writable:
        print("连接成功!")
        client_socket.setblocking(False)  # 保持非阻塞
    else:
        print("连接失败")
        client_socket.close()
        return
    # 发送数据
    send_data = b"Hello, non-blocking server!"
    client_socket.send(send_data)
    # 等待响应
    time.sleep(0.1)
    # 接收数据(非阻塞)
    try:
        data = client_socket.recv(4096)
        if data:
            print(f"收到响应: {data.decode()}")
    except BlockingIOError:
        print("暂无数据可读")
    client_socket.close()
if __name__ == '__main__':
    nonblocking_client()

多客户端聊天服务器

import select
import socket
import queue
class ChatServer:
    def __init__(self, host='localhost', port=9999):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.setblocking(False)
        self.server.bind((host, port))
        self.server.listen(10)
        self.inputs = [self.server]
        self.outputs = []
        self.message_queues = {}
        self.clients = {}  # socket -> username
    def run(self):
        print("聊天服务器启动中...")
        while self.inputs:
            readable, writable, exceptional = select.select(
                self.inputs, self.outputs, self.inputs
            )
            for sock in readable:
                if sock is self.server:
                    self._accept_new_client()
                else:
                    self._handle_client_data(sock)
            for sock in writable:
                self._send_queued_messages(sock)
            for sock in exceptional:
                self._handle_exception(sock)
    def _accept_new_client(self):
        client_socket, address = self.server.accept()
        client_socket.setblocking(False)
        self.inputs.append(client_socket)
        self.message_queues[client_socket] = queue.Queue()
        # 发送欢迎消息
        welcome = b"Welcome to chat server! Please enter your name: "
        client_socket.send(welcome)
        print(f"新客户端: {address}")
    def _handle_client_data(self, sock):
        try:
            data = sock.recv(1024)
        except:
            self._remove_client(sock)
            return
        if data:
            # 处理接收到的数据
            message = data.decode().strip()
            if sock not in self.clients:
                # 首次消息作为用户名
                self.clients[sock] = message
                self._broadcast(f"{message} joined the chat!")
                reply = f"Welcome {message}! You can start chatting.\n"
                sock.send(reply.encode())
            else:
                # 正常聊天消息
                username = self.clients[sock]
                self._broadcast(f"{username}: {message}")
        else:
            self._remove_client(sock)
    def _broadcast(self, message):
        print(f"广播: {message}")
        for sock in self.inputs:
            if sock != self.server:
                self.message_queues[sock].put(message.encode())
                if sock not in self.outputs:
                    self.outputs.append(sock)
    def _send_queued_messages(self, sock):
        try:
            while not self.message_queues[sock].empty():
                message = self.message_queues[sock].get_nowait()
                sock.send(message + b'\n')
        except:
            self._remove_client(sock)
        else:
            if self.message_queues[sock].empty():
                self.outputs.remove(sock)
    def _remove_client(self, sock):
        if sock in self.inputs:
            self.inputs.remove(sock)
        if sock in self.outputs:
            self.outputs.remove(sock)
        username = self.clients.get(sock, "Unknown")
        print(f"客户端断开: {username}")
        if sock in self.clients:
            del self.clients[sock]
        if sock in self.message_queues:
            del self.message_queues[sock]
        sock.close()
        if username != "Unknown":
            self._broadcast(f"{username} left the chat!")
    def _handle_exception(self, sock):
        self._remove_client(sock)
if __name__ == '__main__':
    server = ChatServer()
    try:
        server.run()
    except KeyboardInterrupt:
        print("\n聊天服务器关闭")

高级用法:超时和多种IO监控

import select
import socket
import time
def select_with_timeout():
    """演示select的超时功能"""
    # 创建测试socket
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.setblocking(False)
    udp_socket.bind(('localhost', 0))
    tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp_socket.setblocking(False)
    # 使用select监控多种socket
    timeout = 5  # 5秒超时
    print("等待数据...(5秒超时)")
    readable, writable, exceptional = select.select(
        [udp_socket, tcp_socket],
        [tcp_socket],
        [udp_socket, tcp_socket],
        timeout
    )
    if not (readable or writable or exceptional):
        print("超时!没有事件发生")
    else:
        print(f"可读: {len(readable)}, 可写: {len(writable)}, 异常: {len(exceptional)}")
    udp_socket.close()
    tcp_socket.close()
def monitor_network_events():
    """监控网络事件示例"""
    # 创建一个简单的UDP监听器
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setblocking(False)
    sock.bind(('localhost', 5005))
    print("UDP监听器启动在 localhost:5005")
    while True:
        # 每秒检查一次
        readable, _, _ = select.select([sock], [], [], 1)
        if readable:
            data, addr = sock.recvfrom(1024)
            print(f"收到来自 {addr} 的数据: {data.decode()}")
        else:
            print("等待数据...")
if __name__ == '__main__':
    select_with_timeout()

性能优化建议

import select
import socket
import threading
from collections import defaultdict
class OptimizedSelector:
    """优化后的select实现"""
    def __init__(self, max_connections=1000):
        self.max_connections = max_connections
        self.inputs = []
        self.outputs = []
        self.message_buffers = defaultdict(list)
        self.lock = threading.Lock()
    def handle_events(self):
        """批量处理事件,提高性能"""
        with self.lock:
            if len(self.inputs) > self.max_connections:
                print(f"警告: 连接数 {len(self.inputs)} 超过限制 {self.max_connections}")
                return
        try:
            readable, writable, _ = select.select(
                self.inputs, self.outputs, [],
                timeout=0.1  # 使用short timeout避免阻塞
            )
        except select.error as e:
            print(f"select错误: {e}")
            return
        # 批量处理可读事件
        for sock in readable:
            self._handle_read(sock)
        # 批量处理可写事件
        for sock in writable:
            self._handle_write(sock)
    def _handle_read(self, sock):
        """处理读事件"""
        try:
            data = sock.recv(4096)
            if not data:
                self._remove_socket(sock)
            else:
                with self.lock:
                    self.message_buffers[sock].append(data)
        except BlockingIOError:
            pass  # 正常非阻塞行为
        except Exception as e:
            print(f"读错误: {e}")
            self._remove_socket(sock)
    def _handle_write(self, sock):
        """处理写事件"""
        with self.lock:
            buffer = self.message_buffers.get(sock, [])
            if buffer:
                try:
                    sock.sendall(buffer.pop(0))
                except BlockingIOError:
                    pass  # 正常非阻塞行为
                except Exception as e:
                    print(f"写错误: {e}")
                    self._remove_socket(sock)
    def _remove_socket(self, sock):
        """安全移除socket"""
        with self.lock:
            if sock in self.inputs:
                self.inputs.remove(sock)
            if sock in self.outputs:
                self.outputs.remove(sock)
            if sock in self.message_buffers:
                del self.message_buffers[sock]
        try:
            sock.close()
        except:
            pass

关键要点

  1. 设置非阻塞模式:使用 socket.setblocking(False)
  2. 处理BlockingIOError:非阻塞操作可能引发此异常
  3. 使用select返回readable, writable, exceptional = select.select()
  4. 维护socket列表:持续更新需要监控的socket
  5. 处理连接断开:及时从列表中移除断开连接的socket

select适合连接数较少的场景,对于大量并发连接,建议考虑epoll(Linux)或kqueue(macOS/BSD)。

标签: 非阻塞

抱歉,评论功能暂时关闭!