Python缓存过期案例实操?

wen python案例 1

我来用一个实际的电商库存管理案例,演示Python缓存过期的各种场景和解决方案。

场景:电商商品库存缓存系统

基础缓存过期实现

import time
import threading
from datetime import datetime, timedelta
from typing import Any, Optional, Dict
class SimpleCache:
    """简单的内存缓存,支持过期时间"""
    def __init__(self):
        self._cache: Dict[str, tuple[Any, Optional[float]]] = {}
        self._lock = threading.Lock()
    def set(self, key: str, value: Any, ttl: Optional[int] = None):
        """
        设置缓存
        :param key: 缓存键
        :param value: 缓存值
        :param ttl: 过期时间(秒),None表示永不过期
        """
        expire_time = time.time() + ttl if ttl else None
        with self._lock:
            self._cache[key] = (value, expire_time)
            print(f"[缓存] 设置 {key} = {value}, 过期时间: "
                  f"{datetime.fromtimestamp(expire_time) if ttl else '永久'}")
    def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        with self._lock:
            if key not in self._cache:
                return None
            value, expire_time = self._cache[key]
            # 检查是否过期
            if expire_time and time.time() > expire_time:
                del self._cache[key]
                print(f"[缓存] {key} 已过期,自动删除")
                return None
            return value
    def clear_expired(self):
        """清理过期缓存"""
        with self._lock:
            now = time.time()
            expired_keys = [
                key for key, (_, expire_time) in self._cache.items()
                if expire_time and now > expire_time
            ]
            for key in expired_keys:
                del self._cache[key]
            if expired_keys:
                print(f"[缓存] 清理了 {len(expired_keys)} 个过期项")
# 使用示例
cache = SimpleCache()
# 模拟商品库存数据
def get_product_stock(product_id: str) -> int:
    """模拟从数据库获取库存(通常很慢)"""
    print(f"[数据库] 查询商品 {product_id} 库存...")
    time.sleep(1)  # 模拟数据库查询延迟
    return 100  # 模拟返回库存数量
def get_product_stock_with_cache(product_id: str) -> int:
    """带缓存的库存查询"""
    cache_key = f"stock:{product_id}"
    # 尝试从缓存获取
    stock = cache.get(cache_key)
    if stock is not None:
        print(f"[缓存] 命中!商品 {product_id} 库存: {stock}")
        return stock
    # 缓存未命中,从数据库获取
    print(f"[缓存] 未命中,查询数据库...")
    stock = get_product_stock(product_id)
    # 设置缓存,过期时间30秒
    cache.set(cache_key, stock, ttl=30)
    return stock
# 运行测试
print("=== 缓存过期测试 ===")
# 第一次查询 - 缓存未命中
print("\n--- 第一次查询 ---")
stock1 = get_product_stock_with_cache("P001")
# 第二次查询 - 缓存命中
print("\n--- 第二次查询(立即) ---")
stock2 = get_product_stock_with_cache("P001")
# 等待缓存过期
print(f"\n--- 等待40秒后查询 ---")
time.sleep(40)
stock3 = get_product_stock_with_cache("P001")

基于TTL的主动过期策略

import redis
from functools import wraps
import hashlib
import json
class RedisCacheExample:
    """使用Redis实现缓存过期(实际生产环境)"""
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db,
            decode_responses=True
        )
    def set_cache(self, key: str, value: Any, ttl: int = 300):
        """设置缓存,支持各种数据类型"""
        # 将Python对象序列化为JSON
        if not isinstance(value, (str, bytes)):
            value = json.dumps(value, ensure_ascii=False)
        self.redis_client.set(key, value, ex=ttl)
        print(f"[Redis缓存] 设置 {key}, TTL={ttl}秒")
    def get_cache(self, key: str) -> Optional[Any]:
        """获取缓存"""
        value = self.redis_client.get(key)
        if value:
            try:
                # 尝试解析JSON
                return json.loads(value)
            except (json.JSONDecodeError, TypeError):
                return value
        return None
    def get_or_set(self, key: str, func, ttl: int = 300):
        """先尝试获取缓存,如果没有则调用函数并缓存结果"""
        # 检查缓存
        cached_value = self.get_cache(key)
        if cached_value is not None:
            print(f"[Redis缓存] 命中: {key}")
            return cached_value
        # 执行函数获取数据
        result = func()
        # 缓存结果
        self.set_cache(key, result, ttl)
        return result
# 使用装饰器实现自动缓存
def cache_with_ttl(ttl: int = 300):
    """缓存装饰器"""
    redis_cache = RedisCacheExample()
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存键
            key = f"{func.__name__}:{hashlib.md5(
                str(args).encode() + str(kwargs).encode()
            ).hexdigest()}"
            return redis_cache.get_or_set(key, lambda: func(*args, **kwargs), ttl)
        return wrapper
    return decorator
# 使用示例
@cache_with_ttl(ttl=60)  # 缓存60秒
def get_product_price(product_id: str) -> dict:
    """获取商品价格信息"""
    print(f"[数据库] 查询商品 {product_id} 价格...")
    time.sleep(0.5)
    return {
        "product_id": product_id,
        "price": 99.99,
        "currency": "CNY",
        "last_updated": datetime.now().isoformat()
    }
# 测试
print("\n=== Redis缓存测试 ===")
result1 = get_product_price("P001")
print(f"结果1: {result1}")
result2 = get_product_price("P001")
print(f"结果2: {result2}")
print("\n等待缓存过期...")
time.sleep(65)
result3 = get_product_price("P001")
print(f"结果3: {result3}")

多级缓存策略(本地+远端)

from functools import partial
import weakref
class MultiLevelCache:
    """多级缓存:内存 -> Redis -> 数据库"""
    def __init__(self, memory_ttl=60, redis_ttl=300):
        self.memory_cache = SimpleCache()
        self.redis_cache = RedisCacheExample()
        self.memory_ttl = memory_ttl
        self.redis_ttl = redis_ttl
    def get(self, key: str, db_func):
        """
        多级缓存获取
        Level 1: 内存缓存(最快,但容量小)
        Level 2: Redis缓存(快速,容量大)
        Level 3: 数据库(最慢)
        """
        # Level 1: 尝试内存缓存
        value = self.memory_cache.get(key)
        if value is not None:
            print(f"[L1] 内存缓存命中: {key}")
            return value
        # Level 2: 尝试Redis缓存
        value = self.redis_cache.get_cache(key)
        if value is not None:
            print(f"[L2] Redis缓存命中: {key}")
            # 将数据回填到内存缓存(但TTL设短一些,防止不一致)
            self.memory_cache.set(key, value, ttl=self.memory_ttl)
            return value
        # Level 3: 查询数据库
        print(f"[L3] 查询数据库: {key}")
        value = db_func()
        # 写入Redis和内存缓存
        self.redis_cache.set_cache(key, value, ttl=self.redis_ttl)
        self.memory_cache.set(key, value, ttl=self.memory_ttl)
        return value
    def invalidate(self, key: str):
        """使缓存失效(数据更新时调用)"""
        self.memory_cache.get(key)  # 触发检查并删除
        self.redis_cache.redis_client.delete(key)
        print(f"[缓存] 使 {key} 失效")
# 模拟商品更新
class ProductService:
    def __init__(self):
        self.cache = MultiLevelCache()
    def get_product(self, product_id: str):
        """获取商品信息"""
        key = f"product:{product_id}"
        def fetch_from_db():
            """模拟数据库查询"""
            print(f"[数据库] 查询商品 {product_id}...")
            time.sleep(0.5)
            return {
                "id": product_id,
                "name": f"商品{product_id}",
                "stock": 100,
                "updated_at": datetime.now().isoformat()
            }
        return self.cache.get(key, fetch_from_db)
    def update_product_stock(self, product_id: str, stock: int):
        """更新商品库存"""
        # 更新数据库(模拟)
        print(f"[数据库] 更新商品 {product_id} 库存为 {stock}")
        # 使缓存失效
        key = f"product:{product_id}"
        self.cache.invalidate(key)
# 测试多级缓存
print("\n=== 多级缓存测试 ===")
service = ProductService()
# 第一次查询
print("\n--- 第一次查询 ---")
start = time.time()
product = service.get_product("P001")
print(f"结果: {product}, 耗时: {time.time()-start:.2f}秒")
# 第二次查询(应该从内存缓存)
print("\n--- 第二次查询(立即) ---")
start = time.time()
product = service.get_product("P001")
print(f"结果: {product}, 耗时: {time.time()-start:.2f}秒")
# 更新库存
print("\n--- 更新库存 ---")
service.update_product_stock("P001", 80)
# 再次查询(缓存失效,从数据库重新加载)
print("\n--- 更新后查询 ---")
start = time.time()
product = service.get_product("P001")
print(f"结果: {product}, 耗时: {time.time()-start:.2f}秒")

高级缓存策略:防缓存击穿

import asyncio
from functools import lru_cache
import random
class CachePrevention:
    """防止缓存击穿、雪崩和穿透的高级缓存"""
    def __init__(self):
        self.cache = SimpleCache()
    def get_with_fallback(self, key: str, db_func, ttl: int = 300):
        """
        带降级策略的缓存获取
        防止缓存击穿:当缓存失效时,保护数据库不被高并发击穿
        """
        # 尝试获取缓存
        value = self.cache.get(key)
        if value is not None:
            return value
        # 使用互斥锁防止缓存击穿(实际生产用分布式锁)
        # 这里简化处理,用随机延迟模拟
        delay = random.uniform(0, 0.1)
        time.sleep(delay)
        # 再次检查缓存(可能其他线程已经加载了)
        value = self.cache.get(key)
        if value is not None:
            return value
        # 查询数据库
        print(f"[数据库] 查询数据: {key}")
        value = db_func()
        # 设置缓存(添加随机过期时间防止雪崩)
        actual_ttl = ttl + random.randint(-60, 60)  # 随机偏移
        self.cache.set(key, value, ttl=max(1, actual_ttl))
        return value
    def get_with_stale(self, key: str, db_func, ttl: int = 300, stale_ttl: int = 600):
        """
        使用陈旧数据策略
        当缓存过期但数据库不可用时,可以返回旧数据
        """
        # 尝试获取缓存(包括过期但未删除的)
        with self.cache._lock:
            if key in self.cache._cache:
                value, expire_time = self.cache._cache[key]
                if expire_time and time.time() <= expire_time:
                    return value  # 未过期,直接返回
                # 缓存已过期,但我们可以保留旧值
                if expire_time and time.time() <= expire_time + stale_ttl:
                    # 异步刷新缓存(不阻塞当前请求)
                    print(f"[缓存] 返回陈旧数据: {key}")
                    self._async_refresh(key, db_func, ttl)
                    return value  # 返回旧数据
        # 缓存不存在,同步加载
        value = db_func()
        self.cache.set(key, value, ttl=ttl)
        return value
    def _async_refresh(self, key: str, db_func, ttl: int):
        """异步刷新缓存"""
        try:
            value = db_func()
            self.cache.set(key, value, ttl=ttl)
            print(f"[缓存] 异步刷新成功: {key}")
        except Exception as e:
            print(f"[缓存] 异步刷新失败: {e}")
# 测试缓存击穿防护
print("\n=== 缓存击穿防护测试 ===")
prevention = CachePrevention()
def simulate_high_concurrency():
    """模拟高并发场景"""
    def query_db():
        """模拟数据库查询(可能很慢或失败)"""
        time.sleep(0.5)
        return f"数据_{random.randint(1, 100)}"
    # 模拟多个并发请求
    results = []
    for i in range(10):
        result = prevention.get_with_stale(
            "hot_data", 
            query_db, 
            ttl=10,  # 短过期时间
            stale_ttl=30  # 允许使用过期数据的窗口
        )
        results.append(result)
        print(f"请求 {i+1}: 获取到 {result}")
    return results
# 运行并发测试
results = simulate_high_concurrency()

实际应用:商品详情页面缓存

class ProductDetailCache:
    """商品详情页缓存管理系统"""
    def __init__(self):
        self.cache = SimpleCache()
        self.stale_cache = SimpleCache()  # 存储陈旧的缓存
        self.redis = RedisCacheExample()
    def build_product_detail(self, product_id: str) -> dict:
        """
        构建完整的商品详情
        包含多个数据源的组合和缓存
        """
        cached = self.cache.get(f"detail:{product_id}")
        if cached:
            return cached
        # 复杂的数据聚合
        detail = {
            "basic_info": self._get_basic_info(product_id),
            "inventory": self._get_inventory(product_id),
            "price": self._get_price(product_id),
            "reviews": self._get_reviews(product_id),
            "recommendations": self._get_recommendations(product_id),
            "generated_at": datetime.now().isoformat()
        }
        # 按不同数据源的更新频率设置不同的TTL
        # 库存数据更新最快,设为30秒
        # 基础信息最稳定,设为5分钟
        self.cache.set(f"detail:{product_id}", detail, ttl=120)
        return detail
    def _get_basic_info(self, product_id: str) -> dict:
        """商品基本信息(缓存时间较长)"""
        cache_key = f"basic:{product_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        # 模拟数据库查询
        info = {
            "name": f"商品{product_id}",
            "description": "这是一个很棒的商品",
            "category": "电子产品",
            "brand": "知名品牌"
        }
        self.cache.set(cache_key, info, ttl=300)  # 5分钟
        return info
    def _get_inventory(self, product_id: str) -> dict:
        """库存信息(缓存时间较短)"""
        cache_key = f"inventory:{product_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        # 模拟实时库存查询
        import random
        inventory = {
            "total": random.randint(10, 1000),
            "available": random.randint(5, 500),
            "warehouse": "北京1号库"
        }
        self.cache.set(cache_key, inventory, ttl=30)  # 30秒
        return inventory
    def _get_price(self, product_id: str) -> dict:
        """价格信息(使用Redis缓存,支持分布式)"""
        cache_key = f"price:{product_id}"
        cached = self.redis.get_cache(cache_key)
        if cached:
            return cached
        price = {
            "current": 99.99,
            "original": 129.99,
            "promotion": "夏季特惠",
            "valid_until": (datetime.now() + timedelta(days=7)).isoformat()
        }
        self.redis.set_cache(cache_key, price, ttl=60)  # 1分钟
        return price
    def _get_reviews(self, product_id: str) -> list:
        """评价信息"""
        cache_key = f"reviews:{product_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        reviews = [
            {"user": "用户A", "rating": 5, "content": "非常好用"},
            {"user": "用户B", "rating": 4, "content": "性价比高"},
        ]
        self.cache.set(cache_key, reviews, ttl=180)  # 3分钟
        return reviews
    def _get_recommendations(self, product_id: str) -> list:
        """推荐商品(缓存时间长,因为计算复杂)"""
        # 使用lru_cache装饰器(Python内置)
        return self._compute_recommendations(product_id)
    @lru_cache(maxsize=100)
    def _compute_recommendations(self, product_id: str) -> list:
        """计算推荐商品(使用LRU缓存)"""
        # 模拟复杂的推荐算法
        time.sleep(0.2)
        return [
            {"id": "P002", "name": "配套商品"},
            {"id": "P003", "name": "热门商品"},
        ]
# 完整测试
print("\n=== 商品详情缓存系统测试 ===")
product_cache = ProductDetailCache()
# 第一次请求
print("\n--- 第一次请求 ---")
start = time.time()
detail = product_cache.build_product_detail("P001")
print(f"详情页加载完成,耗时: {time.time()-start:.2f}秒")
print(f"商品名称: {detail['basic_info']['name']}")
print(f"可用库存: {detail['inventory']['available']}")
# 第二次请求(全缓存命中)
print("\n--- 第二次请求 ---")
start = time.time()
detail = product_cache.build_product_detail("P001")
print(f"详情页加载完成,耗时: {time.time()-start:.2f}秒")
# 等待库存缓存过期
print("\n--- 等待35秒(库存缓存过期) ---")
time.sleep(35)
start = time.time()
detail = product_cache.build_product_detail("P001")
print(f"详情页加载完成,耗时: {time.time()-start:.2f}秒")
print(f"更新后的库存: {detail['inventory']['available']}")
  1. TTL(Time To Live):设置缓存过期时间,避免使用过时数据
  2. 主动过期:在访问时检查是否过期,过期则删除
  3. 被动过期:后台线程定期清理过期数据
  4. 缓存击穿:高并发下同一缓存失效导致大量查询数据库
  5. 缓存雪崩:大量缓存同时过期导致数据库压力暴增
  6. 缓存穿透:查询不存在的数据,每次都会查数据库

这个实战案例涵盖了Python中缓存过期的各种场景和解决方案,适合直接应用到实际项目中。

标签: 缓存过期

抱歉,评论功能暂时关闭!