本文目录导读:
我来为你提供一个完整的Flask JWT身份验证实战案例。
项目结构
flask-jwt-auth/
├── app.py
├── config.py
├── models.py
├── auth.py
└── requirements.txt
安装依赖
pip install flask flask-sqlalchemy flask-jwt-extended bcrypt
配置文件
# config.py
import os
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'your-secret-key-here'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///users.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
# JWT配置
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
JWT_ACCESS_TOKEN_EXPIRES = 3600 # 1小时
JWT_REFRESH_TOKEN_EXPIRES = 2592000 # 30天
JWT_BLACKLIST_ENABLED = True
JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh']
数据库模型
# models.py
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
db = SQLAlchemy()
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
is_active = db.Column(db.Boolean, default=True)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'created_at': self.created_at.isoformat() if self.created_at else None,
'is_active': self.is_active
}
class RevokedToken(db.Model):
__tablename__ = 'revoked_tokens'
id = db.Column(db.Integer, primary_key=True)
jti = db.Column(db.String(120), unique=True, nullable=False)
revoked_at = db.Column(db.DateTime, default=db.func.current_timestamp())
认证逻辑
# auth.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import (
create_access_token, create_refresh_token,
jwt_required, get_jwt_identity, get_jwt
)
from datetime import timedelta
from models import db, User, RevokedToken
from config import Config
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册"""
data = request.get_json()
if not data or not data.get('username') or not data.get('password') or not data.get('email'):
return jsonify({'message': '请提供用户名、密码和邮箱'}), 400
# 检查用户是否已存在
if User.query.filter_by(username=data['username']).first():
return jsonify({'message': '用户名已存在'}), 409
if User.query.filter_by(email=data['email']).first():
return jsonify({'message': '邮箱已被注册'}), 409
# 创建新用户
user = User(
username=data['username'],
email=data['email']
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return jsonify({
'message': '注册成功',
'user': user.to_dict()
}), 201
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录"""
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({'message': '请提供用户名和密码'}), 400
user = User.query.filter_by(username=data['username']).first()
if not user or not user.check_password(data['password']):
return jsonify({'message': '用户名或密码错误'}), 401
if not user.is_active:
return jsonify({'message': '账户已被禁用'}), 403
# 生成令牌
access_token = create_access_token(
identity=user.id,
additional_claims={'username': user.username, 'email': user.email},
expires_delta=timedelta(seconds=Config.JWT_ACCESS_TOKEN_EXPIRES)
)
refresh_token = create_refresh_token(
identity=user.id,
expires_delta=timedelta(seconds=Config.JWT_REFRESH_TOKEN_EXPIRES)
)
return jsonify({
'message': '登录成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
}), 200
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""刷新访问令牌"""
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user or not user.is_active:
return jsonify({'message': '用户不存在或已被禁用'}), 401
new_access_token = create_access_token(
identity=user.id,
additional_claims={'username': user.username, 'email': user.email}
)
return jsonify({
'access_token': new_access_token
}), 200
@auth_bp.route('/logout', methods=['POST'])
@jwt_required(verify_type=False)
def logout():
"""登出 - 撤销当前令牌"""
jti = get_jwt()['jti']
# 将令牌加入黑名单
revoked_token = RevokedToken(jti=jti)
db.session.add(revoked_token)
db.session.commit()
return jsonify({'message': '登出成功'}), 200
@auth_bp.route('/profile', methods=['GET'])
@jwt_required()
def profile():
"""获取当前用户信息"""
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
return jsonify({
'user': user.to_dict()
}), 200
@auth_bp.route('/change-password', methods=['PUT'])
@jwt_required()
def change_password():
"""修改密码"""
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
data = request.get_json()
if not data or not data.get('old_password') or not data.get('new_password'):
return jsonify({'message': '请提供旧密码和新密码'}), 400
if not user.check_password(data['old_password']):
return jsonify({'message': '旧密码错误'}), 401
if len(data['new_password']) < 6:
return jsonify({'message': '新密码长度不能少于6位'}), 400
user.set_password(data['new_password'])
db.session.commit()
return jsonify({'message': '密码修改成功'}), 200
主应用
# app.py
from flask import Flask
from flask_jwt_extended import JWTManager
from datetime import datetime
from config import Config
from models import db, User, RevokedToken
from auth import auth_bp
app = Flask(__name__)
app.config.from_object(Config)
# 初始化扩展
db.init_app(app)
jwt = JWTManager(app)
# 注册蓝图
app.register_blueprint(auth_bp, url_prefix='/api/auth')
# JWT回调函数
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
"""检查令牌是否被撤销"""
jti = jwt_payload['jti']
token = RevokedToken.query.filter_by(jti=jti).first()
return token is not None
@jwt.revoked_token_loader
def revoked_token_callback(jwt_header, jwt_payload):
"""撤销令牌的回调"""
return {
'message': 'Token已被撤销',
'error': 'token_revoked'
}, 401
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
"""过期令牌的回调"""
return {
'message': 'Token已过期',
'error': 'token_expired'
}, 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
"""无效令牌的回调"""
return {
'message': '无效的Token',
'error': 'invalid_token'
}, 401
@jwt.unauthorized_loader
def missing_token_callback(error):
"""缺少令牌的回调"""
return {
'message': '缺少Token',
'error': 'authorization_header_missing'
}, 401
# 创建数据库表
with app.app_context():
db.create_all()
# 创建测试用户
if not User.query.filter_by(username='testuser').first():
test_user = User(
username='testuser',
email='test@example.com'
)
test_user.set_password('password123')
db.session.add(test_user)
db.session.commit()
@app.route('/')
def index():
return jsonify({
'message': 'Flask JWT Authentication API',
'version': '1.0.0',
'endpoints': {
'register': 'POST /api/auth/register',
'login': 'POST /api/auth/login',
'refresh': 'POST /api/auth/refresh',
'logout': 'POST /api/auth/logout',
'profile': 'GET /api/auth/profile',
'change_password': 'PUT /api/auth/change-password'
}
})
if __name__ == '__main__':
app.run(debug=True)
测试用例
# requirements.txt flask==2.3.0 flask-sqlalchemy==3.0.5 flask-jwt-extended==4.5.2 bcrypt==4.0.1
API测试示例
注册用户
curl -X POST http://localhost:5000/api/auth/register \
-H "Content-Type: application/json" \
-d '{"username": "newuser", "email": "new@example.com", "password": "password123"}'
登录
curl -X POST http://localhost:5000/api/auth/login \
-H "Content-Type: application/json" \
-d '{"username": "testuser", "password": "password123"}'
使用令牌访问受保护资源
curl -X GET http://localhost:5000/api/auth/profile \ -H "Authorization: Bearer your-access-token-here"
刷新令牌
curl -X POST http://localhost:5000/api/auth/refresh \ -H "Authorization: Bearer your-refresh-token-here"
登出
curl -X POST http://localhost:5000/api/auth/logout \ -H "Authorization: Bearer your-access-token-here"
功能特点
- 完整认证流程:注册、登录、刷新、登出
- 令牌管理:访问令牌和刷新令牌
- 令牌撤销:登出时撤销令牌
- 密码加密:使用bcrypt安全存储密码
- 用户信息管理:查看资料、修改密码
- 错误处理:完善的错误响应机制
这个实战案例包含了JWT身份验证的完整实现,可以直接用于项目开发或学习参考。
标签: Flask