程序架构: django+redis+celery+mysql
前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能
程序架构: django+redis+celery+mysql
前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能
您可以将整个操作设计为分为多个chain
任务
一旦有了这样的工作流程,就可以最终定义要在整个工作流程中暂停的点。在上述每一个点上,您都可以检查前端用户是否已请求操作暂停并相应地继续操作
一个复杂且耗时的操作O分为5个chain任务-T1,T2,T3,T4和T5-
这些任务(第一个任务除外)中的每一个都取决于前一个任务的返回值。
假设我们定义了每个任务后要暂停的点,因此工作流看起来像-
from typing import Any, Optional
from celery import shared_task
from celery.canvas import Signature, chain, signature
@shared_task(bind=True)
def pause_or_continue(
self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
):
# 用于确定是否暂停操作链的任务
if signature(clause)(retval):
# 暂停请求,用retval和剩余的链调用给定的回调
# 由于执行顺序从结束到开始,链条应该颠倒过来
signature(callback)(retval, self.request.chain[::-1])
self.request.chain = None
else:
# 继续下一个任务链
return retval
def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
newch = []
for n, sig in enumerate(ch.tasks):
if n != 0 and n % nth == nth - 1:
newch.append(pause_or_continue.s(clause=clause, callback=callback))
newch.append(sig)
ch.tasks = tuple(newch)
return ch