0
点赞
收藏
分享

微信扫一扫

分布式任务队列 Celery — Overview


#目录


文章目录


  • ​​前言​​
  • ​​简介​​

  • ​​Celery 的应用场景​​
  • ​​架构组成​​
  • ​​Celery 应用基础​​



前言

分布式任务队列 Celery,Python 开发者必备技能,结合之前的 RabbitMQ 系列,深入梳理一下 Celery 的知识点。当然,这也将会是一个系列的文章。

简介

Celery 是一个简单、灵活且可靠的分布式任务队列(Distributed Task Queue)。队列是一种常见的数据结构,具有 FIFO(First in First Out)的特性,阻塞队列能够有效的将应用程序解耦为生产者和消费者,从而实现异步调用。任务的主体是被 celery.task 装饰器包装过的函数,而在队列中传递的 “任务” 其实是执行这些函数所需要的全部参数。

首先需要明确的是,Celery 虽然称之为分布式任务队列,但其本身并不提供队列服务,它更像是队列的 Controller,而非队列本身。所以应用 Celery 还需要集成第三方的消息队列中间件(e.g. RabbitMQ or Redis)。

那 Celery 和 RabbitMQ 有什么区别呢?简单来说 RabbitMQ 专注于消息队列的管理,而 Celery 专注于实时的任务处理。

Celery 的应用场景


  • 即时响应需求:网页的响应时间是用户体验的关键,Amazon 曾指出响应时间每提高 100ms,他们的收入便会增加 1%。对于一些需要长时间执行的任务,大多会采用异步调用的方式来释放用户操作。Celery 的异步调用特性,和前端使用 Ajax 异步加载类似,能够有效缩短响应时间。
  • 周期性任务需求(Periodic Task):对于心跳测试、日志归档、运维巡检这类指定时间周期执行的任务,可以应用 Celery 任务调度模块 Celery Beat,支持 crontab 定时模式,简单方便。
  • 高并发及可扩展性需求:解耦应用程序最直接的好处就是可扩展性和并发性能的提高。Celery 支持以多线程、多进程、协程的方式来并发执行任务,同时支持自动动态扩展。

架构组成

分布式任务队列 Celery — Overview_应用



Broker
消息代理,作为临时储存任务的中间媒介,为 Celery 提供了队列服务。生产者将任务发送到 Broker,消费者再从 Broker 获取任务。



Beat
任务调度器,负责调度并触发 Celery 定时周期任务。Beat 进程读取 CeleryConfig 中自定义的定时周期任务列表,将到期需要执行的定时任务发送到任务队列中。



Worker
任务执行单元,实际负责执行任务的服务进程,每一个 Worker 都有一个并发池(Prefork/Eventlet/Gevent/Thread)来支持多并发。Worker 会监听订阅的任务队列,当队列中有任务时,就会获取任务并执行。



Result Backend/Store
任务执行状态和结果存储,Celery 支持任务实时处理,也就是说 Celery 可以把任务执行的实时状态和最终结果回传生产者。这种回传也需要通过中间存储媒介。



Celery 应用基础

先通过一个简单的例子来认识 Celery,这里使用 RabbitMQ 充当 Broker,使用 Redis 充当 Result Backend。

  • 安装 Celery & RabbitMQ & Redis
pip install celery
pip install redis
sudo apt-get install -yq rabbitmq-server
sudo apt-get install -yq redis-server
  • 初始化 Celery Proj 项目
jmilkfan@aju-dev:/workspace$ tree proj/
proj/
├── app_factory.py # Celery application 工厂模块
├── celeryconfig.py # Celery 常规配置文件模块
├── celery.py # Celery 启动模块
├── __init__.py
└── task # 任务包
├── api.py # 任务 API 模块
├── tasks.py # 任务实现模块
└── __init__.py
  • 配置 Celery Config
    vim proj/celeryconfig.py
# 直接使用 RabbitMQ 的缺省用户 guest 以及缺省虚拟主机 '/'
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
# Redis 同样使用默认数据库 '0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# 设定导入任务模块路径
CELERY_IMPORTS = ['proj.task.tasks']
  • 实现 App Factory
    vim proj/app_factory.py
    这里使用 ​​​__future__.absolute_import​​ 是因为 proj 包中的 celery 模块与 celery 库重名了,所以需要指明 celery 从绝对路径中导入。
from __future__ import absolute_import
from celery import Celery

def make_app():
app = Celery('proj')
# 指定常规配置模块路径
app.config_from_object('proj.celeryconfig')
return app
  • 实现 Celery Runner
    vim proj/celery.py
    该模块能够自动被 celery cmd 识别,并自动加载模块中的 app 对象,以此来启动 worker service。
from proj.app_factory import make_app

app = make_app()
  • 实现 Celery Tasks
    vim proj/task/tasks.py
from proj.celery import app

# 使用 celery.task 装饰器包装的函数就是任务的主体
@app.task
def add(x, y):
return x + y
  • 启动 Celery Worker
/workspace$ celery worker -A proj -l info

选项 ​​-A/--app​​,指定 Celery 的项目,如果参数是一个 Package,那么 celery cmd 会自动搜索 celery.py 模块。当然了,你也可以使用 app 所在的 module_name 作为参数。celery cmd 最终要加载的实际上是 app 对象。

分布式任务队列 Celery — Overview_应用_02

  • 执行任务
    执行任务的代码逻辑一般在 “生产者” 实现。
>>> from proj.task import tasks
>>> tasks.add
<@task: proj.task.tasks.add of proj at 0x7f9149e6e250>
>>> result = tasks.add.delay(1, 2)
>>> result.status
u'SUCCESS'
>>> result.failed()
False
>>> result.info
3
>>> result.result
3

分布式任务队列 Celery — Overview_分布式_03

NOTE:想要获取任务执行结果必须启用 Result Backend,否则会触发异常

>>> result.status
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 436, in state
return self._get_task_meta()['status']
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 375, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/local/lib/python2.7/dist-packages/celery/backends/base.py", line 352, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute ‘_get_task_meta_for'
  • 使用自定义 Exchange 和 Queue
    vim proj/app_factory.py
    自定义 Exchange 和 Queue 需要依赖 kombu 库,kombu 是一个 Messaging library for Python,Python 写的消息库。目标是为 AMQP 协议提供高层接口,让 Python 的消息传递尽可能变得简单,并且也提供一些常见的消息传递问题解决方案。
from __future__ import absolute_import
from celery import Celery
from kombu import Queue, Exchange


def make_app():
app = Celery('proj')
app.config_from_object('proj.celeryconfig')

# 除了 config_from_object,也支持 celery.conf 直接设置 Celery Config

# 定义 Exchange
default_exchange = Exchange('default', type='direct')
web_exchange = Exchange('task', type='direct')

# 定义默认 Exchange 和 Queue,所有没有指定队列的任务,都会放置在这里
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

# 定义 Queues
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('high_queue', web_exchange, routing_key='hign_task'),
Queue('low_queue', web_exchange, routing_key='low_task'),
)
return app

分布式任务队列 Celery — Overview_架构_04

  • 指定任务队列
    task.apply_async 和 task.delay 一样用于触发任务,区别在于前者的功能更多。
    例如:指定任务队列
>>> from proj.task import tasks
>>> result = tasks.add.apply_async(args=(1, 2), queue='low_queue')
>>> result.status
u'SUCCESS'
>>> result.result
3

  • task.apply_async
    使用 celery.task 装饰器包装函数之后,实际上得到的是一个类对象,其含有 delay、apply_async 等属性方法,当我们调用这两个方法时,才实际触发了一个任务。当我们直接调用这个函数时,其实跟调用一个普通函数没有区别。
    apply_async 的参数列表(加粗为常用参数
  • args (Tuple): The positional arguments to pass on to the task.
  • kwargs (Dict): The keyword arguments to pass on to the task.
  • countdown (float): Number of seconds into the future that the task should execute. Defaults to immediate execution. 指定 Worker 接收到任务后,执行任务之前等待的时间(s)。
  • eta (~datetime.datetime): Absolute time and date of when the task should be executed. May not be specified if ​​countdown​​ is also supplied. 使用 datetime 指定 Worker 接收到任务后,开始执行任务的精确时间
  • expires (float, ~datetime.datetime): Datetime or seconds in the future for the task should expire. The task won’t be executed after the expiration time. 使用 datetime 指定 Worker 接收到任务后的任务过期时间,过期后则不再执行。
  • shadow (str): Override task name used in logs/monitoring. Default is retrieved from :meth:​​shadow_name​​.
  • connection (kombu.Connection): Re-use existing broker connection instead of acquiring one from the connection pool. 重新指定消息代理的 connection。
  • retry (bool): If enabled sending of the task message will be retried in the event of connection loss or failure. Default is taken from the :setting:​​task_publish_retry​​ setting. Note that you need to handle the producer/connection manually for this to work. 在消息代理 connection 失败或断开的情况下,重试任务发送。
  • retry_policy (Mapping): Override the retry policy used. See the :setting:​​task_publish_retry_policy​​ setting. 任务重试发送策略。
  • queue (str, kombu.Queue): The queue to route the task to. This must be a key present in :setting:​​task_queues​​, or :setting:​​task_create_missing_queues​​ must be enabled. See :ref:​​guide-routing​​ for more information. 指定任务储存队列。
  • exchange (str, kombu.Exchange): Named custom exchange to send the task to. Usually not used in combination with the ​​queue​​ argument. 指定任务传递交换机
  • routing_key (str): Custom routing key used to route the task to a worker server. If in combination with a ​​queue​​ argument only used to specify custom routing keys to topic exchanges. 指定队列与交换机的 Routing Key。
  • priority (int): The task priority, a number between 0 and 9. Defaults to the :attr:​​priority​​ attribute. 设定队列优先级 0~9。
  • serializer (str): Serialization method to use. Can be ​​pickle​​, ​​json​​, ​​yaml​​, ​​msgpack​​ or any custom serialization method that’s been registered with :mod:​​kombu.serialization.registry​​. Defaults to the :attr:​​serializer​​ attribute. 指定任务数据的序列化方式,默认为 json。
  • compression (str): Optional compression method to use. Can be one of ​​zlib​​, ​​bzip2​​, or any custom compression methods registered with :func:​​kombu.compression.register​​. Defaults to the :setting:​​task_compression​​ setting.
  • link (~@Signature): A single, or a list of tasks signatures to apply if the task returns successfully. 如果任务执行成功,则紧接着执行一系列任务签名。
  • link_error (~@Signature): A single, or a list of task signatures to apply if an error occurs while executing the task. 如果任务执行发生错误,则紧接着执行一系列任务签名。
  • producer (kombu.Producer): custom producer to use when publishing the task.
  • add_to_parent (bool): If set to True (default) and the task is applied while executing another task, then the result will be appended to the parent tasks ​​request.children​​ attribute. Trailing can also be disabled by default using the :attr:​​trail​​ attribute
  • publisher (kombu.Producer): Deprecated alias to ​​producer​​.
  • headers (Dict): Message headers to be included in the message.



举报

相关推荐

0 条评论