Celery(分布式任务队列)
Celery是一个简单、灵活、高可用、高性能的开源(BSD许可)分布式任务处理系统,专注于实时处理的任务队列管理,同时也支持任务调度。
Celery基于Python实现(但是该协议可以用任何语言实现),跟包括Django、Pyramid、Pylons、Flask、Tornado等Web框架都无缝集成,有庞大的用户与贡献者社区。Celery需要消息传输才能发送和接收消息(如:RabbitMQ、Redis、Amazon SQS、Zookeeper)。Celery可以单机运行,也可以在多台机器上运行,甚至可以跨越数据中心运行。
官方文档
简单理解原理就是,把它看成是一个自己实现的python程序,里面监听者rabbitmq的消息队列,收到某种消息就执行对应的方法(看成是worker),如果需要存储值可以存在redis里。然后另一个程序(比如:flask)充当生产者一样,当需要哪些异步完成的时候,就通过消息队列发出去就行了,如果需要获取值就去redis取。不过呢celery实现的更复杂也有更多新功能和玩法。
安装
RabbitMQ是默认代理,如果要使用celery,最后事先安装好RabbitMQ
1 | pip install celery |
2 | # 也可以同时安装一些捆绑的软件 |
3 | pip install "celery[librabbitmq,redis,auth,msgpack]" |
4 | |
5 | # 当前版本 |
6 | celery --version |
7 | ''' |
8 | 4.4.1 (cliffs) |
9 | ''' |
10 | |
11 | # 如果要使用rabbbitmq做消息传输,可以安装librabbitmq这是用C实现的AMQP客户端,性能更好 |
12 | pip install librabbitmq |
消息传输方式比较
Zookeeper现在虽然可以运行,但没有专门的维护者。
缺少监控支持意味着无法执行事件,因此Flower,celery事件,celerymon 和其他基于事件的监视工具将无法工作。
名称 | 状态 | 监控方式 | 远程控制 |
---|---|---|---|
RabbitMQ | 稳定 | 有 | 有 |
Redis | 稳定 | 有 | 有 |
Amazon SQS | 稳定 | 没有 | 没有 |
Zookeeper | 实验性 | 没有 | 没有 |
配置
一般默认使用RabbitMQ的话,其实不怎么需要配置,除非使用其他代理的时候就需要额外的一些配置设置。
对于较大的项目,建议使用专用的配置模块。通过调用app.config_from_object()方法来告诉Celery实例使用配置模块
1 | app.config_from_object('celeryconfig') |
- celeryconfig.py
1
broker_url = 'pyamqp://'
2
result_backend = 'rpc://'
3
4
task_serializer = 'json'
5
result_serializer = 'json'
6
accept_content = ['json']
7
timezone = 'Europe/Oslo'
8
enable_utc = True
URL设置
1 | '''rabbitmq''' |
2 | app.conf.broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost' |
3 | |
4 | '''redis''' |
5 | app.conf.broker_url = 'redis://:password@hostname:port/db_number' |
6 | |
7 | # 如果应使用Unix套接字连接,则URL必须采用以下格式 |
8 | app.conf.broker_url = 'redis+socket:///path/to/redis.sock' |
9 | |
10 | # 通过virtual_host在URL中添加参数,可以在使用Unix套接字时指定其他数据库号 |
11 | app.conf.broker_url = 'redis+socket:///path/to/redis.sock?virtual_host=db_number' |
12 | |
13 | # 直接连接到Redis Sentinel列表 |
14 | app.conf.broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380;sentinel://localhost:26381' |
15 | app.conf.broker_transport_options = { 'master_name': "cluster1" } |
序列化器设置
更改task_serializer设置来配置用于序列化任务负载的默认序列化器
1 | app.conf.update( |
2 | task_serializer='json', |
3 | accept_content=['json'], # 忽略其他内容 |
4 | result_serializer='json', |
5 | timezone='Europe/Oslo', # 设置时区,内部和消息中的所有时间和日期都默认使用UTC时区 |
6 | enable_utc=True, |
7 | result_expires=3600, # 结果过期时间 |
8 | # 设置,路由规则 |
9 | task_routes = { |
10 | 'tasks.add': {'queue': 'test_q'}, # 这里设置了的话,apply_async方法可以省略写队列名称了 |
11 | }, |
12 | ) |
可见性超时设置
可见性超时定义了在将消息重新传递给另一个工作人员之前等待工作人员确认任务的秒数。
redis默认可见性超时为1小时
如果未在“ 可见性超时 ”中确认任务,则该任务将重新交付给其他工作人员并执行。这会导致ETA /倒数/重试任务出现问题,其中执行时间超过了可见性超时;实际上,如果发生这种情况,它将再次循环执行。因此,必须增加可见性超时,以匹配计划使用的最长ETA的时间。
定期任务不会受到可见性超时的影响,因为这是与ETA /倒计时不同的概念
1 | # 可见性超时设置,值为秒数 |
2 | app.conf.broker_transport_options = {'visibility_timeout': 3600} |
存储任务的状态和返回值
1 | app.conf.result_backend = 'redis://localhost:6379/0' |
2 | # 如果使用的是Sentinel,则应该这样设置返回值存储方式 |
3 | app.conf.result_backend_transport_options = {'master_name': "mymaster"} |
注意事项
Fanout prefix
默认情况下,所有虚拟主机都将看到广播消息。必须设置传输选项以为消息加上前缀,以便仅活动虚拟主机才能接收它们。
1
# 注意,无法与运行旧版本的workers或未启用此设置的workers进行通信
2
app.conf.broker_transport_options = {'fanout_prefix': True}
Fanout patterns
工作者将默认接收所有与任务相关的事件。为避免这种情况,必须设置fanout_patterns扇出选项,以便workers只能订阅与workers相关的事件
1
# 注意,此更改是向后不兼容的,因此群集中的所有工作程序都必须启用此选项,否则他们将无法通信。以后版本(4.4.1后)默认情况下将启用此选项
2
app.conf.broker_transport_options = {'fanout_patterns': True}
工作结构
Celery分为3个部分
worker部分
负责任务的处理,即工作进程
broker部分
负责任务消息的分发以及任务结果的存储,这部分任务主要由中间数据存储系统完成,比如消息队列服务器RabbitMQ、redis、Amazon SQS、MongoDB、IronMQ等或者关系型数据库。
- 注意
1
# 如果要配合redis使用的话,建议使用3.3.11,因为3.4和4.0发现unhashable type: 'Redis'的报错问题
2
pip install redis==3.3.11 # 3.4.1好像也解决了这个问题
- 注意
Celery主类
进行任务最开始的指派与执行控制,他可以是单独的python脚本,也可以和其他程序结合,应用到django或者flask等web框架里面以及你能想到的任何应用。
任务调度
默认情况下不启用结果。为了执行远程过程调用或跟踪数据库中的任务结果,您将需要配置Celery以使用结果后端。
简单使用
- day_01.py
1
from celery import Celery
2
from celery import platforms
3
4
platforms.C_FORCE_ROOT = True
5
6
'''
7
day_01为当前代码的文件名称
8
broker利用的是redis的db0作为消息队列,本地端口为16379
9
'''
10
celery = Celery('day_01', broker='redis://localhost:16379/0')
11
12
13
def add(x,y):
14
return x + y
15
16
17
if __name__ == '__main__':
18
'''
19
注意:
20
1.调用前必须先启动celery
21
比如这次的work启动方式:celery -A day_01 worker --loglevel=info
22
2.调用的时候不能直接调用add函数,不然就不是作用于celery了
23
24
'''
25
result = add.delay(7,4)
26
'''
27
调用任务将返回一个AsyncResult实例。
28
这可用于检查任务的状态,等待任务完成或获取其返回值(或者如果任务失败,则获取异常和回溯)
29
'''
30
print(type(result))
启动
1
# 启动,注意名称后面不要加文件名后缀,celery只检测名称(也就是application)
2
celery -A day_01 worker -l info # --loglevel=info
3
# 输出如下
4
-------------- celery@Pocket-MacBookPro.local v4.4.1 (cliffs)
5
--- ***** -----
6
-- ******* ---- Darwin-18.5.0-x86_64-i386-64bit 2020-03-06 18:39:30
7
- *** --- * ---
8
- ** ---------- [config]
9
- ** ---------- .> app: day_01:0x1100f25c0
10
- ** ---------- .> transport: redis://localhost:16379/0
11
- ** ---------- .> results: disabled://
12
- *** --- * --- .> concurrency: 8 (prefork)
13
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
14
--- ***** -----
15
-------------- [queues]
16
.> celery exchange=celery(direct) key=celery
17
18
19
[tasks]
20
. day_01.add
21
22
[2020-03-06 18:39:30,745: INFO/MainProcess] Connected to redis://localhost:16379/0
23
[2020-03-06 18:39:30,757: INFO/MainProcess] mingle: searching for neighbors
24
[2020-03-06 18:39:31,791: INFO/MainProcess] mingle: all alone
25
[2020-03-06 18:39:31,823: INFO/MainProcess] celery@Pocket-MacBookPro.local ready.
26
27
# 可以看到这里显示已经接收到任务
28
[2020-03-06 18:39:31,961: INFO/MainProcess] Received task: day_01.add[d88807c6-abd7-4aac-a476-5e11bf059530]
29
# 这里显示任务执行成功,并打印了结果
30
[2020-03-06 18:39:31,963: INFO/ForkPoolWorker-8] Task day_01.add[d88807c6-abd7-4aac-a476-5e11bf059530] succeeded in 0.0005231109999999095s: 11
- day_01.py
指定队列(其他broker引擎也支持)
启动了队列接收的话,代码执行调用也得指明队列队列才可以发送到指定队列,否则会发送到默认的地方(默认的队列名称为celery)。
1
# 启动celery的时候需要指定队列名称
2
celery -A day_01 worker --loglevel=info -Q 'test_q'
3
4
# 多个队列的话以逗号隔开,队列的顺序无关紧要,因为worker将给予队列同等的权重
5
celery -A proj worker -Q hipri,celery
6
7
# 代码调用的时候也需要指定队列名
8
add.apply_async(args = (7,9),queue = 'test_q')
9
'''
10
注意,有些博文说可以如下调用指定队列,但是发现报错,无奈只好使用apply_async,这两个是等同的方法。也不排除是老版本API废弃,暂未研究原因,也可能是别人博文未经过验证。。。
11
'''
12
# add.delay(3,3,queue = 'test_q')
速率限制
如果使用RabbitMQ或Redis作为代理,可以指示工作程序在运行时为任务设置新的速率限制
1
celery -A day_01 control rate_limit day_01.add 10/m
结果存储
(若非必要,能不返回结果就不返回结果,只注重任务执行就好)
如果要跟踪任务的状态,Celery需要在某些地方存储或发送状态。有几种内置的结果后端可供选择:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),也可以定义自己的后端。
1 | app = Celery('tasks', backend='rpc://', broker='pyamqp://') |
2 | app = Celery('tasks', backend='redis://localhost', broker='pyamqp://') |
3 | |
4 | # 需要注意的是,当同时以redis作为传输和存储的时候,因为redis是单线程的,所以get的时候很容易就发生错误(提示任务还在执行) |
5 | celery = Celery('day_01',backend='redis://localhost:16379', broker='redis://localhost:16379/0') |
6 | |
7 | # AMQP结果后端计划在版本4.0中弃用,在版本5.0中删除,所以我们不要用rabbitmq作为后端存储服务 |
8 | celery = Celery('day_01',backend='amqp://', broker='amqp://') |
redis同时作为传输(transport)和存储结果(results)时需要注意
redis是单线程的,在立马get获取结果的时候很容易触发任务还在执行的想象,所以不推荐单个redis同时作为传输和存储
- day_01.py
1
from celery import Celery
2
from celery import platforms
3
4
platforms.C_FORCE_ROOT = True
5
6
# 这里使用redis作为broker,所以运行前得确保redis服务已经启动
7
# 也可以使用rabbitmq,broker = 'amqp://localhost'
8
9
# 不推荐broker是redis的时候,backend也是redis,除非redis集群了,这里只是故意这样使用
10
celery = Celery('day_01',backend='redis://localhost:16379', broker='redis://localhost:16379/0')
11
12
13
def add(x,y):
14
return x + y
- test_worker.py
1
from day_01 import add
2
import time
3
4
result = add.apply_async(args = (7,90),queue = 'test_q')
5
print(type(result))
6
7
# 稍微睡眠0.1,给redis一些缓冲时间,否则立马get容易报timeout问题
8
time.sleep(0.1)
9
10
# True:任务已执行,False:任务仍在运行,暂挂或正在等待重试
11
print(result.ready())
12
13
'''
14
后端使用资源来存储和传输结果。为确保释放资源,必须最终调用 get()或forget()在AsyncResult调用任务后返回的每个实例上
15
'''
16
# 将异步调用转换为同步调用,基本很少这样使用,在任务中等待任务可能会导致死锁,容易引发异常
17
print(result.get(timeout=1)) # timeout(float)–操作超时之前要等待的时间(以秒为单位)
18
19
# 通过指定propagate参数来覆盖此异常
20
print(result.get(propagate=False))
21
22
# 任务引发异常,可以访问原始回溯
23
print(result.traceback)
- day_01.py