Python批量统计案例怎么编写?

wen python案例 2

Python批量统计案例编写全攻略:从零到实战的高效方案

📖 目录导读

  1. 批量统计的核心场景与需求分析
  2. 环境准备:必备库安装与配置
  3. 基础案例:CSV文件批量统计销售额
  4. 进阶案例:多Excel文件自动汇总与异常检测
  5. 高阶案例:实时日志统计与可视化输出
  6. 常见问答(FAQ)
  7. SEO优化建议与资源推荐

批量统计的核心场景与需求分析

在日常数据分析工作中,我们常面临这样的困境:日生成数十个销售报表、监控日志高频积累、多个实验数据需对比分析……手动复制粘贴不仅效率低下,还容易出错,Python批量统计正是为此而生,其技术本质是自动化遍历文件→读取数据→聚合计算→输出结果

关键难点分解

  • 文件多样性:Excel、CSV、JSON、TXT等格式
  • 统计粒度差异:按天/按地区/按产品类别汇总
  • 异常处理:空值、不完整行、编码不一致

环境准备:必备库安装与配置

建议使用Python 3.8+版本(测试环境为3.10),核心依赖库:

pip install pandas openpyxl xlrd glob2 matplotlib
  • pandas:数据处理核心,支持groupby聚合
  • openpyxl:处理.xlsx文件(比xlrd更兼容)
  • glob2:增强版文件匹配(支持递归搜索)
  • matplotlib:可视化统计结果

开发环境配置建议

使用Jupyter Notebook分步调试,或Pycharm创建项目,若处理海量文件(>1000个),建议启用chunksize进行分块读取。


基础案例:CSV文件批量统计销售额

场景:一个文件夹内有100个CSV文件,每列包含“日期”、“产品”、“数量”、“单价”,需统计每个产品的总销售额。

代码实现

import pandas as pd
import glob
# 1. 批量读取所有CSV文件
all_files = glob.glob("sales_data/*.csv")  # 假设文件在sales_data目录
df_list = []
for file in all_files:
    temp_df = pd.read_csv(file, encoding='utf-8')
    df_list.append(temp_df)
# 2. 合并所有数据
df_all = pd.concat(df_list, ignore_index=True)
# 3. 增加销售额列
df_all['销售额'] = df_all['数量'] * df_all['单价']
# 4. 按产品分组统计
result = df_all.groupby('产品')['销售额'].sum().reset_index()
# 5. 输出结果文件
result.to_csv('产品销售额汇总.csv', index=False, encoding='utf-8-sig')
print("统计完成!共处理", len(all_files), "个文件。")

优势解析

  • 利用glod自动匹配文件,无需手动指定文件名
  • pd.concat高效合并内存数据,避免循环追加时的性能问题
  • groupby.sum()相比逐行累加速度快约10倍

进阶案例:多Excel文件自动汇总与异常检测

场景:某公司每周生成30个Excel周报,每个文件有多个Sheet,需统计“异常订单数”、“按时交付率”,并生成汇总Excel。

代码实现

import pandas as pd
import glob
from datetime import datetime
# 1. 定义统计函数(应对复杂业务)
def process_excel(file_path):
    """解析单个Excel并返回统计指标"""
    # 读取所有Sheet
    sheets = pd.ExcelFile(file_path).sheet_names
    target_sheets = [s for s in sheets if "订单" in s]  # 只处理含“订单”的Sheet
    result_row = {}
    for sheet in target_sheets:
        df = pd.read_excel(file_path, sheet_name=sheet)
        # 异常检测:订单金额为负或数量为0
        abnormal = df[(df['订单金额'] < 0) | (df['数量'] == 0)]
        result_row[f'{sheet}_异常单数'] = len(abnormal)
        # 按时交付率:交付日期<=承诺日期
        df['按时交付'] = df['实际交付日期'] <= df['承诺交付日期']
        result_row[f'{sheet}_按时交付率'] = df['按时交付'].mean()
    # 添加文件名标识
    result_row['来源文件'] = file_path.split('\\')[-1]
    return result_row
# 2. 批量处理
all_files = glob.glob("weekly_reports/*.xlsx")
summary_rows = []
for file in all_files:
    try:
        row = process_excel(file)
        summary_rows.append(row)
    except Exception as e:
        print(f"文件 {file} 处理出错: {e}")
# 3. 汇总并导出
df_summary = pd.DataFrame(summary_rows)
df_summary.to_excel('周报汇总分析.xlsx', index=False, engine='openpyxl')
print("汇总完成!已生成周报汇总分析.xlsx")

关键技巧

  • 异常捕获:单个文件出错不影响整体流程
  • 动态列名:根据Sheet名称自动生成统计字段
  • 性能优化:使用pd.read_excelsheet_name参数避免加载全部Sheet

高阶案例:实时日志统计与可视化输出

场景:监控服务器日志,每5分钟生成一份流量统计报告,包含每分钟请求数、状态码分布、异常时间点识别。

代码实现

import pandas as pd
import matplotlib.pyplot as plt
import glob
import re
# 1. 解析日志文件(假设格式为:2024-01-01 12:33:45 GET /api 200 0.033)
def parse_log(log_line):
    pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+(GET|POST)\s+(\S+)\s+(\d{3})\s+(\d+\.\d+)'
    match = re.match(pattern, log_line)
    if match:
        return {
            '时间': pd.to_datetime(match.group(1)),
            '方法': match.group(2),
            '路径': match.group(3),
            '状态码': int(match.group(4)),
            '响应时间': float(match.group(5))
        }
    return None
# 2. 批量读取日志文件
log_files = glob.glob("logs/*.txt")
records = []
for file in log_files:
    with open(file, 'r', encoding='utf-8') as f:
        for line in f:
            parsed = parse_log(line.strip())
            if parsed:
                records.append(parsed)
# 3. 转化为DataFrame并统计
df = pd.DataFrame(records)
df.set_index('时间', inplace=True)
# 每分钟请求数
minute_reqs = df.resample('1min').size()
# 状态码分布(仅错误码重点显示)
error_codes = df[df['状态码'] >= 500].resample('1min').size()
# 4. 可视化输出
plt.figure(figsize=(12, 8))
plt.subplot(2,1,1)
plt.plot(minute_reqs.index, minute_reqs.values, label='总请求')'每分钟请求数趋势')
plt.legend()
plt.subplot(2,1,2)
plt.bar(error_codes.index, error_codes.values, color='red', alpha=0.7)'每分钟错误数(>=500)')
plt.tight_layout()
plt.savefig('实时统计图.png', dpi=300)
print("实时统计已生成!图表保存为实时统计图.png")

高阶亮点

  • 正则匹配:灵活解析非结构化日志
  • 时间序列分析resample实现不同粒度的聚合
  • 双重输出:数值结果+可视化图表,满足不同汇报场景

常见问答(FAQ)

Q1:处理超大文件(>1GB)时内存不足怎么办?

A:采用分块读取与增量聚合。

chunk_iter = pd.read_csv('large.csv', chunksize=100000)
total = {}
for chunk in chunk_iter:
    grouped = chunk.groupby('产品')['销售额'].sum()
    # 用字典累加各组统计值
    for key, value in grouped.items():
        total[key] = total.get(key, 0) + value

这样全程只保留一个groupby的中间结果,内存占用从GB级降到MB级。

Q2:不同Excel文件列名不一致如何统一?

A:先建立映射字典,读取时重命名:

col_map = {'产品名称': 'product', '销售金额': 'sales'}
df = df.rename(columns=col_map)

如果列名差异极大,建议先用df.columns打印所有列名,再手动匹配。

Q3:统计结果要同时展示均值、中位数、标准差怎么办?

A:使用groupby.agg()

result = df.groupby('产品').agg({
    '销售额': ['sum', 'mean', 'median', 'std'],
    '数量': ['count', 'sum']
})
result.columns = ['_'.join(col) for col in result.columns]

这样一行代码即可输出多维度统计指标。


SEO优化建议与资源推荐

搜索引擎排名要点包含精准关键词**:本文标题“Python批量统计案例编写”直接命中核心搜索词

  • H2/H3层级结构:使用目录导读(包含H1标题)、小标题(H2)和问题式问答(H3)
  • 自然语言长尾词:如“多Excel自动汇总”、“实时日志统计”,覆盖不同搜索意图
  • 代码块高亮:为代码添加语言标识(如```python),提升爬虫提取效率
  • 内部链接建议:文中可自然嵌入相关教程(如“Pandas groupby详解”),但本文不直接添加链接

推荐学习资源

  • 《利用Python进行数据分析》第5章:数据清洗与聚合
  • 官方文档:Pandas groupby详解(https://pandas.pydata.org/docs/reference/groupby.html)
  • GitHub开源项目:python-batch-stats(搜索“Python批量统计”可找到类似工具)

总结与下一步行动

通过以上四个案例,你已掌握从CSV到Excel再到日志文件的批量统计全流程,核心原则是:

  1. 统一接口层:让每个文件的读取逻辑尽量一致
  2. 聚合前置:尽可能在读取时完成基础统计,减少合并后的二次处理
  3. 异常兜底:用try-except保护整个循环,避免“一颗老鼠屎坏了一锅汤”

现在就可以打开你的文件夹,从最简单的CSV批量统计开始实践,如果遇到编码问题,优先尝试encoding='utf-8-sig'encoding='gbk',Python批量统计的价值不在于代码技术有多高,而在于它能让你从重复劳动中解放出来,把精力集中在真正需要洞察的数据模式上。

标签: 批量统计

抱歉,评论功能暂时关闭!