本文目录导读:
我来分享几个Python任务合并的常见案例及实现方法。
简单的顺序任务合并
def merge_tasks_sequentially():
"""顺序执行多个任务"""
def task1():
return "任务1完成"
def task2():
return "任务2完成"
def task3():
return "任务3完成"
# 合并执行
results = []
for task in [task1, task2, task3]:
results.append(task())
return results
# 使用示例
results = merge_tasks_sequentially()
print(results) # ['任务1完成', '任务2完成', '任务3完成']
使用队列的任务合并
from queue import Queue
import threading
import time
class TaskMerger:
def __init__(self):
self.task_queue = Queue()
self.result_queue = Queue()
def add_task(self, task_func, *args, **kwargs):
"""添加任务到队列"""
self.task_queue.put((task_func, args, kwargs))
def execute_all(self):
"""执行所有任务并返回结果"""
results = []
while not self.task_queue.empty():
task_func, args, kwargs = self.task_queue.get()
result = task_func(*args, **kwargs)
results.append(result)
self.result_queue.put(result)
return results
# 使用示例
def download_file(url):
time.sleep(1)
return f"下载 {url} 完成"
merger = TaskMerger()
merger.add_task(download_file, "http://example.com/file1")
merger.add_task(download_file, "http://example.com/file2")
merger.add_task(download_file, "http://example.com/file3")
results = merger.execute_all()
print(results)
多线程并行任务合并
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def parallel_task_merger(tasks, max_workers=5):
"""
并行执行多个任务并合并结果
:param tasks: 任务列表,每个任务是一个函数
:param max_workers: 最大线程数
"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_task = {executor.submit(task): task for task in tasks}
# 收集结果
for future in as_completed(future_to_task):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"任务执行失败: {e}")
results.append(None)
return results
# 使用示例
def fetch_data_1():
time.sleep(2)
return {"id": 1, "name": "数据1"}
def fetch_data_2():
time.sleep(1)
return {"id": 2, "name": "数据2"}
def fetch_data_3():
time.sleep(3)
return {"id": 3, "name": "数据3"}
tasks = [fetch_data_1, fetch_data_2, fetch_data_3]
results = parallel_task_merger(tasks)
print(f"并行执行结果: {results}")
带依赖关系的任务合并
class DependencyTaskMerger:
def __init__(self):
self.tasks = {}
self.dependencies = {}
def add_task(self, task_id, task_func, dependencies=None):
"""添加任务及其依赖"""
self.tasks[task_id] = task_func
self.dependencies[task_id] = dependencies or []
def execute(self):
"""按依赖关系执行任务"""
executed = set()
results = {}
while len(executed) < len(self.tasks):
for task_id in self.tasks:
if task_id not in executed:
# 检查依赖是否都已执行
deps = self.dependencies[task_id]
if all(dep in executed for dep in deps):
# 获取依赖的结果
dep_results = {dep: results[dep] for dep in deps}
results[task_id] = self.tasks[task_id](dep_results)
executed.add(task_id)
return results
# 使用示例
def task_a(deps):
print("执行任务A")
return "A的结果"
def task_b(deps):
print("执行任务B,需要A的结果")
return f"B使用了{deps.get('a')}"
def task_c(deps):
print("执行任务C,需要A和B的结果")
return f"C使用了{deps.get('a')}和{deps.get('b')}"
merger = DependencyTaskMerger()
merger.add_task('a', task_a)
merger.add_task('b', task_b, ['a'])
merger.add_task('c', task_c, ['a', 'b'])
results = merger.execute()
print(f"最终结果: {results}")
实用的数据合并案例
import pandas as pd
from functools import reduce
class DataMergeTasks:
"""数据合并任务案例"""
@staticmethod
def load_sales_data():
"""模拟加载销售数据"""
return pd.DataFrame({
'date': ['2024-01-01', '2024-01-02'],
'sales': [100, 200]
})
@staticmethod
def load_inventory_data():
"""模拟加载库存数据"""
return pd.DataFrame({
'date': ['2024-01-01', '2024-01-02'],
'inventory': [500, 450]
})
@staticmethod
def load_customer_data():
"""模拟加载客户数据"""
return pd.DataFrame({
'customer_id': [1, 2],
'purchase_amount': [300, 400]
})
def merge_all_data(self):
"""合并所有数据"""
# 获取所有数据任务
data_tasks = [
self.load_sales_data(),
self.load_inventory_data(),
self.load_customer_data()
]
# 使用reduce合并所有数据
merged_data = reduce(
lambda left, right: pd.merge(left, right, on='date', how='outer'),
data_tasks[:2] # 只合并有相同key的数据
)
return merged_data
# 使用示例
merger = DataMergeTasks()
result = merger.merge_all_data()
print("合并后的数据:")
print(result)
异步任务合并(使用asyncio)
import asyncio
import aiohttp
class AsyncTaskMerger:
"""异步任务合并器"""
async def fetch_url(self, session, url):
"""异步获取URL内容"""
async with session.get(url) as response:
return await response.text()
async def merge_async_tasks(self, urls):
"""并行执行多个异步任务"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
def execute(self, urls):
"""执行异步任务"""
return asyncio.run(self.merge_async_tasks(urls))
# 使用示例
async def main():
merger = AsyncTaskMerger()
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2'
]
results = await merger.merge_async_tasks(urls)
return results
# 运行
# results = asyncio.run(main())
通用的任务合并框架
from typing import List, Dict, Any, Callable
import time
import logging
class GenericTaskMerger:
"""通用任务合并框架"""
def __init__(self, name="任务合并器"):
self.name = name
self.tasks: Dict[str, Callable] = {}
self.results: Dict[str, Any] = {}
self.logger = logging.getLogger(name)
def register_task(self, name: str, func: Callable):
"""注册任务"""
self.tasks[name] = func
self.logger.info(f"注册任务: {name}")
def execute_all(self, parallel: bool = False) -> Dict[str, Any]:
"""执行所有注册的任务"""
if parallel:
return self._execute_parallel()
return self._execute_sequential()
def _execute_sequential(self) -> Dict[str, Any]:
"""顺序执行所有任务"""
for name, func in self.tasks.items():
start_time = time.time()
try:
result = func()
self.results[name] = result
elapsed = time.time() - start_time
self.logger.info(f"任务 '{name}' 完成,耗时: {elapsed:.2f}秒")
except Exception as e:
self.logger.error(f"任务 '{name}' 失败: {e}")
self.results[name] = None
return self.results
def _execute_parallel(self) -> Dict[str, Any]:
"""并行执行所有任务(简化版)"""
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(func): name
for name, func in self.tasks.items()
}
for future in futures:
name = futures[future]
try:
self.results[name] = future.result()
except Exception as e:
self.logger.error(f"任务 '{name}' 失败: {e}")
self.results[name] = None
return self.results
def clear(self):
"""清除所有任务和结果"""
self.tasks.clear()
self.results.clear()
self.logger.info("已清除所有任务")
# 使用示例
merger = GenericTaskMerger("数据处理合并器")
# 注册任务
merger.register_task("数据清洗", lambda: "清洗后的数据")
merger.register_task("数据转换", lambda: "转换后的数据")
merger.register_task("数据分析", lambda: "分析结果")
# 执行任务
results = merger.execute_all()
print(f"合并结果: {results}")
# 并行执行
results_parallel = merger.execute_all(parallel=True)
print(f"并行执行结果: {results_parallel}")
这些任务合并案例展示了:
- 简单顺序合并:适用于依赖简单的任务
- 队列合并:适合需要任务管理和顺序控制的场景
- 并行合并:提高执行效率,适合I/O密集型任务
- 依赖关系合并:处理有复杂依赖的任务流程
- 数据合并:处理数据框或数据集
- 异步合并:适合网络请求等异步操作
- 通用框架:可扩展的任务管理系统
选择哪种方式取决于你的具体需求:
- 任务间是否有依赖关系
- 是否需要并行处理
- 任务的类型和复杂度
- 性能要求
根据实际场景选择合适的任务合并策略,可以有效提高代码的可维护性和执行效率。
标签: 结果合并