实战
项目目录结构
1 | proj |
2 | ├── __init__.py |
3 | ├── celery_app.py # celery app |
4 | └── tasks.py # 需要执行的任务 |
celery_app.py
1
from celery import Celery
2
'''
3
AMQP结果后端计划在版本4.0中弃用,在版本5.0中删除,所以我们不要用rabbitmq作为后端存储服务
4
这里使用rpc服务作为后端存储
5
'''
6
# 注意必须指明include需要加载的task任务,否则在celery启动的时候,将不会去加载,除非命令参数里主动添加加载任务
7
app = Celery('celery_app',backend='rpc://', broker='amqp://',include = ["tasks"])
8
app.conf.update(
9
result_expires=3600,
10
)
11
12
if __name__ == '__main__':
13
app.start()
tasks.py
1
from celery_app import app
2
3
4
def add(x, y):
5
return x + y
6
7
8
9
def mul(x, y):
10
return x * y
11
12
13
14
def xsum(numbers):
15
return sum(numbers)
16
17
if __name__ == '__main__':
18
'''
19
注意不能在当前文件下测试,否在celery将会报错。
20
报错信息如下:
21
KeyError: 'celery_app.mul'
22
KeyError: 'celery_app.add'
23
KeyError: 'celery_app.xsum'
24
意思就是并未发现celery_app任务中有这些方法。
25
配合启动celery的时候的日志可以发现tasks的全部注册方法:
26
[tasks]
27
. celery.accumulate
28
. celery.backend_cleanup
29
. celery.chain
30
. celery.chord
31
. celery.chord_unlock
32
. celery.chunks
33
. celery.group
34
. celery.map
35
. celery.starmap
36
. tasks.add
37
. tasks.mul
38
. tasks.xsum
39
'''
40
41
'''
42
我也暂时不清楚为什么当前文件测试它调用的是celery_app而不是tasks文件的注册方法。
43
不过如果在celery_app.py文件中注册task方法的话就不会出错
44
'''
45
# mul.delay(5,6)
46
# add.delay(5,6)
47
# xsum.delay([1,2,3,4,5])
test.py
注意要在另外的文件专门调用
- 启动celery
1
# debug模式可以看到更多调试的信息
2
celery -A celery_app worker -l debug
- 运行测试文件
1
from tasks import add,mul,xsum
2
3
mul.delay(5, 6)
4
add.delay(5, 6)
5
xsum.delay([1, 2, 3, 4, 5])
6
# 任务将最早在消息发送后10秒钟执行
7
result = add.apply_async((2, 90), countdown=10)
8
# 设置最大超时获取时间
9
print (result.get(timeout = 11))
10
'''
11
测试打印结果(可以看到调用的是tasks任务的add/mul/xsum方法,这些都是在celery启动时注册过的)延迟10s执行的任务打印有些不一样,有个ETA标志啥时候开始执行:
12
[2020-03-14 18:47:30,255: INFO/MainProcess] Received task: tasks.mul[1b8c4d00-695c-42ec-a189-0cfdc510dfa5]
13
[2020-03-14 18:47:30,256: INFO/MainProcess] Received task: tasks.add[a5a75529-fc70-4748-bb94-0278ff5064d4]
14
[2020-03-14 18:47:30,257: INFO/MainProcess] Received task: tasks.xsum[da065079-ecac-4559-b33f-1d6fe09f1ffb]
15
[2020-03-14 18:47:30,259: INFO/MainProcess] Received task: tasks.add[d12376e6-e67d-4064-be4a-3ddd6b7ab72d] ETA:[2020-03-14 10:47:40.257376+00:00]
16
[2020-03-14 18:47:30,278: INFO/ForkPoolWorker-8] Task tasks.mul[1b8c4d00-695c-42ec-a189-0cfdc510dfa5] succeeded in 0.021057061000000488s: 30
17
[2020-03-14 18:47:30,278: INFO/ForkPoolWorker-1] Task tasks.add[a5a75529-fc70-4748-bb94-0278ff5064d4] succeeded in 0.020012574999999977s: 11
18
[2020-03-14 18:47:30,279: INFO/ForkPoolWorker-2] Task tasks.xsum[da065079-ecac-4559-b33f-1d6fe09f1ffb] succeeded in 0.019595174999999188s: 15
19
[2020-03-14 18:47:42,028: INFO/ForkPoolWorker-8] Task tasks.add[d12376e6-e67d-4064-be4a-3ddd6b7ab72d] succeeded in 0.0005548989999972775s: 92
20
'''
- 启动celery
常用的方法
执行
1
# 延迟多久执行
2
add.apply_async((2, 90), countdown=10)
3
4
# 指定队列名执行
5
add.apply_async(args = (7,9),queue = 'test_q')
6
7
# 通常执行方式
8
add.delay(5, 6)
状态和值获取
1
# 获取任务id,可以方便我们寻找任务
2
res = add.delay(5, 6)
3
print(res.id)
4
5
# get方法获取值有时候是会出现错误的
6
print(res.get(timeout=1))
7
8
# 不希望错误传播,可以通过传递propagate参数来禁用错误
9
print(res.get(propagate=False))
10
11
# 检查任务成功还是失败
12
print(res.failed())
13
print(res.successful())
14
15
'''
16
一个任务只能处于单个状态,但是可以通过多个状态进行。典型任务的阶段可以是:
17
PENDING -> STARTED -> SUCCESS
18
'''
19
# 查看任务状态
20
print(res.state)
签名
1
# 签名:将其传递给函数,甚至进行序列化并通过网络发送。签名实例还支持调用API。
2
# 完整签名
3
# 等同于:add.signature((2, 2))
4
s1 = add.s(2, 2) # 快捷缩写形式
5
result1 = s1.delay()
6
print(result1.get())
7
8
# 部分签名
9
s2 = add.s(12)
10
result2 = s2.delay(13)
11
print(result2.get())
12
13
s3 = add.s()
14
result3 = s3.delay(13,11)
15
print(result3.get())
扩展
group
并行调用任务列表,它返回一个特殊的结果实例,该实例可以将结果作为一个组进行检查,并按顺序检索返回值
1 | # group,统一将任务调用,返回一个特殊的结果实例 |
2 | g = group(add.s(i,i) for i in range(10))().get() |
3 | print(g) |
chain
可以将任务链接在一起,以便在一个任务返回后又调用另一个任务
1 | # chain,可以将任务链接在一起,以便在一个任务返回后又调用另一个任务。将add.s的任务调用返回后继续作为mul的参数 |
2 | ch = chain(add.s(4,4) | mul.s(4))().get() # 等同于 (add.s(4, 4) | mul.s(8))().get() |
3 | print(ch) |
常用命令和监控
以下命令要在celery 的worker启动后的前提下
1 | # 查看worker当前正在执行的任务 |
2 | celery -A celery_app inspect active |
3 | |
4 | # 强制工作程序启用事件消息(用于监视任务和工作程序) |
5 | celery -A celery_app control enable_events |
6 | |
7 | # 启用事件后,可以启动事件转储程序以查看工作程序在做什么 |
8 | celery -A celery_app events --dump |
9 | |
10 | # 或者启动curses界面(不过发现信息并没有dump的全面) |
11 | celery -A celery_app events |
12 | |
13 | # 完成监视后,可以再次禁用事件 |
14 | celery -A celery_app control disable_events |
15 | |
16 | # 查看在线节点的状态 |
17 | celery -A celery_app status |
18 | |
19 | # 查看任务结果 |
20 | celery -A celery_manage result <task_id> |
21 | |
22 | # 从所有配置的任务队列中清除消息 |
23 | celery -A celery_manage purge # 此操作无法撤消,消息将被永久删除 |
24 | # 使用-X选项排除队列被清除 |
25 | celery -A celery_manage purge -X celery |
26 | # 使用-Q选项指定要清除的队列 |
27 | celery -A celery_manage purge -Q celery,foo,bar |
28 | |
29 | # 列出计划的 ETA 任务 |
30 | celery -A celery_manage inspect scheduled |
31 | |
32 | # 列出保留的任务 |
33 | celery -A celery_manage inspect reserved # 这将列出所有已被工作程序预取的任务,并且当前正在等待执行(不包括具有 ETA 值集的任务) |
34 | |
35 | # 列出注册的任务 |
36 | celery -A celery_manage inspect registered |
37 | |
38 | # 检查query_task:按 id 显示有关任务的信息 |
39 | celery -A celery_manage inspect query_task <task_id> |
也可以安装flower插件进行监控
redis作为broker的flower监控我记得新版本后来才加的,老的4.3.0是没有的
1
# 安装
2
pip install flower
3
# 命令启动,启动后默认访问5555端口
4
celery -A celery_manage flower
5
# 指定端口
6
celery -A celery_manage flower --port=6666
7
# 指定Broker URL(项目里配置了的话可以不指定按配置文件broker)
8
celery -A celery_manage flower --broker=redis://guest:guest@localhost:6379/0
补充
celery6.0版本设置有些变动
1
# 需要执行一个设置命令,conf.py是celery的配置文件
2
celery upgrade settings conf.py
3
# 执行完后在启动
4
celery -A celery_manage worker -l debug
celery_manage.py
1
from celery import Celery
2
3
'''
4
AMQP结果后端计划在版本4.0中弃用,在版本5.0中删除,所以我们不要用rabbitmq作为后端存储服务
5
这里使用rpc服务作为后端存储
6
启动:celery -A celery_manage worker -l debug
7
'''
8
# 注意必须指明include需要加载的task任务,否则在celery启动的时候,将不会去加载,除非命令参数里主动添加加载任务
9
app = Celery('celery_app', include = ["tasks"]) # celery_app只是名称,不是worker名称
10
app.config_from_object('conf')
11
12
if __name__ == '__main__':
13
app.start()
conf.py
1
# -*- coding: utf-8 -*-
2
from celery.schedules import crontab
3
# broker_url = 'amqp://localhost'
4
broker_url = 'redis://localhost:16379/0'
5
6
result_backend = 'redis://localhost:16379'
7
8
# 设置时区,默认UTC
9
timezone = 'Asia/Shanghai'
10
11
task_serializer = 'json'
12
result_serializer = 'json'
13
CELERY_enable_utc = True
14
15
16
result_expires = 60 * 60 * 24 # 存储结果过期时间(默认1天)
17
18
# 导入指定的任务模块
19
imports = (
20
"tasks"
21
)
22
23
# 设置定时任务
24
beat_schedule = {
25
"task1": {
26
"task": "tasks.tasks.mul",
27
"schedule": crontab(minute='*/1'),
28
"args": (2, 8)
29
}
30
}
分布式计算
一般分布式计算最终都会来一个汇总合并计算结果,所以这里我们需要利用chord
tasks.py
1
from celery_manage import app
2
import pandas as pd
3
import pickle
4
from temp_pickle import TempPickle
5
"""
6
celery的参数都是只接受可序列化的参数,所以对于object对象需要先序列化整个对象
7
"""
8
# 注意这里需要显示的声明不能忽略结果,并且celery也需要设置后端接收结果的配置
9
10
def sub_task(obj, task_name):
11
print(task_name, obj)
12
df = pd.DataFrame(data={'A':[1,2,3,4,5], 'B':[6,7,8,9,0], 'C':[12,54,67,89, 56]})
13
name=f"test_{task_name}.pkl"
14
df.to_pickle(name)
15
return name # 此处的子任务返回值会统一在汇总回调里包装成数组sub_list
16
17
18
def reduce_task(sub_list,obj_file_name):
19
obj = TempPickle.load(obj_file_name)
20
print(f"obj:{obj.flag}")
21
result = None
22
for name in sub_list:
23
if name is None:
24
return None
25
df = pd.read_pickle(name)
26
if result is None:
27
result = df
28
else:
29
result = pd.concat([result, df])
30
name = f"result.pkl"
31
result.to_pickle(name)
32
return name
33
34
35
def chord_on_error(*args, **kwargs):
36
print(f"回调失败:{kwargs}")
temp_pickle.py
1
# -*- coding: utf-8 -*-
2
import os
3
import pickle
4
import tempfile
5
# 设置的临时存储位置,项目中最好放在配置文件里,然后这里只做引用
6
tempfile.tempdir = '/Users/Pocket/pocket_study/Celerys/temp'
7
8
class TempPickle:
9
def __init__(self):
10
self._file_path = []
11
12
def remove(self):
13
for f in self._file_path:
14
if os.path.exists(f):
15
os.remove(f)
16
17
def dump(self, obj):
18
with tempfile.NamedTemporaryFile('w+b', delete=False) as f:
19
self._file_path.append(f.name)
20
pickle.dump(obj, f)
21
return f.name
22
23
24
def load(path: str):
25
with open(path, 'rb') as f:
26
return pickle.load(f)
27
28
def __enter__(self):
29
return self
30
31
def __exit__(self, exc_type, exc_val, exc_tb):
32
self.remove()
33
34
# 随便写的一个测试类
35
class TestA:
36
def __init__(self):
37
self.flag = True
celery_test.py
用法
1
from tasks import sub_task, reduce_task, chord_on_error
2
from celery import chord
3
import pandas as pd
4
from datetime import datetime
5
from temp_pickle import TempPickle, TestA
6
7
if __name__ == '__main__':
8
"""
9
celery==4.4.1 版本有问题, 在get等待结果的时候回报错celery _iter_meta() got an unexpected keyword argument 'timeout'
10
回退到celery==4.3.0或者升级最新的版本即可解决问题
11
分布式计算
12
监控需要安装:pip install flower
13
监控目前BROKER_URL只能用amqp
14
"""
15
# celery的一切参数都需要JSON serializable,json序列化
16
a_obj = TestA() # 对类进行序列化
17
with TempPickle() as f:
18
obj_file_name = f.dump(a_obj)
19
header = [sub_task.s(datetime.now().date(), i) for i in range(3)]
20
# subtask函数是为了给汇总回调增加参数传递,on_error是添加错误回调
21
callback = reduce_task.subtask(kwargs={'obj_file_name': obj_file_name}).on_error(chord_on_error.s(**{'info': obj_file_name}))
22
obj = chord(header)(callback, interval=8)
23
result = obj.get(disable_sync_subtasks=False, interval=8) # 等待子任务结束后获取汇总数据
24
print(result)
25
if result is not None:
26
df = pd.read_pickle(result)
27
print(df)