Django中使用Celery任务队列

Celery队列可以被集成在Django中。

假定我们已经有一个Django项目叫pyseg.

首先在pyseg/pyseg下新建一个mycelery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pyseg.settings')

app = Celery('pyseg')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

pyseg/pyseg/settings.py中添加

CELERY_BROKER_URL = 'redis://172.17.0.2:6379/0'
CELERY_REUSLT_BACKEND = 'redis://172.17.0.2:6379/0'
CELERY_TASK_SERIALIZER = 'json'

pyseg/pyseg/__init__.py中添加

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that task will use this app.
from .mycelery import app as celery_app

__all__ = ['celery_app']

在任一app中创建tasks.py

from __future__ import absolute_import, unicode_literals
from pyseg.mycelery import app

@app.task
def add(x, y)
    return x + y

@app.task
def minus(x, y)
    return x - y    

在项目根目录下启动celery

celery -A pyseg.mycelery worker -l info

然后就可以在views中调用task了

from task import add.minus

add.delay(1, 2)

minus.delay(2, 1)

接下来启动Django来看执行效果吧!

定时任务

  1. 定时间隔任务

pyseg/pyseg/settings.py中添加

from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'myapp.tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (2, 1)
    },
}
  1. 定时执行任务
# crontab任务
# 每天7:30调用tasks.add
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'myapp.tasks.add',
        'schedule': crontab(hour=7, minute=30),
        'args': (2, 1)
    },
}

需要注意的是task参数,需要传递任务函数完整的引用,不然会报错找不到。

这两种都需要额外启动一个进程

celery -A pyseg.mycelery beat -l info

Leave a Comment

豫ICP备19001387号-1