写一套基于fastapi和sqlite数据库的后端代码
时间: 2024-04-23 21:07:41 浏览: 279
好的,以下是一个基于FastAPI和SQLite数据库的后端代码示例,包含用户注册、登录、文件上传、文件下载和文件列表展示等功能。注意:示例中的密码没有加密处理,请在实际项目中进行加密处理。
```python
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
from typing import Optional
import jwt
import os
import sqlite3
# 初始化FastAPI应用
app = FastAPI()
# 允许跨域请求
origins = [
"http://localhost",
"http://localhost:8080",
"https://example.com",
"https://www.example.com"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化OAuth2密码验证
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 连接SQLite数据库
conn = sqlite3.connect('file_db.sqlite')
c = conn.cursor()
# 用户数据模型
class User:
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password = password
# 文件数据模型
class File:
def __init__(self, id, name, path, user_id):
self.id = id
self.name = name
self.path = path
self.user_id = user_id
# 用户注册
@app.post("/register")
async def register(username: str, password: str):
# 检查用户名是否已存在
c.execute("SELECT * FROM users WHERE username=?", (username,))
if c.fetchone():
raise HTTPException(status_code=409, detail="Username already exists")
# 创建新用户
c.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
user_id = c.lastrowid
user = User(user_id, username, password)
# 生成JWT令牌
access_token = generate_token(user)
return { "access_token": access_token, "token_type": "bearer" }
# 用户登录
@app.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 检查用户名和密码是否匹配
c.execute("SELECT * FROM users WHERE username=? AND password=?", (form_data.username, form_data.password))
user = c.fetchone()
if not user:
raise HTTPException(status_code=401, detail="Incorrect username or password")
# 生成JWT令牌
user = User(user[0], user[1], user[2])
access_token = generate_token(user)
return { "access_token": access_token, "token_type": "bearer" }
# 生成JWT令牌
def generate_token(user: User):
access_token_expires = timedelta(minutes=30)
access_token_payload = {
"sub": str(user.id),
"username": user.username,
"exp": datetime.utcnow() + access_token_expires
}
access_token = jwt.encode(access_token_payload, "secret", algorithm="HS256")
return access_token
# 获取当前用户信息
@app.get("/me")
async def me(token: str = Depends(oauth2_scheme)):
try:
# 解析JWT令牌
token_payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_id = int(token_payload["sub"])
username = token_payload["username"]
user = User(user_id, username, None)
return user
except:
raise HTTPException(status_code=401, detail="Invalid token")
# 上传文件
@app.post("/upload")
async def upload_file(file: UploadFile = File(...), token: str = Depends(oauth2_scheme)):
try:
# 解析JWT令牌
token_payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_id = int(token_payload["sub"])
# 保存文件
file_path = os.path.join("uploads", file.filename)
with open(file_path, "wb") as f:
f.write(file.file.read())
# 保存文件记录
c.execute("INSERT INTO files (name, path, user_id) VALUES (?, ?, ?)", (file.filename, file_path, user_id))
conn.commit()
file_id = c.lastrowid
file = File(file_id, file.filename, file_path, user_id)
return file
except:
raise HTTPException(status_code=401, detail="Invalid token")
# 获取文件列表
@app.get("/files")
async def get_files(token: str = Depends(oauth2_scheme)):
try:
# 解析JWT令牌
token_payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_id = int(token_payload["sub"])
# 获取该用户上传的文件列表
c.execute("SELECT * FROM files WHERE user_id=?", (user_id,))
files = []
for row in c.fetchall():
file = File(row[0], row[1], row[2], row[3])
files.append(file)
return files
except:
raise HTTPException(status_code=401, detail="Invalid token")
# 下载文件
@app.get("/download/{file_id}")
async def download_file(file_id: int, token: str = Depends(oauth2_scheme)):
try:
# 解析JWT令牌
token_payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_id = int(token_payload["sub"])
# 获取文件信息
c.execute("SELECT * FROM files WHERE id=?", (file_id,))
file = c.fetchone()
if not file:
raise HTTPException(status_code=404, detail="File not found")
if file[3] != user_id:
raise HTTPException(status_code=403, detail="Access denied")
# 返回文件下载响应
return FileResponse(file[2], media_type="application/octet-stream", filename=file[1])
except:
raise HTTPException(status_code=401, detail="Invalid token")
# 关闭数据库连接
@app.on_event("shutdown")
def shutdown_event():
conn.close()
```
这个示例包含了用户注册、登录、文件上传、文件下载和文件列表展示等功能。你可以根据自己的需求进行修改和定制化,例如替换SQLite数据库为其他数据库。请注意,这个示例还没有进行性能优化和安全性处理,需要在实际项目中进行优化和处理。
阅读全文