本文目录导读:
我来为你展示如何使用 Python 的 asyncore 模块实现异步网络服务。
基础异步TCP服务器
import asyncore
import socket
class EchoHandler(asyncore.dispatcher_with_send):
"""处理客户端连接的处理器"""
def handle_read(self):
"""当有数据可读时调用"""
data = self.recv(1024)
if data:
print(f"收到数据: {data.decode('utf-8')}")
# 将数据回显给客户端
self.send(data)
else:
self.close()
class EchoServer(asyncore.dispatcher):
"""异步TCP服务器"""
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
print(f"服务器启动在 {host}:{port}")
def handle_accept(self):
"""当有新的客户端连接时调用"""
pair = self.accept()
if pair is not None:
sock, addr = pair
print(f"新连接来自: {addr}")
handler = EchoHandler(sock)
# 使用示例
if __name__ == "__main__":
server = EchoServer('localhost', 8888)
asyncore.loop()
更完整的聊天服务器
import asyncore
import socket
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
class ChatClient(asyncore.dispatcher_with_send):
"""聊天客户端处理器"""
def __init__(self, sock, server):
asyncore.dispatcher_with_send.__init__(self, sock)
self.server = server
self.nickname = f"用户{id(self)}"
self.buffer = ""
def handle_read(self):
"""处理接收到的数据"""
data = self.recv(1024)
if data:
message = data.decode('utf-8').strip()
logger.info(f"从 {self.nickname} 收到: {message}")
# 处理特殊命令
if message.startswith('/'):
self.handle_command(message)
else:
# 广播消息给所有客户端
self.server.broadcast(f"{self.nickname}: {message}", self)
else:
self.handle_close()
def handle_command(self, command):
"""处理特殊命令"""
cmd_parts = command.split()
cmd = cmd_parts[0].lower()
if cmd == '/nick' and len(cmd_parts) > 1:
old_nick = self.nickname
self.nickname = cmd_parts[1]
self.server.broadcast(f"{old_nick} 改名为 {self.nickname}", self)
elif cmd == '/list':
users = self.server.get_user_list()
self.send(f"在线用户: {', '.join(users)}".encode())
elif cmd == '/quit':
self.handle_close()
elif cmd == '/help':
help_text = """
可用命令:
/nick <名称> - 修改昵称
/list - 查看在线用户
/quit - 退出聊天室
/help - 显示帮助
"""
self.send(help_text.encode())
def handle_close(self):
"""处理客户端断开连接"""
logger.info(f"{self.nickname} 断开连接")
self.server.remove_client(self)
self.server.broadcast(f"{self.nickname} 离开了聊天室", self)
self.close()
class ChatServer(asyncore.dispatcher):
"""聊天服务器"""
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.clients = []
logger.info(f"聊天服务器启动在 {host}:{port}")
def handle_accept(self):
"""处理新的客户端连接"""
pair = self.accept()
if pair is not None:
sock, addr = pair
logger.info(f"新客户端连接: {addr}")
client = ChatClient(sock, self)
self.clients.append(client)
self.broadcast(f"欢迎 {client.nickname} 加入聊天室!", client)
def broadcast(self, message, sender=None):
"""广播消息给所有客户端"""
full_message = message.encode('utf-8')
for client in self.clients:
if client is not sender: # 不发送给发送者自己
try:
client.send(full_message)
except:
self.remove_client(client)
def remove_client(self, client):
"""移除客户端"""
if client in self.clients:
self.clients.remove(client)
def get_user_list(self):
"""获取在线用户列表"""
return [client.nickname for client in self.clients]
if __name__ == "__main__":
server = ChatServer('localhost', 8889)
try:
asyncore.loop()
except KeyboardInterrupt:
logger.info("服务器关闭")
# 简单客户端测试
import socket
import threading
def test_client():
"""测试用的客户端"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 8889))
# 发送消息
sock.send(f"Hello from client {id(sock)}".encode())
# 接收响应
response = sock.recv(4096)
print(f"收到: {response.decode()}")
sock.close()
# 启动测试客户端
if __name__ == "__main__":
import threading
# 在实际测试中,先启动服务器,然后运行这个
print("请先运行服务器,然后取消以下注释来测试")
# threading.Thread(target=test_client).start()
异步HTTP服务器
import asyncore
import socket
import os
from datetime import datetime
class HTTPHandler(asyncore.dispatcher_with_send):
"""HTTP请求处理器"""
def __init__(self, sock, server):
asyncore.dispatcher_with_send.__init__(self, sock)
self.server = server
self.request_data = b""
def handle_read(self):
"""处理HTTP请求"""
chunk = self.recv(8192)
if chunk:
self.request_data += chunk
# 检查是否接收完整请求
if b"\r\n\r\n" in self.request_data:
self.process_request()
else:
self.handle_close()
def process_request(self):
"""处理HTTP请求"""
request_text = self.request_data.decode('utf-8')
lines = request_text.split('\r\n')
if lines:
# 解析请求行
request_line = lines[0]
parts = request_line.split()
if len(parts) >= 2:
method = parts[0]
path = parts[1]
# 处理不同的路由
if path == '/':
self.send_response(200, '<h1>Hello, World!</h1>')
elif path == '/time':
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.send_response(200, f'<h1>当前时间: {current_time}</h1>')
elif path == '/info':
info = f"""
<h2>服务器信息</h2>
<ul>
<li>主机: {socket.gethostname()}</li>
<li>时间: {datetime.now()}</li>
<li>协议: HTTP/1.1</li>
</ul>
"""
self.send_response(200, info)
else:
self.send_response(404, '<h1>404 - 页面未找到</h1>')
else:
self.send_response(400, '<h1>400 - 错误请求</h1>')
def send_response(self, status_code, body):
"""发送HTTP响应"""
status_messages = {
200: "OK",
400: "Bad Request",
404: "Not Found",
500: "Internal Server Error"
}
status_message = status_messages.get(status_code, "Unknown")
response = f"HTTP/1.1 {status_code} {status_message}\r\n"
response += "Content-Type: text/html; charset=utf-8\r\n"
response += f"Content-Length: {len(body.encode())}\r\n"
response += "Connection: close\r\n"
response += "\r\n"
response += body
self.send(response.encode())
self.handle_close()
def handle_close(self):
"""关闭连接"""
self.close()
class HTTPServer(asyncore.dispatcher):
"""异步HTTP服务器"""
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
print(f"HTTP服务器启动在 http://{host}:{port}")
def handle_accept(self):
"""处理新的HTTP连接"""
pair = self.accept()
if pair is not None:
sock, addr = pair
print(f"新HTTP请求来自: {addr}")
handler = HTTPHandler(sock, self)
if __name__ == "__main__":
server = HTTPServer('localhost', 8080)
try:
asyncore.loop()
except KeyboardInterrupt:
print("服务器关闭")
高级功能示例:带超时的客户端管理
import asyncore
import socket
import time
class TimedClientHandler(asyncore.dispatcher_with_send):
"""带超时管理的客户端"""
def __init__(self, sock, server, timeout=30):
asyncore.dispatcher_with_send.__init__(self, sock)
self.server = server
self.timeout = timeout
self.last_activity = time.time()
self.buffer = ""
def handle_read(self):
"""处理读取,同时更新活动时间"""
data = self.recv(1024)
if data:
self.last_activity = time.time()
message = data.decode('utf-8').strip()
print(f"收到消息: {message}")
# 回显消息
self.send(f"服务器回复: {message}".encode())
else:
self.handle_close()
def check_timeout(self):
"""检查是否超时"""
if time.time() - self.last_activity > self.timeout:
print(f"客户端 {id(self)} 超时,关闭连接")
self.send("连接超时,服务器关闭连接".encode())
self.handle_close()
return True
return False
class TimeoutServer(asyncore.dispatcher):
"""带超时检查的服务器"""
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.clients = []
self.last_check = time.time()
print(f"超时检查服务器启动在 {host}:{port}")
def handle_accept(self):
"""处理新连接"""
pair = self.accept()
if pair is not None:
sock, addr = pair
print(f"新连接: {addr}")
client = TimedClientHandler(sock, self)
self.clients.append(client)
def writable(self):
"""定期检查超时连接"""
current_time = time.time()
if current_time - self.last_check > 5: # 每5秒检查一次
self.last_check = current_time
# 检查所有客户端的超时
self.clients = [c for c in self.clients if not c.check_timeout()]
return False # 不需要写入
if __name__ == "__main__":
server = TimeoutServer('localhost', 8890)
try:
asyncore.loop()
except KeyboardInterrupt:
print("服务器关闭")
主要特点说明:
-
基础架构: 使用
asyncore.dispatcher作为服务器,asyncore.dispatcher_with_send处理客户端 -
事件驱动: 通过
handle_accept(),handle_read(),handle_close()等方法处理事件 -
非阻塞I/O: 所有操作都是非阻塞的,支持并发处理多个客户端
-
易于扩展: 可以根据需要添加自定义处理方法
运行说明:
-
运行服务器:
python asyncore_server.py
-
测试连接(使用 telnet):
telnet localhost 8889
-
或使用 Python 的 socket 客户端测试
asyncore 模块虽然功能相对简单,但对于学习异步编程概念和理解事件驱动架构很有帮助,在实际生产环境中,建议使用 asyncio 等更现代的异步框架。
标签: 异步网络服务