Celery是一个功能完备即插即用的任务队列。
celery的特点是:
- 简单,易于使用和维护,有丰富的文档。
- 高效,单个celery进程每分钟可以处理数百万个任务。
- 灵活,celery中几乎每个部分都可以自定义扩展。
celery非常易于集成到一些web开发框架中.
基础概念
任务队列中包含称作任务
的工作单元。
有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
celery通过
消息
进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者):
- clients发出消息到队列中
- broker将队列中的信息派发给worker来处理
- worker处理任务
一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。
- 任务生产者 :调用Celery提供的API,函数,装饰器而产生任务并交给任务队列的都是任务生产者。
- 任务调度 Beat:Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列。
- 中间人(Broker):Celery 用消息通信,通常使用中间人(Broker)在客户端和 worker 之前传递,这个过程从客户端向队列添加消息开始,之后中间人把消息派送给 worker。
- 执行单元 worker:worker 是任务执行单元,是属于任务队列的消费者,worker持续地监控任务队列,当队列中有新地任务时,它便取出来执行。worker 可以运行在不同的机器上,只要它指向同一个中间人即可,worker还可以监控一个或多个任务队列, Celery 是分布式任务队列的重要原因就在于 worker 可以分布在多台主机中运行。修改配置文件后不需要重启 worker,它会自动生效。
- 任务结果存储backend:用来持久存储 Worker 执行任务的结果,Celery支持不同的方式存储任务的结果,包括AMQP,Redis,memcached,MongoDb,SQLAlchemy等。
简单版本:
- Task:
任务(Task)就是你要做的事情,例如一个注册流程里面有很多任务,给用户发验证邮件就是一个任务,这种耗时的任务就可以交给Celery去处理,还有一种任务是定时任务,比如每天定时统计网站的注册人数,这个也可以交给Celery周期性的处理。 - Broker:
Broker 的中文意思是经纪人,指为市场上买卖双方提供中介服务的人。在Celery中这个角色相当于数据结构中的队列,介于生产者和消费者之间经纪人。例如一个Web系统中,生产者是主程序,它生产任务,将任务发送给 Broker,消费者是 Worker,是专门用于执行任务的后台服务。Celery本身不提供队列服务,一般用Redis或者RabbitMQ来实现队列服务。 - Worker:
Worker 就是那个一直在后台执行任务的人,也成为任务的消费者,它会实时地监控队列中有没有任务,如果有就立即取出来执行。 - Beat:
Beat 是一个定时任务调度器,它会根据配置定时将任务发送给 Broker,等待 Worker 来消费。 - Backend:
Backend 用于保存任务的执行结果,每个任务都有返回值,比如发送邮件的服务会告诉我们有没有发送成功,这个结果就是存在Backend中,当然我们并不总是要关心任务的执行结果。
走个流程
1.创建应用
1 | ## tasks.py |
1 | # celeryconfig.py |
my_task函数加上装饰器app.task, 将其注册到broker的队列中。
现在我们在创建一个worker, 等待处理队列中的任务.
打开终端,执行命令:
1 | celery -A tasks worker --loglevel=info |
这里需要注意,如果是使用win10可能会发生错误.
这时候需要安装pip install eventlet
然后执行:
1 | celery -A tasks worker -l info -P eventlet |
2.调用任务
任务加入到broker队列中,以便刚才我们创建的celery workder服务器能够从队列中取出任务并执行。如何将任务函数加入到队列中,可使用delay()。
1 | import time |
- send_mail.delay()会将此任务加入到broker队列中
- worker发现broker队列中有任务,取出并执行
通过worker的控制台,可以看到我们的任务被worker处理。调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。
3.存储结果
如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。
有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
任务结果存储配置我们通过Celery的backend参数来设定。
backend参数 : 指定redis作为结果存储
1 | # tasks.py |
执行:
1 | celery -A tasks worker -l info -P eventlet |
调用:
1 | from tasks import my_task |
state: 返回任务状态;
task_id: 返回任务id;
result: 返回任务结果,同get()方法;
get(): 获取返回值;
t.get(propagate=False)
: 只获取错误结果,不触发异常ready(): 判断任务是否以及有结果,有结果为True,否则False;
info(): 获取任务信息,默认为结果;
wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
successfu(): 判断任务是否成功,成功为True,否则为False;
traceback() : 里面存着错误信息
事实上,delay 方法封装了 apply_async,如下:
1
2
3 def delay(self, *partial_args, **partial_kwargs):
"""Shortcut to :meth:`apply_async` using star arguments."""
return self.apply_async(partial_args, partial_kwargs)apply_async 支持更多的参数,它的一般形式如下:
1 apply_async(args=(), kwargs={}, route_name=None, **options)apply_async 常用的参数如下:
countdown:指定多少秒后执行任务
task1.apply_async(args=(2, 3), countdown=5)
5 秒后执行任务eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
1
2
3
4 from datetime import datetime, timedelta
# 当前 UTC 时间再加 10 秒后执行任务
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))expires:任务过期时间,参数类型可以是 int,也可以是 datetime
task1.multiply.apply_async(args=[3, 7], expires=10)
10 秒后过期
Application(应用)
Celery 库在使用之前必须初始化,一个celery实例被称为一个应用(或者缩写 app),一般叫做celery应用,或者更简单直接叫做一个app。
app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。
Celery 应用是线程安全的,所以多个不同配置、不同组件、不同任务的 应用可以在一个进程空间里共存。
1 | from celery import Celery |
celery 应用的文本表示:
- 包含应用类的名称(Celery)
- 当前主模块的名称(main)
- 应用对象的内存地址(0x2747861b5c0)
main name(主模块名称)
当你发送一个消息给 Celery,消息中不会包含任何源码,而只有你想要执行的任务的名称。这就好像因特网上的域名映射原理一般:每个执行单元维护着一个任务名称到实际任务函数的映射,这个映射被称为任务注册表。
当你定义一个任务,这个任务就会被添加到本地注册表
1 | # liangbo.py |
这里的
__main__
和python里的模块中的__main__
一样:
- 在当前模块就是
__main__
- 当被导入时就是
liangbo
修改celery设置
app.conf.update
1 | from celery import Celery |
config_from_object
app.config_from_object() 方法的参数可以是:
- 一个配置模块
- 任意包含配置属性的对象
1 | # celery.py |
1 | from celery import Celery |
config_from_envvar
app.config_from_envvar() 从环境变量中获取配置模块名称。
1 | # 从环境变量 CELERY_CONFIG_MODULE 所声明的模块加载配置: |
设置的优先级:
- 运行时的配置修改
- 配置模块(如果声明)
- 默认配置(celery.app.defaults)
敏感配置
Celery 提供了一些有用的工具函数来展示这些配置信息,其中一个是 humanize() 函数:app.conf.humanize(with_defaults=False, censored=True)
如果一个配置项键名包含以下字符串,它将被看作是敏感的:API,TOKEN,KEY,SECRET,PASS,SIGNATURE,DATABASE
延迟加载
创建一个 celery 实例是延迟加载的,意味着它只有在实际调用的时候才会被求值。
创建一个celery 实例只会做如下事情:
- 创建一个用于事件的逻辑时钟实例
- 创建一个任务注册表
- 将自己设置为当前应用实例(如果 set_as_current 参数被禁用将不会做此设置)
- 调用 app.on_init() 回调函数(默认不做任何事情)
应用的终止有两种情况:
- 显式调用 app.finalize() 终止,
- 通过访问 app.tasks 属性隐式终止。