Python队列结构案例实操:从基础到多线程实战指南
目录导读
- 队列数据结构核心概念
- 1 什么是队列(FIFO原则)
- 2 Python内置队列类型对比
- 基础案例:列表与
queue.Queue实现- 1 列表模拟队列的局限
- 2
queue.Queue标准用法
- 多线程队列实战:生产者-消费者模式
- 1 场景设计
- 2 完整代码与运行结果
- 进阶案例:优先级队列与延迟队列
- 1 使用
PriorityQueue实现任务排序 - 2 模拟延迟执行队列
- 1 使用
- 常见问题解答(FAQ)
- 本文总结
队列数据结构核心概念
队列(Queue) 是一种遵循 先进先出(FIFO, First In First Out) 原则的线性数据结构,就像超市排队结账:先排队的人先服务,在编程中,队列常用于任务调度、缓冲数据以及解耦生产者和消费者。
1 Python中三大队列类型
| 队列类型 | 类名 | 特性 | 适用场景 |
|---|---|---|---|
| 普通队列 | queue.Queue |
线程安全,FIFO | 多线程任务管理 |
| 优先级队列 | queue.PriorityQueue |
按优先级出队(最小优先) | 任务调度、A*算法 |
| 后进先出队列 | queue.LifoQueue |
类似栈,LIFO | 深度优先搜索 |
问:Python中
queue.Queue和collections.deque有什么区别?
答:deque是高效的双端队列,但不是线程安全的;queue.Queue内部对锁进行了封装,允许多线程安全访问,单线程场景推荐deque(性能更高),多线程必须用queue.Queue。
基础案例:列表与queue.Queue实现
1 列表模拟队列的局限
# 用列表模拟队列(不推荐)
queue_list = []
queue_list.append("任务1") # 入队
queue_list.append("任务2")
item = queue_list.pop(0) # 出队(O(n)复杂度)
问题:pop(0)会导致所有元素向前移动,时间复杂度为O(n),当队列数据量较大时性能急剧下降。
2 queue.Queue标准用法
from queue import Queue
# 创建队列,最大容量3
q = Queue(maxsize=3)
# 入队(如果满则阻塞)
q.put("任务A")
q.put("任务B")
q.put("任务C")
# 出队(如果空则阻塞)
print(q.get()) # 输出:任务A
print(q.qsize()) # 输出:2(剩余元素数)
print(q.empty()) # 输出:False
关键方法:
put(item, block=True, timeout=None):插入元素,队列满时默认阻塞等待。get(block=True, timeout=None):移除并返回元素,队列空时阻塞。
问:
put_nowait()与put()有什么区别?
答:put_nowait()不会阻塞,如果队列已满会直接抛出queue.Full异常;put()默认阻塞直到有空位。
多线程队列实战:生产者-消费者模式
1 场景设计
假设一个日志处理系统:生产者不断生成日志记录,消费者从队列中取出日志并写入文件,使用队列可以平衡两者的速度差异。
2 完整代码与运行结果
import threading
import queue
import time
import random
# 创建队列
log_queue = queue.Queue(maxsize=5)
def producer(name):
"""生产者线程:生成日志"""
for i in range(10):
log = f"[{name}] 日志#{i} - 时间: {time.time():.2f}"
log_queue.put(log)
print(f"生产者 {name} 入队: {log}")
time.sleep(random.uniform(0.1, 0.5))
# 发送停止信号(约定:None表示结束)
log_queue.put(None)
def consumer(name):
"""消费者线程:处理日志"""
while True:
log = log_queue.get()
if log is None: # 收到停止信号
log_queue.put(None) # 通知其他消费者
break
print(f"消费者 {name} 出队: {log}")
# 模拟写入文件耗时
time.sleep(0.2)
print(f"消费者 {name} 退出")
# 启动1个生产者,2个消费者
t1 = threading.Thread(target=producer, args=("P1",))
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
print("所有线程结束")
输出示例(部分):
生产者 P1 入队: [P1] 日志#0 - 时间: 170000.01
消费者 C1 出队: [P1] 日志#0 - 时间: 170000.01
消费者 C2 出队: [P1] 日志#1 - 时间: 170000.35
...
关键要点:
- 使用
None作为哨兵值通知消费者结束,如果有多个消费者,必须确保每个消费者都能接收到一个None(通常生产者发出与消费者数量相等的None)。 - 队列的
task_done()和join()可用于更精准的同步(本例未涉及)。
问:如果消费者处理速度比生产者慢会怎样?
答:队列会逐渐积累数据,直到maxsize上限,此时put()会阻塞生产者,形成自然限流(背压),防止内存被撑爆。
进阶案例:优先级队列与延迟队列
1 使用PriorityQueue实现任务排序
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((3, "低优先级任务"))
pq.put((1, "高优先级任务"))
pq.put((2, "中优先级任务"))
while not pq.empty():
priority, task = pq.get()
print(f"优先级{priority}: {task}")
输出:
优先级1: 高优先级任务
优先级2: 中优先级任务
优先级3: 低优先级任务
注意:元组的第一个元素是优先级,数字越小优先级越高,如果优先级相同,则按元组剩余元素排序(不可比较的类型会报错)。
2 模拟延迟执行队列
import time
from queue import PriorityQueue
delayed_queue = PriorityQueue()
# 添加延迟任务(执行时间戳, 任务内容)
delayed_queue.put((time.time() + 1, "发送邮件"))
delayed_queue.put((time.time() + 3, "刷新缓存"))
delayed_queue.put((time.time() + 0.5, "记录日志"))
while not delayed_queue.empty():
exec_time, task = delayed_queue.get()
delay = exec_time - time.time()
if delay > 0:
time.sleep(delay)
print(f"执行任务: {task} 时间: {time.time():.2f}")
输出:
执行任务: 记录日志 时间: 170000.50
执行任务: 发送邮件 时间: 170001.00
执行任务: 刷新缓存 时间: 170003.00
实现原理:利用优先级队列按时间戳排序,取出后检查是否需要等待,这种模式常用于轻量级任务调度系统。
问:Python中队列是线程安全的,那进程安全吗?
答:不同进程需要multiprocessing.Queue,它与queue.Queue接口类似但使用进程间通信(IPC),同一进程内的多线程用queue.Queue,跨进程用multiprocessing.Queue。
常见问题解答(FAQ)
Q1:队列和栈的区别是什么?
A:队列是FIFO(先进先出),栈是LIFO(后进先出),队列用put/get,栈用push/pop。
Q2:queue.Queue的maxsize=0是什么意思?
A:maxsize=0或负数表示队列大小不限(无界队列),理论可以放入任意多元素,但内存可能耗尽。
Q3:如何清空一个队列?
A:没有直接清空方法,可以循环调用get()直到empty()为True,或者重新实例化一个新队列(旧队列会被GC回收)。
Q4:队列中的join()和task_done()如何配合?
A:task_done()表示消费者完成一个任务的处理,join()阻塞直到所有任务都调用了task_done(),典型用法:
# 生产者
for item in items:
q.put(item)
q.join() # 等待所有任务完成
# 消费者
while True:
item = q.get()
process(item) # 处理任务
q.task_done() # 标记完成
Q5:队列可以用于协程吗?
A:可以,但需要异步队列:asyncio.Queue,它的语法与queue.Queue类似,但使用await put()和await get(),专为异步编程设计。
本文总结
| 要点 | 核心收获 |
|---|---|
| 队列基础 | 理解FIFO原则与线程安全特性,对比list与queue.Queue性能差异 |
| 多线程实战 | 掌握生产者-消费者模式,学会使用哨兵控制线程生命周期 |
| 进阶队列 | 掌握优先级队列实现任务排序,延迟队列实现定时执行 |
| 最佳实践 | 单线程用deque,多线程用queue.Queue,跨进程用multiprocessing.Queue |
队列是Python并发编程的基石,从简单的数据缓冲到复杂的任务调度系统都离不开它,建议读者实际动手运行上述代码,调整maxsize、线程数量等参数,观察队列阻塞与任务调度的行为变化,理解队列的“解耦”思想,能让你的多线程/异步代码更加健壮。
参考资料(综合自Python官方文档、Real Python、GeeksforGeeks等权威来源,经去重与重组):
- Python 3.12
queue模块官方文档 - 《Python并行编程实战》
- 常见博客与Stack Overflow问答精炼
标签: 实操