python multiprocessing 问题 5C

最近写一个简单的web后端项目(结构简单,并发不高,但是处理逻辑稍复杂),在某一个url路由的响应函数(Run)中,处理逻辑是每次调用到Run,都启动一个子进程来处理逻辑,子进程完了之后通过multiprocessing.Queue将结果反馈给父进程(父进程收到消息表示子进程完成任务并准备结束了)。

Run函数中的流程:
    1. q = Queue() #创建queue
    2. p = Process(target=run_child,args=(q)) #创建处理业务的子进程,run_child函数耗时较长
    3. parent_listen_q(p,q) #这个函数中以gevent协程的方式不停循环从q中get消息,同时join子进程
parent_listen_q函数流程:
  def _inner():
    1. while True: #循环收消息
        msg = q.get_nowait(q) 
        if msg:
            break
        gevent.sleep(0.01)
    2. handle(msg) #处理消息
    3. p.join() #避免僵尸
  gevent.spawn(_inner) #协程

这个逻辑看起来没有问题,并且很简洁(连queue都是每个子进程分开的互不影响),但是问题就出在每个子进程一个queue上。
一个场景是:
用户接连调用了两次Run,期间间隔很短,子进程的生命周期长于这个间隔。会启动了两个子进程,记为c1,c2,父进程记为p。
第一次调用Run的时候,关系是:父p,子c1,队列q1。
第二次调用Run的时候,关系是:父p,子c2,队列q2,但此时c1还未结束,父进程的parent_listen_q协程也还在运行中,c2会继承这个运行的协程,
导致的问题是有两个进程(p,c2)同时在循环从q1中get数据,当后续c1结束向q1中发消息时,有可能消息被c2抢先取到,而真正关心这个消息的p却收不到消息了,这样就出了问题。
出现这个问题的根本原因有两个:
1. fork子进程会继承父进程的栈空间,导致了协程也被继承下来了。
2. queue底层其实也是基本的ipc对象,看Queue的代码是pipe实现的,在fork的时候也会被继承,并且,如果不继承的话反而实现不了ipc了。
所以,上面这种结构的处理逻辑对这个问题就不是蛮好解决了,看起来不能给queue设置一个“close_on_exec”的标志。
我在这里暂时用了一个很挫的方法曲线解决了这个问题(确实挺搓,因为不是避免,而是修补):

修补后的parent_listen_q函数流程:
    def _inner():
        1. while True:
            msg = q.get_nowait() 
            if p._parent_pid != os.getpid(): #如果是子进程
                if msg:
                    q.put_nowait(msg) #如果发现被子进程不小心收错了,重新塞回去(汗。。)
                return #直接结束这个冒牌货协程
            if msg:
                break
            gevent.sleep(0.01)
        2. handle(msg)
        3. p.join() #如果在非父进程中,这行会报异常
    gevent.spawn(_inner)

在这里向各位大神请赐教,不知有没有更优雅的方法解决这个问题,本人新手,求带。

2个回答

设置标志标识子进程未结束,判断未结束时不启动新进程

u010300403
Coder.Z 那样搞和只用单进程没什么区别了啊,用多进程就是为了并发啊
一年多之前 回复

这个直接创建一个queue好了,然后每个消息中加一个子进程的标示来区分。这样fork子进程也不会继承queue。每个子进程都往一个queue里塞数据。父进程循环读取一个queue处理

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问