Coder.Z 2018-05-15 10:25 采纳率: 40%
浏览 1536
已结题

python multiprocessing 问题

最近写一个简单的web后端项目(结构简单,并发不高,但是处理逻辑稍复杂),在某一个url路由的响应函数(Run)中,处理逻辑是每次调用到Run,都启动一个子进程来处理逻辑,子进程完了之后通过multiprocessing.Queue将结果反馈给父进程(父进程收到消息表示子进程完成任务并准备结束了)。

Run函数中的流程:
    1. q = Queue() #创建queue
    2. p = Process(target=run_child,args=(q)) #创建处理业务的子进程,run_child函数耗时较长
    3. parent_listen_q(p,q) #这个函数中以gevent协程的方式不停循环从q中get消息,同时join子进程
parent_listen_q函数流程:
  def _inner():
    1. while True: #循环收消息
        msg = q.get_nowait(q) 
        if msg:
            break
        gevent.sleep(0.01)
    2. handle(msg) #处理消息
    3. p.join() #避免僵尸
  gevent.spawn(_inner) #协程

这个逻辑看起来没有问题,并且很简洁(连queue都是每个子进程分开的互不影响),但是问题就出在每个子进程一个queue上。
一个场景是:
用户接连调用了两次Run,期间间隔很短,子进程的生命周期长于这个间隔。会启动了两个子进程,记为c1,c2,父进程记为p。
第一次调用Run的时候,关系是:父p,子c1,队列q1。
第二次调用Run的时候,关系是:父p,子c2,队列q2,但此时c1还未结束,父进程的parent_listen_q协程也还在运行中,c2会继承这个运行的协程,
导致的问题是有两个进程(p,c2)同时在循环从q1中get数据,当后续c1结束向q1中发消息时,有可能消息被c2抢先取到,而真正关心这个消息的p却收不到消息了,这样就出了问题。
出现这个问题的根本原因有两个:
1. fork子进程会继承父进程的栈空间,导致了协程也被继承下来了。
2. queue底层其实也是基本的ipc对象,看Queue的代码是pipe实现的,在fork的时候也会被继承,并且,如果不继承的话反而实现不了ipc了。
所以,上面这种结构的处理逻辑对这个问题就不是蛮好解决了,看起来不能给queue设置一个“close_on_exec”的标志。
我在这里暂时用了一个很挫的方法曲线解决了这个问题(确实挺搓,因为不是避免,而是修补):

修补后的parent_listen_q函数流程:
    def _inner():
        1. while True:
            msg = q.get_nowait() 
            if p._parent_pid != os.getpid(): #如果是子进程
                if msg:
                    q.put_nowait(msg) #如果发现被子进程不小心收错了,重新塞回去(汗。。)
                return #直接结束这个冒牌货协程
            if msg:
                break
            gevent.sleep(0.01)
        2. handle(msg)
        3. p.join() #如果在非父进程中,这行会报异常
    gevent.spawn(_inner)

在这里向各位大神请赐教,不知有没有更优雅的方法解决这个问题,本人新手,求带。

  • 写回答

2条回答 默认 最新

  • zheshiweihe 2018-05-15 10:38
    关注

    设置标志标识子进程未结束,判断未结束时不启动新进程

    评论

报告相同问题?

悬赏问题

  • ¥20 求数据集和代码#有偿答复
  • ¥15 关于下拉菜单选项关联的问题
  • ¥20 java-OJ-健康体检
  • ¥15 rs485的上拉下拉,不会对a-b<-200mv有影响吗,就是接受时,对判断逻辑0有影响吗
  • ¥15 使用phpstudy在云服务器上搭建个人网站
  • ¥15 应该如何判断含间隙的曲柄摇杆机构,轴与轴承是否发生了碰撞?
  • ¥15 vue3+express部署到nginx
  • ¥20 搭建pt1000三线制高精度测温电路
  • ¥15 使用Jdk8自带的算法,和Jdk11自带的加密结果会一样吗,不一样的话有什么解决方案,Jdk不能升级的情况
  • ¥15 画两个图 python或R