本文介绍Django如何运行后台任务。
Celery简介
在大多数应用场景里,都有资源密集处理的任务,这些任务处理要花较长时间,比如处理图片与视频、生成报表、发送邮件、运行机器学习模型等。如果我们将这些处理过程也放在我们的应用程序里,由于处理繁忙,很可能使我们不能及时地响应客户的请求。所以我们应该尽量保持应用程序空闲,而一些工作繁忙的任务,最好拿出来由另外的程序来处理,换句话说,就是繁忙的处理任务放在后台工作。
举个实际的场景,比如上传图片和视频,我们不能让用户一直等着直到我们处理完毕,我们应该发个消息给用户告诉我们正在处理,然后将这个任务转到后台处理。等任务处理完毕后,给用户发个消息告知任务已处理完毕。这样才会有好的体验。
如何才能做到这样呢?
Celery! 不是蔬菜哦。
http://celeryproject.org
Celery 在后台启用几个工作进程,同时提供一个队列,工作进程监视队列,当队列里有任务时,后台工作进程就拿过来运行。当后台工作进程超负荷时,也可以很方便地再增加工作进程,利用Celery我们就可以并行地运行任务。最重要的是,后台的工作进程处理不影响前台的应用程序,后台延迟或崩溃也不会影响前台应用程序,它还可以继续为用户服务。
另外,Celery还可以定期安排工作,比如每天早上8点执行或每周一执行等。总之,Celery非常棒!
消息中介
上一节我们说应用通过队列传递给Celery,队列是消息中介(Message Broker)的一部分。而消息中介则保证消息可靠地传递。
目前有多种消息中介,最流行的是Redis和RabbitMQ。从技术上讲,Redis不是真正的消息中介者,它是一种内存数据库作缓存用,但同时也可以充当消息中介。RabbitMQ是真正的企业级消息中介者,它提供了很多Redis没有的能力,但它需要付费也更复杂。
这一节我们使用Redis,后续我们会讲到Cache缓存,也会用到Redis,这样Redis兼作消息中介和缓存。即便后续确实有理由要用到RabbitMQ的时候,也是比较方便切换的。
安装Redis
安装Redis最方便的就是使用Docker:
docker run -d -p 6379:6379 redis
这里的 -d 表示分离,这里就可以在后台运行;-p 表示端口映射,6379是redis的标准端口。
完成后验证一下: docker ps
确认redis 确定在运行。
然后,我们还要安装 redis的python库:
pipenv install redis
Celery 和 windows
比较遗憾的是,从Celere 4 开始,Celery不支持Windows了。
解决方法其实也很简单,可以使用Windows自带的WSL搭建Linux环境。WSL就不细讲了,有兴趣的可以翻看下我之前的公众号文章,有专门介绍使用WSL的。
设置Celery
首先安装celery:
pipenv install celery
然后开始设置。先创建 storefront — celery.py文件:
import os
from celery import Celery
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘storefront.settings’)
# 设置环境变量,主要是Celery程序用,指明了Django的设置模块
celery = Celery(‘storefront’)
# 创建一个celery对象,构造函数的参数字符串‘storefront’表示项目名称
celery.config_from_object(‘django.conf:settings’, namespace=’CELERY’)
# 指定celery去哪里找设置,namespace表示设置里的前缀,这里设了CELERY表示以CELERY开头的都是celery的配置
celery.autodiscover_tasks()
# 从所有注册的应用中加载task模块
接着,我们去 storefront — settings.py里设置消息中介。我们这里用了redis,注意后面的1 表示是数据库,redis用1、2、3这样的标识表示数据库,这里用1。
…
CELERY_BROKER_URL = ‘redis://localhost:6379/1’
…
然后,我们需要在 storefront — __init__.py中添加导入 celery模块,以使celery代码执行。
from .celery import celery
最后,我们打开一个shell窗口,启动celery工作进程。-A 后跟着项目名称,worker是表示类型,loglevel表示日志级别用于调试排错用。
celery -A storefront worker –loglevel=info
检查启动成功 ,有ready字样提示。并能看启动多个worker,我这里启了4个对应我的机器有4个CPU,以及broker的redis等信息,例如:
– ** ———- .> app: storefront:0x7ff6634c96d0
– ** ———- .> transport: redis://localhost:6379/1
– ** ———- .> results: disabled://
– *** — * — .> concurrency: 4 (prefork)
我们完成了celery的设置。下一节将创建任务来执行。顺便提一句,目前要打开多个shell,一个是运行redis的docker,一个是celery的启动。以及python打开的python manage.py runserver 窗口,实际上我们可以用docker-compose来简化,设置好后只需 docker-compose up即可。当然这是题外话,目前就保持这样。
创建和执行任务
我们创建 playground — task.py 文件,celery会自动搜索各个应用下的task模块。我们定义了一个函数 notify_customers,模拟给一万个顾客发送邮件,显然这是一个长时间的任务,我们用sleep函数模拟了10秒钟。另外还要导入storefront.celery的celery,并用装饰符@celery.task 标识notify_customers函数。
from time import sleep
from storefront.celery import celery
@celery.task
def notify_customers(message):
print(‘Sending 10k emails…’)
print(message)
sleep(10)
print(‘Emails were successfully sent!’)
这是网上celery常用的使用方法。实际上是有问题的,如果我们想重用playground应用怎么办,因为依赖了 storefront,难道每次重用都要导入storefront吗?我们稍做修改使用shared_task.
…
from celery import shared_task
@shared_task
def notify_customers(message):
…
接着,我们来运行这个任务,转到playground — views.py ,导入notify_customers, 并调用它的delay方法。如下:
…
from .task import notify_customers
def say_hello(request):
notify_customers.delay(“Hello”)
…
完成后,访问 /playground/hello,发现页面马上就有回应了,即便我们要花10秒的时间执行任务。实际上这10秒的长时间任务在后台运行了。
但我们查看后台celery,可能有错误提示notify_customers未注册,主要是celery在notify_customers前已在运行了,解决问题很简单,我们按ctrl+c停止,再重新启动celery即可。
celery -A storefront worker –loglevel=info
然后再访问hell页面,就能看到celery在后台执行发送邮件的相关提示信息了。
我们再来看一个场景,假如celery挂掉了,情况会怎么样?
我们用ctrl+c 终止掉celery,然后访问页面 /playground/hello ,发现页面没有任何错误,很迅速地响应了用户的请求。然后我们再启动celery,发现后台已自动执行notify_customers了。这是怎么做到的呢?
原因是:当我们访问页面API时,程序将消息发给了消息中介,这里是redis,redis发现 celery挂掉了,它就暂时不传递给celery,当celery恢复后,redis就继续把消息传给celery执行。可见消息中介(broker)非常重要,它能保证消息传递的可靠性。
调度定时任务
我们有时需要定时执行一些任务,比如定期生成报表,定时发送邮件等。使用celery beat可以做到这点。celery后台执行工作的worker还是一样,beat相当于一个管理器或协调器。
我们来看看如何做?
我们在 settings.py里添加设置:
CELERY_BEAT_SCHEDULE = {
‘notify_customers’: {
‘task’: ‘playground.task.notify_customers’,
‘schedule’: 5,
# 5表示5秒,如果5分钟的话,就用5*60
# 也可以用crontab,需要先导入
# from celery.schedules import crontab
# 比如我们要定时在每周一7时30分执行,就用下面这句
# ‘scehdule’:crontab(day_of_week=1,hour=7,minute=30)
# 或者也可以用crontab表示间隔,比如下面表示每15分钟执行
# scehdule’: crontab(minute=’*/15′)
‘args’: [‘Hello World’],
# 任务的参数
‘kwargs’: {}
# 如果有键值对参数,可以传递字典参数,这里可以删除
},
}
接着我们启动celery beat,我们打开另外一个终端窗口,运行:
celery -A storefront beat
之前我们在项目名称 storefront后跟的是 worker, 这里跟的是beat表示启用beat。这样每隔5秒钟就会执行 notify_customers函数。
监视celery任务
监视celery的工具是 flower,注意别读成“花”,而是flow-er。
安装:pipenv install flower
启动:celery -A storefront flower
启动flower 后,可以访问5555端口监视相关后台任务。
http://127.0.0.1:5555
小结
本文介绍了Django运行后台任务的相关知识。下一篇计划介绍自动化测试相关知识。