本文目录导读:
是的,我可以为你提供一个关于定时任务的Python案例,下面是一个使用schedule库实现简单定时任务的示例:
安装所需库
pip install schedule
基础定时任务案例
import schedule
import time
from datetime import datetime
def job1():
"""任务1:打印当前时间"""
print(f"执行任务1 - 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
def job2(name):
"""任务2:带参数的定时任务"""
print(f"执行任务2 - {name}说: 你好,现在是 {datetime.now().strftime('%H:%M:%S')}")
def job3():
"""任务3:执行需要时间操作的任务"""
print("任务3开始执行...")
time.sleep(5) # 模拟耗时操作
print("任务3执行完成")
# 设置定时任务
# 每隔5秒执行一次
schedule.every(5).seconds.do(job1)
# 每隔1分钟执行一次
schedule.every(1).minutes.do(job1)
# 每隔2小时执行一次
schedule.every(2).hours.do(job1)
# 每天在特定时间执行
schedule.every().day.at("10:30").do(job1)
# 每周一执行
schedule.every().monday.do(job1)
# 带参数的定时任务
schedule.every(10).seconds.do(job2, name="小明")
# 每隔一段时间执行复杂任务
schedule.every(30).seconds.do(job3)
# 使用装饰器方式
@schedule.repeat(schedule.every(15).seconds)
def job4():
print(f"装饰器任务 - {datetime.now()}")
if __name__ == "__main__":
print("定时任务启动...")
print("按 Ctrl+C 停止运行")
try:
while True:
schedule.run_pending()
time.sleep(1) # 每秒检查一次是否有待执行的任务
except KeyboardInterrupt:
print("\n定时任务已停止")
使用APScheduler库(更强大的定时任务框架)
# 安装:pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def tick():
print(f"执行任务 - {datetime.now()}")
def tick_with_param(text):
print(f"参数任务: {text} - {datetime.now()}")
def interval_job():
print("每隔5秒执行一次的任务")
if __name__ == "__main__":
# 创建调度器
scheduler = BlockingScheduler()
# 添加定时任务
# 每隔3秒执行一次
scheduler.add_job(tick, 'interval', seconds=3)
# 每隔10秒执行带参数的任务
scheduler.add_job(tick_with_param, 'interval', seconds=10, args=["定时任务"])
# 在特定时间执行(每天下午3点)
scheduler.add_job(tick, 'cron', hour=15, minute=0)
# 每周一执行
scheduler.add_job(tick, 'cron', day_of_week='mon', hour=0, minute=0)
print(f"任务调度器启动 - {datetime.now()}")
try:
scheduler.start()
except KeyboardInterrupt:
print("调度器停止")
scheduler.shutdown()
使用Python内置的time模块(简单版本)
import time
from datetime import datetime
def simple_task():
"""简单的定时任务"""
print(f"执行任务 - {datetime.now()}")
def run_periodically(func, interval_seconds):
"""周期性执行函数"""
start_time = time.time()
next_run = start_time
try:
while True:
current_time = time.time()
if current_time >= next_run:
func()
next_run += interval_seconds
time.sleep(0.1) # 避免CPU占用过高
except KeyboardInterrupt:
print("任务已停止")
# 设置每5秒执行一次
interval = 5 # 秒
run_periodically(simple_task, interval)
定时任务最佳实践
- 错误处理:添加try-except防止任务异常导致程序崩溃
- 日志记录:记录任务执行情况
- 线程安全:如果多个任务同时执行,考虑使用线程池
- 任务持久化:如果需要持久化任务,使用APScheduler的jobstores
# 带日志和错误处理的完整示例
import schedule
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def safe_task():
try:
logging.info("开始执行任务")
# 你的任务逻辑
result = 10 / 2 # 示例计算
logging.info(f"任务执行成功,结果: {result}")
except Exception as e:
logging.error(f"任务执行失败: {e}")
schedule.every(10).seconds.do(safe_task)
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logging.error(f"调度器异常: {e}")
time.sleep(10) # 避免频繁报错
根据你的需求选择合适的方式:
- 简单任务:使用
schedule库 - 复杂调度:使用
APScheduler - 极简单:使用
time.sleep