本文目录导读:
我来提供一个完整的Python异步聊天服务器实现,可以同时处理多个客户端:
使用 asyncio 实现(推荐)
import asyncio
import json
from typing import Dict, Set
class ChatServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.clients: Dict[str, asyncio.StreamWriter] = {} # 用户名 -> writer
self.nicknames: Dict[str, str] = {} # writer -> 用户名
async def broadcast(self, message: str, exclude=None):
"""向所有客户端广播消息"""
for username, writer in self.clients.items():
if username != exclude:
try:
writer.write(f"{message}\n".encode())
await writer.drain()
except:
# 移除已断开的客户端
await self.remove_client(username)
async def remove_client(self, username: str):
"""移除客户端"""
if username in self.clients:
writer = self.clients[username]
del self.clients[username]
if writer in self.nicknames:
del self.nicknames[writer]
writer.close()
await self.broadcast(f"系统:{username} 离开了聊天室")
async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""处理单个客户端连接"""
addr = writer.get_extra_info('peername')
print(f"新客户端连接: {addr}")
# 获取用户名
writer.write("欢迎来到聊天室!请输入你的用户名: ".encode())
await writer.drain()
data = await reader.readline()
username = data.decode().strip()
if not username or username in self.clients:
writer.write("用户名无效或已被使用,连接关闭。\n".encode())
await writer.drain()
writer.close()
return
# 注册用户
self.clients[username] = writer
self.nicknames[writer] = username
writer.write(f"欢迎 {username}!输入 /quit 退出聊天室\n".encode())
await writer.drain()
# 广播用户加入
await self.broadcast(f"系统:{username} 加入了聊天室", exclude=username)
try:
while True:
data = await reader.readline()
if not data:
break
message = data.decode().strip()
if message.lower() == '/quit':
break
elif message.startswith('/private'):
# 私聊功能
parts = message.split(' ', 2)
if len(parts) >= 3:
target = parts[1]
private_msg = parts[2]
if target in self.clients:
target_writer = self.clients[target]
target_writer.write(f"私聊 {username}: {private_msg}\n".encode())
await target_writer.drain()
writer.write(f"私聊 {target}: {private_msg}\n".encode())
await writer.drain()
else:
writer.write(f"用户 {target} 不存在\n".encode())
await writer.drain()
elif message.startswith('/list'):
# 列出在线用户
users = ', '.join(self.clients.keys())
writer.write(f"在线用户: {users}\n".encode())
await writer.drain()
else:
# 广播消息
await self.broadcast(f"{username}: {message}", exclude=username)
except asyncio.CancelledError:
pass
except Exception as e:
print(f"客户端错误: {e}")
finally:
await self.remove_client(username)
print(f"客户端断开: {addr}")
async def start(self):
"""启动服务器"""
server = await asyncio.start_server(
self.handle_client, self.host, self.port
)
addr = server.sockets[0].getsockname()
print(f"服务器启动在 {addr}")
async with server:
await server.serve_forever()
# 运行服务器
if __name__ == "__main__":
server = ChatServer()
try:
asyncio.run(server.start())
except KeyboardInterrupt:
print("服务器已关闭")
客户端实现
import asyncio
import sys
class ChatClient:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.reader = None
self.writer = None
async def receive_messages(self):
"""异步接收服务器消息"""
try:
while True:
data = await self.reader.readline()
if not data:
print("服务器已断开连接")
break
message = data.decode().strip()
sys.stdout.write(f"\r{'':<50}\r{message}\n> ")
sys.stdout.flush()
except asyncio.CancelledError:
pass
except Exception as e:
print(f"接收消息错误: {e}")
async def send_messages(self):
"""异步发送消息"""
try:
while True:
# 使用 asyncio 的非阻塞输入
loop = asyncio.get_event_loop()
message = await loop.run_in_executor(None, sys.stdin.readline)
if not message.strip():
continue
self.writer.write(message.encode())
await self.writer.drain()
if message.strip().lower() == '/quit':
break
except asyncio.CancelledError:
pass
except Exception as e:
print(f"发送消息错误: {e}")
async def start(self):
"""启动客户端"""
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
print(f"连接到 {self.host}:{self.port}")
# 创建接收和发送任务
receive_task = asyncio.create_task(self.receive_messages())
send_task = asyncio.create_task(self.send_messages())
try:
await asyncio.gather(receive_task, send_task)
except:
pass
finally:
receive_task.cancel()
send_task.cancel()
self.writer.close()
# 运行客户端
if __name__ == "__main__":
client = ChatClient()
try:
asyncio.run(client.start())
except KeyboardInterrupt:
print("客户端已断开")
使用 threading 的简化版本
如果不想使用 asyncio,也可以使用 threading:
import socket
import threading
from typing import Dict
class ThreadedChatServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.clients: Dict[str, socket.socket] = {}
self.lock = threading.Lock()
def broadcast(self, message: str, exclude=None):
"""广播消息到所有客户端"""
with self.lock:
for username, client_socket in self.clients.items():
if username != exclude:
try:
client_socket.send(f"{message}\n".encode())
except:
self.remove_client(username)
def remove_client(self, username: str):
"""移除客户端"""
with self.lock:
if username in self.clients:
client_socket = self.clients[username]
del self.clients[username]
client_socket.close()
self.broadcast(f"系统:{username} 离开了聊天室")
def handle_client(self, client_socket: socket.socket, addr):
"""处理单个客户端"""
try:
# 获取用户名
client_socket.send("欢迎来到聊天室!请输入你的用户名: ".encode())
username = client_socket.recv(1024).decode().strip()
with self.lock:
if username in self.clients:
client_socket.send("用户名已被使用\n".encode())
client_socket.close()
return
self.clients[username] = client_socket
client_socket.send(f"欢迎 {username}!输入 /quit 退出\n".encode())
self.broadcast(f"系统:{username} 加入了聊天室", exclude=username)
while True:
data = client_socket.recv(1024)
if not data:
break
message = data.decode().strip()
if message.lower() == '/quit':
break
elif message.startswith('/list'):
users = ', '.join(self.clients.keys())
client_socket.send(f"在线用户: {users}\n".encode())
else:
self.broadcast(f"{username}: {message}", exclude=username)
except Exception as e:
print(f"客户端错误: {e}")
finally:
self.remove_client(username)
client_socket.close()
def start(self):
"""启动服务器"""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"服务器启动在 {self.host}:{self.port}")
try:
while True:
client_socket, addr = self.server_socket.accept()
print(f"新客户端连接: {addr}")
thread = threading.Thread(
target=self.handle_client,
args=(client_socket, addr)
)
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("服务器已关闭")
finally:
self.server_socket.close()
# 运行
if __name__ == "__main__":
server = ThreadedChatServer()
server.start()
使用说明
-
安装:无需额外安装,使用 Python 标准库
-
运行服务器:
python chat_server.py
-
运行多个客户端(打开多个终端):
python chat_client.py
功能特点
- 多客户端支持:可以同时处理多个客户端连接
- 用户名系统:每个用户需要输入唯一的用户名
- 广播消息:消息会发送给所有在线用户
- 私聊功能:支持
/private username message私聊 - 用户列表:使用
/list查看在线用户 - 退出命令:使用
/quit退出聊天室
第一个 asyncio 版本性能更好,第二个 threading 版本更简单易懂,根据你的需求选择合适的版本。
标签: Socket