Celery介绍和使用
思考:
- 消费者取到消息之后,要消费掉(执行任务),需要我们去实现。
- 任务可能出现高并发的情况,需要补充多任务的方式执行。
- 耗时任务很多种,每种耗时任务编写的生产者和消费者代码有重复。
- 取到的消息什么时候执行,以什么样的方式执行。
结论:
- 实际开发中,我们可以借助成熟的工具
Celery来完成。- 有了
Celery,我们在使用生产者消费者模式时,只需要关注任务本身,极大的简化了程序员的开发流程。
1. Celery介绍
Celery介绍:
- 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
- Celery是一个功能完备即插即用的任务队列
- 单个 Celery 进程每分钟可处理数以百万计的任务。
- 通过消息进行通信,使用
消息队列(broker)在客户端和消费者之间进行协调。
安装Celery:
$ pip install -U Celery
2. 创建Celery实例并加载配置
1.定义Celery包

2.创建Celery实例

celery_tasks.main.py
# celery启动文件
from celery import Celery
# 为celery使用django配置文件进行设置
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "meiduo_mall.settings")
# 创建celery实例
celery_app = Celery('celery_tasks')
3.加载Celery配置

celery_tasks.config.py
broker_url = "redis://127.0.0.1/15"
###########################说明#############################
# 如果使用别的作为中间人, 例如使用 rabbitmq
# 则 rabbitmq 配置如下:
broker_url= 'amqp://用户名:密码@ip地址:5672'
# 例如:
# meihao: 在rabbitq中创建的用户名, 注意: 远端链接时不能使用guest账户.
# 123456: 在rabbitq中用户名对应的密码
# ip部分: 指的是当前rabbitq所在的电脑ip
# 5672: 是规定的端口号
broker_url = 'amqp://meihao:123456@172.16.238.128:5672'
celery_tasks.main.py
# celery启动文件
from celery import Celery
# 创建celery实例
celery_app = Celery('meiduo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
3. 定义发送短信任务

1.注册任务:celery_tasks.main.py
# celery启动文件
from celery import Celery
# 创建celery实例
celery_app = Celery('celery_tasks')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# 自动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])
2.定义任务:celery_tasks.sms.tasks.py
from celery_tasks.main import celery_app
from libs.yuntongxun.sms import CCP
import logging
logger = logging.getLogger('django')
# name:异步任务别名
@celery_app.task(name='send_sms_code')
def send_sms_code(mobile, sms_code):
"""
发送短信异步任务
:param mobile: 手机号
:param sms_code: 短信验证码
"""
try:
send_ret = CCP().send_template_sms(mobile, [sms_code, 5], 1)
except Exception as e:
logger.error(e)
4. 启动Celery服务
$ cd ~/meiduo_project/meiduo_mall
$ celery -A celery_tasks.main worker -l info
-A指对应的应用程序, 其参数是项目中 Celery实例的位置。worker指这里要启动的worker。-l指日志等级,比如info等级。

5. 调用发送短信任务
# 发送短信验证码
# CCP().send_template_sms(mobile,[sms_code, 5], 1)
# Celery异步发送短信验证码
send_sms_code.delay(mobile, sms_code)

6. 补充celery worker的工作模式
- 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
- 如何自己指定进程数:
celery worker -A proj --concurrency=4 - 如何改变进程池方式为协程方式:
celery worker -A proj --concurrency=1000 -P eventlet -c 1000
# 安装eventlet模块
$ pip install eventlet
# 启用 Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000
