python fastapi 使用jwt
时间: 2024-05-21 08:09:25 浏览: 286
Python FastAPI 是一个快速构建 Web API 的现代化框架,而JWT(Json Web Token)则是一种用于身份认证的开放标准。在 FastAPI 中使用 JWT 可以实现用户认证和授权,保护你的 API 免受非法访问。
使用 JWT 认证需要以下几个步骤:
1. 生成 JWT:在用户登录成功之后,需要使用用户信息生成 JWT。通常 JWT 包含一些用户信息(如 ID、用户名等)和有效期等信息。
2. 发送 JWT:在用户访问需要认证的 API 时,需要在请求头中发送 JWT。
3. 验证 JWT:API 在接收到请求时,需要验证请求头中的 JWT 是否有效。如果 JWT 有效,则允许用户访问 API,否则返回错误信息。
4. 刷新 JWT:为了保证安全性,JWT 通常都有有效期限制。在有效期结束之前,需要刷新 JWT,防止用户被强制退出。
FastAPI 本身并不提供 JWT 认证的功能,但可以使用第三方库来实现。其中较为流行的有 PyJWT 和 FastAPI-JWT。使用这些库可以很方便地实现 JWT 认证。
如果你想了解更多关于 FastAPI 和 JWT 认证的内容,可以参考以下文档:
FastAPI 文档:https://fastapi.tiangolo.com/tutorial/security/jwt/
PyJWT 文档:https://pyjwt.readthedocs.io/en/stable/
FastAPI-JWT 文档:https://github.com/authlib/example-fastapi-jwt
相关问题
写一个fastapi的jwt登录认证
### 回答1:
我可以给你一些提示来帮助你实现fastapi的jwt登录认证。首先,你需要安装fastapi和pyjwt库。然后,构建一个JWT认证函数,并将它添加到登录路由上。接下来,创建一个签名密钥,并将其添加到认证函数中。最后,将签名密钥添加到环境变量中,以便可以在登录路由中使用。
### 回答2:
FastAPI是一个基于Python的现代,高性能的Web框架,它提供了快速构建API和Web应用程序的能力。JWT(JSON Web Token)是一种用于在网络应用间传递声明的安全的、基于JSON的令牌。
要实现FastAPI的JWT登录认证,需要使用以下步骤:
1. 导入所需的库:
```python
from fastapi import FastAPI
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
```
2. 配置密码哈希和jwt密钥:
```python
SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
```
3. 创建用户模型和密码校验函数:
```python
class User(BaseModel):
username: str
password: str
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
```
4. 创建用户认证函数和生成JWT的函数:
```python
async def authenticate_user(username: str, password: str):
user = get_user_from_db(username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
```
5. 创建登录路由和保护需要认证的路由:
```python
app = FastAPI()
@app.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
return {"error": "Invalid username or password"}
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/protected")
async def protected_route(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user_from_db(username)
if user is None:
raise credentials_exception
return {"user": user}
```
以上代码示例了一个快速实现的FastAPI JWT登录认证。在登录路由中,用户提供用户名和密码,然后校验用户名和密码是否正确,如果正确则生成一个JWT,并返回给用户。在保护的路由中,用户需要提供JWT作为认证凭证,服务器会验证JWT的有效性,并根据JWT中的信息返回相应的内容。
### 回答3:
FastAPI是一个Python的Web框架,它提供了快速构建API的能力。JWT(JSON Web Token)是一种用于身份验证和授权的开放标准。下面是一个使用FastAPI实现JWT登录认证的示例。
首先,需要安装FastAPI和PyJWT等必要的库。可以使用pip命令进行安装。
```shell
pip install fastapi
pip install python-jose[cryptography]
```
接下来,创建一个Python文件,例如`main.py`,并使用以下代码编写JWT登录认证的示例。
```python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from datetime import datetime, timedelta
from passlib.context import CryptContext
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
'username': {
'username': 'username',
'password': '$2b$12$Jl7haqJbg.fBx5AZvK7Hj.57A6FYRXTFmL/NyU.JvMhLDwwBUUz/e' # hashed_password: password
}
}
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(username: str):
if username in fake_users_db:
user_dict = fake_users_db[username]
return user_dict
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user['password']):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user["username"]}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
user = get_user(username=username)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
```
上面的代码中,首先定义了一个用于存储用户信息的`fake_users_db`字典。然后使用`CryptContext`来进行密码的哈希操作。`verify_password`用于验证密码是否匹配,`get_password_hash`用于生成密码的哈希值。`get_user`用于返回指定用户名的用户信息。`authenticate_user`用于验证用户名和密码,并返回用户信息。`create_access_token`用于生成JWT令牌。
接下来,使用`@app.post`装饰器定义了一个`/token`的登录路由,接收用户名和密码,验证后生成并返回JWT令牌。
最后,使用`@app.get`装饰器定义了一个`/me`的路由,该路由需要进行JWT登录认证。在函数中解码JWT令牌,获取到用户信息并返回。
通过上述代码示例,我们实现了一个使用FastAPI实现JWT登录认证的API。可以使用`uvicorn`命令运行该API。
```shell
uvicorn main:app
```
接下来,可以通过向`http://localhost:8000/token`发起POST请求进行登录认证,获取到JWT令牌。然后使用该令牌向`http://localhost:8000/me`发起GET请求获取当前用户信息。
写一个fastapi的jwt登录认证代码
### 回答1:
答:下面是一个示例fastapi jwt登录认证代码:@app.post("/login")
def login(username: str, password: str):
user = authenticate_user(username, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
### 回答2:
下面是一个基本的FastAPI JWT登录认证代码示例:
```python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
app = FastAPI()
security = HTTPBearer()
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("username")
# 在这里可以添加自定义的验证逻辑
if username is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return username
except jwt.exceptions.DecodeError:
raise HTTPException(status_code=401, detail="Invalid token or expired token")
@app.get("/")
def home(token: HTTPAuthorizationCredentials = Depends(security)):
username = verify_token(token.credentials)
return {"message": f"Hello, {username}!"}
@app.post("/login")
def login(username: str, password: str):
# 在这里可以添加验证username和password的逻辑
# 验证通过后,生成token并返回给客户端
payload = {"username": username}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return {"token": token}
```
在上述例子中,通过`/login`接口进行用户名密码验证,验证通过后生成一个JWT token返回给客户端。客户端在访问需要认证的接口时,在请求头添加`Authorization: Bearer <token>`,服务器端会验证token的有效性,并提取出用户名。
需要注意的是,这只是一个简单的示例,实际项目中可能还需要处理更多的安全性和错误处理逻辑。另外,`SECRET_KEY`需要保持安全,最好不要直接写明在代码中。最佳实践是将其存储在环境变量中,并通过`.env`文件加载。
### 回答3:
以下是一个使用FastAPI和JWT进行身份验证的示例代码。
```python
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
# 密码哈希生成器
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 用于生成访问令牌的加密算法和密钥
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 用户模型
class User:
def __init__(self, username: str, password: str):
self.username = username
self.password = self.get_hashed_password(password)
def verify_password(self, password: str):
return pwd_context.verify(password, self.password)
def get_hashed_password(self, password: str):
return pwd_context.hash(password)
# 用户数据库
users_db = {
"admin": User("admin", "password")
}
# 获取用户的方式,通过用户名查找用户
def get_user(username: str):
if username in users_db:
return users_db[username]
return None
# 用于验证用户身份的依赖项
async def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not user.verify_password(password):
return False
return user
# 创建访问令牌的函数
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
app = FastAPI()
# 用于提供身份验证和获取访问令牌的OAuth2密码流对象
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
# 创建获取令牌的路由
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="登录失败")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 创建受保护的路由,需要身份验证
@app.get("/protected")
async def protected_route(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="无效的认证凭证")
except JWTError:
raise HTTPException(status_code=401, detail="无效的认证凭证")
return {"message": "受保护的路由"}
```
阅读全文