Python队列结构案例实操?

wen python案例 1

Python队列结构案例实操:从基础到多线程实战指南

目录导读


队列数据结构核心概念

队列(Queue) 是一种遵循 先进先出(FIFO, First In First Out) 原则的线性数据结构,就像超市排队结账:先排队的人先服务,在编程中,队列常用于任务调度、缓冲数据以及解耦生产者和消费者。

1 Python中三大队列类型

队列类型 类名 特性 适用场景
普通队列 queue.Queue 线程安全,FIFO 多线程任务管理
优先级队列 queue.PriorityQueue 按优先级出队(最小优先) 任务调度、A*算法
后进先出队列 queue.LifoQueue 类似栈,LIFO 深度优先搜索

:Python中queue.Queuecollections.deque有什么区别?
deque是高效的双端队列,但不是线程安全的;queue.Queue内部对锁进行了封装,允许多线程安全访问,单线程场景推荐deque(性能更高),多线程必须用queue.Queue


基础案例:列表与queue.Queue实现

1 列表模拟队列的局限

# 用列表模拟队列(不推荐)
queue_list = []
queue_list.append("任务1")  # 入队
queue_list.append("任务2")
item = queue_list.pop(0)    # 出队(O(n)复杂度)

问题pop(0)会导致所有元素向前移动,时间复杂度为O(n),当队列数据量较大时性能急剧下降。

2 queue.Queue标准用法

from queue import Queue
# 创建队列,最大容量3
q = Queue(maxsize=3)
# 入队(如果满则阻塞)
q.put("任务A")
q.put("任务B")
q.put("任务C")
# 出队(如果空则阻塞)
print(q.get())  # 输出:任务A
print(q.qsize()) # 输出:2(剩余元素数)
print(q.empty()) # 输出:False

关键方法

  • put(item, block=True, timeout=None):插入元素,队列满时默认阻塞等待。
  • get(block=True, timeout=None):移除并返回元素,队列空时阻塞。

put_nowait()put()有什么区别?
put_nowait()不会阻塞,如果队列已满会直接抛出queue.Full异常;put()默认阻塞直到有空位。


多线程队列实战:生产者-消费者模式

1 场景设计

假设一个日志处理系统:生产者不断生成日志记录,消费者从队列中取出日志并写入文件,使用队列可以平衡两者的速度差异。

2 完整代码与运行结果

import threading
import queue
import time
import random
# 创建队列
log_queue = queue.Queue(maxsize=5)
def producer(name):
    """生产者线程:生成日志"""
    for i in range(10):
        log = f"[{name}] 日志#{i} - 时间: {time.time():.2f}"
        log_queue.put(log)
        print(f"生产者 {name} 入队: {log}")
        time.sleep(random.uniform(0.1, 0.5))
    # 发送停止信号(约定:None表示结束)
    log_queue.put(None)
def consumer(name):
    """消费者线程:处理日志"""
    while True:
        log = log_queue.get()
        if log is None:  # 收到停止信号
            log_queue.put(None)  # 通知其他消费者
            break
        print(f"消费者 {name} 出队: {log}")
        # 模拟写入文件耗时
        time.sleep(0.2)
    print(f"消费者 {name} 退出")
# 启动1个生产者,2个消费者
t1 = threading.Thread(target=producer, args=("P1",))
t2 = threading.Thread(target=consumer, args=("C1",))
t3 = threading.Thread(target=consumer, args=("C2",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
print("所有线程结束")

输出示例(部分)

生产者 P1 入队: [P1] 日志#0 - 时间: 170000.01
消费者 C1 出队: [P1] 日志#0 - 时间: 170000.01
消费者 C2 出队: [P1] 日志#1 - 时间: 170000.35
...

关键要点

  • 使用None作为哨兵值通知消费者结束,如果有多个消费者,必须确保每个消费者都能接收到一个None(通常生产者发出与消费者数量相等的None)。
  • 队列的task_done()join()可用于更精准的同步(本例未涉及)。

:如果消费者处理速度比生产者慢会怎样?
:队列会逐渐积累数据,直到maxsize上限,此时put()会阻塞生产者,形成自然限流(背压),防止内存被撑爆。


进阶案例:优先级队列与延迟队列

1 使用PriorityQueue实现任务排序

from queue import PriorityQueue
pq = PriorityQueue()
pq.put((3, "低优先级任务"))
pq.put((1, "高优先级任务"))
pq.put((2, "中优先级任务"))
while not pq.empty():
    priority, task = pq.get()
    print(f"优先级{priority}: {task}")

输出

优先级1: 高优先级任务
优先级2: 中优先级任务
优先级3: 低优先级任务

注意:元组的第一个元素是优先级,数字越小优先级越高,如果优先级相同,则按元组剩余元素排序(不可比较的类型会报错)。

2 模拟延迟执行队列

import time
from queue import PriorityQueue
delayed_queue = PriorityQueue()
# 添加延迟任务(执行时间戳, 任务内容)
delayed_queue.put((time.time() + 1, "发送邮件"))
delayed_queue.put((time.time() + 3, "刷新缓存"))
delayed_queue.put((time.time() + 0.5, "记录日志"))
while not delayed_queue.empty():
    exec_time, task = delayed_queue.get()
    delay = exec_time - time.time()
    if delay > 0:
        time.sleep(delay)
    print(f"执行任务: {task} 时间: {time.time():.2f}")

输出

执行任务: 记录日志 时间: 170000.50
执行任务: 发送邮件 时间: 170001.00
执行任务: 刷新缓存 时间: 170003.00

实现原理:利用优先级队列按时间戳排序,取出后检查是否需要等待,这种模式常用于轻量级任务调度系统。

:Python中队列是线程安全的,那进程安全吗?
:不同进程需要multiprocessing.Queue,它与queue.Queue接口类似但使用进程间通信(IPC),同一进程内的多线程用queue.Queue,跨进程用multiprocessing.Queue


常见问题解答(FAQ)

Q1:队列和栈的区别是什么?
A:队列是FIFO(先进先出),栈是LIFO(后进先出),队列用put/get,栈用push/pop

Q2:queue.Queuemaxsize=0是什么意思?
A:maxsize=0或负数表示队列大小不限(无界队列),理论可以放入任意多元素,但内存可能耗尽。

Q3:如何清空一个队列?
A:没有直接清空方法,可以循环调用get()直到empty()为True,或者重新实例化一个新队列(旧队列会被GC回收)。

Q4:队列中的join()task_done()如何配合?
A:task_done()表示消费者完成一个任务的处理,join()阻塞直到所有任务都调用了task_done(),典型用法:

# 生产者
for item in items:
    q.put(item)
q.join()  # 等待所有任务完成
# 消费者
while True:
    item = q.get()
    process(item)  # 处理任务
    q.task_done()  # 标记完成

Q5:队列可以用于协程吗?
A:可以,但需要异步队列:asyncio.Queue,它的语法与queue.Queue类似,但使用await put()await get(),专为异步编程设计。


本文总结

要点 核心收获
队列基础 理解FIFO原则与线程安全特性,对比listqueue.Queue性能差异
多线程实战 掌握生产者-消费者模式,学会使用哨兵控制线程生命周期
进阶队列 掌握优先级队列实现任务排序,延迟队列实现定时执行
最佳实践 单线程用deque,多线程用queue.Queue,跨进程用multiprocessing.Queue

队列是Python并发编程的基石,从简单的数据缓冲到复杂的任务调度系统都离不开它,建议读者实际动手运行上述代码,调整maxsize、线程数量等参数,观察队列阻塞与任务调度的行为变化,理解队列的“解耦”思想,能让你的多线程/异步代码更加健壮。


参考资料(综合自Python官方文档、Real Python、GeeksforGeeks等权威来源,经去重与重组):

  • Python 3.12 queue模块官方文档
  • 《Python并行编程实战》
  • 常见博客与Stack Overflow问答精炼

标签: 实操

抱歉,评论功能暂时关闭!