你能否用Python的asyncore模块实现异步网络服务

访客 网络编程 1

本文目录导读:

  1. 基础异步TCP服务器
  2. 更完整的聊天服务器
  3. 异步HTTP服务器
  4. 高级功能示例:带超时的客户端管理
  5. 主要特点说明:
  6. 运行说明:

我来为你展示如何使用 Python 的 asyncore 模块实现异步网络服务。

基础异步TCP服务器

import asyncore
import socket
class EchoHandler(asyncore.dispatcher_with_send):
    """处理客户端连接的处理器"""
    def handle_read(self):
        """当有数据可读时调用"""
        data = self.recv(1024)
        if data:
            print(f"收到数据: {data.decode('utf-8')}")
            # 将数据回显给客户端
            self.send(data)
        else:
            self.close()
class EchoServer(asyncore.dispatcher):
    """异步TCP服务器"""
    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)
        print(f"服务器启动在 {host}:{port}")
    def handle_accept(self):
        """当有新的客户端连接时调用"""
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            print(f"新连接来自: {addr}")
            handler = EchoHandler(sock)
# 使用示例
if __name__ == "__main__":
    server = EchoServer('localhost', 8888)
    asyncore.loop()

更完整的聊天服务器

import asyncore
import socket
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, 
                   format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
class ChatClient(asyncore.dispatcher_with_send):
    """聊天客户端处理器"""
    def __init__(self, sock, server):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.server = server
        self.nickname = f"用户{id(self)}"
        self.buffer = ""
    def handle_read(self):
        """处理接收到的数据"""
        data = self.recv(1024)
        if data:
            message = data.decode('utf-8').strip()
            logger.info(f"从 {self.nickname} 收到: {message}")
            # 处理特殊命令
            if message.startswith('/'):
                self.handle_command(message)
            else:
                # 广播消息给所有客户端
                self.server.broadcast(f"{self.nickname}: {message}", self)
        else:
            self.handle_close()
    def handle_command(self, command):
        """处理特殊命令"""
        cmd_parts = command.split()
        cmd = cmd_parts[0].lower()
        if cmd == '/nick' and len(cmd_parts) > 1:
            old_nick = self.nickname
            self.nickname = cmd_parts[1]
            self.server.broadcast(f"{old_nick} 改名为 {self.nickname}", self)
        elif cmd == '/list':
            users = self.server.get_user_list()
            self.send(f"在线用户: {', '.join(users)}".encode())
        elif cmd == '/quit':
            self.handle_close()
        elif cmd == '/help':
            help_text = """
可用命令:
/nick <名称> - 修改昵称
/list - 查看在线用户
/quit - 退出聊天室
/help - 显示帮助
            """
            self.send(help_text.encode())
    def handle_close(self):
        """处理客户端断开连接"""
        logger.info(f"{self.nickname} 断开连接")
        self.server.remove_client(self)
        self.server.broadcast(f"{self.nickname} 离开了聊天室", self)
        self.close()
class ChatServer(asyncore.dispatcher):
    """聊天服务器"""
    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)
        self.clients = []
        logger.info(f"聊天服务器启动在 {host}:{port}")
    def handle_accept(self):
        """处理新的客户端连接"""
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            logger.info(f"新客户端连接: {addr}")
            client = ChatClient(sock, self)
            self.clients.append(client)
            self.broadcast(f"欢迎 {client.nickname} 加入聊天室!", client)
    def broadcast(self, message, sender=None):
        """广播消息给所有客户端"""
        full_message = message.encode('utf-8')
        for client in self.clients:
            if client is not sender:  # 不发送给发送者自己
                try:
                    client.send(full_message)
                except:
                    self.remove_client(client)
    def remove_client(self, client):
        """移除客户端"""
        if client in self.clients:
            self.clients.remove(client)
    def get_user_list(self):
        """获取在线用户列表"""
        return [client.nickname for client in self.clients]
if __name__ == "__main__":
    server = ChatServer('localhost', 8889)
    try:
        asyncore.loop()
    except KeyboardInterrupt:
        logger.info("服务器关闭")
# 简单客户端测试
import socket
import threading
def test_client():
    """测试用的客户端"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('localhost', 8889))
    # 发送消息
    sock.send(f"Hello from client {id(sock)}".encode())
    # 接收响应
    response = sock.recv(4096)
    print(f"收到: {response.decode()}")
    sock.close()
# 启动测试客户端
if __name__ == "__main__":
    import threading
    # 在实际测试中,先启动服务器,然后运行这个
    print("请先运行服务器,然后取消以下注释来测试")
    # threading.Thread(target=test_client).start()

异步HTTP服务器

import asyncore
import socket
import os
from datetime import datetime
class HTTPHandler(asyncore.dispatcher_with_send):
    """HTTP请求处理器"""
    def __init__(self, sock, server):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.server = server
        self.request_data = b""
    def handle_read(self):
        """处理HTTP请求"""
        chunk = self.recv(8192)
        if chunk:
            self.request_data += chunk
            # 检查是否接收完整请求
            if b"\r\n\r\n" in self.request_data:
                self.process_request()
        else:
            self.handle_close()
    def process_request(self):
        """处理HTTP请求"""
        request_text = self.request_data.decode('utf-8')
        lines = request_text.split('\r\n')
        if lines:
            # 解析请求行
            request_line = lines[0]
            parts = request_line.split()
            if len(parts) >= 2:
                method = parts[0]
                path = parts[1]
                # 处理不同的路由
                if path == '/':
                    self.send_response(200, '<h1>Hello, World!</h1>')
                elif path == '/time':
                    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    self.send_response(200, f'<h1>当前时间: {current_time}</h1>')
                elif path == '/info':
                    info = f"""
                    <h2>服务器信息</h2>
                    <ul>
                        <li>主机: {socket.gethostname()}</li>
                        <li>时间: {datetime.now()}</li>
                        <li>协议: HTTP/1.1</li>
                    </ul>
                    """
                    self.send_response(200, info)
                else:
                    self.send_response(404, '<h1>404 - 页面未找到</h1>')
            else:
                self.send_response(400, '<h1>400 - 错误请求</h1>')
    def send_response(self, status_code, body):
        """发送HTTP响应"""
        status_messages = {
            200: "OK",
            400: "Bad Request",
            404: "Not Found",
            500: "Internal Server Error"
        }
        status_message = status_messages.get(status_code, "Unknown")
        response = f"HTTP/1.1 {status_code} {status_message}\r\n"
        response += "Content-Type: text/html; charset=utf-8\r\n"
        response += f"Content-Length: {len(body.encode())}\r\n"
        response += "Connection: close\r\n"
        response += "\r\n"
        response += body
        self.send(response.encode())
        self.handle_close()
    def handle_close(self):
        """关闭连接"""
        self.close()
class HTTPServer(asyncore.dispatcher):
    """异步HTTP服务器"""
    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)
        print(f"HTTP服务器启动在 http://{host}:{port}")
    def handle_accept(self):
        """处理新的HTTP连接"""
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            print(f"新HTTP请求来自: {addr}")
            handler = HTTPHandler(sock, self)
if __name__ == "__main__":
    server = HTTPServer('localhost', 8080)
    try:
        asyncore.loop()
    except KeyboardInterrupt:
        print("服务器关闭")

高级功能示例:带超时的客户端管理

import asyncore
import socket
import time
class TimedClientHandler(asyncore.dispatcher_with_send):
    """带超时管理的客户端"""
    def __init__(self, sock, server, timeout=30):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.server = server
        self.timeout = timeout
        self.last_activity = time.time()
        self.buffer = ""
    def handle_read(self):
        """处理读取,同时更新活动时间"""
        data = self.recv(1024)
        if data:
            self.last_activity = time.time()
            message = data.decode('utf-8').strip()
            print(f"收到消息: {message}")
            # 回显消息
            self.send(f"服务器回复: {message}".encode())
        else:
            self.handle_close()
    def check_timeout(self):
        """检查是否超时"""
        if time.time() - self.last_activity > self.timeout:
            print(f"客户端 {id(self)} 超时,关闭连接")
            self.send("连接超时,服务器关闭连接".encode())
            self.handle_close()
            return True
        return False
class TimeoutServer(asyncore.dispatcher):
    """带超时检查的服务器"""
    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)
        self.clients = []
        self.last_check = time.time()
        print(f"超时检查服务器启动在 {host}:{port}")
    def handle_accept(self):
        """处理新连接"""
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            print(f"新连接: {addr}")
            client = TimedClientHandler(sock, self)
            self.clients.append(client)
    def writable(self):
        """定期检查超时连接"""
        current_time = time.time()
        if current_time - self.last_check > 5:  # 每5秒检查一次
            self.last_check = current_time
            # 检查所有客户端的超时
            self.clients = [c for c in self.clients if not c.check_timeout()]
        return False  # 不需要写入
if __name__ == "__main__":
    server = TimeoutServer('localhost', 8890)
    try:
        asyncore.loop()
    except KeyboardInterrupt:
        print("服务器关闭")

主要特点说明:

  1. 基础架构: 使用 asyncore.dispatcher 作为服务器,asyncore.dispatcher_with_send 处理客户端

  2. 事件驱动: 通过 handle_accept(), handle_read(), handle_close() 等方法处理事件

  3. 非阻塞I/O: 所有操作都是非阻塞的,支持并发处理多个客户端

  4. 易于扩展: 可以根据需要添加自定义处理方法

运行说明:

  1. 运行服务器:

    python asyncore_server.py
  2. 测试连接(使用 telnet):

    telnet localhost 8889
  3. 或使用 Python 的 socket 客户端测试

asyncore 模块虽然功能相对简单,但对于学习异步编程概念和理解事件驱动架构很有帮助,在实际生产环境中,建议使用 asyncio 等更现代的异步框架。

标签: 异步网络服务

抱歉,评论功能暂时关闭!