本文目录导读:
- 多线程:I/O密集型任务(网络请求/文件读写)
- 多进程:CPU密集型任务(数值计算/图像处理)
- 异步编程:高并发I/O(Web服务/实时数据流)
- 分布式任务队列(Celery + Redis/RabbitMQ)
- MapReduce模式:大数据量处理(数据清洗/统计)
- 流水线(Pipeline)模式:流式数据处理
- 总结:如何选择拆分方式?
Python任务拆分的核心思想是将一个大型任务分解为多个可独立执行的小任务,从而利用多线程、多进程或异步编程来提升效率,以下是几个典型的案例,从简单到复杂,涵盖不同场景。
多线程:I/O密集型任务(网络请求/文件读写)
场景:下载1000张图片,或爬取1000个网页。
问题:如果单线程顺序执行,大部分时间浪费在等待网络响应(I/O阻塞)。
拆分方案:使用concurrent.futures.ThreadPoolExecutor,将每个URL的下载任务作为一个独立线程。
import requests
from concurrent.futures import ThreadPoolExecutor
urls = ["http://example.com/img1.jpg", ..., "http://example.com/img1000.jpg"]
def download_one(url):
resp = requests.get(url)
with open(f"image_{url.split('/')[-1]}", "wb") as f:
f.write(resp.content)
return url
# 创建10个线程的线程池,最多同时下载10个
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(download_one, urls))
print(f"下载完成,共{len(results)}个文件")
优点:显著减少I/O等待时间,代码结构清晰。
多进程:CPU密集型任务(数值计算/图像处理)
场景:对一个包含1000万张图片的文件夹,批量调整尺寸或计算哈希值。
问题:CPU成为瓶颈,多线程受GIL(全局解释器锁)限制无法提升,需用多进程。
拆分方案:使用multiprocessing.Pool,将文件列表切块后分配给多个进程。
import os
from multiprocessing import Pool
from PIL import Image
def resize_image(file_path):
"""单个任务:调整图片到800x600并保存"""
img = Image.open(file_path)
img_resized = img.resize((800, 600))
output_path = os.path.join("processed", os.path.basename(file_path))
img_resized.save(output_path)
return file_path
if __name__ == "__main__":
all_files = [os.path.join("input", f) for f in os.listdir("input") if f.endswith(".jpg")]
# 创建4个进程,充分利用多核CPU
with Pool(processes=4) as pool:
result = list(pool.imap_unordered(resize_image, all_files))
print(f"处理完成 {len(result)} 张图片")
优点:突破GIL限制,CPU密集型任务效率接近线性提升。
异步编程:高并发I/O(Web服务/实时数据流)
场景:一个Web服务器需要同时处理10000个WebSocket连接。
问题:线程或进程数量过多会导致内存和上下文切换开销过大。
拆分方案:使用asyncio + aiohttp,通过事件循环在单线程内并发处理。
import asyncio
import aiohttp
async def fetch_url(session, url):
"""协程:异步请求一个URL"""
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["http://example.com"] * 10000
async with aiohttp.ClientSession() as session:
# 同时发起10000个请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"成功获取 {len(results)} 个页面")
asyncio.run(main())
优点:单线程、内存占用低,适合海量连接场景。
分布式任务队列(Celery + Redis/RabbitMQ)
场景:电商网站的“订单过期自动取消”或“视频转码”后台任务。 问题:任务可能需要长时间执行,且Web服务器不能阻塞。 拆分方案:将任务发布到消息队列,由独立的工作进程(Worker)异步消费。
# tasks.py (Celery任务定义)
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
@app.task
def process_order(order_id):
"""后台执行:验证库存、扣减、发送邮件"""
# 模拟耗时操作
import time; time.sleep(10)
return f"订单 {order_id} 处理完成"
# 在Web视图中调用
def create_order(request):
# ...验证逻辑
process_order.delay(order_id) # 立即返回,不阻塞
return HttpResponse("订单已提交,后台处理中")
优点:异步、解耦、可水平扩展Worker数量。
MapReduce模式:大数据量处理(数据清洗/统计)
场景:从10TB的日志文件中统计每个IP的访问次数。
问题:单机内存不足,需分片+聚合。
拆分方案(单机简化版):使用functools.reduce 结合多进程。
from functools import reduce
from multiprocessing import Pool
# Map阶段:解析一行日志,返回(IP, 1)元组
def mapper(line):
ip = line.split()[0] if line else None
return (ip, 1) if ip else None
# Reduce阶段:合并相同IP的计数
def reducer(item1, item2):
ip1, count1 = item1
ip2, count2 = item2
if ip1 == ip2:
return (ip1, count1 + count2)
else:
# 简单处理:不合并不同key,实际应用需分组
return [item1, item2]
if __name__ == "__main__":
with open("large_log.txt", "r") as f:
lines = f.readlines()
# Map:多进程并行解析
with Pool(processes=4) as pool:
mapped = pool.map(mapper, lines)
# Reduce:顺序聚合
result = reduce(reducer, [x for x in mapped if x])
print(result)
注意:生产环境建议使用Spark、Hadoop等框架,这里仅为展示拆分思想。
流水线(Pipeline)模式:流式数据处理
场景:实时视频流 → 人脸检测 → 模糊处理 → 输出。
问题:每个阶段的处理速度可能不同,需要缓冲区解耦。
拆分方案:使用queue.Queue连接多个线程/进程。
import threading
import queue
import time
def stage_grabber(data_queue):
"""阶段1:从摄像头读取帧"""
for i in range(100):
frame = f"frame_{i}" # 模拟读取
data_queue.put(frame)
time.sleep(0.03) # 30fps
def stage_processor(in_queue, out_queue):
"""阶段2:处理帧(如人脸检测)"""
while True:
frame = in_queue.get()
processed = f"[processed]{frame}" # 模拟处理
out_queue.put(processed)
in_queue.task_done()
def stage_writer(out_queue):
"""阶段3:输出结果"""
while True:
data = out_queue.get()
print(data)
out_queue.task_done()
# 创建队列
input_q = queue.Queue(maxsize=10)
output_q = queue.Queue(maxsize=10)
# 启动流水线
threading.Thread(target=stage_grabber, args=(input_q,), daemon=True).start()
threading.Thread(target=stage_processor, args=(input_q, output_q), daemon=True).start()
threading.Thread(target=stage_writer, args=(output_q,), daemon=True).start()
time.sleep(5) # 允许处理一段时间
优点:每个阶段可独立优化,提高吞吐量。
如何选择拆分方式?
| 任务类型 | 推荐方案 | 适用场景 |
|---|---|---|
| I/O密集型(网络、磁盘) | 多线程 / 异步 | 爬虫、文件下载、数据库连接 |
| CPU密集型(计算、加密) | 多进程 / C扩展 | 图像处理、科学计算、视频编码 |
| 高并发连接(WebSocket) | 异步 (asyncio) | 聊天室、实时推送、代理服务器 |
| 定时/后台任务 | Celery / RQ | 邮件发送、报表生成、定期清理 |
| 大数据量处理 | MapReduce / Spark | 日志分析、机器学习预处理 |
| 实时流式处理 | 流水线(队列) | 视频流、音频流、传感器数据 |
核心原则:找到瓶颈(是CPU、I/O还是内存?),然后针对性地选择拆分粒度。
标签: 多进程技术