东莞市盛裕绒艺玩具有限公司

东莞市盛裕绒艺玩具有限公司

yabo118

18889621535
联系方式
全国服务热线: 18889621535

咨询热线:15868706013
联系人:蒲老师
地址:天津市东丽区东外环36号

从零开始搭建django前后端分离项目 系列三(实战之异步任务执行)

来源:yabo118   发布时间:2019-11-06   点击量:414

前面已经将项目环境搭建好了,下面进入实战环节。这里挑选项目中涉及到的几个重要的功能模块进行讲解。

celery执行异步任务和任务管理

Celery 是一个专注于实时处理和任务调度的分布式任务队列。由于本项目进行数据分析的耗时比较长,所以采用异步方式执行任务。本项目中Broker使用redis,Result Backend使用django的数据库,部分配置如下settings.py(具体配置见项目代码):

import djcelerydjcelery.setup_loader()BROKER_URL = "redis://127.0.0.1:6379/5"BROKER_POOL_LIMIT = 0CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler" # 定时任务CELERY_RESULT_BACKEND="djcelery.backends.database:DatabaseBackend"# CELERY_RESULT_BACKEND = "redis://10.39.211.198:6379/6"CELERY_ACCEPT_CONTENT = ["json"]CELERY_TASK_SERIALIZER = "json"CELERY_RESULT_SERIALIZER = "json"CELERY_TIMEZONE="Asia/Shanghai"CELERY_ENABLE_UTC = TrueCELERYD_CONCURRENCY = 10CELERYD_MAX_TASKS_PER_CHILD = 10 # 每个worker最多执行10个任务就会被销毁,可防止内存泄露

项目中涉及到的celery任务执行成功、执行失败、执行完成、执行被终止、执行失败的事件和信号如下:

@task_prerun.connectdef pre_task_run(task_id, task, sender, *args, **kwargs): logger.info("task [{task_id}] 开始执行, taskname: {task.name}".format(task_id=task_id, task=task))@task_revoked.connectdef task_revoked(request,terminated,sender,expired,signal,signum): now=datetime.now() task_id=request.id logger.warn("task [{0}] 被停止。".format(task_id)) job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save()class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): job=Job.objects.filter(task_id=task_id).first() if job: channel = job.id print("channel:", channel) redis_helper = RedisHelper(channel) redis_helper.public("task [{0}] success。".format(task_id)) logger.info("task [{0}] 执行成功, success".format(task_id)) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): job = Job.objects.filter(task_id=task_id).first() if job: channel = job.id print("channel:", channel) redis_helper = RedisHelper(channel) redis_helper.public("failed") logger.error("task [{0}] 执行失败, reason: {1} ,einfo: {2}".format(task_id,exc,einfo)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) def after_return(self, status, retval, task_id, args, kwargs, einfo): now = datetime.now() job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save()

获取任务执行结果:

from celery.result import AsyncResultres=AsyncResult(taskid).get()

终止任务:

from celery.task.control import broadcast, revoke, rate_limit,inspectrevoke(task_id, terminate=True)

celery任务启动:

启用事件发送:python manage.py celery -A myproject worker -l info -E --autoscale=6,3启动快照相机:python manage.py celerycam -F 10 -l info

在开发过程中发现,当异步任务中导入sklearn包时报错 

AttributeError: "Worker" object has no attribute "_config"

所以在项目task.py中需要添加如下代码:

from celery.signals import worker_process_init@worker_process_init.connectdef fix_multiprocessing(**_): from multiprocessing import current_process try: current_process()._config except AttributeError: current_process()._config = {"semprefix": "/mp"}

并且需要把sklearn相关包从文件开始导入移到函数内部导入,具体见项目代码。

效果图:

 

相关产品

COPYRIGHTS©2017 yabo118 ALL RIGHTS RESERVED 备案号:414