我创建了一个脚本,默认情况下会创建一个多处理过程。然后它可以正常工作。启动多个过程时,它开始悬挂,并且并非总是在同一位置。该程序约为700行代码,因此我将尝试总结发生的事情。我想通过平行最慢的任务(即对齐DNA序列)来充分利用我的多核。为此,我使用子进程模块调用命令行程序:“ HMMSearch”,我可以通过 /dev /stdin以序列为序列,然后我通过 /dev /stdout读取了对齐的序列。我想之所以发生悬挂,是因为这些多个子过程实例从Stdout / stdin读取 /写作,我真的不知道最好的方法...我正在研究OS.fdopen(...)&OS。 .tmpfile(),创建可以刷新数据的临时文件手机或管道。但是,我以前从未使用过,也无法用子过程模块来想象该如何做到这一点。理想情况下,我想完全使用硬盘驱动器绕过,因为高通量数据处理的管道要好得多!任何帮助都将是超级美好的!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

花了一段时间调试此事,我发现一个问题一直存在,尚未解决,但已解决了其他问题 效率低下 在(调试)的过程中。有两个初始馈线功能:此align_seq类和一个文件解析器 parsehmm() 它将特定位置的评分矩阵(PSM)加载到字典中。然后,主父进程使用字典(词典)作为指向每个残基的相关分数的指针进行比较与PSM的比对。为了计算我想要的分数,我有两个单独的多处理。进程类,一个类 logScore() 这计算对数的比值比(使用Math.exp());我平行于这个;它将计算得分排队到最后一个过程, sumscore() 这只是总结了这些分数(使用Math.fsum),将总和和所有特定位置的分数返回到父过程中。 ie queue.put([sum,{残留位置:特定位置分数,...}])我发现这个异常混乱以使我的头(太多的排队!),所以我希望读者能够遵循。 。在完成上述所有计算之后,然后给出将累积分数保存为选项卡的输出的选项。这是现在(从昨晚起)有时会破裂的地方,因为我确保它在应该有分数的每个位置打印出一个分数。我认为由于延迟(计算机时间安排不同步),有时会首先放在队列中 logScore 没有到达 sumscore 第一的。为了使sumscore知道何时返回计数并重新开始,我将“ endSeq”放入了执行计算的最后一个日志进程的队列中。我以为那也应该持续到sumscore,但并非总是如此。只有有时会破裂。因此,现在我不再发生僵局,而是在打印或保存结果时是钥匙扣。我相信有时获得KeyError的原因是因为我为每个日志过程创建一个队列,但是他们都应该使用相同的队列。现在,我有类似的东西: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

而我应该仅创建一个队列以在所有日志实例之间共享。 IE

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
有帮助吗?

解决方案

这不是管道的运作方式...而是为了放心,这是 子过程文档:

STDIN,STDOUT和STDERR分别指定执行程序的标准输入,标准输出和标准错误文件处理。有效值是管道,现有文件描述符(正整数),现有文件对象,无。管道表明 新管道 应该为孩子创建。没有,不会发生重定向;孩子的文件把手将从父母那里继承。

故障最有可能的区域是与主要过程或信号量管理的管理。也许由于错误而无法按预期进行状态过渡 /同步?我建议通过在每次封锁调用之前和之后添加日志/打印语句进行调试 - 您要与主过程进行通信,以及您在哪里获取/释放信号量以缩小问题出现问题的位置。

我也很好奇 - 信号量是绝对必要的吗?

其他提示

我还想并行化简单的任务,为此我创建了一个Python脚本。您可以看一下:http://bioinf.comav.upv.es/psubprocess/index.html

比您想要的要多一些,但是对于简单的任务来说很容易使用。至少对您来说可能是一定程度的。

何塞·布兰卡(Jose Blanca)

这可能是子过程中的僵局,您是否尝试过使用通信方法而不是等待?http://docs.python.org/library/subprocess.html

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top