掌握Django(十五)–运行后台任务

本文介绍Django如何运行后台任务。


Celery简介

在大多数应用场景里,都有资源密集处理的任务,这些任务处理要花较长时间,比如处理图片与视频、生成报表、发送邮件、运行机器学习模型等。如果我们将这些处理过程也放在我们的应用程序里,由于处理繁忙,很可能使我们不能及时地响应客户的请求。所以我们应该尽量保持应用程序空闲,而一些工作繁忙的任务,最好拿出来由另外的程序来处理,换句话说,就是繁忙的处理任务放在后台工作。

举个实际的场景,比如上传图片和视频,我们不能让用户一直等着直到我们处理完毕,我们应该发个消息给用户告诉我们正在处理,然后将这个任务转到后台处理。等任务处理完毕后,给用户发个消息告知任务已处理完毕。这样才会有好的体验。

如何才能做到这样呢?

Celery! 不是蔬菜哦。

http://celeryproject.org

Celery 在后台启用几个工作进程,同时提供一个队列,工作进程监视队列,当队列里有任务时,后台工作进程就拿过来运行。当后台工作进程超负荷时,也可以很方便地再增加工作进程,利用Celery我们就可以并行地运行任务。最重要的是,后台的工作进程处理不影响前台的应用程序,后台延迟或崩溃也不会影响前台应用程序,它还可以继续为用户服务。

另外,Celery还可以定期安排工作,比如每天早上8点执行或每周一执行等。总之,Celery非常棒!


消息中介

上一节我们说应用通过队列传递给Celery,队列是消息中介(Message Broker)的一部分。而消息中介则保证消息可靠地传递。

目前有多种消息中介,最流行的是Redis和RabbitMQ。从技术上讲,Redis不是真正的消息中介者,它是一种内存数据库作缓存用,但同时也可以充当消息中介。RabbitMQ是真正的企业级消息中介者,它提供了很多Redis没有的能力,但它需要付费也更复杂。

这一节我们使用Redis,后续我们会讲到Cache缓存,也会用到Redis,这样Redis兼作消息中介和缓存。即便后续确实有理由要用到RabbitMQ的时候,也是比较方便切换的。


安装Redis

安装Redis最方便的就是使用Docker:

docker run -d -p 6379:6379 redis

这里的 -d 表示分离,这里就可以在后台运行;-p 表示端口映射,6379是redis的标准端口。

完成后验证一下: docker ps

确认redis 确定在运行。

然后,我们还要安装 redis的python库:

pipenv install redis


Celery 和 windows

比较遗憾的是,从Celere 4 开始,Celery不支持Windows了。

解决方法其实也很简单,可以使用Windows自带的WSL搭建Linux环境。WSL就不细讲了,有兴趣的可以翻看下我之前的公众号文章,有专门介绍使用WSL的。


设置Celery


首先安装celery:

pipenv install celery

然后开始设置。先创建 storefront — celery.py文件:

import os

from celery import Celery

os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘storefront.settings’)

# 设置环境变量,主要是Celery程序用,指明了Django的设置模块

celery = Celery(‘storefront’)

# 创建一个celery对象,构造函数的参数字符串‘storefront’表示项目名称

celery.config_from_object(‘django.conf:settings’, namespace=’CELERY’)

# 指定celery去哪里找设置,namespace表示设置里的前缀,这里设了CELERY表示以CELERY开头的都是celery的配置

celery.autodiscover_tasks()

# 从所有注册的应用中加载task模块

接着,我们去 storefront — settings.py里设置消息中介。我们这里用了redis,注意后面的1 表示是数据库,redis用1、2、3这样的标识表示数据库,这里用1。

CELERY_BROKER_URL = ‘redis://localhost:6379/1’

然后,我们需要在 storefront — __init__.py中添加导入 celery模块,以使celery代码执行。

from .celery import celery

最后,我们打开一个shell窗口,启动celery工作进程。-A 后跟着项目名称,worker是表示类型,loglevel表示日志级别用于调试排错用。

celery -A storefront worker –loglevel=info

检查启动成功 ,有ready字样提示。并能看启动多个worker,我这里启了4个对应我的机器有4个CPU,以及broker的redis等信息,例如:

– ** ———- .> app:         storefront:0x7ff6634c96d0

– ** ———- .> transport:   redis://localhost:6379/1

– ** ———- .> results:     disabled://

– *** — * — .> concurrency: 4 (prefork)

我们完成了celery的设置。下一节将创建任务来执行。顺便提一句,目前要打开多个shell,一个是运行redis的docker,一个是celery的启动。以及python打开的python manage.py runserver 窗口,实际上我们可以用docker-compose来简化,设置好后只需 docker-compose up即可。当然这是题外话,目前就保持这样。


创建和执行任务


我们创建 playground — task.py 文件,celery会自动搜索各个应用下的task模块。我们定义了一个函数 notify_customers,模拟给一万个顾客发送邮件,显然这是一个长时间的任务,我们用sleep函数模拟了10秒钟。另外还要导入storefront.celery的celery,并用装饰符@celery.task 标识notify_customers函数。

from time import sleep

from storefront.celery import celery

@celery.task

def notify_customers(message):

    print(‘Sending 10k emails…’)

    print(message)

    sleep(10)

    print(‘Emails were successfully sent!’)

这是网上celery常用的使用方法。实际上是有问题的,如果我们想重用playground应用怎么办,因为依赖了 storefront,难道每次重用都要导入storefront吗?我们稍做修改使用shared_task.

from celery import shared_task

@shared_task

def notify_customers(message):

    …

接着,我们来运行这个任务,转到playground — views.py ,导入notify_customers, 并调用它的delay方法。如下:

from .task import notify_customers

def say_hello(request):

    notify_customers.delay(“Hello”)

    …

完成后,访问 /playground/hello,发现页面马上就有回应了,即便我们要花10秒的时间执行任务。实际上这10秒的长时间任务在后台运行了。

但我们查看后台celery,可能有错误提示notify_customers未注册,主要是celery在notify_customers前已在运行了,解决问题很简单,我们按ctrl+c停止,再重新启动celery即可。

celery -A storefront worker –loglevel=info

然后再访问hell页面,就能看到celery在后台执行发送邮件的相关提示信息了。

我们再来看一个场景,假如celery挂掉了,情况会怎么样?

我们用ctrl+c 终止掉celery,然后访问页面 /playground/hello ,发现页面没有任何错误,很迅速地响应了用户的请求。然后我们再启动celery,发现后台已自动执行notify_customers了。这是怎么做到的呢?

原因是:当我们访问页面API时,程序将消息发给了消息中介,这里是redis,redis发现 celery挂掉了,它就暂时不传递给celery,当celery恢复后,redis就继续把消息传给celery执行。可见消息中介(broker)非常重要,它能保证消息传递的可靠性。


调度定时任务


我们有时需要定时执行一些任务,比如定期生成报表,定时发送邮件等。使用celery beat可以做到这点。celery后台执行工作的worker还是一样,beat相当于一个管理器或协调器。

我们来看看如何做?

我们在 settings.py里添加设置:

CELERY_BEAT_SCHEDULE = {

    ‘notify_customers’: {

        ‘task’: ‘playground.task.notify_customers’,

        ‘schedule’: 5,

        # 5表示5秒,如果5分钟的话,就用5*60

        # 也可以用crontab,需要先导入

        # from celery.schedules import crontab

        # 比如我们要定时在每周一7时30分执行,就用下面这句

        # ‘scehdule’:crontab(day_of_week=1,hour=7,minute=30)

        # 或者也可以用crontab表示间隔,比如下面表示每15分钟执行

        # scehdule’: crontab(minute=’*/15′)

        ‘args’: [‘Hello World’],

        # 任务的参数

        ‘kwargs’: {}

        # 如果有键值对参数,可以传递字典参数,这里可以删除

    },

}

接着我们启动celery beat,我们打开另外一个终端窗口,运行:

celery -A storefront beat

之前我们在项目名称 storefront后跟的是 worker, 这里跟的是beat表示启用beat。这样每隔5秒钟就会执行 notify_customers函数。


监视celery任务


监视celery的工具是 flower,注意别读成“花”,而是flow-er。

安装:pipenv install flower

启动:celery -A storefront flower 

启动flower 后,可以访问5555端口监视相关后台任务。

http://127.0.0.1:5555


小结


本文介绍了Django运行后台任务的相关知识。下一篇计划介绍自动化测试相关知识。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注