Celery是一个功能完备即插即用的任务队列。

celery的特点是:

  • 简单,易于使用和维护,有丰富的文档。
  • 高效,单个celery进程每分钟可以处理数百万个任务。
  • 灵活,celery中几乎每个部分都可以自定义扩展。

celery非常易于集成到一些web开发框架中.

基础概念

任务队列中包含称作任务的工作单元。
有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.

celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者):

  • clients发出消息到队列中
  • broker将队列中的信息派发给worker来处理
  • worker处理任务

img

一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

img

  • 任务生产者 :调用Celery提供的API,函数,装饰器而产生任务并交给任务队列的都是任务生产者。
  • 任务调度 Beat:Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
  • 中间人(Broker):Celery 用消息通信,通常使用中间人(Broker)在客户端和 worker 之前传递,这个过程从客户端向队列添加消息开始,之后中间人把消息派送给 worker。
  • 执行单元 worker:worker 是任务执行单元,是属于任务队列的消费者,worker持续地监控任务队列,当队列中有新地任务时,它便取出来执行。worker 可以运行在不同的机器上,只要它指向同一个中间人即可,worker还可以监控一个或多个任务队列, Celery 是分布式任务队列的重要原因就在于 worker 可以分布在多台主机中运行。修改配置文件后不需要重启 worker,它会自动生效。
  • 任务结果存储backend:用来持久存储 Worker 执行任务的结果,Celery支持不同的方式存储任务的结果,包括AMQP,Redis,memcached,MongoDb,SQLAlchemy等。

简单版本:

  • Task:
    任务(Task)就是你要做的事情,例如一个注册流程里面有很多任务,给用户发验证邮件就是一个任务,这种耗时的任务就可以交给Celery去处理,还有一种任务是定时任务,比如每天定时统计网站的注册人数,这个也可以交给Celery周期性的处理。
  • Broker:
    Broker 的中文意思是经纪人,指为市场上买卖双方提供中介服务的人。在Celery中这个角色相当于数据结构中的队列,介于生产者和消费者之间经纪人。例如一个Web系统中,生产者是主程序,它生产任务,将任务发送给 Broker,消费者是 Worker,是专门用于执行任务的后台服务。Celery本身不提供队列服务,一般用Redis或者RabbitMQ来实现队列服务。
  • Worker:
    Worker 就是那个一直在后台执行任务的人,也成为任务的消费者,它会实时地监控队列中有没有任务,如果有就立即取出来执行。
  • Beat:
    Beat 是一个定时任务调度器,它会根据配置定时将任务发送给 Broker,等待 Worker 来消费。
  • Backend:
    Backend 用于保存任务的执行结果,每个任务都有返回值,比如发送邮件的服务会告诉我们有没有发送成功,这个结果就是存在Backend中,当然我们并不总是要关心任务的执行结果。

1563254594586

走个流程

1.创建应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## tasks.py
import time
from celery import Celery
import celeryconfig

# 我们这里案例使用redis作为broker,demo为app的名字
app = Celery('demo')

# 从单独的配置模块中加载配置
app.config_from_object('celeryconfig')

# 创建任务函数
@app.task
def send_mail(email):
print("send mail to ", email)
time.sleep(5)
return "success"
1
2
3
# celeryconfig.py
result_backend = 'redis://127.0.0.1:6379/2'
broker_url = 'redis://127.0.0.1:6379/1'

my_task函数加上装饰器app.task, 将其注册到broker的队列中

现在我们在创建一个worker, 等待处理队列中的任务.
打开终端,执行命令:

1
celery -A tasks worker --loglevel=info

201907161353191

这里需要注意,如果是使用win10可能会发生错误.
这时候需要安装pip install eventlet

然后执行:

1
celery -A tasks worker -l info -P eventlet

2.调用任务

任务加入到broker队列中,以便刚才我们创建的celery workder服务器能够从队列中取出任务并执行。如何将任务函数加入到队列中,可使用delay()。

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from tasks import send_mail

def register():
start = time.time()
print("1. 插入记录到数据库")
print("2. celery 帮我发邮件")
send_mail.delay("xx@gmail.com")
print("3. 告诉用户注册成功")
print("耗时:%s 秒 " % (time.time() - start))

if __name__ == '__main__':
register()

1563257165377

201907161405471

  • send_mail.delay()会将此任务加入到broker队列中
  • worker发现broker队列中有任务,取出并执行

通过worker的控制台,可以看到我们的任务被worker处理。调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

3.存储结果

如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。
有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

任务结果存储配置我们通过Celery的backend参数来设定。
backend参数 : 指定redis作为结果存储

1
2
3
4
5
6
7
8
9
10
11
12
13
# tasks.py
from celery import Celery

# 我们这里案例使用redis作为broker
app = Celery('demo',
backend='redis://127.0.0.1:6379/2',
broker='redis://127.0.0.1:6379/1')

# 创建任务函数
@app.task
def my_task(a, b):
print("任务函数正在执行....")
return a + b

执行:

1
celery -A tasks worker -l info -P eventlet

调用:

1
2
3
4
5
from tasks import my_task

res = my_task.delay(10,20)
print(res.failed()) # False
print(res.result) # 30
  • state: 返回任务状态;

  • task_id: 返回任务id;

  • result: 返回任务结果,同get()方法;

  • get(): 获取返回值;

    t.get(propagate=False) : 只获取错误结果,不触发异常

  • ready(): 判断任务是否以及有结果,有结果为True,否则False;

  • info(): 获取任务信息,默认为结果;

  • wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;

  • successfu(): 判断任务是否成功,成功为True,否则为False;

  • traceback() : 里面存着错误信息

事实上,delay 方法封装了 apply_async,如下:

1
2
3
def delay(self, *partial_args, **partial_kwargs):
"""Shortcut to :meth:`apply_async` using star arguments."""
return self.apply_async(partial_args, partial_kwargs)

apply_async 支持更多的参数,它的一般形式如下:

1
apply_async(args=(), kwargs={}, route_name=None, **options)

apply_async 常用的参数如下:

  • countdown:指定多少秒后执行任务
    task1.apply_async(args=(2, 3), countdown=5)5 秒后执行任务

  • eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime

    1
    2
    3
    4
    from datetime import datetime, timedelta

    # 当前 UTC 时间再加 10 秒后执行任务
    task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))
  • expires:任务过期时间,参数类型可以是 int,也可以是 datetime
    task1.multiply.apply_async(args=[3, 7], expires=10)10 秒后过期

Application(应用)

Celery 库在使用之前必须初始化,一个celery实例被称为一个应用(或者缩写 app),一般叫做celery应用,或者更简单直接叫做一个app。

app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。

Celery 应用是线程安全的,所以多个不同配置、不同组件、不同任务的 应用可以在一个进程空间里共存。

1
2
3
4
from celery import Celery
app = Celery()
print(app)
# <Celery __main__ at 0x2747861b5c0>

celery 应用的文本表示:

  • 包含应用类的名称(Celery)
  • 当前主模块的名称(main)
  • 应用对象的内存地址(0x2747861b5c0)

main name(主模块名称)

当你发送一个消息给 Celery,消息中不会包含任何源码,而只有你想要执行的任务的名称。这就好像因特网上的域名映射原理一般:每个执行单元维护着一个任务名称到实际任务函数的映射,这个映射被称为任务注册表。

当你定义一个任务,这个任务就会被添加到本地注册表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# liangbo.py
from celery import Celery


app = Celery()
print(app)
# <Celery __main__ at 0x2747861b5c0>

@app.task
def add(x, y):
return x + y

print(add)
# <@task: __main__.add of __main__ at 0x1609222b668>

print(add.name)
# __main__.add

print(app.tasks['__main__.add'])
# <@task: __main__.add of __main__ at 0x1609222b668>

这里的__main__和python里的模块中的__main__一样:

  • 在当前模块就是__main__
  • 当被导入时就是liangbo

修改celery设置

app.conf.update

1
2
3
4
5
6
7
8
9
10
from celery import Celery

app = Celery()
print(app)
# <Celery __main__ at 0x2747861b5c0>

app.conf.update(
enable_utc=True,
timezone='Europe/London',
)

config_from_object

app.config_from_object() 方法的参数可以是:

  • 一个配置模块
  • 任意包含配置属性的对象
1
2
3
4
5
6
7
8
9
# celery.py
from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

# celeryconfig.py
enable_utc = True
timezone = 'Europe/London'
1
2
3
4
5
6
7
8
9
10
11
from celery import Celery

app = Celery()

class Config:
enable_utc = True
timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
# app.config_from_object('module:Config')

config_from_envvar

app.config_from_envvar() 从环境变量中获取配置模块名称。

1
2
3
4
5
6
7
8
9
# 从环境变量 CELERY_CONFIG_MODULE 所声明的模块加载配置:
import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

设置的优先级:

  1. 运行时的配置修改
  2. 配置模块(如果声明)
  3. 默认配置(celery.app.defaults)

敏感配置

Celery 提供了一些有用的工具函数来展示这些配置信息,其中一个是 humanize() 函数:app.conf.humanize(with_defaults=False, censored=True)

如果一个配置项键名包含以下字符串,它将被看作是敏感的:
API,TOKEN,KEY,SECRET,PASS,SIGNATURE,DATABASE

延迟加载

创建一个 celery 实例是延迟加载的,意味着它只有在实际调用的时候才会被求值。

创建一个celery 实例只会做如下事情:

  1. 创建一个用于事件的逻辑时钟实例
  2. 创建一个任务注册表
  3. 将自己设置为当前应用实例(如果 set_as_current 参数被禁用将不会做此设置)
  4. 调用 app.on_init() 回调函数(默认不做任何事情)

应用的终止有两种情况:

  • 显式调用 app.finalize() 终止,
  • 通过访问 app.tasks 属性隐式终止。