Python任务合并案例怎么编写?

wen python案例 1

本文目录导读:

  1. 简单的顺序任务合并
  2. 使用队列的任务合并
  3. 多线程并行任务合并
  4. 带依赖关系的任务合并
  5. 实用的数据合并案例
  6. 异步任务合并(使用asyncio)
  7. 通用的任务合并框架

我来分享几个Python任务合并的常见案例及实现方法。

简单的顺序任务合并

def merge_tasks_sequentially():
    """顺序执行多个任务"""
    def task1():
        return "任务1完成"
    def task2():
        return "任务2完成"
    def task3():
        return "任务3完成"
    # 合并执行
    results = []
    for task in [task1, task2, task3]:
        results.append(task())
    return results
# 使用示例
results = merge_tasks_sequentially()
print(results)  # ['任务1完成', '任务2完成', '任务3完成']

使用队列的任务合并

from queue import Queue
import threading
import time
class TaskMerger:
    def __init__(self):
        self.task_queue = Queue()
        self.result_queue = Queue()
    def add_task(self, task_func, *args, **kwargs):
        """添加任务到队列"""
        self.task_queue.put((task_func, args, kwargs))
    def execute_all(self):
        """执行所有任务并返回结果"""
        results = []
        while not self.task_queue.empty():
            task_func, args, kwargs = self.task_queue.get()
            result = task_func(*args, **kwargs)
            results.append(result)
            self.result_queue.put(result)
        return results
# 使用示例
def download_file(url):
    time.sleep(1)
    return f"下载 {url} 完成"
merger = TaskMerger()
merger.add_task(download_file, "http://example.com/file1")
merger.add_task(download_file, "http://example.com/file2")
merger.add_task(download_file, "http://example.com/file3")
results = merger.execute_all()
print(results)

多线程并行任务合并

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def parallel_task_merger(tasks, max_workers=5):
    """
    并行执行多个任务并合并结果
    :param tasks: 任务列表,每个任务是一个函数
    :param max_workers: 最大线程数
    """
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_task = {executor.submit(task): task for task in tasks}
        # 收集结果
        for future in as_completed(future_to_task):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"任务执行失败: {e}")
                results.append(None)
    return results
# 使用示例
def fetch_data_1():
    time.sleep(2)
    return {"id": 1, "name": "数据1"}
def fetch_data_2():
    time.sleep(1)
    return {"id": 2, "name": "数据2"}
def fetch_data_3():
    time.sleep(3)
    return {"id": 3, "name": "数据3"}
tasks = [fetch_data_1, fetch_data_2, fetch_data_3]
results = parallel_task_merger(tasks)
print(f"并行执行结果: {results}")

带依赖关系的任务合并

class DependencyTaskMerger:
    def __init__(self):
        self.tasks = {}
        self.dependencies = {}
    def add_task(self, task_id, task_func, dependencies=None):
        """添加任务及其依赖"""
        self.tasks[task_id] = task_func
        self.dependencies[task_id] = dependencies or []
    def execute(self):
        """按依赖关系执行任务"""
        executed = set()
        results = {}
        while len(executed) < len(self.tasks):
            for task_id in self.tasks:
                if task_id not in executed:
                    # 检查依赖是否都已执行
                    deps = self.dependencies[task_id]
                    if all(dep in executed for dep in deps):
                        # 获取依赖的结果
                        dep_results = {dep: results[dep] for dep in deps}
                        results[task_id] = self.tasks[task_id](dep_results)
                        executed.add(task_id)
        return results
# 使用示例
def task_a(deps):
    print("执行任务A")
    return "A的结果"
def task_b(deps):
    print("执行任务B,需要A的结果")
    return f"B使用了{deps.get('a')}"
def task_c(deps):
    print("执行任务C,需要A和B的结果")
    return f"C使用了{deps.get('a')}和{deps.get('b')}"
merger = DependencyTaskMerger()
merger.add_task('a', task_a)
merger.add_task('b', task_b, ['a'])
merger.add_task('c', task_c, ['a', 'b'])
results = merger.execute()
print(f"最终结果: {results}")

实用的数据合并案例

import pandas as pd
from functools import reduce
class DataMergeTasks:
    """数据合并任务案例"""
    @staticmethod
    def load_sales_data():
        """模拟加载销售数据"""
        return pd.DataFrame({
            'date': ['2024-01-01', '2024-01-02'],
            'sales': [100, 200]
        })
    @staticmethod
    def load_inventory_data():
        """模拟加载库存数据"""
        return pd.DataFrame({
            'date': ['2024-01-01', '2024-01-02'],
            'inventory': [500, 450]
        })
    @staticmethod
    def load_customer_data():
        """模拟加载客户数据"""
        return pd.DataFrame({
            'customer_id': [1, 2],
            'purchase_amount': [300, 400]
        })
    def merge_all_data(self):
        """合并所有数据"""
        # 获取所有数据任务
        data_tasks = [
            self.load_sales_data(),
            self.load_inventory_data(),
            self.load_customer_data()
        ]
        # 使用reduce合并所有数据
        merged_data = reduce(
            lambda left, right: pd.merge(left, right, on='date', how='outer'),
            data_tasks[:2]  # 只合并有相同key的数据
        )
        return merged_data
# 使用示例
merger = DataMergeTasks()
result = merger.merge_all_data()
print("合并后的数据:")
print(result)

异步任务合并(使用asyncio)

import asyncio
import aiohttp
class AsyncTaskMerger:
    """异步任务合并器"""
    async def fetch_url(self, session, url):
        """异步获取URL内容"""
        async with session.get(url) as response:
            return await response.text()
    async def merge_async_tasks(self, urls):
        """并行执行多个异步任务"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            return results
    def execute(self, urls):
        """执行异步任务"""
        return asyncio.run(self.merge_async_tasks(urls))
# 使用示例
async def main():
    merger = AsyncTaskMerger()
    urls = [
        'https://api.example.com/data1',
        'https://api.example.com/data2'
    ]
    results = await merger.merge_async_tasks(urls)
    return results
# 运行
# results = asyncio.run(main())

通用的任务合并框架

from typing import List, Dict, Any, Callable
import time
import logging
class GenericTaskMerger:
    """通用任务合并框架"""
    def __init__(self, name="任务合并器"):
        self.name = name
        self.tasks: Dict[str, Callable] = {}
        self.results: Dict[str, Any] = {}
        self.logger = logging.getLogger(name)
    def register_task(self, name: str, func: Callable):
        """注册任务"""
        self.tasks[name] = func
        self.logger.info(f"注册任务: {name}")
    def execute_all(self, parallel: bool = False) -> Dict[str, Any]:
        """执行所有注册的任务"""
        if parallel:
            return self._execute_parallel()
        return self._execute_sequential()
    def _execute_sequential(self) -> Dict[str, Any]:
        """顺序执行所有任务"""
        for name, func in self.tasks.items():
            start_time = time.time()
            try:
                result = func()
                self.results[name] = result
                elapsed = time.time() - start_time
                self.logger.info(f"任务 '{name}' 完成,耗时: {elapsed:.2f}秒")
            except Exception as e:
                self.logger.error(f"任务 '{name}' 失败: {e}")
                self.results[name] = None
        return self.results
    def _execute_parallel(self) -> Dict[str, Any]:
        """并行执行所有任务(简化版)"""
        from concurrent.futures import ThreadPoolExecutor
        with ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(func): name 
                for name, func in self.tasks.items()
            }
            for future in futures:
                name = futures[future]
                try:
                    self.results[name] = future.result()
                except Exception as e:
                    self.logger.error(f"任务 '{name}' 失败: {e}")
                    self.results[name] = None
        return self.results
    def clear(self):
        """清除所有任务和结果"""
        self.tasks.clear()
        self.results.clear()
        self.logger.info("已清除所有任务")
# 使用示例
merger = GenericTaskMerger("数据处理合并器")
# 注册任务
merger.register_task("数据清洗", lambda: "清洗后的数据")
merger.register_task("数据转换", lambda: "转换后的数据")
merger.register_task("数据分析", lambda: "分析结果")
# 执行任务
results = merger.execute_all()
print(f"合并结果: {results}")
# 并行执行
results_parallel = merger.execute_all(parallel=True)
print(f"并行执行结果: {results_parallel}")

这些任务合并案例展示了:

  1. 简单顺序合并:适用于依赖简单的任务
  2. 队列合并:适合需要任务管理和顺序控制的场景
  3. 并行合并:提高执行效率,适合I/O密集型任务
  4. 依赖关系合并:处理有复杂依赖的任务流程
  5. 数据合并:处理数据框或数据集
  6. 异步合并:适合网络请求等异步操作
  7. 通用框架:可扩展的任务管理系统

选择哪种方式取决于你的具体需求:

  • 任务间是否有依赖关系
  • 是否需要并行处理
  • 任务的类型和复杂度
  • 性能要求

根据实际场景选择合适的任务合并策略,可以有效提高代码的可维护性和执行效率。

标签: 结果合并

抱歉,评论功能暂时关闭!