fastapi_jwt_auth
时间: 2025-01-15 12:08:39 浏览: 34
如何在FastAPI中实现JWT身份验证
安装必要依赖项
为了使 FastAPI 支持 JWT 功能,需安装 pydantic
和 python-jose[cryptography]
这两个库。可以通过 pip 工具完成安装。
pip install fastapi pydantic python-jose[cryptography]
配置JWT设置
创建一个配置文件或直接在主程序里定义常量来保存密钥(SECRET_KEY)和加密算法(ALGORITHM)。这些参数对于生成和校验令牌至关重要[^2]。
from datetime import timedelta, datetime
from jose import jwt
from passlib.context import CryptContext
# 密码上下文对象用于哈希密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT相关配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
数据模型定义
利用 Pydantic 创建数据类表示用户信息结构体以及Token响应格式。
from typing import Optional
from pydantic import BaseModel
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
辅助函数编写
构建几个帮助性的方法来进行密码散列、比较;还有就是用来创建新的访问令牌与刷新已有令牌的有效期。
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
实现认证逻辑
通过上述准备工作之后,在实际项目里面就可以着手于具体的业务实现了——比如设计登录接口让用户提交用户名/密码换取token; 设计受保护资源接口仅当客户端提供合法有效的Bearer类型的Authorization header时才给予服务响应[^3]。
from fastapi.security import OAuth2PasswordRequestForm
from starlette.status import HTTP_401_UNAUTHORIZED
from fastapi.responses import JSONResponse
from fastapi.exceptions import HTTPException
from fastapi import Depends, FastAPI, Header
app = FastAPI()
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # secret
"disabled": False,
}
}
async def authenticate_user(username: str, password: str):
user_dict = fake_users_db.get(username)
if not user_dict or not verify_password(password, user_dict["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return UserInDB(**user_dict)
@app.post("/login", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
相关推荐


















