Python工具类封装案例实现?

wen python案例 2

本文目录导读:

  1. 文件操作工具类
  2. 时间工具类
  3. 网络请求工具类
  4. 日志工具类
  5. 数据库工具类
  6. 综合应用:数据处理器
  7. 设计原则与最佳实践

我来提供几个实用的Python工具类封装案例,涵盖常见功能场景。

文件操作工具类

import os
import json
import pickle
from pathlib import Path
from typing import Any, Optional, List
class FileUtils:
    """文件操作工具类"""
    @staticmethod
    def read_file(filepath: str, encoding: str = 'utf-8') -> str:
        """读取文本文件"""
        with open(filepath, 'r', encoding=encoding) as f:
            return f.read()
    @staticmethod
    def write_file(filepath: str, content: str, encoding: str = 'utf-8') -> None:
        """写入文本文件"""
        with open(filepath, 'w', encoding=encoding) as f:
            f.write(content)
    @staticmethod
    def read_json(filepath: str) -> dict:
        """读取JSON文件"""
        with open(filepath, 'r', encoding='utf-8') as f:
            return json.load(f)
    @staticmethod
    def write_json(filepath: str, data: dict, indent: int = 2) -> None:
        """写入JSON文件"""
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=indent)
    @staticmethod
    def get_file_size(filepath: str) -> int:
        """获取文件大小"""
        return os.path.getsize(filepath)
    @staticmethod
    def get_files_by_extension(directory: str, extension: str) -> List[str]:
        """获取指定扩展名的所有文件"""
        return [str(f) for f in Path(directory).glob(f'*.{extension}')]
# 使用示例
if __name__ == "__main__":
    # 写入文件
    FileUtils.write_file('test.txt', 'Hello, World!')
    # 读取文件
    content = FileUtils.read_file('test.txt')
    print(f"文件内容: {content}")
    # 写入JSON
    data = {'name': '张三', 'age': 30}
    FileUtils.write_json('data.json', data)
    # 读取JSON
    loaded_data = FileUtils.read_json('data.json')
    print(f"JSON数据: {loaded_data}")
    # 获取文件大小
    size = FileUtils.get_file_size('test.txt')
    print(f"文件大小: {size} bytes")

时间工具类

from datetime import datetime, timedelta
from typing import Union, Optional
import time
class TimeUtils:
    """时间处理工具类"""
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    DATE_FORMAT = '%Y-%m-%d'
    TIME_FORMAT = '%H:%M:%S'
    @classmethod
    def now(cls, format: str = None) -> Union[str, datetime]:
        """获取当前时间"""
        now = datetime.now()
        if format:
            return now.strftime(format)
        return now
    @classmethod
    def format_time(cls, dt: datetime, format: str = DATETIME_FORMAT) -> str:
        """格式化时间"""
        return dt.strftime(format)
    @classmethod
    def parse_time(cls, time_str: str, format: str = DATETIME_FORMAT) -> datetime:
        """解析时间字符串"""
        return datetime.strptime(time_str, format)
    @classmethod
    def time_difference(cls, start_time: datetime, end_time: datetime) -> dict:
        """计算时间差"""
        diff = end_time - start_time
        return {
            'days': diff.days,
            'seconds': diff.seconds,
            'total_seconds': diff.total_seconds(),
            'hours': diff.total_seconds() // 3600,
            'minutes': diff.total_seconds() // 60
        }
    @classmethod
    def add_days(cls, dt: datetime, days: int) -> datetime:
        """添加天数"""
        return dt + timedelta(days=days)
    @classmethod
    def is_weekend(cls, dt: datetime = None) -> bool:
        """判断是否是周末"""
        if dt is None:
            dt = datetime.now()
        return dt.weekday() >= 5
    @classmethod
    def timestamp_to_datetime(cls, timestamp: float) -> datetime:
        """时间戳转datetime"""
        return datetime.fromtimestamp(timestamp)
    @classmethod
    def datetime_to_timestamp(cls, dt: datetime) -> float:
        """datetime转时间戳"""
        return dt.timestamp()
# 使用示例
if __name__ == "__main__":
    # 获取当前时间
    print(f"当前时间: {TimeUtils.now()}")
    print(f"格式化时间: {TimeUtils.now('%Y-%m-%d %H:%M:%S')}")
    # 格式化时间
    now = datetime.now()
    print(f"格式化: {TimeUtils.format_time(now)}")
    # 解析时间
    parsed = TimeUtils.parse_time('2024-01-15 10:30:00')
    print(f"解析时间: {parsed}")
    # 计算时间差
    start = datetime(2024, 1, 1)
    end = datetime(2024, 1, 15)
    diff = TimeUtils.time_difference(start, end)
    print(f"时间差: {diff['days']}天")
    # 判断周末
    print(f"今天是周末吗: {TimeUtils.is_weekend()}")

网络请求工具类

import requests
from typing import Dict, Any, Optional
import time
from functools import wraps
class NetworkUtils:
    """网络请求工具类"""
    def __init__(self, base_url: str = '', timeout: int = 10, headers: dict = None):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.headers = headers or {}
        self.session = requests.Session()
    def set_headers(self, headers: dict) -> None:
        """设置请求头"""
        self.headers.update(headers)
    def get(self, endpoint: str, params: dict = None) -> dict:
        """发送GET请求"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        try:
            response = self.session.get(
                url,
                params=params,
                headers=self.headers,
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {'error': str(e), 'status_code': getattr(e.response, 'status_code', None)}
    def post(self, endpoint: str, data: dict = None, json: dict = None) -> dict:
        """发送POST请求"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        try:
            response = self.session.post(
                url,
                data=data,
                json=json,
                headers=self.headers,
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {'error': str(e), 'status_code': getattr(e.response, 'status_code', None)}
    def download_file(self, url: str, save_path: str) -> bool:
        """下载文件"""
        try:
            response = self.session.get(url, stream=True, timeout=self.timeout)
            response.raise_for_status()
            with open(save_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            return True
        except Exception as e:
            print(f"下载失败: {e}")
            return False
    def close(self):
        """关闭会话"""
        self.session.close()
# 使用示例
if __name__ == "__main__":
    # 创建网络工具实例
    net = NetworkUtils(base_url='https://api.github.com')
    # 设置请求头
    net.set_headers({'User-Agent': 'Mozilla/5.0'})
    # 发送GET请求
    result = net.get('/users/octocat')
    print(f"用户信息: {result.get('login', '获取失败')}")
    # 关闭会话
    net.close()

日志工具类

import logging
import os
from datetime import datetime
from typing import Optional
class Logger:
    """日志工具类"""
    def __init__(self, name: str = 'app', 
                 log_level: int = logging.DEBUG,
                 log_file: Optional[str] = None):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(log_level)
        self.logger.handlers.clear()
        # 创建格式化器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)
        # 文件处理器(可选)
        if log_file:
            log_dir = os.path.dirname(log_file)
            if log_dir and not os.path.exists(log_dir):
                os.makedirs(log_dir)
            file_handler = logging.FileHandler(log_file, encoding='utf-8')
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)
    def debug(self, message: str):
        self.logger.debug(message)
    def info(self, message: str):
        self.logger.info(message)
    def warning(self, message: str):
        self.logger.warning(message)
    def error(self, message: str, exc_info: bool = False):
        self.logger.error(message, exc_info=exc_info)
    def critical(self, message: str):
        self.logger.critical(message)
# 使用示例
if __name__ == "__main__":
    # 创建日志实例
    logger = Logger('myapp', log_level=logging.INFO, log_file='app.log')
    # 记录日志
    logger.info("应用启动")
    logger.debug("这是一条调试信息")
    logger.warning("这是一条警告")
    try:
        result = 10 / 0
    except Exception as e:
        logger.error(f"发生错误: {e}", exc_info=True)

数据库工具类

import sqlite3
from contextlib import contextmanager
from typing import Any, List, Tuple, Optional
import pymysql
class DatabaseUtils:
    """数据库操作工具类"""
    def __init__(self, db_type: str = 'sqlite', **kwargs):
        self.db_type = db_type
        self.connection_params = kwargs
        self.connection = None
    def connect(self):
        """建立数据库连接"""
        if self.db_type == 'sqlite':
            db_path = self.connection_params.get('path', 'database.db')
            self.connection = sqlite3.connect(db_path)
        elif self.db_type == 'mysql':
            self.connection = pymysql.connect(
                host=self.connection_params.get('host', 'localhost'),
                user=self.connection_params.get('user'),
                password=self.connection_params.get('password'),
                database=self.connection_params.get('database'),
                port=self.connection_params.get('port', 3306)
            )
        return self.connection
    def close(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
    @contextmanager
    def get_cursor(self):
        """获取数据库游标(上下文管理器)"""
        cursor = self.connection.cursor()
        try:
            yield cursor
            self.connection.commit()
        except Exception as e:
            self.connection.rollback()
            raise e
        finally:
            cursor.close()
    def execute_query(self, query: str, params: Tuple = None) -> List[Tuple]:
        """执行查询"""
        with self.get_cursor() as cursor:
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
            return cursor.fetchall()
    def execute_update(self, query: str, params: Tuple = None) -> int:
        """执行更新操作"""
        with self.get_cursor() as cursor:
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
            return cursor.rowcount
    def insert(self, table: str, data: dict) -> int:
        """插入数据"""
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['%s'] * len(data))
        query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
        with self.get_cursor() as cursor:
            cursor.execute(query, tuple(data.values()))
            return cursor.lastrowid
    def update(self, table: str, data: dict, condition: str) -> int:
        """更新数据"""
        set_clause = ', '.join([f"{key}=%s" for key in data.keys()])
        query = f"UPDATE {table} SET {set_clause} WHERE {condition}"
        with self.get_cursor() as cursor:
            cursor.execute(query, tuple(data.values()))
            return cursor.rowcount
# 使用示例
if __name__ == "__main__":
    # 创建数据库工具实例(SQLite)
    db = DatabaseUtils('sqlite', path='test.db')
    try:
        # 连接数据库
        db.connect()
        # 创建表
        db.execute_update("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                age INTEGER,
                email TEXT
            )
        """)
        # 插入数据
        user_id = db.insert('users', {
            'name': '李四',
            'age': 28,
            'email': 'lisi@example.com'
        })
        print(f"插入用户ID: {user_id}")
        # 查询数据
        users = db.execute_query("SELECT * FROM users")
        print(f"所有用户: {users}")
        # 更新数据
        affected = db.update('users', 
                            {'age': 29}, 
                            "name='李四'")
        print(f"更新了 {affected} 条记录")
    finally:
        db.close()

综合应用:数据处理器

import re
from typing import List, Dict, Any, Optional
import hashlib
import base64
class DataProcessor:
    """数据处理工具类"""
    @staticmethod
    def validate_email(email: str) -> bool:
        """验证邮箱格式"""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return re.match(pattern, email) is not None
    @staticmethod
    def validate_phone(phone: str) -> bool:
        """验证手机号"""
        pattern = r'^1[3-9]\d{9}$'
        return re.match(pattern, phone) is not None
    @staticmethod
    def mask_sensitive_data(data: str, 
                           mask_char: str = '*', 
                           visible_start: int = 3,
                           visible_end: int = 4) -> str:
        """脱敏处理"""
        if len(data) <= visible_start + visible_end:
            return data[:visible_start] + mask_char * 3
        return (data[:visible_start] + 
                mask_char * (len(data) - visible_start - visible_end) + 
                data[-visible_end:])
    @staticmethod
    def encrypt_md5(text: str) -> str:
        """MD5加密"""
        return hashlib.md5(text.encode()).hexdigest()
    @staticmethod
    def encrypt_sha256(text: str) -> str:
        """SHA256加密"""
        return hashlib.sha256(text.encode()).hexdigest()
    @staticmethod
    def base64_encode(text: str) -> str:
        """Base64编码"""
        return base64.b64encode(text.encode()).decode()
    @staticmethod
    def base64_decode(encoded_text: str) -> str:
        """Base64解码"""
        return base64.b64decode(encoded_text).decode()
    @staticmethod
    def clean_text(text: str) -> str:
        """清洗文本"""
        # 去除多余空白
        text = re.sub(r'\s+', ' ', text)
        # 去除特殊字符
        text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)
        return text.strip()
    @staticmethod
    def chunk_list(data: List, chunk_size: int) -> List[List]:
        """将列表分块"""
        return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# 使用示例
if __name__ == "__main__":
    # 验证邮箱
    print(f"邮箱验证: {DataProcessor.validate_email('test@example.com')}")
    # 手机号脱敏
    phone = '13800138000'
    masked_phone = DataProcessor.mask_sensitive_data(phone)
    print(f"脱敏手机号: {masked_phone}")
    # 加密
    password = 'mypassword123'
    print(f"MD5: {DataProcessor.encrypt_md5(password)}")
    print(f"SHA256: {DataProcessor.encrypt_sha256(password)}")
    # Base64编码解码
    encoded = DataProcessor.base64_encode('Hello, 世界!')
    print(f"Base64编码: {encoded}")
    print(f"Base64解码: {DataProcessor.base64_decode(encoded)}")
    # 分块处理
    data = list(range(10))
    chunks = DataProcessor.chunk_list(data, 3)
    print(f"分块结果: {chunks}")

设计原则与最佳实践

  1. 单一职责:每个工具类只负责特定领域的功能
  2. 静态方法:工具类方法通常使用 @staticmethod@classmethod
  3. 类型提示:使用类型注解提高代码可读性
  4. 错误处理:适当添加异常处理
  5. 文档字符串:为类和方法添加文档说明
  6. 可配置性:使用参数化配置而不是硬编码
  7. 测试:使用 if __name__ == "__main__" 提供测试示例

这些工具类可以在实际项目中直接使用或根据需要修改扩展。

标签: 封装

抱歉,评论功能暂时关闭!