Admin_Poker 2021-02-27 17:24 采纳率: 75%
浏览 150
已采纳

celery异步任务控制

程序架构: django+redis+celery+mysql

前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能

 

  • 写回答

3条回答 默认 最新

  • 歇歇 2021-02-28 00:53
    关注

    您可以将整个操作设计为分为多个chain任务

    一旦有了这样的工作流程,就可以最终定义要在整个工作流程中暂停的点。在上述每一个点上,您都可以检查前端用户是否已请求操作暂停并相应地继续操作

    一个复杂且耗时的操作O分为5个chain任务-T1,T2,T3,T4和T5-

    这些任务(第一个任务除外)中的每一个都取决于前一个任务的返回值。

    假设我们定义了每个任务后要暂停的点,因此工作流看起来像-

    • T1执行
    • T1完成,请检查用户是否已请求暂停
      • 如果用户未请求暂停-继续
      • 如果用户已经请求暂停,序列化的剩余工作流程链,并存储在某个地方后继续
    from typing import Any, Optional
    
    from celery import shared_task
    from celery.canvas import Signature, chain, signature
    
    @shared_task(bind=True)
    def pause_or_continue(
        self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
    ):
        # 用于确定是否暂停操作链的任务
        if signature(clause)(retval):
            # 暂停请求,用retval和剩余的链调用给定的回调
            # 由于执行顺序从结束到开始,链条应该颠倒过来
            signature(callback)(retval, self.request.chain[::-1])
            self.request.chain = None
        else:
            # 继续下一个任务链
            return retval
    
    
    def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
       
        newch = []
        for n, sig in enumerate(ch.tasks):
            if n != 0 and n % nth == nth - 1:
                newch.append(pause_or_continue.s(clause=clause, callback=callback))
            newch.append(sig)
        ch.tasks = tuple(newch)
        return ch
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥15 MATLAB yalmip gurobi求解器
  • ¥15 关于#stm32#的问题:第一张图是发送数据代码,第二第三张图是接收数据并进行DAC转换的函数,若用串口一接收数据并进行DAC转换,主函数代码该怎么写
  • ¥20 hc130怎么读写内部flash存储信息
  • ¥15 Axure rp9注册与登录交互
  • ¥15 我下载图形界面重启完就变成这样了,打字也打不了,动也动不了,该怎么解决(操作系统-centos)
  • ¥15 VBA中在窗体中遍历所有checkbox控件,提取出被选中的checkbox的caption值
  • ¥15 在Ubuntu上有什么命令,或者是系统文件能告诉我链接nvme ssd的pcie槽位是不是支持热插拔功能?
  • ¥15 ansys license许可证问题
  • ¥20 QQ号和密码都能正常登录微信 QQ号和密码登录微信显示密码错误
  • ¥15 单片机RTOS Kernel与应用分离开发,Kernel如何调起应用?