你清楚如何用Flask-Mail扩展实现异步发送邮件功能吗

访客 全栈框架 1

本文目录导读:

  1. 方法一:使用线程(简单方案)
  2. 方法二:使用 Celery(生产环境推荐)
  3. 方法三:使用 Flask-Mail 的异步上下文管理器(简化版)
  4. 最佳实践建议
  5. 附:完整配置示例

是的,我了解如何使用 Flask-Mail 扩展实现异步发送邮件功能,主要思路是将邮件发送任务放到后台线程或任务队列中执行,避免阻塞主请求线程。

使用线程(简单方案)

from flask import Flask
from flask_mail import Mail, Message
from threading import Thread
app = Flask(__name__)
# 邮件配置
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
app.config['MAIL_PASSWORD'] = 'your-password'
app.config['MAIL_DEFAULT_SENDER'] = 'your-email@gmail.com'
mail = Mail(app)
def send_async_email(app, msg):
    """在后台线程中发送邮件"""
    with app.app_context():
        mail.send(msg)
def send_email(subject, recipients, body):
    """发送邮件的包装函数"""
    msg = Message(subject, recipients=recipients)
    msg.body = body
    # 创建并启动后台线程
    thread = Thread(target=send_async_email, args=(app, msg))
    thread.daemon = True
    thread.start()
    return True
@app.route('/send-mail')
def send_mail_route():
    send_email(
        subject='Test Email',
        recipients=['recipient@example.com'],
        body='This is a test email sent asynchronously.'
    )
    return 'Email will be sent in background'

使用 Celery(生产环境推荐)

安装依赖

pip install celery redis flask-mail

配置 Celery

# tasks.py
from celery import Celery
from flask import Flask
from flask_mail import Mail, Message
# 创建 Flask 应用
app = Flask(__name__)
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
app.config['MAIL_PASSWORD'] = 'your-password'
app.config['MAIL_DEFAULT_SENDER'] = 'your-email@gmail.com'
# 创建 Celery 实例
celery = Celery(
    app.name,
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)
# 创建邮件实例
mail = Mail(app)
@celery.task
def send_async_email_task(subject, recipients, body, html=None):
    """Celery 异步发送邮件任务"""
    with app.app_context():
        msg = Message(
            subject=subject,
            recipients=recipients,
            body=body,
            html=html
        )
        mail.send(msg)
    return f'Email sent to {recipients}'
def send_email_async(subject, recipients, body, html=None):
    """包装函数,触发 Celery 任务"""
    task = send_async_email_task.delay(subject, recipients, body, html)
    return task.id

在 Flask 路由中使用

# app.py
from flask import Flask, request, jsonify
from tasks import send_email_async
app = Flask(__name__)
@app.route('/send-mail')
def send_mail():
    subject = request.args.get('subject', 'Default Subject')
    recipient = request.args.get('email', 'recipient@example.com')
    body = request.args.get('body', 'Default body content')
    task_id = send_email_async(subject, [recipient], body)
    return jsonify({
        'message': 'Email task submitted',
        'task_id': task_id
    })
if __name__ == '__main__':
    app.run(debug=True)

启动 Celery Worker

# 启动 Celery worker
celery -A tasks.celery worker --loglevel=info

使用 Flask-Mail 的异步上下文管理器(简化版)

from flask import Flask
from flask_mail import Mail, Message
import asyncio
app = Flask(__name__)
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
app.config['MAIL_PASSWORD'] = 'your-password'
mail = Mail(app)
async def send_async_email(app, msg):
    """异步发送邮件"""
    with app.app_context():
        # 使用线程池执行同步的 mail.send
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, mail.send, msg)
def send_email(subject, recipients, body):
    """发送邮件并异步执行"""
    msg = Message(subject, recipients=recipients)
    msg.body = body
    # 创建异步任务
    asyncio.create_task(send_async_email(app, msg))
    return 'Email will be sent'

最佳实践建议

  1. 生产环境推荐使用 Celery:提供任务队列、重试机制、任务状态追踪
  2. 配置重试机制:邮件发送失败时自动重试
  3. 使用模板:搭配 Jinja2 模板生成 HTML 邮件
  4. 错误处理:记录邮件发送失败的日志
  5. 限流控制:避免短时间内发送过多邮件

附:完整配置示例

# config.py
class MailConfig:
    MAIL_SERVER = 'smtp.gmail.com'
    MAIL_PORT = 587
    MAIL_USE_TLS = True
    MAIL_USE_SSL = False
    MAIL_USERNAME = 'your-email@gmail.com'
    MAIL_PASSWORD = 'your-app-password'
    MAIL_DEFAULT_SENDER = ('Your Name', 'your-email@gmail.com')
    MAIL_MAX_EMAILS = None
    MAIL_SUPPRESS_SEND = False
    MAIL_ASCII_ATTACHMENTS = False

选择哪种方案取决于你的具体需求:简单应用用线程即可,生产系统建议使用 Celery 或类似的任务队列系统。

标签: 异步发送

抱歉,评论功能暂时关闭!