本文目录导读:
我来介绍一下如何使用Python实现多协议网络服务端,这里有几种常用方案:
使用 selectors 模块实现多协议支持
import selectors
import socket
import json
import threading
class MultiProtocolServer:
def __init__(self, host='0.0.0.0', port_tcp=8888, port_udp=8889, port_http=8080):
self.host = host
self.port_tcp = port_tcp
self.port_udp = port_udp
self.port_http = port_http
self.selector = selectors.DefaultSelector()
def setup_servers(self):
# TCP 服务器
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_socket.bind((self.host, self.port_tcp))
tcp_socket.listen(5)
tcp_socket.setblocking(False)
self.selector.register(tcp_socket, selectors.EVENT_READ, self.handle_tcp_accept)
print(f"TCP Server listening on {self.host}:{self.port_tcp}")
# UDP 服务器
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind((self.host, self.port_udp))
udp_socket.setblocking(False)
self.selector.register(udp_socket, selectors.EVENT_READ, self.handle_udp)
print(f"UDP Server listening on {self.host}:{self.port_udp}")
# HTTP 服务器(简化版)
http_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
http_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
http_socket.bind((self.host, self.port_http))
http_socket.listen(5)
http_socket.setblocking(False)
self.selector.register(http_socket, selectors.EVENT_READ, self.handle_http_accept)
print(f"HTTP Server listening on {self.host}:{self.port_http}")
def handle_tcp_accept(self, tcp_socket, mask):
conn, addr = tcp_socket.accept()
conn.setblocking(False)
self.selector.register(conn, selectors.EVENT_READ,
lambda conn, mask: self.handle_tcp(conn, mask, addr))
print(f"TCP connection from {addr}")
def handle_tcp(self, conn, mask, addr):
try:
data = conn.recv(1024)
if data:
print(f"TCP received from {addr}: {data.decode()}")
response = f"TCP Server received: {data.decode()}"
conn.sendall(response.encode())
else:
self.selector.unregister(conn)
conn.close()
print(f"TCP connection closed from {addr}")
except Exception as e:
self.selector.unregister(conn)
conn.close()
print(f"TCP error from {addr}: {e}")
def handle_udp(self, udp_socket, mask):
try:
data, addr = udp_socket.recvfrom(1024)
print(f"UDP received from {addr}: {data.decode()}")
response = f"UDP Server received: {data.decode()}"
udp_socket.sendto(response.encode(), addr)
except Exception as e:
print(f"UDP error: {e}")
def handle_http_accept(self, http_socket, mask):
conn, addr = http_socket.accept()
conn.setblocking(False)
self.selector.register(conn, selectors.EVENT_READ,
lambda conn, mask: self.handle_http(conn, mask, addr))
print(f"HTTP connection from {addr}")
def handle_http(self, conn, mask, addr):
try:
data = conn.recv(4096)
if data:
request = data.decode()
print(f"HTTP request from {addr}")
# 解析简单的HTTP请求
lines = request.split('\r\n')
if lines:
first_line = lines[0].split()
if len(first_line) >= 2:
path = first_line[1]
# 构建HTTP响应
response = f"""HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: {len(self.generate_html(path))}
{self.generate_html(path)}"""
conn.sendall(response.encode())
else:
self.selector.unregister(conn)
conn.close()
except Exception as e:
self.selector.unregister(conn)
conn.close()
print(f"HTTP error from {addr}: {e}")
def generate_html(self, path):
return f"""<html>
<head><title>Multi-Protocol Server</title></head>
<body>
<h1>Multi-Protocol Server</h1>
<p>Path: {path}</p>
<p>Available protocols: TCP, UDP, HTTP</p>
</body>
</html>"""
def run(self):
self.setup_servers()
print("Server started. Press Ctrl+C to stop.")
try:
while True:
events = self.selector.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.selector.close()
# 使用示例
if __name__ == "__main__":
server = MultiProtocolServer()
server.run()
使用 asyncio 实现异步多协议服务器
import asyncio
import json
class AsyncMultiProtocolServer:
def __init__(self, host='0.0.0.0', port_tcp=8888, port_udp=8889, port_http=8080):
self.host = host
self.port_tcp = port_tcp
self.port_udp = port_udp
self.port_http = port_http
async def handle_tcp_client(self, reader, writer):
addr = writer.get_extra_info('peername')
print(f"TCP connection from {addr}")
try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode()
print(f"TCP received from {addr}: {message}")
# 处理不同的消息类型
if message.startswith('json:'):
response = self.process_json_message(message[5:])
else:
response = f"TCP Echo: {message}"
writer.write(response.encode())
await writer.drain()
except asyncio.CancelledError:
pass
finally:
writer.close()
await writer.wait_closed()
print(f"TCP connection closed from {addr}")
def process_json_message(self, json_str):
try:
data = json.loads(json_str)
# 处理JSON消息
return json.dumps({"status": "OK", "received": data})
except json.JSONDecodeError:
return json.dumps({"status": "Error", "message": "Invalid JSON"})
async def handle_udp(self):
transport, protocol = await self.create_udp_endpoint()
print(f"UDP Server started on {self.host}:{self.port_udp}")
# UDP服务将持续运行
while True:
await asyncio.sleep(3600) # 保持运行
async def create_udp_endpoint(self):
loop = asyncio.get_running_loop()
class UDPProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print(f"UDP received from {addr}: {message}")
response = f"UDP Echo: {message}"
self.transport.sendto(response.encode(), addr)
return await loop.create_datagram_endpoint(
UDPProtocol,
local_addr=(self.host, self.port_udp)
)
async def handle_http_client(self, reader, writer):
addr = writer.get_extra_info('peername')
print(f"HTTP connection from {addr}")
try:
request_data = await reader.read(4096)
if request_data:
request = request_data.decode()
# 解析HTTP请求
lines = request.split('\r\n')
if lines:
first_line = lines[0].split()
if len(first_line) >= 2:
method = first_line[0]
path = first_line[1]
# 处理HTTP请求
if path == '/api/data':
response = self.handle_api_request(method, lines)
else:
response = self.generate_http_response(path)
writer.write(response.encode())
await writer.drain()
except Exception as e:
print(f"HTTP error: {e}")
finally:
writer.close()
await writer.wait_closed()
def handle_api_request(self, method, headers):
if method == 'GET':
return f"""HTTP/1.1 200 OK
Content-Type: application/json
Access-Control-Allow-Origin: *
{{"status": "success", "data": "API endpoint"}}"""
else:
return """HTTP/1.1 405 Method Not Allowed
Content-Type: text/plain
Method not allowed"""
def generate_http_response(self, path):
body = f"""<html>
<head><title>Async Multi-Protocol Server</title></head>
<body>
<h1>Async Multi-Protocol Server</h1>
<p>Path: {path}</p>
<p>Available protocols: TCP, UDP, HTTP</p>
</body>
</html>"""
return f"""HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: {len(body)}
{body}"""
async def start_servers(self):
# 启动TCP服务器
tcp_server = await asyncio.start_server(
self.handle_tcp_client,
self.host, self.port_tcp
)
print(f"TCP Server started on {self.host}:{self.port_tcp}")
# 启动HTTP服务器
http_server = await asyncio.start_server(
self.handle_http_client,
self.host, self.port_http
)
print(f"HTTP Server started on {self.host}:{self.port_http}")
# 启动UDP服务器
udp_task = asyncio.create_task(self.handle_udp())
# 创建服务器任务列表
servers = [
asyncio.create_task(self.serve_forever(tcp_server)),
asyncio.create_task(self.serve_forever(http_server)),
udp_task
]
print("All servers started. Press Ctrl+C to stop.")
try:
await asyncio.gather(*servers)
except asyncio.CancelledError:
pass
finally:
# 清理
for task in servers:
task.cancel()
async def serve_forever(self, server):
async with server:
await server.serve_forever()
def run(self):
asyncio.run(self.start_servers())
# 使用示例
if __name__ == "__main__":
server = AsyncMultiProtocolServer()
server.run()
使用 socketserver 模块实现多协议服务器
import socketserver
import threading
import json
import http.server
class MultiProtocolServer:
def __init__(self, host='0.0.0.0', port_tcp=8888, port_udp=8889, port_http=8080):
self.host = host
self.port_tcp = port_tcp
self.port_udp = port_udp
self.port_http = port_http
self.servers = []
class TCPHandler(socketserver.StreamRequestHandler):
def handle(self):
addr = self.client_address
print(f"TCP connection from {addr}")
try:
while True:
data = self.rfile.readline().strip()
if not data:
break
message = data.decode()
print(f"TCP received: {message}")
response = f"TCP Server received: {message}\n"
self.wfile.write(response.encode())
self.wfile.flush()
except Exception as e:
print(f"TCP error: {e}")
print(f"TCP connection closed from {addr}")
class UDPHandler(socketserver.DatagramRequestHandler):
def handle(self):
data = self.rfile.read(1024)
addr = self.client_address
if data:
message = data.decode()
print(f"UDP received from {addr}: {message}")
response = f"UDP Server received: {message}"
self.wfile.write(response.encode())
class HTTPHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/api':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
response = json.dumps({"status": "OK", "protocol": "HTTP"})
self.wfile.write(response.encode())
else:
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.end_headers()
html = f"""<html>
<body>
<h1>Multi-Protocol Server</h1>
<p>Path: {self.path}</p>
</body>
</html>"""
self.wfile.write(html.encode())
def log_message(self, format, *args):
print(f"HTTP: {args[0]} {args[1]} {args[2]}")
def start_server(self, server_type, port, handler_class):
if server_type == 'TCP':
server = socketserver.ThreadingTCPServer((self.host, port), handler_class)
elif server_type == 'UDP':
server = socketserver.ThreadingUDPServer((self.host, port), handler_class)
elif server_type == 'HTTP':
server = http.server.HTTPServer((self.host, port), handler_class)
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()
self.servers.append(server)
print(f"{server_type} Server started on {self.host}:{port}")
return server
def run(self):
# 启动所有服务器
self.start_server('TCP', self.port_tcp, self.TCPHandler)
self.start_server('UDP', self.port_udp, self.UDPHandler)
self.start_server('HTTP', self.port_http, self.HTTPHandler)
print("All servers running. Press Ctrl+C to stop.")
try:
# 保持主线程运行
while True:
threading.Event().wait(1)
except KeyboardInterrupt:
print("\nShutting down all servers...")
for server in self.servers:
server.shutdown()
# 使用示例
if __name__ == "__main__":
server = MultiProtocolServer()
server.run()
客户端测试代码
import socket
import threading
def test_tcp_client():
"""测试TCP客户端"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 8888))
sock.sendall(b"Hello TCP Server!")
response = sock.recv(1024)
print(f"TCP Response: {response.decode()}")
sock.close()
def test_udp_client():
"""测试UDP客户端"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(b"Hello UDP Server!", ('localhost', 8889))
data, addr = sock.recvfrom(1024)
print(f"UDP Response: {data.decode()}")
sock.close()
def test_http_client():
"""测试HTTP客户端"""
import http.client
conn = http.client.HTTPConnection('localhost', 8080)
conn.request('GET', '/test')
response = conn.getresponse()
print(f"HTTP Response: {response.status}")
print(response.read().decode())
conn.close()
if __name__ == "__main__":
# 并行测试所有协议
threads = [
threading.Thread(target=test_tcp_client),
threading.Thread(target=test_udp_client),
threading.Thread(target=test_http_client)
]
for t in threads:
t.start()
for t in threads:
t.join()
选择建议
- selectors 模块: 适合需要精确控制IO事件的场景,性能较好
- asyncio: 现代Python异步编程的最佳实践,适合高并发场景
- socketserver: 使用简单,适合快速开发原型
选择哪种方案取决于你的具体需求:
- 需要高性能?使用 asyncio
- 需要简单易用?使用 socketserver
- 需要精细控制?使用 selectors