FastAPI使用Celery做队列之二

本篇是FastAPI使用Celery做队列之二,与前篇不同的是,本篇是从实际项目中抽取的代码,并使用Redis做broker。

首先我们新添加个celery_queue.py,定义celery实例和相关配置,以及定时任务的执行,代码如下:

from celery import Celery
from celery.schedules import crontab
from tasks import *

celery_instance = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')

celery_instance.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json']
)

# 注册任务,实测不注册也可以,但是需要执行 from tasks import add_task,也就是说,这里的注册是为了让celery_instance知道这个任务的存在
# celery_instance.task(add_task)

celery_instance.conf.beat_schedule = {
    # 'add-every-30-seconds': {
    #     'task': 'tasks.add_task',
    #     'schedule': 30.0,
    #     'args': (16, 16)
    # },
    'add-every-minute': {
        'task': 'tasks.add_task',
        'schedule': crontab(minute='*'),
        'args': (32, 32)
    },
}

第二步,我们新添加一个task.py,定义具体执行任务的函数,代码如下

from celery import shared_task
from celery import Celery

# 要跟celery_queue.py中的保持一致,共用一个Celery实例
celery_instance = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')

# 注意点:这里的@celery_instance.task不能写成@shared_task,否则在FastAPI中调用会报错
@celery_instance.task
def add_task(x, y):
    return x + y

@celery_instance.task
def minus_task(x, y):
    return x - y

第三步,在main.py中引入task.py并添加路由

from tasks import *

@app.post("/add")
def add(x: int, y: int):
    result = add_task.delay(x, y).get()
    return {"result": result}

@app.post("/minus")
def minus(x: int, y: int):
    result = minus_task.delay(x, y).get()
    return {"result": result}

第四步,安装celery依赖并启动celery实例

pip3 install celery
celery -A celery_queue worker -B -l INFO

最后,启动FastAPI

uvicorn main:app --reload

定义在celery_queue.py中的任务就会定时执行,也可以通过路由/add/minus去手动执行任务。

1 thought on “FastAPI使用Celery做队列之二”

Leave a Comment

豫ICP备19001387号-1