Admin_Poker 2021-02-27 17:24 采纳率: 75%
浏览 150
已采纳

celery异步任务控制

程序架构: django+redis+celery+mysql

前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能

 

  • 写回答

3条回答 默认 最新

  • 歇歇 2021-02-28 00:53
    关注

    您可以将整个操作设计为分为多个chain任务

    一旦有了这样的工作流程,就可以最终定义要在整个工作流程中暂停的点。在上述每一个点上,您都可以检查前端用户是否已请求操作暂停并相应地继续操作

    一个复杂且耗时的操作O分为5个chain任务-T1,T2,T3,T4和T5-

    这些任务(第一个任务除外)中的每一个都取决于前一个任务的返回值。

    假设我们定义了每个任务后要暂停的点,因此工作流看起来像-

    • T1执行
    • T1完成,请检查用户是否已请求暂停
      • 如果用户未请求暂停-继续
      • 如果用户已经请求暂停,序列化的剩余工作流程链,并存储在某个地方后继续
    from typing import Any, Optional
    
    from celery import shared_task
    from celery.canvas import Signature, chain, signature
    
    @shared_task(bind=True)
    def pause_or_continue(
        self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
    ):
        # 用于确定是否暂停操作链的任务
        if signature(clause)(retval):
            # 暂停请求,用retval和剩余的链调用给定的回调
            # 由于执行顺序从结束到开始,链条应该颠倒过来
            signature(callback)(retval, self.request.chain[::-1])
            self.request.chain = None
        else:
            # 继续下一个任务链
            return retval
    
    
    def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
       
        newch = []
        for n, sig in enumerate(ch.tasks):
            if n != 0 and n % nth == nth - 1:
                newch.append(pause_or_continue.s(clause=clause, callback=callback))
            newch.append(sig)
        ch.tasks = tuple(newch)
        return ch
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥30 Matlab打开默认名称带有/的光谱数据
  • ¥50 easyExcel模板 动态单元格合并列
  • ¥15 res.rows如何取值使用
  • ¥15 在odoo17开发环境中,怎么实现库存管理系统,或独立模块设计与AGV小车对接?开发方面应如何设计和开发?请详细解释MES或WMS在与AGV小车对接时需完成的设计和开发
  • ¥15 CSP算法实现EEG特征提取,哪一步错了?
  • ¥15 游戏盾如何溯源服务器真实ip?需要30个字。后面的字是凑数的
  • ¥15 vue3前端取消收藏的不会引用collectId
  • ¥15 delphi7 HMAC_SHA256方式加密
  • ¥15 关于#qt#的问题:我想实现qcustomplot完成坐标轴
  • ¥15 下列c语言代码为何输出了多余的空格