如何在FastAPI中创建和使用JWT令牌?
时间: 2024-09-09 14:02:48 浏览: 53
在FastAPI中创建和使用JWT(JSON Web Tokens)通常涉及以下步骤:
1. 安装所需的库:首先,需要安装`fastapi`、`pydantic`以及用于处理JWT的`python-jose`和`passlib`库。
2. 创建用户模型:定义一个用户模型类来表示用户的信息,可以使用`pydantic`的`BaseModel`。
3. 创建JWT令牌的工具函数:定义一个函数来生成JWT令牌,这个函数通常需要用户的标识信息(比如ID)和一个密钥(secret key)。
4. 创建登录API端点:创建一个API端点,用户可以通过这个端点提交登录信息。如果登录成功,返回一个JWT令牌。
5. 验证JWT令牌:创建一个中间件或者依赖项,用于在用户访问需要认证的API端点时验证JWT令牌的有效性。
6. 使用JWT令牌保护API端点:在需要保护的API端点上使用上面创建的验证机制,确保只有提供有效JWT令牌的用户才能访问。
下面是一个简单的示例代码:
```python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from pydantic import BaseModel
from datetime import datetime, timedelta
from passlib.context import CryptContext
# 为示例简单起见,这里使用硬编码的密钥
SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
"johndoe": {
"username": "johndoe",
"hashed_password": pwd_context.hash("secret"),
}
}
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user or not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
app = FastAPI()
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(oauth2_scheme)):
return current_user
```
在这个示例中,我们定义了一个`/token`端点用于登录并获取JWT令牌,以及一个`/users/me`端点用于返回当前用户的信息,这个端点使用了`oauth2_scheme`依赖项来验证JWT令牌。
阅读全文