fastapi 中 使用 queue
时间: 2023-09-13 21:05:56 浏览: 165
fastadmin queue 插件
在 FastAPI 中使用队列可以使用 Python 内置的 `queue` 模块,或者第三方的异步队列库 `aiorq`。下面是两个示例:
使用 Python 内置的 `queue` 模块:
```python
import asyncio
from fastapi import FastAPI
from queue import Queue
app = FastAPI()
q = Queue()
async def worker():
while True:
item = await asyncio.get_event_loop().run_in_executor(None, q.get)
# 处理队列中的任务,这里只是简单地打印出来
print(f"Processing {item}")
q.task_done()
@app.post("/task")
async def add_task(item: str):
q.put(item)
return {"message": "Task added to queue"}
if __name__ == "__main__":
asyncio.create_task(worker())
uvicorn.run("module_name:app", host="0.0.0.0", port=8000, log_level="info")
```
在上面的示例中,`worker` 协程不断地从队列中获取任务,并进行处理。`add_task` 函数将任务添加到队列中。在 Uvicorn 的 `run` 函数中,将 `module_name` 替换为你的应用程序模块名,例如 `myapp.main`。
使用 `aiorq` 库:
```python
import asyncio
from fastapi import FastAPI
from aiorq import Queue
app = FastAPI()
q = Queue()
async def worker():
while True:
item = await q.get()
# 处理队列中的任务,这里只是简单地打印出来
print(f"Processing {item}")
await q.task_done()
@app.post("/task")
async def add_task(item: str):
await q.put(item)
return {"message": "Task added to queue"}
if __name__ == "__main__":
asyncio.create_task(worker())
uvicorn.run("module_name:app", host="0.0.0.0", port=8000, log_level="info")
```
在上面的示例中,`worker` 协程不断地从队列中获取任务,并进行处理。`add_task` 函数将任务添加到队列中。请确保在安装并导入 `aiorq` 库。在 Uvicorn 的 `run` 函数中,将 `module_name` 替换为你的应用程序模块名,例如 `myapp.main`。
阅读全文