本文目录导读:
我来给你编写几个Python缓存清理的实用案例:
使用 functools.lru_cache 清理缓存
from functools import lru_cache
import time
@lru_cache(maxsize=128)
def expensive_function(n):
"""模拟一个耗时计算函数"""
print(f"计算 {n}...")
time.sleep(1)
return n * n
# 测试缓存
print("第一次调用:")
print(expensive_function(5)) # 计算
print("\n第二次调用(使用缓存):")
print(expensive_function(5)) # 直接从缓存返回
# 清理所有缓存
expensive_function.cache_clear()
print("\n缓存已清理")
print("\n第三次调用(重新计算):")
print(expensive_function(5)) # 重新计算
# 查看缓存信息
print(f"\n缓存信息: {expensive_function.cache_info()}")
手动实现简单缓存系统
import time
from datetime import datetime, timedelta
class SimpleCache:
def __init__(self, timeout=60):
self.cache = {}
self.timeout = timeout # 缓存过期时间(秒)
def get(self, key):
"""获取缓存值"""
if key in self.cache:
value, timestamp = self.cache[key]
# 检查是否过期
if datetime.now() - timestamp < timedelta(seconds=self.timeout):
print(f"命中缓存: {key}")
return value
else:
print(f"缓存过期: {key}")
self.delete(key)
return None
def set(self, key, value):
"""设置缓存"""
self.cache[key] = (value, datetime.now())
print(f"设置缓存: {key} = {value}")
def delete(self, key):
"""删除单个缓存"""
if key in self.cache:
del self.cache[key]
print(f"删除缓存: {key}")
def clear(self):
"""清理所有缓存"""
self.cache.clear()
print("所有缓存已清理")
def clean_expired(self):
"""清理过期缓存"""
now = datetime.now()
expired_keys = []
for key, (value, timestamp) in self.cache.items():
if now - timestamp > timedelta(seconds=self.timeout):
expired_keys.append(key)
for key in expired_keys:
del self.cache[key]
print(f"清理了 {len(expired_keys)} 个过期缓存")
# 使用示例
cache = SimpleCache(timeout=5)
# 设置缓存
cache.set("user_1", {"name": "Alice", "age": 30})
cache.set("user_2", {"name": "Bob", "age": 25})
# 获取缓存
print(cache.get("user_1")) # 命中
print(cache.get("user_3")) # 未命中
# 等缓存过期
print("\n等待6秒...")
time.sleep(6)
print(cache.get("user_1")) # 过期,返回None
# 手动清理
cache.clear()
使用 cachetools 库(需要安装)
# 首先安装:pip install cachetools
from cachetools import cached, TTLCache, LRUCache
import time
# 基于时间的缓存(TTL)
@cached(cache=TTLCache(maxsize=100, ttl=5))
def get_user_info(user_id):
"""模拟获取用户信息"""
print(f"从数据库获取用户 {user_id} 的信息...")
time.sleep(1)
return {
"id": user_id,
"name": f"User_{user_id}",
"timestamp": time.time()
}
# 基于LRU的缓存
lru_cache = LRUCache(maxsize=3)
@cached(cache=lru_cache)
def expensive_operation(x, y):
print(f"执行耗时操作: {x} + {y}")
time.sleep(0.5)
return x + y
# 测试
print("=== TTL缓存测试 ===")
for i in range(3):
print(get_user_info(1)) # 第一次会重新计算
print("\n等待6秒...")
time.sleep(6)
print(get_user_info(1)) # 缓存过期,重新计算
print("\n=== LRU缓存测试 ===")
for i in range(4):
print(f"expensive_operation({i}, {i+1}): {expensive_operation(i, i+1)}")
文件缓存清理
import os
import json
import pickle
import time
from pathlib import Path
class FileCache:
def __init__(self, cache_dir="cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_path(self, key):
"""获取缓存文件路径"""
safe_key = str(hash(key))
return self.cache_dir / f"{safe_key}.cache"
def set(self, key, value, expiry=3600):
"""设置缓存到文件"""
cache_path = self._get_cache_path(key)
cache_data = {
"value": value,
"expiry": time.time() + expiry
}
with open(cache_path, 'wb') as f:
pickle.dump(cache_data, f)
print(f"缓存保存: {key}")
def get(self, key):
"""从文件读取缓存"""
cache_path = self._get_cache_path(key)
if not cache_path.exists():
return None
try:
with open(cache_path, 'rb') as f:
cache_data = pickle.load(f)
# 检查是否过期
if time.time() > cache_data["expiry"]:
print(f"缓存过期: {key}")
self.delete(key)
return None
print(f"命中缓存: {key}")
return cache_data["value"]
except:
return None
def delete(self, key):
"""删除单个缓存文件"""
cache_path = self._get_cache_path(key)
if cache_path.exists():
cache_path.unlink()
print(f"删除缓存: {key}")
def clear(self):
"""清理所有缓存文件"""
for cache_file in self.cache_dir.glob("*.cache"):
cache_file.unlink()
print(f"清理了所有缓存文件")
def clean_expired(self):
"""清理过期缓存文件"""
count = 0
for cache_file in self.cache_dir.glob("*.cache"):
try:
with open(cache_file, 'rb') as f:
cache_data = pickle.load(f)
if time.time() > cache_data["expiry"]:
cache_file.unlink()
count += 1
except:
cache_file.unlink()
count += 1
print(f"清理了 {count} 个过期缓存文件")
# 使用示例
cache = FileCache()
# 设置缓存(5秒过期)
cache.set("user_data", {"name": "Alice", "score": 95}, expiry=5)
print(cache.get("user_data")) # 命中
time.sleep(6)
print(cache.get("user_data")) # 过期
# 清理所有缓存
cache.clear()
Redis缓存清理(需要安装redis)
# 安装:pip install redis
import redis
import json
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
def set(self, key, value, expiry=3600):
"""设置缓存"""
self.client.setex(key, expiry, json.dumps(value))
print(f"Redis缓存设置: {key}")
def get(self, key):
"""获取缓存"""
data = self.client.get(key)
if data:
print(f"命中Redis缓存: {key}")
return json.loads(data)
return None
def delete(self, key):
"""删除单个缓存"""
result = self.client.delete(key)
if result:
print(f"删除Redis缓存: {key}")
def clear_pattern(self, pattern="*"):
"""按模式清理缓存"""
cursor = 0
count = 0
while True:
cursor, keys = self.client.scan(cursor=cursor, match=pattern, count=100)
if keys:
self.client.delete(*keys)
count += len(keys)
if cursor == 0:
break
print(f"清理了 {count} 个Redis缓存")
def clear_all(self):
"""清理所有缓存"""
self.client.flushdb()
print("清理所有Redis缓存")
# 使用示例(需要先启动Redis服务)
try:
cache = RedisCache()
cache.set("user:1", {"name": "Alice", "age": 30})
print(cache.get("user:1"))
# 清理特定模式的缓存
cache.clear_pattern("user:*")
except redis.ConnectionError:
print("无法连接到Redis服务器,请确保Redis服务已启动")
装饰器实现缓存清理功能
import functools
import time
from collections import OrderedDict
def clear_cache_decorator(max_size=100, ttl=60):
"""缓存装饰器,支持自动清理"""
cache = OrderedDict()
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
key = str(args) + str(sorted(kwargs.items()))
# 检查缓存是否存在且未过期
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < ttl:
print(f"命中缓存: {func.__name__}{args}")
return result
else:
print(f"缓存过期: {func.__name__}{args}")
del cache[key]
# 计算新结果
result = func(*args, **kwargs)
# 缓存结果
cache[key] = (result, time.time())
# 如果缓存超出最大大小,删除最旧的
if len(cache) > max_size:
cache.popitem(last=False)
print("缓存超出大小,清理最旧缓存")
return result
# 添加清理方法
def clear_cache():
cache.clear()
print(f"清理 {func.__name__} 的所有缓存")
wrapper.clear_cache = clear_cache
return wrapper
return decorator
@clear_cache_decorator(max_size=3, ttl=3)
def compute_square(n):
"""计算平方数"""
print(f"计算 {n} 的平方...")
time.sleep(0.5)
return n * n
# 测试
print("=== 缓存装饰器测试 ===")
for i in range(4):
print(f"compute_square({i}): {compute_square(i)}")
print("\n重复调用(使用缓存):")
print(compute_square(1))
print(compute_square(2))
print("\n等待4秒...")
time.sleep(4)
print(compute_square(1)) # 缓存过期
print("\n手动清理缓存:")
compute_square.clear_cache()
print(compute_square(1)) # 重新计算
这些案例展示了不同场景下的缓存清理方案:
lru_cache: 适合函数级别的简单缓存- 手动实现: 灵活控制,适合学习原理
cachetools: 专业缓存库,功能丰富- 文件缓存: 持久化存储,适合重启后使用
- Redis缓存: 分布式场景,适合生产环境
- 装饰器: 优雅的实现方式,易于复用
选择哪种方案取决于你的具体需求:
- 简单场景:用
lru_cache - 需要过期机制:用
cachetools或手动实现 - 分布式系统:用 Redis
- 需要持久化:用文件缓存
标签: Python案例