Python异常值处理案例怎么写?

wen python案例 1

本文目录导读:

  1. 基础统计方法 - IQR(四分位距法)
  2. Z-Score方法
  3. 多变量异常值检测 - 使用IQR
  4. 异常值处理函数封装
  5. 实战示例:数据分析中的异常值处理
  6. 可视化异常值分布

我来分享几个Python异常值处理的完整案例。

基础统计方法 - IQR(四分位距法)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# 创建示例数据
np.random.seed(42)
data = {
    'value': np.concatenate([
        np.random.normal(50, 10, 100),  # 正常数据
        [120, 130, 5, 3]  # 异常值
    ])
}
df = pd.DataFrame(data)
# IQR方法检测异常值
def detect_outliers_iqr(data, column):
    Q1 = data[column].quantile(0.25)
    Q3 = data[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers = data[(data[column] < lower_bound) | (data[column] > upper_bound)]
    return outliers, lower_bound, upper_bound
# 检测异常值
outliers_iqr, lower, upper = detect_outliers_iqr(df, 'value')
print(f"下界: {lower:.2f}, 上界: {upper:.2f}")
print(f"检测到的异常值数量: {len(outliers_iqr)}")
# 处理异常值 - 方法1:删除
df_cleaned = df[(df['value'] >= lower) & (df['value'] <= upper)]
# 处理异常值 - 方法2:替换为界限值
df_capped = df.copy()
df_capped['value'] = df_capped['value'].clip(lower=lower, upper=upper)
# 可视化对比
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
axes[0].boxplot(df['value'])
axes[0].set_title('原始数据')
axes[1].boxplot(df_cleaned['value'])
axes[1].set_title('删除异常值后')
axes[2].boxplot(df_capped['value'])
axes[2].set_title('替换异常值后')
plt.tight_layout()
plt.show()
print(f"原始数据统计: 均值={df['value'].mean():.2f}, 标准差={df['value'].std():.2f}")
print(f"清理后统计: 均值={df_cleaned['value'].mean():.2f}, 标准差={df_cleaned['value'].std():.2f}")

Z-Score方法

from scipy import stats
def detect_outliers_zscore(data, column, threshold=3):
    z_scores = np.abs(stats.zscore(data[column]))
    outliers = data[z_scores > threshold]
    return outliers, z_scores
# 检测异常值
outliers_zscore, z_scores = detect_outliers_zscore(df, 'value', threshold=3)
print(f"Z-Score方法检测到的异常值数量: {len(outliers_zscore)}")
# 标记异常值
df['is_outlier_zscore'] = np.abs(z_scores) > 3
df['z_score'] = z_scores
# 处理异常值
df_zscore_cleaned = df[~df['is_outlier_zscore']].copy()
print(f"\n异常值详情:")
print(df[df['is_outlier_zscore']]['value'].values)

多变量异常值检测 - 使用IQR

# 创建多变量数据
np.random.seed(42)
multi_data = pd.DataFrame({
    'feature1': np.concatenate([
        np.random.normal(50, 10, 100),
        [200, 250]
    ]),
    'feature2': np.concatenate([
        np.random.normal(30, 5, 100),
        [100, 120]
    ]),
    'feature3': np.concatenate([
        np.random.normal(70, 15, 100),
        [150, 200]
    ])
})
def detect_outliers_multivariate(data, columns):
    outlier_mask = pd.Series([False] * len(data))
    for col in columns:
        Q1 = data[col].quantile(0.25)
        Q3 = data[col].quantile(0.75)
        IQR = Q3 - Q1
        lower = Q1 - 1.5 * IQR
        upper = Q3 + 1.5 * IQR
        col_outliers = (data[col] < lower) | (data[col] > upper)
        outlier_mask = outlier_mask | col_outliers
    return outlier_mask
# 检测多变量异常值
columns = ['feature1', 'feature2', 'feature3']
outlier_mask = detect_outliers_multivariate(multi_data, columns)
print(f"多变量异常值检测:")
print(f"总样本数: {len(multi_data)}")
print(f"异常值数量: {outlier_mask.sum()}")
print(f"异常值比例: {outlier_mask.sum()/len(multi_data)*100:.2f}%")

异常值处理函数封装

class OutlierHandler:
    """异常值处理类"""
    def __init__(self, method='iqr', threshold=3):
        self.method = method
        self.threshold = threshold
        self.bounds_ = {}
    def fit(self, data, columns):
        """拟合数据,计算界限"""
        for col in columns:
            if self.method == 'iqr':
                Q1 = data[col].quantile(0.25)
                Q3 = data[col].quantile(0.75)
                IQR = Q3 - Q1
                self.bounds_[col] = {
                    'lower': Q1 - 1.5 * IQR,
                    'upper': Q3 + 1.5 * IQR
                }
            elif self.method == 'zscore':
                mean = data[col].mean()
                std = data[col].std()
                self.bounds_[col] = {
                    'lower': mean - self.threshold * std,
                    'upper': mean + self.threshold * std
                }
        return self
    def transform(self, data, strategy='clip'):
        """处理异常值"""
        result = data.copy()
        for col, bounds in self.bounds_.items():
            if strategy == 'clip':
                # 替换为边界值
                result[col] = result[col].clip(
                    lower=bounds['lower'], 
                    upper=bounds['upper']
                )
            elif strategy == 'remove':
                # 删除异常值
                mask = (result[col] >= bounds['lower']) & \
                       (result[col] <= bounds['upper'])
                result = result[mask]
            elif strategy == 'mean':
                # 替换为均值
                mean_val = result[col].mean()
                mask = (result[col] < bounds['lower']) | \
                       (result[col] > bounds['upper'])
                result.loc[mask, col] = mean_val
        return result
    def detect(self, data, columns):
        """检测异常值"""
        outliers = {}
        for col in columns:
            bounds = self.bounds_[col]
            mask = (data[col] < bounds['lower']) | (data[col] > bounds['upper'])
            outliers[col] = data[mask]
        return outliers
# 使用示例
handler = OutlierHandler(method='iqr')
handler.fit(df, ['value'])
# 不同处理策略
df_clipped = handler.transform(df, 'clip')
df_removed = handler.transform(df, 'remove')
df_mean_replaced = handler.transform(df, 'mean')
print("不同处理策略的统计结果:")
print(f"Clipped - 均值: {df_clipped['value'].mean():.2f}, 标准差: {df_clipped['value'].std():.2f}")
print(f"Removed - 均值: {df_removed['value'].mean():.2f}, 标准差: {df_removed['value'].std():.2f}")
print(f"Mean替换 - 均值: {df_mean_replaced['value'].mean():.2f}, 标准差: {df_mean_replaced['value'].std():.2f}")

实战示例:数据分析中的异常值处理

# 生成更真实的业务数据
np.random.seed(123)
business_data = pd.DataFrame({
    'date': pd.date_range('2024-01-01', periods=200, freq='D'),
    'sales': np.concatenate([
        np.random.normal(1000, 200, 180),  # 正常销售数据
        [3000, 3500, 100, 50, 4000, 200],  # 异常数据
        np.random.normal(1000, 200, 14)     # 更多正常数据
    ]),
    'customers': np.concatenate([
        np.random.normal(100, 20, 180),
        [300, 350, 10, 5, 400, 20],
        np.random.normal(100, 20, 14)
    ])
})
# 异常值检测与分析
def analyze_outliers(df, column, method='both'):
    """全面的异常值分析"""
    if method in ['iqr', 'both']:
        Q1 = df[column].quantile(0.25)
        Q3 = df[column].quantile(0.75)
        IQR = Q3 - Q1
        iqr_lower = Q1 - 1.5 * IQR
        iqr_upper = Q3 + 1.5 * IQR
    else:
        iqr_lower, iqr_upper = None, None
    if method in ['zscore', 'both']:
        z_scores = np.abs(stats.zscore(df[column]))
        zscore_outliers = z_scores > 3
    else:
        zscore_outliers = None
    # 异常值统计
    result = {
        'total_records': len(df),
        'column': column,
        'mean': df[column].mean(),
        'median': df[column].median(),
        'std': df[column].std(),
    }
    if iqr_lower is not None:
        iqr_outliers = (df[column] < iqr_lower) | (df[column] > iqr_upper)
        result['iqr_outliers'] = iqr_outliers.sum()
        result['iqr_percentage'] = iqr_outliers.sum() / len(df) * 100
        result['iqr_bounds'] = (iqr_lower, iqr_upper)
    if zscore_outliers is not None:
        result['zscore_outliers'] = zscore_outliers.sum()
        result['zscore_percentage'] = zscore_outliers.sum() / len(df) * 100
    return result
# 分析sales和customers的异常值
for col in ['sales', 'customers']:
    print(f"\n{'='*50}")
    print(f"分析 {col} 字段异常值:")
    print('='*50)
    analysis_result = analyze_outliers(business_data, col)
    for key, value in analysis_result.items():
        if isinstance(value, tuple):
            print(f"{key}: ({value[0]:.2f}, {value[1]:.2f})")
        elif isinstance(value, float):
            print(f"{key}: {value:.2f}")
        else:
            print(f"{key}: {value}")

可视化异常值分布

def plot_outlier_analysis(data, column, handler):
    """可视化异常值分析"""
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    # 1. 原始箱线图
    axes[0, 0].boxplot(data[column])
    axes[0, 0].set_title(f'{column} - 原始数据箱线图')
    axes[0, 0].set_ylabel('值')
    # 2. 处理后的箱线图
    cleaned_data = handler.transform(data[[column]], 'clip')
    axes[0, 1].boxplot(cleaned_data[column])
    axes[0, 1].set_title(f'{column} - 处理后箱线图')
    axes[0, 1].set_ylabel('值')
    # 3. 直方图对比
    axes[0, 2].hist(data[column], bins=30, alpha=0.5, label='原始', density=True)
    axes[0, 2].hist(cleaned_data[column], bins=30, alpha=0.5, label='处理后', density=True)
    axes[0, 2].set_title(f'{column} - 分布对比')
    axes[0, 2].legend()
    # 4. 异常值标记
    bounds = handler.bounds_[column]
    outliers = handler.detect(data, [column])[column]
    axes[1, 0].scatter(range(len(data)), data[column], alpha=0.6, label='正常值')
    axes[1, 0].scatter(outliers.index, outliers[column], 
                       color='red', s=50, label='异常值', zorder=5)
    axes[1, 0].axhline(y=bounds['lower'], color='green', linestyle='--', label='下界')
    axes[1, 0].axhline(y=bounds['upper'], color='green', linestyle='--', label='上界')
    axes[1, 0].set_title(f'{column} - 异常值标记')
    axes[1, 0].legend()
    # 5. 时间序列图(如果有日期列)
    if 'date' in data.columns:
        axes[1, 1].plot(data['date'], data[column], marker='o', linewidth=0.5, markersize=3)
        outlier_times = data[data.index.isin(outliers.index)]
        axes[1, 1].scatter(outlier_times['date'], outlier_times[column], 
                          color='red', s=50, zorder=5)
        axes[1, 1].set_title(f'{column} - 时间序列异常值')
        axes[1, 1].tick_params(axis='x', rotation=45)
    # 6. 异常值统计
    axes[1, 2].axis('off')
    stats_text = f"""
    异常值统计:
    总数: {len(outliers)}
    占比: {len(outliers)/len(data)*100:.2f}%
    异常值范围: 
    最小值: {outliers[column].min():.2f}
    最大值: {outliers[column].max():.2f}
    """
    axes[1, 2].text(0.1, 0.5, stats_text, fontsize=12, verticalalignment='center')
    axes[1, 2].set_title('异常值统计信息')
    plt.tight_layout()
    plt.show()
# 使用可视化
handler = OutlierHandler(method='iqr')
handler.fit(business_data, ['sales', 'customers'])
plot_outlier_analysis(business_data, 'sales', handler)

这些案例覆盖了:

  1. 基础IQR方法 - 最常用的异常值检测
  2. Z-Score方法 - 适用于正态分布数据
  3. 多变量异常值检测 - 跨多个特征检测
  4. 封装成可复用类 - 面向对象编程
  5. 实战业务案例 - 模拟真实场景
  6. 可视化分析 - 直观展示异常值

选择哪种方法取决于你的数据分布、业务场景和处理需求,建议先了解数据特性,再选择合适的异常值处理方法。

标签: 异常值处 理 Python案例

抱歉,评论功能暂时关闭!