Celery介绍和使用

思考:

  • 消费者取到消息之后,要消费掉(执行任务),需要我们去实现。
  • 任务可能出现高并发的情况,需要补充多任务的方式执行。
  • 耗时任务很多种,每种耗时任务编写的生产者和消费者代码有重复。
  • 取到的消息什么时候执行,以什么样的方式执行。

结论:

  • 实际开发中,我们可以借助成熟的工具Celery来完成。
  • 有了Celery,我们在使用生产者消费者模式时,只需要关注任务本身,极大的简化了程序员的开发流程。

1. Celery介绍

  • Celery介绍:

    • 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
    • Celery是一个功能完备即插即用的任务队列
    • 单个 Celery 进程每分钟可处理数以百万计的任务。
    • 通过消息进行通信,使用消息队列(broker)客户端消费者之间进行协调。
  • 安装Celery:

    $ pip install -U Celery
    
  • Celery官方文档

2. 创建Celery实例并加载配置

1.定义Celery包

2.创建Celery实例

celery_tasks.main.py

# celery启动文件
from celery import Celery

# 为celery使用django配置文件进行设置
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "meiduo_mall.settings")

# 创建celery实例
celery_app = Celery('celery_tasks')

3.加载Celery配置

celery_tasks.config.py

broker_url = "redis://127.0.0.1/15"

###########################说明#############################

# 如果使用别的作为中间人, 例如使用 rabbitmq
# 则 rabbitmq 配置如下:
broker_url= 'amqp://用户名:密码@ip地址:5672'

# 例如: 
# meihao: 在rabbitq中创建的用户名, 注意: 远端链接时不能使用guest账户.
# 123456: 在rabbitq中用户名对应的密码
# ip部分: 指的是当前rabbitq所在的电脑ip
# 5672: 是规定的端口号
broker_url = 'amqp://meihao:123456@172.16.238.128:5672'

celery_tasks.main.py

# celery启动文件
from celery import Celery


# 创建celery实例
celery_app = Celery('meiduo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')

3. 定义发送短信任务

1.注册任务:celery_tasks.main.py

# celery启动文件
from celery import Celery


# 创建celery实例
celery_app = Celery('celery_tasks')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# 自动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])

2.定义任务:celery_tasks.sms.tasks.py

tasks

from celery_tasks.main import celery_app
from libs.yuntongxun.sms import CCP
import logging
logger = logging.getLogger('django')

# name:异步任务别名
@celery_app.task(name='send_sms_code')
def send_sms_code(mobile, sms_code):
    """
    发送短信异步任务
    :param mobile: 手机号
    :param sms_code: 短信验证码
    """
    try:
        send_ret = CCP().send_template_sms(mobile, [sms_code, 5], 1)
    except Exception as e:
        logger.error(e)

4. 启动Celery服务

$ cd ~/meiduo_project/meiduo_mall
$ celery -A celery_tasks.main worker -l info
  • -A指对应的应用程序, 其参数是项目中 Celery实例的位置。
  • worker指这里要启动的worker。
  • -l指日志等级,比如info等级。

5. 调用发送短信任务

# 发送短信验证码
# CCP().send_template_sms(mobile,[sms_code, 5], 1)
# Celery异步发送短信验证码

send_sms_code.delay(mobile, sms_code)

6. 补充celery worker的工作模式

  • 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
  • 如何自己指定进程数: celery worker -A proj --concurrency=4
  • 如何改变进程池方式为协程方式: celery worker -A proj --concurrency=1000 -P eventlet -c 1000
# 安装eventlet模块
$ pip install eventlet

# 启用 Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000