L20517496
Admin_Poker
采纳率100%
2021-02-27 17:24

celery异步任务控制

20
已采纳

程序架构: django+redis+celery+mysql

前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能

 

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

3条回答

  • bill20100829 歇歇 2月前

    您可以将整个操作设计为分为多个chain任务

    一旦有了这样的工作流程,就可以最终定义要在整个工作流程中暂停的点。在上述每一个点上,您都可以检查前端用户是否已请求操作暂停并相应地继续操作

    一个复杂且耗时的操作O分为5个chain任务-T1,T2,T3,T4和T5-

    这些任务(第一个任务除外)中的每一个都取决于前一个任务的返回值。

    假设我们定义了每个任务后要暂停的点,因此工作流看起来像-

    • T1执行
    • T1完成,请检查用户是否已请求暂停
      • 如果用户未请求暂停-继续
      • 如果用户已经请求暂停,序列化的剩余工作流程链,并存储在某个地方后继续
    from typing import Any, Optional
    
    from celery import shared_task
    from celery.canvas import Signature, chain, signature
    
    @shared_task(bind=True)
    def pause_or_continue(
        self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
    ):
        # 用于确定是否暂停操作链的任务
        if signature(clause)(retval):
            # 暂停请求,用retval和剩余的链调用给定的回调
            # 由于执行顺序从结束到开始,链条应该颠倒过来
            signature(callback)(retval, self.request.chain[::-1])
            self.request.chain = None
        else:
            # 继续下一个任务链
            return retval
    
    
    def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
       
        newch = []
        for n, sig in enumerate(ch.tasks):
            if n != 0 and n % nth == nth - 1:
                newch.append(pause_or_continue.s(clause=clause, callback=callback))
            newch.append(sig)
        ch.tasks = tuple(newch)
        return ch
    点赞 评论 复制链接分享
  • L20517496 Admin_Poker 1月前

    小弟代码跑步起来了,悬赏送大佬了。小弟去解决代码的问题了。。

    点赞 评论 复制链接分享
  • bill20100829 歇歇 2月前

    add_consumer和cancel_consumer远程命令可用于开始/停止使用

    或者你用一个编号代替任务是否执行

    if IsT1on=1

    执行t1

    if IsT2on=1

    执行t2

    https://github.com/ask/celery/commit/03b7a417e86df4a2334382705f17a706163d9704

    点赞 评论 复制链接分享

为你推荐