我在学习python异步非阻塞的时候遇到了一些问题,处理数据的时候我在进程池创建了64个进程,运行了大概半个多小时,然后进程CPU使用率变为0了,进程全部休眠,只剩下一两个python3存在CPU占用率,所有操作好像都停下了,需要写入的文件末尾不在新增。这是为什么呢?我应该怎么解决。除此之外,就是每个进程的CPU占用率一直比较低,我怎么才能跑满每一核?
# python
# -*- encoding: utf-8 -*-
'''
@File :read.py
@Time :2022/02/10 03:30:02
@Author :charles kiko
@Version :1.0
@Contact :charles_kiko@163.com
@Desc :扫描基因组内的重复序列 python read.py lens fasta out_file
'''
import os
import gc
import sys
import csv
import numpy as np
import pandas as pd
from pandas import Series, DataFrame
from Bio import SeqIO
import multiprocessing # Step I : 导入模块
from multiprocessing import cpu_count#读取CPU核心数用于匹配线程数
def count_str(seq):
str_dic = {'A':0,'T':0,'C':0,'G':0}
for i in seq:
str_dic[i] = str_dic[i] + 1
if str_dic['G'] + str_dic['C'] == 0:
return 0
else:
return (str_dic['G'] + str_dic['C']) / len(seq)
def find(seq,name,start,end):
global DNA_dict# 全局变量
global contig
global contigs_0
global dup
# print(seq)
file = open('out.out', 'a+')
file.write(name+'_'+str(start)+'_'+str(end)+'\n')
file.close()
name1 = name+str(start)+str(end)
GC = count_str(seq)
for contig_ in contig:
if contig_ not in DNA_dict.keys():
print(contig_,'not in keys!')
gc.collect()
# continue
return 0
top = str(DNA_dict[contig_])
left = str(seq)
for i in range(len(top) - len(left) + 1):
if i + len(left) >= len(top):
continue
if top[i:i + len(left)] == left:
if contig_ == name and start == i:
continue
name2 = contig_+str(i)+str(i+len(left))
if name1 not in dup:
dup.append(name1)
lt = [contig_,name1,start,end,len(left),GC,seq]
file = open(sys.argv[3], 'a+')
file.write('\t'.join([str(m) for m in lt])+'\n')
if name2 not in dup:
dup.append(name2)
lt = [contig_,name2,i,i+len(left),len(left),GC,seq]
file = open(sys.argv[3], 'a+')
file.write('\t'.join([str(m) for m in lt])+'\n')
# print(seq)
file = open('out.out', 'a+')
file.write(contig_+'*'+str(i)+'*'+str(i+len(left))+'\n')
file.close()
del lt
gc.collect()
file.close()
del top,left
gc.collect()
return 0
if __name__ == '__main__' :#多进程
length = 5
dup=[]
contigs = {}
contigs_0 = {}
for i in open(sys.argv[1],'r'):
if i != '\n':
lt = i.strip('\n').split()
contigs[str(lt[0])] = int(lt[1])
DNA_dict = SeqIO.to_dict(SeqIO.parse(sys.argv[2], "fasta"))# 提取之后直接返回字典
for key in DNA_dict.keys():
DNA_dict[key] = str(DNA_dict[key].seq).upper()
contigs_ = {k: v for k, v in sorted(contigs.items(), key=lambda item: item[1], reverse=True)}
contig = list(contigs_.keys())
# print(contig)
file = open('out.out', 'w')
file.close()
file = open(sys.argv[3], 'w')
file.write(
'\t'.join(['contig_name','name','start', 'end','seq_length','GC','SEQ'])+'\n')
file.close()
pool = multiprocessing.Pool(processes = 8) # Step II : 进程池
for i in contig:
if i not in DNA_dict.keys():
continue
for j in range(5,len(DNA_dict[i])):
for m in range(len(DNA_dict[i]) - j + 1):
contigs_0[str(DNA_dict[i][m:m + j])+str(m)+str(m+j)] = 1
pool.apply_async(find, (DNA_dict[i][m:m + j],i,m,m+j,), ) # Step III : 异步(并行)计算
pool.close() # Step IV : 准备结束
pool.join() # Step IV : 完全结束