celery基本介绍
为什么选择celery?
- 可以后台运行任务。
- 在请求完成后,执行后续的任务
- 通过异步执行和重试保证任务的完成
- 定时任务
- 。。。等
celery支持的序列化方式有哪些?
包括 JSON、YAML、Pickle和msgpack,在4.0版本之前默认是Pickle,4.0版本之后是JSON。
如果跨语言使用celery,建议不使用Pickle。如果传参是复杂的python对象,使用Pickle。celery需要的broker有哪些?
包括RabbitMQ、redis、SQS和Qpid,redis作为broker会由于强制终止或者断电丢失数据,Rabbitmq是推荐使用的。
我们这里使用的是redis。如果需要获取任务的执行结果,该如何配置?
需要配置backend,用于存储result。包括db、memcached、redis和Rabbitmq
我们这里使用的是redis本项目异步任务使用的架构
- celery运行时序图如下所示:
- 本项目异步任务流程图如下所示:
使用示例介绍
项目目录结构
1 | . |
说明:异步任务使用到的目录就两个,一个是config,另外一个是tasks。config的init.py文件记录celery的配置信息,tasks则是定义一个个异步任务模块,也就是定义消费者。那么生产者在哪里呢?生产者就是在api目录中的views中,后面会细讲。
安装所需环境
1 | #切换到neocu-business目录下,新建虚拟环境并安装第三方包 |
关于配置
本项目配置redis作为broker和result backend。CELERY_INCLUDE 用于指明task中所有的worker的路径,新增模块的任务都需要在这里标明下,否则启动celery的worker时候,没有消费者执行任务。其他的配置项目参考官方文档,例如CELERY_RESULT_SERIALIZER和CELERY_TASK_SERIALIZER用于设置序列化方式,CELERY_TASK_ANNOTATIONS设置任务执行速率等。配置如下
1 | NB_REDIS = 'redis://127.0.0.1:6379' |
celery app
celery的app的在tasks/init.py中,celery启动的入口,并且作为纽带连接tasks和api
1 | from celery import Celery |
celery tasks
以负载均衡为例,我们在tasks/network目录下面新建loadbalancers.py文件,这里以创建负载均衡的异步任务为例:
1 | from tasks import celery |
因为4.0之后版本的celery序列化方式默认为json,所以传参中我们不能用python对象,上面代码中token和data是基本数据类型。
token是keystone中给登录后用户生成的唯一识别码TOKEN_ID,data是用于异步请求底层api的data。如果请求成功了,那么改写数据库中的记录与底层一致,否则删除创建的数据记录
celery producer
同样以负载均衡为例,在api/network/resources/loadbalancers.py中创建负载均衡的api接口如下:
1 | from tasks.network.loadbalancers import create |
在接收到用户输入,存入数据库中后,调用异步任务create.delay(token, data)
执行真正的请求,然后直接返回response给用户。
启动 celery
在neocu-business目录下运行celery -A tasks.celery worker -l info
,启动成功如下:
1 | -------------- celery@localhost.localdomain v4.1.0 (latentcall) |
可以看到启动的celery中有四个类型的tasks等待执行任务,然后利用postman执行api的创建操作,检验异步任务的执行结果。命令中的-l info
作用:如果task中的代码执行错误会在console中有日志。如下所示
1 | [2018-05-31 16:31:35,861: INFO/MainProcess] Received task: tasks.network.loadbalancers.update[a5afef6e-1ef0-4dfb-bf82-555adfe50c06] |
基于Celery官方文档画的时序图
理解的Celery消息任务时序图如下: