本文目录导读:
我来分享几个Python异常值处理的完整案例。
基础统计方法 - IQR(四分位距法)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# 创建示例数据
np.random.seed(42)
data = {
'value': np.concatenate([
np.random.normal(50, 10, 100), # 正常数据
[120, 130, 5, 3] # 异常值
])
}
df = pd.DataFrame(data)
# IQR方法检测异常值
def detect_outliers_iqr(data, column):
Q1 = data[column].quantile(0.25)
Q3 = data[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = data[(data[column] < lower_bound) | (data[column] > upper_bound)]
return outliers, lower_bound, upper_bound
# 检测异常值
outliers_iqr, lower, upper = detect_outliers_iqr(df, 'value')
print(f"下界: {lower:.2f}, 上界: {upper:.2f}")
print(f"检测到的异常值数量: {len(outliers_iqr)}")
# 处理异常值 - 方法1:删除
df_cleaned = df[(df['value'] >= lower) & (df['value'] <= upper)]
# 处理异常值 - 方法2:替换为界限值
df_capped = df.copy()
df_capped['value'] = df_capped['value'].clip(lower=lower, upper=upper)
# 可视化对比
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
axes[0].boxplot(df['value'])
axes[0].set_title('原始数据')
axes[1].boxplot(df_cleaned['value'])
axes[1].set_title('删除异常值后')
axes[2].boxplot(df_capped['value'])
axes[2].set_title('替换异常值后')
plt.tight_layout()
plt.show()
print(f"原始数据统计: 均值={df['value'].mean():.2f}, 标准差={df['value'].std():.2f}")
print(f"清理后统计: 均值={df_cleaned['value'].mean():.2f}, 标准差={df_cleaned['value'].std():.2f}")
Z-Score方法
from scipy import stats
def detect_outliers_zscore(data, column, threshold=3):
z_scores = np.abs(stats.zscore(data[column]))
outliers = data[z_scores > threshold]
return outliers, z_scores
# 检测异常值
outliers_zscore, z_scores = detect_outliers_zscore(df, 'value', threshold=3)
print(f"Z-Score方法检测到的异常值数量: {len(outliers_zscore)}")
# 标记异常值
df['is_outlier_zscore'] = np.abs(z_scores) > 3
df['z_score'] = z_scores
# 处理异常值
df_zscore_cleaned = df[~df['is_outlier_zscore']].copy()
print(f"\n异常值详情:")
print(df[df['is_outlier_zscore']]['value'].values)
多变量异常值检测 - 使用IQR
# 创建多变量数据
np.random.seed(42)
multi_data = pd.DataFrame({
'feature1': np.concatenate([
np.random.normal(50, 10, 100),
[200, 250]
]),
'feature2': np.concatenate([
np.random.normal(30, 5, 100),
[100, 120]
]),
'feature3': np.concatenate([
np.random.normal(70, 15, 100),
[150, 200]
])
})
def detect_outliers_multivariate(data, columns):
outlier_mask = pd.Series([False] * len(data))
for col in columns:
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
col_outliers = (data[col] < lower) | (data[col] > upper)
outlier_mask = outlier_mask | col_outliers
return outlier_mask
# 检测多变量异常值
columns = ['feature1', 'feature2', 'feature3']
outlier_mask = detect_outliers_multivariate(multi_data, columns)
print(f"多变量异常值检测:")
print(f"总样本数: {len(multi_data)}")
print(f"异常值数量: {outlier_mask.sum()}")
print(f"异常值比例: {outlier_mask.sum()/len(multi_data)*100:.2f}%")
异常值处理函数封装
class OutlierHandler:
"""异常值处理类"""
def __init__(self, method='iqr', threshold=3):
self.method = method
self.threshold = threshold
self.bounds_ = {}
def fit(self, data, columns):
"""拟合数据,计算界限"""
for col in columns:
if self.method == 'iqr':
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
self.bounds_[col] = {
'lower': Q1 - 1.5 * IQR,
'upper': Q3 + 1.5 * IQR
}
elif self.method == 'zscore':
mean = data[col].mean()
std = data[col].std()
self.bounds_[col] = {
'lower': mean - self.threshold * std,
'upper': mean + self.threshold * std
}
return self
def transform(self, data, strategy='clip'):
"""处理异常值"""
result = data.copy()
for col, bounds in self.bounds_.items():
if strategy == 'clip':
# 替换为边界值
result[col] = result[col].clip(
lower=bounds['lower'],
upper=bounds['upper']
)
elif strategy == 'remove':
# 删除异常值
mask = (result[col] >= bounds['lower']) & \
(result[col] <= bounds['upper'])
result = result[mask]
elif strategy == 'mean':
# 替换为均值
mean_val = result[col].mean()
mask = (result[col] < bounds['lower']) | \
(result[col] > bounds['upper'])
result.loc[mask, col] = mean_val
return result
def detect(self, data, columns):
"""检测异常值"""
outliers = {}
for col in columns:
bounds = self.bounds_[col]
mask = (data[col] < bounds['lower']) | (data[col] > bounds['upper'])
outliers[col] = data[mask]
return outliers
# 使用示例
handler = OutlierHandler(method='iqr')
handler.fit(df, ['value'])
# 不同处理策略
df_clipped = handler.transform(df, 'clip')
df_removed = handler.transform(df, 'remove')
df_mean_replaced = handler.transform(df, 'mean')
print("不同处理策略的统计结果:")
print(f"Clipped - 均值: {df_clipped['value'].mean():.2f}, 标准差: {df_clipped['value'].std():.2f}")
print(f"Removed - 均值: {df_removed['value'].mean():.2f}, 标准差: {df_removed['value'].std():.2f}")
print(f"Mean替换 - 均值: {df_mean_replaced['value'].mean():.2f}, 标准差: {df_mean_replaced['value'].std():.2f}")
实战示例:数据分析中的异常值处理
# 生成更真实的业务数据
np.random.seed(123)
business_data = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=200, freq='D'),
'sales': np.concatenate([
np.random.normal(1000, 200, 180), # 正常销售数据
[3000, 3500, 100, 50, 4000, 200], # 异常数据
np.random.normal(1000, 200, 14) # 更多正常数据
]),
'customers': np.concatenate([
np.random.normal(100, 20, 180),
[300, 350, 10, 5, 400, 20],
np.random.normal(100, 20, 14)
])
})
# 异常值检测与分析
def analyze_outliers(df, column, method='both'):
"""全面的异常值分析"""
if method in ['iqr', 'both']:
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
iqr_lower = Q1 - 1.5 * IQR
iqr_upper = Q3 + 1.5 * IQR
else:
iqr_lower, iqr_upper = None, None
if method in ['zscore', 'both']:
z_scores = np.abs(stats.zscore(df[column]))
zscore_outliers = z_scores > 3
else:
zscore_outliers = None
# 异常值统计
result = {
'total_records': len(df),
'column': column,
'mean': df[column].mean(),
'median': df[column].median(),
'std': df[column].std(),
}
if iqr_lower is not None:
iqr_outliers = (df[column] < iqr_lower) | (df[column] > iqr_upper)
result['iqr_outliers'] = iqr_outliers.sum()
result['iqr_percentage'] = iqr_outliers.sum() / len(df) * 100
result['iqr_bounds'] = (iqr_lower, iqr_upper)
if zscore_outliers is not None:
result['zscore_outliers'] = zscore_outliers.sum()
result['zscore_percentage'] = zscore_outliers.sum() / len(df) * 100
return result
# 分析sales和customers的异常值
for col in ['sales', 'customers']:
print(f"\n{'='*50}")
print(f"分析 {col} 字段异常值:")
print('='*50)
analysis_result = analyze_outliers(business_data, col)
for key, value in analysis_result.items():
if isinstance(value, tuple):
print(f"{key}: ({value[0]:.2f}, {value[1]:.2f})")
elif isinstance(value, float):
print(f"{key}: {value:.2f}")
else:
print(f"{key}: {value}")
可视化异常值分布
def plot_outlier_analysis(data, column, handler):
"""可视化异常值分析"""
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 1. 原始箱线图
axes[0, 0].boxplot(data[column])
axes[0, 0].set_title(f'{column} - 原始数据箱线图')
axes[0, 0].set_ylabel('值')
# 2. 处理后的箱线图
cleaned_data = handler.transform(data[[column]], 'clip')
axes[0, 1].boxplot(cleaned_data[column])
axes[0, 1].set_title(f'{column} - 处理后箱线图')
axes[0, 1].set_ylabel('值')
# 3. 直方图对比
axes[0, 2].hist(data[column], bins=30, alpha=0.5, label='原始', density=True)
axes[0, 2].hist(cleaned_data[column], bins=30, alpha=0.5, label='处理后', density=True)
axes[0, 2].set_title(f'{column} - 分布对比')
axes[0, 2].legend()
# 4. 异常值标记
bounds = handler.bounds_[column]
outliers = handler.detect(data, [column])[column]
axes[1, 0].scatter(range(len(data)), data[column], alpha=0.6, label='正常值')
axes[1, 0].scatter(outliers.index, outliers[column],
color='red', s=50, label='异常值', zorder=5)
axes[1, 0].axhline(y=bounds['lower'], color='green', linestyle='--', label='下界')
axes[1, 0].axhline(y=bounds['upper'], color='green', linestyle='--', label='上界')
axes[1, 0].set_title(f'{column} - 异常值标记')
axes[1, 0].legend()
# 5. 时间序列图(如果有日期列)
if 'date' in data.columns:
axes[1, 1].plot(data['date'], data[column], marker='o', linewidth=0.5, markersize=3)
outlier_times = data[data.index.isin(outliers.index)]
axes[1, 1].scatter(outlier_times['date'], outlier_times[column],
color='red', s=50, zorder=5)
axes[1, 1].set_title(f'{column} - 时间序列异常值')
axes[1, 1].tick_params(axis='x', rotation=45)
# 6. 异常值统计
axes[1, 2].axis('off')
stats_text = f"""
异常值统计:
总数: {len(outliers)}
占比: {len(outliers)/len(data)*100:.2f}%
异常值范围:
最小值: {outliers[column].min():.2f}
最大值: {outliers[column].max():.2f}
"""
axes[1, 2].text(0.1, 0.5, stats_text, fontsize=12, verticalalignment='center')
axes[1, 2].set_title('异常值统计信息')
plt.tight_layout()
plt.show()
# 使用可视化
handler = OutlierHandler(method='iqr')
handler.fit(business_data, ['sales', 'customers'])
plot_outlier_analysis(business_data, 'sales', handler)
这些案例覆盖了:
- 基础IQR方法 - 最常用的异常值检测
- Z-Score方法 - 适用于正态分布数据
- 多变量异常值检测 - 跨多个特征检测
- 封装成可复用类 - 面向对象编程
- 实战业务案例 - 模拟真实场景
- 可视化分析 - 直观展示异常值
选择哪种方法取决于你的数据分布、业务场景和处理需求,建议先了解数据特性,再选择合适的异常值处理方法。
标签: 异常值处 理 Python案例