Admin_Poker 2021-02-27 17:24 采纳率: 75%
浏览 150
已采纳

celery异步任务控制

程序架构: django+redis+celery+mysql

前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能

 

  • 写回答

3条回答 默认 最新

  • 歇歇 2021-02-28 00:53
    关注

    您可以将整个操作设计为分为多个chain任务

    一旦有了这样的工作流程,就可以最终定义要在整个工作流程中暂停的点。在上述每一个点上,您都可以检查前端用户是否已请求操作暂停并相应地继续操作

    一个复杂且耗时的操作O分为5个chain任务-T1,T2,T3,T4和T5-

    这些任务(第一个任务除外)中的每一个都取决于前一个任务的返回值。

    假设我们定义了每个任务后要暂停的点,因此工作流看起来像-

    • T1执行
    • T1完成,请检查用户是否已请求暂停
      • 如果用户未请求暂停-继续
      • 如果用户已经请求暂停,序列化的剩余工作流程链,并存储在某个地方后继续
    from typing import Any, Optional
    
    from celery import shared_task
    from celery.canvas import Signature, chain, signature
    
    @shared_task(bind=True)
    def pause_or_continue(
        self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
    ):
        # 用于确定是否暂停操作链的任务
        if signature(clause)(retval):
            # 暂停请求,用retval和剩余的链调用给定的回调
            # 由于执行顺序从结束到开始,链条应该颠倒过来
            signature(callback)(retval, self.request.chain[::-1])
            self.request.chain = None
        else:
            # 继续下一个任务链
            return retval
    
    
    def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
       
        newch = []
        for n, sig in enumerate(ch.tasks):
            if n != 0 and n % nth == nth - 1:
                newch.append(pause_or_continue.s(clause=clause, callback=callback))
            newch.append(sig)
        ch.tasks = tuple(newch)
        return ch
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥15 linux驱动,linux应用,多线程
  • ¥20 我要一个分身加定位两个功能的安卓app
  • ¥15 基于FOC驱动器,如何实现卡丁车下坡无阻力的遛坡的效果
  • ¥15 IAR程序莫名变量多重定义
  • ¥15 (标签-UDP|关键词-client)
  • ¥15 关于库卡officelite无法与虚拟机通讯的问题
  • ¥15 目标检测项目无法读取视频
  • ¥15 GEO datasets中基因芯片数据仅仅提供了normalized signal如何进行差异分析
  • ¥100 求采集电商背景音乐的方法
  • ¥15 数学建模竞赛求指导帮助