你是否在寻找一个用Flask实现JWT身份验证的实战案例

访客 全栈框架 1

本文目录导读:

  1. 项目结构
  2. 安装依赖
  3. 配置文件
  4. 数据库模型
  5. 认证逻辑
  6. 主应用
  7. 测试用例
  8. API测试示例
  9. 功能特点

我来为你提供一个完整的Flask JWT身份验证实战案例。

项目结构

flask-jwt-auth/
├── app.py
├── config.py
├── models.py
├── auth.py
└── requirements.txt

安装依赖

pip install flask flask-sqlalchemy flask-jwt-extended bcrypt

配置文件

# config.py
import os
class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'your-secret-key-here'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///users.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    # JWT配置
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
    JWT_ACCESS_TOKEN_EXPIRES = 3600  # 1小时
    JWT_REFRESH_TOKEN_EXPIRES = 2592000  # 30天
    JWT_BLACKLIST_ENABLED = True
    JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh']

数据库模型

# models.py
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
db = SQLAlchemy()
class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(200), nullable=False)
    created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
    is_active = db.Column(db.Boolean, default=True)
    def set_password(self, password):
        self.password_hash = generate_password_hash(password)
    def check_password(self, password):
        return check_password_hash(self.password_hash, password)
    def to_dict(self):
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'created_at': self.created_at.isoformat() if self.created_at else None,
            'is_active': self.is_active
        }
class RevokedToken(db.Model):
    __tablename__ = 'revoked_tokens'
    id = db.Column(db.Integer, primary_key=True)
    jti = db.Column(db.String(120), unique=True, nullable=False)
    revoked_at = db.Column(db.DateTime, default=db.func.current_timestamp())

认证逻辑

# auth.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import (
    create_access_token, create_refresh_token,
    jwt_required, get_jwt_identity, get_jwt
)
from datetime import timedelta
from models import db, User, RevokedToken
from config import Config
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/register', methods=['POST'])
def register():
    """用户注册"""
    data = request.get_json()
    if not data or not data.get('username') or not data.get('password') or not data.get('email'):
        return jsonify({'message': '请提供用户名、密码和邮箱'}), 400
    # 检查用户是否已存在
    if User.query.filter_by(username=data['username']).first():
        return jsonify({'message': '用户名已存在'}), 409
    if User.query.filter_by(email=data['email']).first():
        return jsonify({'message': '邮箱已被注册'}), 409
    # 创建新用户
    user = User(
        username=data['username'],
        email=data['email']
    )
    user.set_password(data['password'])
    db.session.add(user)
    db.session.commit()
    return jsonify({
        'message': '注册成功',
        'user': user.to_dict()
    }), 201
@auth_bp.route('/login', methods=['POST'])
def login():
    """用户登录"""
    data = request.get_json()
    if not data or not data.get('username') or not data.get('password'):
        return jsonify({'message': '请提供用户名和密码'}), 400
    user = User.query.filter_by(username=data['username']).first()
    if not user or not user.check_password(data['password']):
        return jsonify({'message': '用户名或密码错误'}), 401
    if not user.is_active:
        return jsonify({'message': '账户已被禁用'}), 403
    # 生成令牌
    access_token = create_access_token(
        identity=user.id,
        additional_claims={'username': user.username, 'email': user.email},
        expires_delta=timedelta(seconds=Config.JWT_ACCESS_TOKEN_EXPIRES)
    )
    refresh_token = create_refresh_token(
        identity=user.id,
        expires_delta=timedelta(seconds=Config.JWT_REFRESH_TOKEN_EXPIRES)
    )
    return jsonify({
        'message': '登录成功',
        'access_token': access_token,
        'refresh_token': refresh_token,
        'user': user.to_dict()
    }), 200
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
    """刷新访问令牌"""
    current_user_id = get_jwt_identity()
    user = User.query.get(current_user_id)
    if not user or not user.is_active:
        return jsonify({'message': '用户不存在或已被禁用'}), 401
    new_access_token = create_access_token(
        identity=user.id,
        additional_claims={'username': user.username, 'email': user.email}
    )
    return jsonify({
        'access_token': new_access_token
    }), 200
@auth_bp.route('/logout', methods=['POST'])
@jwt_required(verify_type=False)
def logout():
    """登出 - 撤销当前令牌"""
    jti = get_jwt()['jti']
    # 将令牌加入黑名单
    revoked_token = RevokedToken(jti=jti)
    db.session.add(revoked_token)
    db.session.commit()
    return jsonify({'message': '登出成功'}), 200
@auth_bp.route('/profile', methods=['GET'])
@jwt_required()
def profile():
    """获取当前用户信息"""
    current_user_id = get_jwt_identity()
    user = User.query.get(current_user_id)
    if not user:
        return jsonify({'message': '用户不存在'}), 404
    return jsonify({
        'user': user.to_dict()
    }), 200
@auth_bp.route('/change-password', methods=['PUT'])
@jwt_required()
def change_password():
    """修改密码"""
    current_user_id = get_jwt_identity()
    user = User.query.get(current_user_id)
    data = request.get_json()
    if not data or not data.get('old_password') or not data.get('new_password'):
        return jsonify({'message': '请提供旧密码和新密码'}), 400
    if not user.check_password(data['old_password']):
        return jsonify({'message': '旧密码错误'}), 401
    if len(data['new_password']) < 6:
        return jsonify({'message': '新密码长度不能少于6位'}), 400
    user.set_password(data['new_password'])
    db.session.commit()
    return jsonify({'message': '密码修改成功'}), 200

主应用

# app.py
from flask import Flask
from flask_jwt_extended import JWTManager
from datetime import datetime
from config import Config
from models import db, User, RevokedToken
from auth import auth_bp
app = Flask(__name__)
app.config.from_object(Config)
# 初始化扩展
db.init_app(app)
jwt = JWTManager(app)
# 注册蓝图
app.register_blueprint(auth_bp, url_prefix='/api/auth')
# JWT回调函数
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
    """检查令牌是否被撤销"""
    jti = jwt_payload['jti']
    token = RevokedToken.query.filter_by(jti=jti).first()
    return token is not None
@jwt.revoked_token_loader
def revoked_token_callback(jwt_header, jwt_payload):
    """撤销令牌的回调"""
    return {
        'message': 'Token已被撤销',
        'error': 'token_revoked'
    }, 401
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
    """过期令牌的回调"""
    return {
        'message': 'Token已过期',
        'error': 'token_expired'
    }, 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
    """无效令牌的回调"""
    return {
        'message': '无效的Token',
        'error': 'invalid_token'
    }, 401
@jwt.unauthorized_loader
def missing_token_callback(error):
    """缺少令牌的回调"""
    return {
        'message': '缺少Token',
        'error': 'authorization_header_missing'
    }, 401
# 创建数据库表
with app.app_context():
    db.create_all()
    # 创建测试用户
    if not User.query.filter_by(username='testuser').first():
        test_user = User(
            username='testuser',
            email='test@example.com'
        )
        test_user.set_password('password123')
        db.session.add(test_user)
        db.session.commit()
@app.route('/')
def index():
    return jsonify({
        'message': 'Flask JWT Authentication API',
        'version': '1.0.0',
        'endpoints': {
            'register': 'POST /api/auth/register',
            'login': 'POST /api/auth/login',
            'refresh': 'POST /api/auth/refresh',
            'logout': 'POST /api/auth/logout',
            'profile': 'GET /api/auth/profile',
            'change_password': 'PUT /api/auth/change-password'
        }
    })
if __name__ == '__main__':
    app.run(debug=True)

测试用例

# requirements.txt
flask==2.3.0
flask-sqlalchemy==3.0.5
flask-jwt-extended==4.5.2
bcrypt==4.0.1

API测试示例

注册用户

curl -X POST http://localhost:5000/api/auth/register \
  -H "Content-Type: application/json" \
  -d '{"username": "newuser", "email": "new@example.com", "password": "password123"}'

登录

curl -X POST http://localhost:5000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "testuser", "password": "password123"}'

使用令牌访问受保护资源

curl -X GET http://localhost:5000/api/auth/profile \
  -H "Authorization: Bearer your-access-token-here"

刷新令牌

curl -X POST http://localhost:5000/api/auth/refresh \
  -H "Authorization: Bearer your-refresh-token-here"

登出

curl -X POST http://localhost:5000/api/auth/logout \
  -H "Authorization: Bearer your-access-token-here"

功能特点

  1. 完整认证流程:注册、登录、刷新、登出
  2. 令牌管理:访问令牌和刷新令牌
  3. 令牌撤销:登出时撤销令牌
  4. 密码加密:使用bcrypt安全存储密码
  5. 用户信息管理:查看资料、修改密码
  6. 错误处理:完善的错误响应机制

这个实战案例包含了JWT身份验证的完整实现,可以直接用于项目开发或学习参考。

标签: Flask

抱歉,评论功能暂时关闭!