TenSuns147 2022-11-11 11:37 采纳率: 100%
浏览 66
已结题

通过Python multiprocessing库的Pool进程池实现多进程并等待所有进程执行完毕的问题

问题描述:我在正在通过Python执行shell命令来调用本专业软件进行大量分析,每条命令执行完(即软件分析完成)后会自动生成一些文件,接着要读取这些文件,因为需要反复调用本专业软件的次数太多,而且本专业软件单独分析一次需要十几秒,所以此想要了并行的概念。

目的:我需要同时执行上百条shell命令,并在命令完全执行完毕后,接着读取本专业软件生成的结果文件,我先后使用了两种办法去解决问题,具体如下

方案一:仅使用subprocess库,代码如下,

import subprocess as sbp

processes = []
for ii in range(10):
    newfolder = 'test' + str(ii + 1)
    cc = sbp.Popen(cmd, cwd=newfolder, shell=True)
    processes.append(cc)

for jj in range(10):
    processes[jj].wait()

这段代码运行完毕会生成几个文件,后续代码的功能是读取这些文件,如果只是两三条命令同时执行,不会出问题,但当我试图同时执行10条命令时,运行后极少数情况会正常,大多时候会报错,报错的提示就是没有发现某某文件,打开失败,由此判断应该是某些文件夹下的cmd命令并没有执行完毕。请问朋友们该怎么处理?

方案二:使用subprocess库和multiprocessing库,代码如下,

import subprocess as sbp
import multiprocessing as mlp

def run_shell(number):
    sbp.Popen(cmd, cwd='test' + str(number), shell=True)

numlist = [ii + 1 for ii in range(100)]
pool = mlp.Pool(100)
pool.map(run_shell, numlist)
pool.close()
pool.join()

类似的,执行过程也会因为没有生成某些文件打开失败而中断,经过学习,我了解到受计算机CPU数的限制,例如CPU数为4,则程序会创建4个进程,待其中之一执行完毕后才会执行第5条命令,请问大家如何保证这100个进程全部运行结束再执行后续代码?

此外,将这两种方案导入time模块进行计时,发现方案一输出的时间包含了本专业软件运行消耗的时间,而方案二输出的时间似乎只是创建进程消耗的时间,如何使方案二把调用本专业软件进行分析的用时也涵盖上呢?

  • 写回答

5条回答 默认 最新

  • chuifengde 2022-11-11 15:07
    关注
    看例子,用队列来实现,监测每个子进程的执行完成状态,子进程执行完成后入队:
    import subprocess as sp
    import multiprocessing as mp
    import time
    
    def run_exe(q, code):
        s1 = time.time()
        p = sp.Popen('test.py ' + str(code),shell = True)
        while 1:
            if p.poll() == 0:
                q.put((p.pid, code, time.time() - s1))
                break
    
    if __name__ == "__main__":    
        jobs = []
        qq = mp.Queue()
        for i in range(20):
            pp = mp.Process(target = run_exe, args = (qq,i ))
            jobs.append(pp)
            pp.start()
            
        for s in jobs:
            s.join()
    
        for _ in jobs:
            cur = qq.get()
            print(cur)
            #这里能获得子进程id及对应的传入参数及运行时间
    --result
    (6784, 2, 6.593695163726807)
    (14944, 0, 8.390507698059082)
    (2144, 1, 9.3748459815979)
    (11824, 5, 10.656054019927979)
    (4360, 11, 9.8748300075531)
    (13596, 4, 11.034750699996948)
    (15308, 6, 12.576527833938599)
    (13056, 8, 12.155471563339233)
    (13684, 3, 11.481432437896729)
    (15328, 14, 12.865367650985718)
    (15092, 7, 13.527697086334229)
    (5640, 9, 13.480824947357178)
    (6776, 12, 13.168334484100342)
    (15508, 17, 13.965184450149536)
    (6420, 15, 13.152710437774658)
    (14260, 10, 13.605820655822754)
    (15896, 16, 12.652727127075195)
    (3400, 13, 12.980840921401978)
    (8112, 18, 12.730849266052246)
    (13944, 19, 13.215219497680664)
    
    
    
    #我的test.py程序是被调用程序
    dirname = r'C:\Users\Administrator\Desktop\stss'
    
    import sys 
    
    args = sys.argv
    code=int(sys.argv[1])
    a = []
    for i in range(100):
        for j in range(code*1000):
            a.append(i*j)
    with open(dirname+'/tt'+str(code)+'.txt',mode='w',encoding='utf-8')    as f:    
        print(a,file =f )
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(4条)

报告相同问题?

问题事件

  • 系统已结题 11月24日
  • 已采纳回答 11月16日
  • 创建了问题 11月11日

悬赏问题

  • ¥15 下图接收小电路,谁知道原理
  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度
  • ¥30 关于#r语言#的问题:如何对R语言中mfgarch包中构建的garch-midas模型进行样本内长期波动率预测和样本外长期波动率预测
  • ¥15 ETLCloud 处理json多层级问题
  • ¥15 matlab中使用gurobi时报错
  • ¥15 这个主板怎么能扩出一两个sata口
  • ¥15 不是,这到底错哪儿了😭
  • ¥15 2020长安杯与连接网探