如何用Python实现一个多协议的网络服务端

wen 网络编程 7

本文目录导读:

  1. 使用 selectors 模块实现多协议支持
  2. 使用 asyncio 实现异步多协议服务器
  3. 使用 socketserver 模块实现多协议服务器
  4. 客户端测试代码
  5. 选择建议

我来介绍一下如何使用Python实现多协议网络服务端,这里有几种常用方案:

使用 selectors 模块实现多协议支持

import selectors
import socket
import json
import threading
class MultiProtocolServer:
    def __init__(self, host='0.0.0.0', port_tcp=8888, port_udp=8889, port_http=8080):
        self.host = host
        self.port_tcp = port_tcp
        self.port_udp = port_udp
        self.port_http = port_http
        self.selector = selectors.DefaultSelector()
    def setup_servers(self):
        # TCP 服务器
        tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_socket.bind((self.host, self.port_tcp))
        tcp_socket.listen(5)
        tcp_socket.setblocking(False)
        self.selector.register(tcp_socket, selectors.EVENT_READ, self.handle_tcp_accept)
        print(f"TCP Server listening on {self.host}:{self.port_tcp}")
        # UDP 服务器
        udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udp_socket.bind((self.host, self.port_udp))
        udp_socket.setblocking(False)
        self.selector.register(udp_socket, selectors.EVENT_READ, self.handle_udp)
        print(f"UDP Server listening on {self.host}:{self.port_udp}")
        # HTTP 服务器(简化版)
        http_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        http_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        http_socket.bind((self.host, self.port_http))
        http_socket.listen(5)
        http_socket.setblocking(False)
        self.selector.register(http_socket, selectors.EVENT_READ, self.handle_http_accept)
        print(f"HTTP Server listening on {self.host}:{self.port_http}")
    def handle_tcp_accept(self, tcp_socket, mask):
        conn, addr = tcp_socket.accept()
        conn.setblocking(False)
        self.selector.register(conn, selectors.EVENT_READ, 
                              lambda conn, mask: self.handle_tcp(conn, mask, addr))
        print(f"TCP connection from {addr}")
    def handle_tcp(self, conn, mask, addr):
        try:
            data = conn.recv(1024)
            if data:
                print(f"TCP received from {addr}: {data.decode()}")
                response = f"TCP Server received: {data.decode()}"
                conn.sendall(response.encode())
            else:
                self.selector.unregister(conn)
                conn.close()
                print(f"TCP connection closed from {addr}")
        except Exception as e:
            self.selector.unregister(conn)
            conn.close()
            print(f"TCP error from {addr}: {e}")
    def handle_udp(self, udp_socket, mask):
        try:
            data, addr = udp_socket.recvfrom(1024)
            print(f"UDP received from {addr}: {data.decode()}")
            response = f"UDP Server received: {data.decode()}"
            udp_socket.sendto(response.encode(), addr)
        except Exception as e:
            print(f"UDP error: {e}")
    def handle_http_accept(self, http_socket, mask):
        conn, addr = http_socket.accept()
        conn.setblocking(False)
        self.selector.register(conn, selectors.EVENT_READ,
                              lambda conn, mask: self.handle_http(conn, mask, addr))
        print(f"HTTP connection from {addr}")
    def handle_http(self, conn, mask, addr):
        try:
            data = conn.recv(4096)
            if data:
                request = data.decode()
                print(f"HTTP request from {addr}")
                # 解析简单的HTTP请求
                lines = request.split('\r\n')
                if lines:
                    first_line = lines[0].split()
                    if len(first_line) >= 2:
                        path = first_line[1]
                        # 构建HTTP响应
                        response = f"""HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: {len(self.generate_html(path))}
{self.generate_html(path)}"""
                        conn.sendall(response.encode())
            else:
                self.selector.unregister(conn)
                conn.close()
        except Exception as e:
            self.selector.unregister(conn)
            conn.close()
            print(f"HTTP error from {addr}: {e}")
    def generate_html(self, path):
        return f"""<html>
<head><title>Multi-Protocol Server</title></head>
<body>
<h1>Multi-Protocol Server</h1>
<p>Path: {path}</p>
<p>Available protocols: TCP, UDP, HTTP</p>
</body>
</html>"""
    def run(self):
        self.setup_servers()
        print("Server started. Press Ctrl+C to stop.")
        try:
            while True:
                events = self.selector.select()
                for key, mask in events:
                    callback = key.data
                    callback(key.fileobj, mask)
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.selector.close()
# 使用示例
if __name__ == "__main__":
    server = MultiProtocolServer()
    server.run()

使用 asyncio 实现异步多协议服务器

import asyncio
import json
class AsyncMultiProtocolServer:
    def __init__(self, host='0.0.0.0', port_tcp=8888, port_udp=8889, port_http=8080):
        self.host = host
        self.port_tcp = port_tcp
        self.port_udp = port_udp
        self.port_http = port_http
    async def handle_tcp_client(self, reader, writer):
        addr = writer.get_extra_info('peername')
        print(f"TCP connection from {addr}")
        try:
            while True:
                data = await reader.read(1024)
                if not data:
                    break
                message = data.decode()
                print(f"TCP received from {addr}: {message}")
                # 处理不同的消息类型
                if message.startswith('json:'):
                    response = self.process_json_message(message[5:])
                else:
                    response = f"TCP Echo: {message}"
                writer.write(response.encode())
                await writer.drain()
        except asyncio.CancelledError:
            pass
        finally:
            writer.close()
            await writer.wait_closed()
            print(f"TCP connection closed from {addr}")
    def process_json_message(self, json_str):
        try:
            data = json.loads(json_str)
            # 处理JSON消息
            return json.dumps({"status": "OK", "received": data})
        except json.JSONDecodeError:
            return json.dumps({"status": "Error", "message": "Invalid JSON"})
    async def handle_udp(self):
        transport, protocol = await self.create_udp_endpoint()
        print(f"UDP Server started on {self.host}:{self.port_udp}")
        # UDP服务将持续运行
        while True:
            await asyncio.sleep(3600)  # 保持运行
    async def create_udp_endpoint(self):
        loop = asyncio.get_running_loop()
        class UDPProtocol:
            def connection_made(self, transport):
                self.transport = transport
            def datagram_received(self, data, addr):
                message = data.decode()
                print(f"UDP received from {addr}: {message}")
                response = f"UDP Echo: {message}"
                self.transport.sendto(response.encode(), addr)
        return await loop.create_datagram_endpoint(
            UDPProtocol,
            local_addr=(self.host, self.port_udp)
        )
    async def handle_http_client(self, reader, writer):
        addr = writer.get_extra_info('peername')
        print(f"HTTP connection from {addr}")
        try:
            request_data = await reader.read(4096)
            if request_data:
                request = request_data.decode()
                # 解析HTTP请求
                lines = request.split('\r\n')
                if lines:
                    first_line = lines[0].split()
                    if len(first_line) >= 2:
                        method = first_line[0]
                        path = first_line[1]
                        # 处理HTTP请求
                        if path == '/api/data':
                            response = self.handle_api_request(method, lines)
                        else:
                            response = self.generate_http_response(path)
                        writer.write(response.encode())
                        await writer.drain()
        except Exception as e:
            print(f"HTTP error: {e}")
        finally:
            writer.close()
            await writer.wait_closed()
    def handle_api_request(self, method, headers):
        if method == 'GET':
            return f"""HTTP/1.1 200 OK
Content-Type: application/json
Access-Control-Allow-Origin: *
{{"status": "success", "data": "API endpoint"}}"""
        else:
            return """HTTP/1.1 405 Method Not Allowed
Content-Type: text/plain
Method not allowed"""
    def generate_http_response(self, path):
        body = f"""<html>
<head><title>Async Multi-Protocol Server</title></head>
<body>
<h1>Async Multi-Protocol Server</h1>
<p>Path: {path}</p>
<p>Available protocols: TCP, UDP, HTTP</p>
</body>
</html>"""
        return f"""HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: {len(body)}
{body}"""
    async def start_servers(self):
        # 启动TCP服务器
        tcp_server = await asyncio.start_server(
            self.handle_tcp_client,
            self.host, self.port_tcp
        )
        print(f"TCP Server started on {self.host}:{self.port_tcp}")
        # 启动HTTP服务器
        http_server = await asyncio.start_server(
            self.handle_http_client,
            self.host, self.port_http
        )
        print(f"HTTP Server started on {self.host}:{self.port_http}")
        # 启动UDP服务器
        udp_task = asyncio.create_task(self.handle_udp())
        # 创建服务器任务列表
        servers = [
            asyncio.create_task(self.serve_forever(tcp_server)),
            asyncio.create_task(self.serve_forever(http_server)),
            udp_task
        ]
        print("All servers started. Press Ctrl+C to stop.")
        try:
            await asyncio.gather(*servers)
        except asyncio.CancelledError:
            pass
        finally:
            # 清理
            for task in servers:
                task.cancel()
    async def serve_forever(self, server):
        async with server:
            await server.serve_forever()
    def run(self):
        asyncio.run(self.start_servers())
# 使用示例
if __name__ == "__main__":
    server = AsyncMultiProtocolServer()
    server.run()

使用 socketserver 模块实现多协议服务器

import socketserver
import threading
import json
import http.server
class MultiProtocolServer:
    def __init__(self, host='0.0.0.0', port_tcp=8888, port_udp=8889, port_http=8080):
        self.host = host
        self.port_tcp = port_tcp
        self.port_udp = port_udp
        self.port_http = port_http
        self.servers = []
    class TCPHandler(socketserver.StreamRequestHandler):
        def handle(self):
            addr = self.client_address
            print(f"TCP connection from {addr}")
            try:
                while True:
                    data = self.rfile.readline().strip()
                    if not data:
                        break
                    message = data.decode()
                    print(f"TCP received: {message}")
                    response = f"TCP Server received: {message}\n"
                    self.wfile.write(response.encode())
                    self.wfile.flush()
            except Exception as e:
                print(f"TCP error: {e}")
            print(f"TCP connection closed from {addr}")
    class UDPHandler(socketserver.DatagramRequestHandler):
        def handle(self):
            data = self.rfile.read(1024)
            addr = self.client_address
            if data:
                message = data.decode()
                print(f"UDP received from {addr}: {message}")
                response = f"UDP Server received: {message}"
                self.wfile.write(response.encode())
    class HTTPHandler(http.server.BaseHTTPRequestHandler):
        def do_GET(self):
            if self.path == '/api':
                self.send_response(200)
                self.send_header('Content-Type', 'application/json')
                self.end_headers()
                response = json.dumps({"status": "OK", "protocol": "HTTP"})
                self.wfile.write(response.encode())
            else:
                self.send_response(200)
                self.send_header('Content-Type', 'text/html')
                self.end_headers()
                html = f"""<html>
<body>
<h1>Multi-Protocol Server</h1>
<p>Path: {self.path}</p>
</body>
</html>"""
                self.wfile.write(html.encode())
        def log_message(self, format, *args):
            print(f"HTTP: {args[0]} {args[1]} {args[2]}")
    def start_server(self, server_type, port, handler_class):
        if server_type == 'TCP':
            server = socketserver.ThreadingTCPServer((self.host, port), handler_class)
        elif server_type == 'UDP':
            server = socketserver.ThreadingUDPServer((self.host, port), handler_class)
        elif server_type == 'HTTP':
            server = http.server.HTTPServer((self.host, port), handler_class)
        thread = threading.Thread(target=server.serve_forever)
        thread.daemon = True
        thread.start()
        self.servers.append(server)
        print(f"{server_type} Server started on {self.host}:{port}")
        return server
    def run(self):
        # 启动所有服务器
        self.start_server('TCP', self.port_tcp, self.TCPHandler)
        self.start_server('UDP', self.port_udp, self.UDPHandler)
        self.start_server('HTTP', self.port_http, self.HTTPHandler)
        print("All servers running. Press Ctrl+C to stop.")
        try:
            # 保持主线程运行
            while True:
                threading.Event().wait(1)
        except KeyboardInterrupt:
            print("\nShutting down all servers...")
            for server in self.servers:
                server.shutdown()
# 使用示例
if __name__ == "__main__":
    server = MultiProtocolServer()
    server.run()

客户端测试代码

import socket
import threading
def test_tcp_client():
    """测试TCP客户端"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('localhost', 8888))
    sock.sendall(b"Hello TCP Server!")
    response = sock.recv(1024)
    print(f"TCP Response: {response.decode()}")
    sock.close()
def test_udp_client():
    """测试UDP客户端"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(b"Hello UDP Server!", ('localhost', 8889))
    data, addr = sock.recvfrom(1024)
    print(f"UDP Response: {data.decode()}")
    sock.close()
def test_http_client():
    """测试HTTP客户端"""
    import http.client
    conn = http.client.HTTPConnection('localhost', 8080)
    conn.request('GET', '/test')
    response = conn.getresponse()
    print(f"HTTP Response: {response.status}")
    print(response.read().decode())
    conn.close()
if __name__ == "__main__":
    # 并行测试所有协议
    threads = [
        threading.Thread(target=test_tcp_client),
        threading.Thread(target=test_udp_client),
        threading.Thread(target=test_http_client)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

选择建议

  1. selectors 模块: 适合需要精确控制IO事件的场景,性能较好
  2. asyncio: 现代Python异步编程的最佳实践,适合高并发场景
  3. socketserver: 使用简单,适合快速开发原型

选择哪种方案取决于你的具体需求:

  • 需要高性能?使用 asyncio
  • 需要简单易用?使用 socketserver
  • 需要精细控制?使用 selectors

上一篇当前分类已是最后一篇

下一篇你清楚如何用Python编写一个能解析网络包头的程序吗

抱歉,评论功能暂时关闭!