本文目录导读:
我来提供几个实用的Python工具类封装案例,涵盖常见功能场景。
文件操作工具类
import os
import json
import pickle
from pathlib import Path
from typing import Any, Optional, List
class FileUtils:
"""文件操作工具类"""
@staticmethod
def read_file(filepath: str, encoding: str = 'utf-8') -> str:
"""读取文本文件"""
with open(filepath, 'r', encoding=encoding) as f:
return f.read()
@staticmethod
def write_file(filepath: str, content: str, encoding: str = 'utf-8') -> None:
"""写入文本文件"""
with open(filepath, 'w', encoding=encoding) as f:
f.write(content)
@staticmethod
def read_json(filepath: str) -> dict:
"""读取JSON文件"""
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
@staticmethod
def write_json(filepath: str, data: dict, indent: int = 2) -> None:
"""写入JSON文件"""
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
@staticmethod
def get_file_size(filepath: str) -> int:
"""获取文件大小"""
return os.path.getsize(filepath)
@staticmethod
def get_files_by_extension(directory: str, extension: str) -> List[str]:
"""获取指定扩展名的所有文件"""
return [str(f) for f in Path(directory).glob(f'*.{extension}')]
# 使用示例
if __name__ == "__main__":
# 写入文件
FileUtils.write_file('test.txt', 'Hello, World!')
# 读取文件
content = FileUtils.read_file('test.txt')
print(f"文件内容: {content}")
# 写入JSON
data = {'name': '张三', 'age': 30}
FileUtils.write_json('data.json', data)
# 读取JSON
loaded_data = FileUtils.read_json('data.json')
print(f"JSON数据: {loaded_data}")
# 获取文件大小
size = FileUtils.get_file_size('test.txt')
print(f"文件大小: {size} bytes")
时间工具类
from datetime import datetime, timedelta
from typing import Union, Optional
import time
class TimeUtils:
"""时间处理工具类"""
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_FORMAT = '%Y-%m-%d'
TIME_FORMAT = '%H:%M:%S'
@classmethod
def now(cls, format: str = None) -> Union[str, datetime]:
"""获取当前时间"""
now = datetime.now()
if format:
return now.strftime(format)
return now
@classmethod
def format_time(cls, dt: datetime, format: str = DATETIME_FORMAT) -> str:
"""格式化时间"""
return dt.strftime(format)
@classmethod
def parse_time(cls, time_str: str, format: str = DATETIME_FORMAT) -> datetime:
"""解析时间字符串"""
return datetime.strptime(time_str, format)
@classmethod
def time_difference(cls, start_time: datetime, end_time: datetime) -> dict:
"""计算时间差"""
diff = end_time - start_time
return {
'days': diff.days,
'seconds': diff.seconds,
'total_seconds': diff.total_seconds(),
'hours': diff.total_seconds() // 3600,
'minutes': diff.total_seconds() // 60
}
@classmethod
def add_days(cls, dt: datetime, days: int) -> datetime:
"""添加天数"""
return dt + timedelta(days=days)
@classmethod
def is_weekend(cls, dt: datetime = None) -> bool:
"""判断是否是周末"""
if dt is None:
dt = datetime.now()
return dt.weekday() >= 5
@classmethod
def timestamp_to_datetime(cls, timestamp: float) -> datetime:
"""时间戳转datetime"""
return datetime.fromtimestamp(timestamp)
@classmethod
def datetime_to_timestamp(cls, dt: datetime) -> float:
"""datetime转时间戳"""
return dt.timestamp()
# 使用示例
if __name__ == "__main__":
# 获取当前时间
print(f"当前时间: {TimeUtils.now()}")
print(f"格式化时间: {TimeUtils.now('%Y-%m-%d %H:%M:%S')}")
# 格式化时间
now = datetime.now()
print(f"格式化: {TimeUtils.format_time(now)}")
# 解析时间
parsed = TimeUtils.parse_time('2024-01-15 10:30:00')
print(f"解析时间: {parsed}")
# 计算时间差
start = datetime(2024, 1, 1)
end = datetime(2024, 1, 15)
diff = TimeUtils.time_difference(start, end)
print(f"时间差: {diff['days']}天")
# 判断周末
print(f"今天是周末吗: {TimeUtils.is_weekend()}")
网络请求工具类
import requests
from typing import Dict, Any, Optional
import time
from functools import wraps
class NetworkUtils:
"""网络请求工具类"""
def __init__(self, base_url: str = '', timeout: int = 10, headers: dict = None):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.headers = headers or {}
self.session = requests.Session()
def set_headers(self, headers: dict) -> None:
"""设置请求头"""
self.headers.update(headers)
def get(self, endpoint: str, params: dict = None) -> dict:
"""发送GET请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
response = self.session.get(
url,
params=params,
headers=self.headers,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {'error': str(e), 'status_code': getattr(e.response, 'status_code', None)}
def post(self, endpoint: str, data: dict = None, json: dict = None) -> dict:
"""发送POST请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
response = self.session.post(
url,
data=data,
json=json,
headers=self.headers,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {'error': str(e), 'status_code': getattr(e.response, 'status_code', None)}
def download_file(self, url: str, save_path: str) -> bool:
"""下载文件"""
try:
response = self.session.get(url, stream=True, timeout=self.timeout)
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True
except Exception as e:
print(f"下载失败: {e}")
return False
def close(self):
"""关闭会话"""
self.session.close()
# 使用示例
if __name__ == "__main__":
# 创建网络工具实例
net = NetworkUtils(base_url='https://api.github.com')
# 设置请求头
net.set_headers({'User-Agent': 'Mozilla/5.0'})
# 发送GET请求
result = net.get('/users/octocat')
print(f"用户信息: {result.get('login', '获取失败')}")
# 关闭会话
net.close()
日志工具类
import logging
import os
from datetime import datetime
from typing import Optional
class Logger:
"""日志工具类"""
def __init__(self, name: str = 'app',
log_level: int = logging.DEBUG,
log_file: Optional[str] = None):
self.logger = logging.getLogger(name)
self.logger.setLevel(log_level)
self.logger.handlers.clear()
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# 文件处理器(可选)
if log_file:
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def debug(self, message: str):
self.logger.debug(message)
def info(self, message: str):
self.logger.info(message)
def warning(self, message: str):
self.logger.warning(message)
def error(self, message: str, exc_info: bool = False):
self.logger.error(message, exc_info=exc_info)
def critical(self, message: str):
self.logger.critical(message)
# 使用示例
if __name__ == "__main__":
# 创建日志实例
logger = Logger('myapp', log_level=logging.INFO, log_file='app.log')
# 记录日志
logger.info("应用启动")
logger.debug("这是一条调试信息")
logger.warning("这是一条警告")
try:
result = 10 / 0
except Exception as e:
logger.error(f"发生错误: {e}", exc_info=True)
数据库工具类
import sqlite3
from contextlib import contextmanager
from typing import Any, List, Tuple, Optional
import pymysql
class DatabaseUtils:
"""数据库操作工具类"""
def __init__(self, db_type: str = 'sqlite', **kwargs):
self.db_type = db_type
self.connection_params = kwargs
self.connection = None
def connect(self):
"""建立数据库连接"""
if self.db_type == 'sqlite':
db_path = self.connection_params.get('path', 'database.db')
self.connection = sqlite3.connect(db_path)
elif self.db_type == 'mysql':
self.connection = pymysql.connect(
host=self.connection_params.get('host', 'localhost'),
user=self.connection_params.get('user'),
password=self.connection_params.get('password'),
database=self.connection_params.get('database'),
port=self.connection_params.get('port', 3306)
)
return self.connection
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
@contextmanager
def get_cursor(self):
"""获取数据库游标(上下文管理器)"""
cursor = self.connection.cursor()
try:
yield cursor
self.connection.commit()
except Exception as e:
self.connection.rollback()
raise e
finally:
cursor.close()
def execute_query(self, query: str, params: Tuple = None) -> List[Tuple]:
"""执行查询"""
with self.get_cursor() as cursor:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
def execute_update(self, query: str, params: Tuple = None) -> int:
"""执行更新操作"""
with self.get_cursor() as cursor:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.rowcount
def insert(self, table: str, data: dict) -> int:
"""插入数据"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
with self.get_cursor() as cursor:
cursor.execute(query, tuple(data.values()))
return cursor.lastrowid
def update(self, table: str, data: dict, condition: str) -> int:
"""更新数据"""
set_clause = ', '.join([f"{key}=%s" for key in data.keys()])
query = f"UPDATE {table} SET {set_clause} WHERE {condition}"
with self.get_cursor() as cursor:
cursor.execute(query, tuple(data.values()))
return cursor.rowcount
# 使用示例
if __name__ == "__main__":
# 创建数据库工具实例(SQLite)
db = DatabaseUtils('sqlite', path='test.db')
try:
# 连接数据库
db.connect()
# 创建表
db.execute_update("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
email TEXT
)
""")
# 插入数据
user_id = db.insert('users', {
'name': '李四',
'age': 28,
'email': 'lisi@example.com'
})
print(f"插入用户ID: {user_id}")
# 查询数据
users = db.execute_query("SELECT * FROM users")
print(f"所有用户: {users}")
# 更新数据
affected = db.update('users',
{'age': 29},
"name='李四'")
print(f"更新了 {affected} 条记录")
finally:
db.close()
综合应用:数据处理器
import re
from typing import List, Dict, Any, Optional
import hashlib
import base64
class DataProcessor:
"""数据处理工具类"""
@staticmethod
def validate_email(email: str) -> bool:
"""验证邮箱格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
@staticmethod
def validate_phone(phone: str) -> bool:
"""验证手机号"""
pattern = r'^1[3-9]\d{9}$'
return re.match(pattern, phone) is not None
@staticmethod
def mask_sensitive_data(data: str,
mask_char: str = '*',
visible_start: int = 3,
visible_end: int = 4) -> str:
"""脱敏处理"""
if len(data) <= visible_start + visible_end:
return data[:visible_start] + mask_char * 3
return (data[:visible_start] +
mask_char * (len(data) - visible_start - visible_end) +
data[-visible_end:])
@staticmethod
def encrypt_md5(text: str) -> str:
"""MD5加密"""
return hashlib.md5(text.encode()).hexdigest()
@staticmethod
def encrypt_sha256(text: str) -> str:
"""SHA256加密"""
return hashlib.sha256(text.encode()).hexdigest()
@staticmethod
def base64_encode(text: str) -> str:
"""Base64编码"""
return base64.b64encode(text.encode()).decode()
@staticmethod
def base64_decode(encoded_text: str) -> str:
"""Base64解码"""
return base64.b64decode(encoded_text).decode()
@staticmethod
def clean_text(text: str) -> str:
"""清洗文本"""
# 去除多余空白
text = re.sub(r'\s+', ' ', text)
# 去除特殊字符
text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)
return text.strip()
@staticmethod
def chunk_list(data: List, chunk_size: int) -> List[List]:
"""将列表分块"""
return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# 使用示例
if __name__ == "__main__":
# 验证邮箱
print(f"邮箱验证: {DataProcessor.validate_email('test@example.com')}")
# 手机号脱敏
phone = '13800138000'
masked_phone = DataProcessor.mask_sensitive_data(phone)
print(f"脱敏手机号: {masked_phone}")
# 加密
password = 'mypassword123'
print(f"MD5: {DataProcessor.encrypt_md5(password)}")
print(f"SHA256: {DataProcessor.encrypt_sha256(password)}")
# Base64编码解码
encoded = DataProcessor.base64_encode('Hello, 世界!')
print(f"Base64编码: {encoded}")
print(f"Base64解码: {DataProcessor.base64_decode(encoded)}")
# 分块处理
data = list(range(10))
chunks = DataProcessor.chunk_list(data, 3)
print(f"分块结果: {chunks}")
设计原则与最佳实践
- 单一职责:每个工具类只负责特定领域的功能
- 静态方法:工具类方法通常使用
@staticmethod或@classmethod - 类型提示:使用类型注解提高代码可读性
- 错误处理:适当添加异常处理
- 文档字符串:为类和方法添加文档说明
- 可配置性:使用参数化配置而不是硬编码
- 测试:使用
if __name__ == "__main__"提供测试示例
这些工具类可以在实际项目中直接使用或根据需要修改扩展。
标签: 封装