Python中使用Celery任务队列

Python中使用Celery队列。

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB, Amazon SQS,CouchDB, SQLAlchemy ,Django ORM, IronMQ。推荐使用RabbitMQ、Redis作为消息队列。

任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache。建议使用与消息中间件一样的服务。

这里我们使用Redis来作为消息中间件和任务结果存储。

假设Redis服务已正常安装且处理启动状态。

首先需要安装Python的redis、celery依赖

pip install redis celery

创建一个任务 task.py

from celery import Celery

app = Celery('tasks', broker='redis://172.17.0.2:6379/0', backend='redis://172.17.0.2:6379/0')

@app.task
def add(x, y):
    return x + y

启动这个任务

celery -A task worker --loglevel=info

另建一个脚本来执行任务 pub.py

from task import add

for i in range(1000):
    add.delay(i, i)

# result = add.delay(2,1)
#
# print(result.ready())
#
# print(result.state)
#
# print(result.get())

另开一个终端执行该脚本,然后就可以在两个终端中看到任务的执行情况了。

Leave a Comment

豫ICP备19001387号-1