如何用Python实现一个能同时处理多个客户端的聊天服务器

访客 网络编程 1

本文目录导读:

  1. 使用 asyncio 实现(推荐)
  2. 客户端实现
  3. 使用 threading 的简化版本
  4. 使用说明
  5. 功能特点

我来提供一个完整的Python异步聊天服务器实现,可以同时处理多个客户端:

使用 asyncio 实现(推荐)

import asyncio
import json
from typing import Dict, Set
class ChatServer:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.clients: Dict[str, asyncio.StreamWriter] = {}  # 用户名 -> writer
        self.nicknames: Dict[str, str] = {}  # writer -> 用户名
    async def broadcast(self, message: str, exclude=None):
        """向所有客户端广播消息"""
        for username, writer in self.clients.items():
            if username != exclude:
                try:
                    writer.write(f"{message}\n".encode())
                    await writer.drain()
                except:
                    # 移除已断开的客户端
                    await self.remove_client(username)
    async def remove_client(self, username: str):
        """移除客户端"""
        if username in self.clients:
            writer = self.clients[username]
            del self.clients[username]
            if writer in self.nicknames:
                del self.nicknames[writer]
            writer.close()
            await self.broadcast(f"系统:{username} 离开了聊天室")
    async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        """处理单个客户端连接"""
        addr = writer.get_extra_info('peername')
        print(f"新客户端连接: {addr}")
        # 获取用户名
        writer.write("欢迎来到聊天室!请输入你的用户名: ".encode())
        await writer.drain()
        data = await reader.readline()
        username = data.decode().strip()
        if not username or username in self.clients:
            writer.write("用户名无效或已被使用,连接关闭。\n".encode())
            await writer.drain()
            writer.close()
            return
        # 注册用户
        self.clients[username] = writer
        self.nicknames[writer] = username
        writer.write(f"欢迎 {username}!输入 /quit 退出聊天室\n".encode())
        await writer.drain()
        # 广播用户加入
        await self.broadcast(f"系统:{username} 加入了聊天室", exclude=username)
        try:
            while True:
                data = await reader.readline()
                if not data:
                    break
                message = data.decode().strip()
                if message.lower() == '/quit':
                    break
                elif message.startswith('/private'):
                    # 私聊功能
                    parts = message.split(' ', 2)
                    if len(parts) >= 3:
                        target = parts[1]
                        private_msg = parts[2]
                        if target in self.clients:
                            target_writer = self.clients[target]
                            target_writer.write(f"私聊 {username}: {private_msg}\n".encode())
                            await target_writer.drain()
                            writer.write(f"私聊 {target}: {private_msg}\n".encode())
                            await writer.drain()
                        else:
                            writer.write(f"用户 {target} 不存在\n".encode())
                            await writer.drain()
                elif message.startswith('/list'):
                    # 列出在线用户
                    users = ', '.join(self.clients.keys())
                    writer.write(f"在线用户: {users}\n".encode())
                    await writer.drain()
                else:
                    # 广播消息
                    await self.broadcast(f"{username}: {message}", exclude=username)
        except asyncio.CancelledError:
            pass
        except Exception as e:
            print(f"客户端错误: {e}")
        finally:
            await self.remove_client(username)
            print(f"客户端断开: {addr}")
    async def start(self):
        """启动服务器"""
        server = await asyncio.start_server(
            self.handle_client, self.host, self.port
        )
        addr = server.sockets[0].getsockname()
        print(f"服务器启动在 {addr}")
        async with server:
            await server.serve_forever()
# 运行服务器
if __name__ == "__main__":
    server = ChatServer()
    try:
        asyncio.run(server.start())
    except KeyboardInterrupt:
        print("服务器已关闭")

客户端实现

import asyncio
import sys
class ChatClient:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.reader = None
        self.writer = None
    async def receive_messages(self):
        """异步接收服务器消息"""
        try:
            while True:
                data = await self.reader.readline()
                if not data:
                    print("服务器已断开连接")
                    break
                message = data.decode().strip()
                sys.stdout.write(f"\r{'':<50}\r{message}\n> ")
                sys.stdout.flush()
        except asyncio.CancelledError:
            pass
        except Exception as e:
            print(f"接收消息错误: {e}")
    async def send_messages(self):
        """异步发送消息"""
        try:
            while True:
                # 使用 asyncio 的非阻塞输入
                loop = asyncio.get_event_loop()
                message = await loop.run_in_executor(None, sys.stdin.readline)
                if not message.strip():
                    continue
                self.writer.write(message.encode())
                await self.writer.drain()
                if message.strip().lower() == '/quit':
                    break
        except asyncio.CancelledError:
            pass
        except Exception as e:
            print(f"发送消息错误: {e}")
    async def start(self):
        """启动客户端"""
        self.reader, self.writer = await asyncio.open_connection(
            self.host, self.port
        )
        print(f"连接到 {self.host}:{self.port}")
        # 创建接收和发送任务
        receive_task = asyncio.create_task(self.receive_messages())
        send_task = asyncio.create_task(self.send_messages())
        try:
            await asyncio.gather(receive_task, send_task)
        except:
            pass
        finally:
            receive_task.cancel()
            send_task.cancel()
            self.writer.close()
# 运行客户端
if __name__ == "__main__":
    client = ChatClient()
    try:
        asyncio.run(client.start())
    except KeyboardInterrupt:
        print("客户端已断开")

使用 threading 的简化版本

如果不想使用 asyncio,也可以使用 threading:

import socket
import threading
from typing import Dict
class ThreadedChatServer:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.clients: Dict[str, socket.socket] = {}
        self.lock = threading.Lock()
    def broadcast(self, message: str, exclude=None):
        """广播消息到所有客户端"""
        with self.lock:
            for username, client_socket in self.clients.items():
                if username != exclude:
                    try:
                        client_socket.send(f"{message}\n".encode())
                    except:
                        self.remove_client(username)
    def remove_client(self, username: str):
        """移除客户端"""
        with self.lock:
            if username in self.clients:
                client_socket = self.clients[username]
                del self.clients[username]
                client_socket.close()
                self.broadcast(f"系统:{username} 离开了聊天室")
    def handle_client(self, client_socket: socket.socket, addr):
        """处理单个客户端"""
        try:
            # 获取用户名
            client_socket.send("欢迎来到聊天室!请输入你的用户名: ".encode())
            username = client_socket.recv(1024).decode().strip()
            with self.lock:
                if username in self.clients:
                    client_socket.send("用户名已被使用\n".encode())
                    client_socket.close()
                    return
                self.clients[username] = client_socket
            client_socket.send(f"欢迎 {username}!输入 /quit 退出\n".encode())
            self.broadcast(f"系统:{username} 加入了聊天室", exclude=username)
            while True:
                data = client_socket.recv(1024)
                if not data:
                    break
                message = data.decode().strip()
                if message.lower() == '/quit':
                    break
                elif message.startswith('/list'):
                    users = ', '.join(self.clients.keys())
                    client_socket.send(f"在线用户: {users}\n".encode())
                else:
                    self.broadcast(f"{username}: {message}", exclude=username)
        except Exception as e:
            print(f"客户端错误: {e}")
        finally:
            self.remove_client(username)
            client_socket.close()
    def start(self):
        """启动服务器"""
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        print(f"服务器启动在 {self.host}:{self.port}")
        try:
            while True:
                client_socket, addr = self.server_socket.accept()
                print(f"新客户端连接: {addr}")
                thread = threading.Thread(
                    target=self.handle_client, 
                    args=(client_socket, addr)
                )
                thread.daemon = True
                thread.start()
        except KeyboardInterrupt:
            print("服务器已关闭")
        finally:
            self.server_socket.close()
# 运行
if __name__ == "__main__":
    server = ThreadedChatServer()
    server.start()

使用说明

  1. 安装:无需额外安装,使用 Python 标准库

  2. 运行服务器

    python chat_server.py
  3. 运行多个客户端(打开多个终端):

    python chat_client.py

功能特点

  • 多客户端支持:可以同时处理多个客户端连接
  • 用户名系统:每个用户需要输入唯一的用户名
  • 广播消息:消息会发送给所有在线用户
  • 私聊功能:支持 /private username message 私聊
  • 用户列表:使用 /list 查看在线用户
  • 退出命令:使用 /quit 退出聊天室

第一个 asyncio 版本性能更好,第二个 threading 版本更简单易懂,根据你的需求选择合适的版本。

标签: Socket

抱歉,评论功能暂时关闭!