それぞれが独自のサブプロセス(Kubuntu、Mac)を使用してPythonマルチプロセッシング

StackOverflow https://stackoverflow.com/questions/4620041

質問

デフォルトで1つのマルチプロセッシングプロセスを作成するスクリプトを作成しました。その後、正常に動作します。複数のプロセスを開始すると、それは垂れ下がっており、常に同じ場所にいるわけではありません。プログラムは約700行のコードなので、何が起こっているのかを要約しようとします。 DNAシーケンスを調整する最も遅いタスクを並行することにより、マルチコアを最大限に活用したいと考えています。そのためには、サブプロセスモジュールを使用して、コマンドラインプログラム「HMMSearch」を呼び出します。これは /dev /stdinを介してシーケンスでフィードでき、 /dev /stdoutを介してアライメントされたシーケンスを読み取ります。これらの複数のサブプロセスインスタンスがstdout / stdinから読書 /執筆のために発生すると思いますが、私はこれを行うための最良の方法を本当に知りません...私はos.fdopen(...)&osを検討していました.tmpfile()、一時的なファイルハンドルまたはパイプを作成して、データを洗い流すことができます。しかし、私は以前にも使用したことがなく、サブプロセスモジュールでそれを行う方法を想像することはできません。理想的には、ハードドライブを完全に使用してバイパスしたいと思います。なぜなら、ハイスループットのデータ処理ではパイプがはるかに優れているからです。これに関する助けはとても素晴らしいでしょう!!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

これをデバッグするのに時間を費やしてきたので、私は常にそこにあり、まだ完全には解決されていないが、他のいくつかを修正した問題を発見しました 非効率性 (デバッグの)過程で。このAlign_Seqクラスとファイルパーサーの2つの初期フィーダー関数があります parsehmm() 位置固有のスコアリングマトリックス(PSM)を辞書にロードします。主な親プロセスは、各残基の関連スコアへのポインターとして(辞書の)辞書を使用して、PSMとのアライメントを比較します。スコアを計算するために、私は2つの個別のマルチプロセッシングを持っています。プロセスクラス、1つのクラス logscore() それは、logオッズ比を計算します(math.exp());私はこれを並行しています。そして、計算されたスコアを最後のプロセスに依頼します。 sumscore() これにより、これらのスコア(Math.fsumを使用)を合計するだけで、Sumとすべての位置固有のスコアを辞書として親プロセスに戻します。 IE queue.put([sum、{残留位置:位置特定のスコア、...}])私はこれが非常に混乱していると思います(キューが多すぎます!)。 。上記のすべての計算が完了した後、累積スコアをタブ設計出力として保存するオプションを提供します。これは(昨夜から)時々壊れる場所です。スコアがあるはずのすべてのポジションのスコアを印刷することを確認します。待ち時間(コンピューターのタイミングが整っていない)のため、時には最初にキューに入れられるものがあると思います logscore 到達しません sumScore 最初。 SumScoreがタリーを返す時期を知っているために、計算を実行した最後のログスコアプロセスの「Endseq」をキューに入れました。私はそれも最後にサムスコアに到達するはずだと思ったが、それは必ずしもそうではない。時々それは壊れます。だから今、私はもうデッドロックを得ることができませんが、代わりに結果を印刷または保存するときはキーエラーです。 KeyErrorを時々取得する理由は、各ログスコアプロセスのキューを作成するためだと思いますが、代わりにすべて同じキューを使用する必要があります。さて、私は次のようなものを持っています: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

一方、すべてのログスコアインスタンス間で共有するために1つのキューだけを作成する必要があります。すなわち

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
役に立ちましたか?

解決

それはパイプラインがどのように機能するかではありません...しかし、あなたの心を容易にするために、ここからからの抜粋があります サブプロセスドキュメント:

STDIN、STDOUT、およびSTDERRは、実行されたプログラムの標準入力、標準出力、および標準エラーファイルハンドルをそれぞれ指定します。有効な値は、パイプ、既存のファイル記述子(正の整数)、既存のファイルオブジェクトなどです。パイプはaを示します 新しいパイプ 子供に創造する必要があります。いずれも、リダイレクトは発生しません。子供のファイルハンドルは親から継承されます。

故障した最も可能性の高い領域は、主要なプロセスとの通信またはセマフォの管理にあります。たぶん、状態の移行 /同期は、バグのために予想どおりに進行していませんか?ブロッキングコールの前後にロギング/印刷ステートメントを追加することでデバッグをお勧めします。メインプロセスと通信し、セマフォを取得/リリースして物事がうまくいかなかった場所を絞り込むことをお勧めします。

また、私は興味があります - セマフォは絶対に必要ですか?

他のヒント

また、簡単なタスクに並行したかったので、小さなPythonスクリプトを作成しました。あなたは次のことを見ることができます:http://bioinf.comav.upv.es/psubprocess/index.html

あなたが望むものよりも少し一般的ですが、単純なタスクの場合は非常に使いやすいです。それは少なくともあなたにとってある程度のインスペレーションかもしれません。

ホセ・ブランカ

サブプロセスのデッドロックかもしれませんが、待機するのではなく通信方法を使用してみましたか?http://docs.python.org/library/subprocess.html

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top