Python数值预测案例怎么写?

wen python案例 2

本文目录导读:

  1. 简单线性回归案例(房价预测)
  2. 时间序列预测(ARIMA模型)
  3. 多特征预测(随机森林)
  4. 神经网络预测(使用TensorFlow/Keras)
  5. 完整的预测流程模板
  6. 安装所需库

我来给你一个完整的Python数值预测案例,从简单到复杂逐步展示:

简单线性回归案例(房价预测)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
# 1. 生成示例数据
np.random.seed(42)
area = np.random.uniform(50, 200, 100)  # 房屋面积(平米)
price = 2.5 * area + np.random.normal(0, 30, 100)  # 房价(万元)
# 2. 创建DataFrame
df = pd.DataFrame({'area': area, 'price': price})
# 3. 数据分割
X = df[['area']]
y = df['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 4. 训练模型
model = LinearRegression()
model.fit(X_train, y_train)
# 5. 预测
y_pred = model.predict(X_test)
# 6. 评估
print(f"系数: {model.coef_[0]:.2f}")
print(f"截距: {model.intercept_:.2f}")
print(f"MSE: {mean_squared_error(y_test, y_pred):.2f}")
print(f"R2: {r2_score(y_test, y_pred):.2f}")
# 7. 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_test, y_test, color='blue', label='实际价格')
plt.plot(X_test, y_pred, color='red', linewidth=2, label='预测价格')
plt.xlabel('面积(平米)')
plt.ylabel('价格(万元)')'房价预测 - 线性回归')
plt.legend()
plt.grid(True)
plt.show()

时间序列预测(ARIMA模型)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_absolute_error
# 1. 生成时间序列数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', periods=365, freq='D')
trend = np.linspace(0, 10, 365)  # 趋势
seasonal = 5 * np.sin(2 * np.pi * np.arange(365) / 365)  # 季节性
noise = np.random.normal(0, 1, 365)
data = trend + seasonal + noise
# 2. 创建DataFrame
df = pd.DataFrame({'date': dates, 'value': data})
df.set_index('date', inplace=True)
# 3. 数据分割
train_size = int(len(df) * 0.8)
train, test = df.iloc[:train_size], df.iloc[train_size:]
# 4. 训练ARIMA模型
model = ARIMA(train, order=(5,1,0))
model_fit = model.fit()
# 5. 预测
predictions = model_fit.forecast(steps=len(test))
# 6. 评估
mae = mean_absolute_error(test, predictions)
print(f"MAE: {mae:.2f}")
# 7. 可视化
plt.figure(figsize=(12, 6))
plt.plot(train.index, train, label='训练数据')
plt.plot(test.index, test, label='实际值')
plt.plot(test.index, predictions, label='预测值', color='red')
plt.xlabel('日期')
plt.ylabel('数值')'时间序列预测 - ARIMA')
plt.legend()
plt.grid(True)
plt.show()

多特征预测(随机森林)

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
# 1. 创建更复杂的数据集
np.random.seed(42)
n_samples = 1000
# 特征
X = pd.DataFrame({
    'feature1': np.random.normal(0, 1, n_samples),
    'feature2': np.random.uniform(0, 10, n_samples),
    'feature3': np.random.exponential(1, n_samples),
    'feature4': np.random.beta(2, 5, n_samples),
    'feature5': np.random.poisson(5, n_samples)
})
# 目标变量(带非线性关系)
y = (2 * X['feature1'] + 
     0.5 * X['feature2']**2 + 
     3 * np.log(X['feature3'] + 1) + 
     1 * X['feature4'] * X['feature5'] + 
     np.random.normal(0, 0.5, n_samples))
# 2. 数据预处理
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 3. 分割数据
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42
)
# 4. 训练随机森林模型
rf_model = RandomForestRegressor(
    n_estimators=100,
    max_depth=10,
    random_state=42
)
rf_model.fit(X_train, y_train)
# 5. 预测
y_pred = rf_model.predict(X_test)
# 6. 评估
mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
print(f"MAE: {mae:.2f}")
print(f"MSE: {mse:.2f}")
print(f"RMSE: {rmse:.2f}")
# 7. 特征重要性
feature_importance = pd.DataFrame({
    'feature': X.columns,
    'importance': rf_model.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特征重要性:")
print(feature_importance)
# 8. 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 预测值 vs 实际值
axes[0].scatter(y_test, y_pred, alpha=0.5)
axes[0].plot([y_test.min(), y_test.max()], 
             [y_test.min(), y_test.max()], 
             'r--', lw=2)
axes[0].set_xlabel('实际值')
axes[0].set_ylabel('预测值')
axes[0].set_title('随机森林预测效果')
axes[0].grid(True)
# 特征重要性
axes[1].barh(feature_importance['feature'], feature_importance['importance'])
axes[1].set_xlabel('重要性')
axes[1].set_title('特征重要性分析')
plt.tight_layout()
plt.show()

神经网络预测(使用TensorFlow/Keras)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 1. 生成复杂非线性数据
np.random.seed(42)
n_samples = 2000
X = np.random.uniform(-3, 3, (n_samples, 3))
y = (np.sin(X[:, 0]) + 
     0.5 * X[:, 1]**2 + 
     np.exp(-X[:, 2]**2 / 2) + 
     np.random.normal(0, 0.1, n_samples))
# 2. 数据预处理
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
X_scaled = scaler_X.fit_transform(X)
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1)).flatten()
# 3. 分割数据
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y_scaled, test_size=0.2, random_state=42
)
# 4. 构建神经网络
model = keras.Sequential([
    layers.Dense(64, activation='relu', input_shape=(3,)),
    layers.Dropout(0.2),
    layers.Dense(64, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(32, activation='relu'),
    layers.Dense(1)
])
# 5. 编译模型
model.compile(
    optimizer='adam',
    loss='mse',
    metrics=['mae']
)
# 6. 训练模型
history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=32,
    validation_split=0.2,
    verbose=0
)
# 7. 预测
y_pred_scaled = model.predict(X_test).flatten()
y_pred = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
y_test_actual = scaler_y.inverse_transform(y_test.reshape(-1, 1)).flatten()
# 8. 评估
from sklearn.metrics import mean_absolute_error, mean_squared_error
mae = mean_absolute_error(y_test_actual, y_pred)
rmse = np.sqrt(mean_squared_error(y_test_actual, y_pred))
print(f"MAE: {mae:.4f}")
print(f"RMSE: {rmse:.4f}")
# 9. 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 训练历史
axes[0].plot(history.history['loss'], label='训练损失')
axes[0].plot(history.history['val_loss'], label='验证损失')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('损失')
axes[0].set_title('模型训练历史')
axes[0].legend()
axes[0].grid(True)
# 预测效果
axes[1].scatter(y_test_actual, y_pred, alpha=0.5)
axes[1].plot([y_test_actual.min(), y_test_actual.max()], 
             [y_test_actual.min(), y_test_actual.max()], 
             'r--', lw=2)
axes[1].set_xlabel('实际值')
axes[1].set_ylabel('预测值')
axes[1].set_title('神经网络预测效果')
axes[1].grid(True)
plt.tight_layout()
plt.show()

完整的预测流程模板

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
import warnings
warnings.filterwarnings('ignore')
class PredictionPipeline:
    def __init__(self, data=None):
        self.data = data
        self.models = {}
        self.best_model = None
        self.scaler = StandardScaler()
    def load_data(self, filepath):
        """加载数据"""
        self.data = pd.read_csv(filepath)
        return self
    def explore_data(self):
        """数据探索"""
        print("=== 数据基本信息 ===")
        print(self.data.info())
        print("\n=== 描述统计 ===")
        print(self.data.describe())
        print("\n=== 缺失值统计 ===")
        print(self.data.isnull().sum())
        # 可视化
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        # 分布图
        if 'price' in self.data.columns:
            self.data['price'].hist(ax=axes[0, 0])
            axes[0, 0].set_title('目标变量分布')
        # 相关性热图
        numeric_cols = self.data.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 1:
            corr = self.data[numeric_cols].corr()
            sns.heatmap(corr, annot=True, fmt='.2f', ax=axes[0, 1])
            axes[0, 1].set_title('特征相关性')
        # 箱线图
        if len(numeric_cols) > 0:
            self.data[numeric_cols].boxplot(ax=axes[1, 0])
            axes[1, 0].set_title('数值特征箱线图')
            axes[1, 0].tick_params(axis='x', rotation=45)
        plt.tight_layout()
        plt.show()
        return self
    def preprocess_data(self, target_col, feature_cols=None):
        """数据预处理"""
        if feature_cols is None:
            feature_cols = [col for col in self.data.columns 
                           if col != target_col and self.data[col].dtype in ['int64', 'float64']]
        # 处理缺失值
        self.data = self.data.dropna(subset=[target_col] + feature_cols)
        # 分离特征和目标
        X = self.data[feature_cols]
        y = self.data[target_col]
        # 标准化
        X_scaled = self.scaler.fit_transform(X)
        # 分割数据
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X_scaled, y, test_size=0.2, random_state=42
        )
        self.feature_names = feature_cols
        return self
    def train_models(self):
        """训练多个模型"""
        models = {
            'Linear Regression': LinearRegression(),
            'Ridge': Ridge(alpha=1.0),
            'Lasso': Lasso(alpha=0.1),
            'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
            'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42)
        }
        for name, model in models.items():
            model.fit(self.X_train, self.y_train)
            y_pred = model.predict(self.X_test)
            mse = mean_squared_error(self.y_test, y_pred)
            mae = mean_absolute_error(self.y_test, y_pred)
            r2 = r2_score(self.y_test, y_pred)
            self.models[name] = {
                'model': model,
                'mse': mse,
                'mae': mae,
                'r2': r2,
                'predictions': y_pred
            }
            print(f"{name}:")
            print(f"  MSE: {mse:.2f}")
            print(f"  MAE: {mae:.2f}")
            print(f"  R2:  {r2:.2f}")
            print()
        return self
    def select_best_model(self, metric='r2'):
        """选择最佳模型"""
        best_score = -np.inf if metric == 'r2' else np.inf
        best_model_name = None
        for name, model_info in self.models.items():
            score = model_info[metric]
            if metric == 'r2':
                if score > best_score:
                    best_score = score
                    best_model_name = name
            else:
                if score < best_score:
                    best_score = score
                    best_model_name = name
        self.best_model = self.models[best_model_name]['model']
        print(f"最佳模型: {best_model_name}")
        print(f"{metric} 分数: {best_score:.2f}")
        return self
    def feature_importance_analysis(self):
        """特征重要性分析"""
        if hasattr(self.best_model, 'feature_importances_'):
            importances = self.best_model.feature_importances_
            indices = np.argsort(importances)[::-1]
            plt.figure(figsize=(10, 6))
            plt.title("特征重要性")
            plt.bar(range(len(importances)), importances[indices])
            plt.xticks(range(len(importances)), 
                      [self.feature_names[i] for i in indices], 
                      rotation=45)
            plt.tight_layout()
            plt.show()
        return self
    def make_predictions(self, new_data):
        """对新数据进行预测"""
        new_data_scaled = self.scaler.transform(new_data)
        predictions = self.best_model.predict(new_data_scaled)
        return predictions
    def save_model(self, filepath):
        """保存模型"""
        import joblib
        joblib.dump({
            'model': self.best_model,
            'scaler': self.scaler,
            'feature_names': self.feature_names
        }, filepath)
        print(f"模型已保存到 {filepath}")
        return self
    def load_model(self, filepath):
        """加载模型"""
        import joblib
        saved_objects = joblib.load(filepath)
        self.best_model = saved_objects['model']
        self.scaler = saved_objects['scaler']
        self.feature_names = saved_objects['feature_names']
        print(f"模型已从 {filepath} 加载")
        return self
# 使用示例
if __name__ == "__main__":
    # 创建示例数据
    np.random.seed(42)
    n_samples = 500
    sample_data = pd.DataFrame({
        'area': np.random.uniform(50, 200, n_samples),
        'bedrooms': np.random.randint(1, 5, n_samples),
        'age': np.random.uniform(0, 50, n_samples),
        'distance_center': np.random.uniform(0, 20, n_samples),
        'price': (2.5 * np.random.uniform(50, 200, n_samples) + 
                 10 * np.random.randint(1, 5, n_samples) + 
                 -1 * np.random.uniform(0, 50, n_samples) + 
                 -5 * np.random.uniform(0, 20, n_samples) + 
                 np.random.normal(0, 30, n_samples))
    })
    # 保存示例数据
    sample_data.to_csv('house_prices.csv', index=False)
    # 使用Pipeline
    pipeline = PredictionPipeline()
    pipeline.load_data('house_prices.csv')
    pipeline.explore_data()
    pipeline.preprocess_data(target_col='price')
    pipeline.train_models()
    pipeline.select_best_model(metric='r2')
    pipeline.feature_importance_analysis()
    pipeline.save_model('best_model.pkl')

安装所需库

pip install pandas numpy matplotlib seaborn scikit-learn statsmodels tensorflow joblib

这些案例涵盖了:

  1. 简单线性回归 - 入门级预测
  2. 时间序列预测 - ARIMA模型
  3. 多特征预测 - 随机森林
  4. 神经网络预测 - 深度学习
  5. 完整预测流程 - 生产级Pipeline

你可以根据实际需求选择适合的案例进行修改和使用!

标签: 数值预测

抱歉,评论功能暂时关闭!