Frage

Ich habe ein Skript erstellt, dass standardmäßig ein Multiprocessing-Verfahren erzeugt; es funktioniert dann gut. Wenn mehrere Prozesse starten, beginnt es zu hängen, und nicht immer an der gleichen Stelle. Das Programm ist etwa 700 Zeilen Code, so dass ich versuchen werde zusammenfassen, was los ist. Ich mag die meisten meinen Multi-Kern machen, durch die langsamste Aufgabe Parallelisierung, die DNA-Sequenzen ausrichtet. Für dass ich die Subprozess Modul verwenden, um ein Befehlszeilenprogramm aufzurufen: ‚hmmsearch‘, die in I-Sequenzen durch / dev / stdin einspeisen kann, und dann ausgelesen I die ausgerichteten Sequenzen über / dev / stdout. Ich stelle mir die hängen, weil dieser mehrere Instanzen subprocess auftritt Lesen / Schreiben von stdout / stdin, und ich weiß es wirklich nicht den besten Weg, um dies zu realisieren ... Ich war auf der Suche in os.fdopen (...) & os.tmpfile (), temporäre Dateihandies oder Rohre zu schaffen, wo ich die Daten durch spülen kann. Allerdings habe ich auch nie vor und ich kann nicht vorstellen, verwendet, wie das mit dem Subprozess-Modul zu tun. Im Idealfall würde ich Bypass mag die Festplatte vollständig zu verwenden, da Rohre sind viel besser mit hohen Durchsatz Datenverarbeitung! Jede mögliche Hilfe bei dieser wäre super wunderbar !!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

Nachdem ein verbrauchtes diese während des Debuggens, habe ich ein Problem gefunden, die immer da war und ist noch nicht ganz gelöst, aber haben einige andere Ineffizienzen in den Prozess festgelegt (Debugging). Es gibt zwei Anfänge Feeder Funktionen, diese align_seq Klasse und einen Datei-Parser parseHMM () , die Lasten eine positionsspezifische Scoring-Matrix (PSM) in ein Wörterbuch. Der Hauptgeordneten Prozess vergleicht dann die Ausrichtung zu dem PSM, ein Wörterbuch unter Verwendung (von Wörterbuch) als Zeiger auf die entsprechende Punktzahl für jeden Rest. Um die Noten zu berechnen Ich möchte Ich habe zwei separate multiprocessing.Process Klassen, eine Klasse logScore () , der das Protokoll Odds Ratio (mit math.exp ()) berechnet; Ich parallelisieren diese ein; und es Warteschlangen, die berechneten Werte auf dem letzten Prozess, sumScore () , die nur diese Ergebnisse summiert (mit math.fsum), Rückkehr der Summe und alle positionsspezifische Scores zu den übergeordneten Prozess als Wörterbuch zurück. d.h.     Queue.put ([Summe, {Restposition: Position bestimmte Punktzahl, ...}]) Ich finde das außerordentlich verwirrend meinen Kopf herum (zu viele Warteschlangen!), So dass ich, dass die Leser hoffen folgen verwalten ... Nachdem alle der obigen Berechnungen fertig sind, habe ich dann die Möglichkeit geben, die kumulative Scores als tab- zu speichern mit Trennzeichen ausgegeben. Dies ist, wo es jetzt (seit gestern Abend) manchmal bricht, wie ich sicher, es für jede Position eine Punktzahl ausdruckt, wo es sollte eine Punktzahl sein. Ich denke, dass aufgrund der Latenz (Computer-Timings sind out-of-sync), manchmal, was in der Warteschlange gestellt wird zuerst für die logScore nicht erreicht sumScore zuerst. Damit sumScore weiß, wann die tally zurückzukehren und wieder von vorn anfangen, habe ich ‚ENDSEQ‘ in die Warteschlange für den letzten logScore Prozess, der eine Berechnung durchgeführt. Ich dachte, dass dann sollte es sumScore erreicht zuletzt auch, aber das ist nicht immer der Fall; nur manchmal tut es zu brechen. So jetzt habe ich ein Deadlock nicht mehr bekommen, sondern einen KeyError beim Drucken oder Speichern der Ergebnisse. Ich glaube, der Grund für die manchmal KeyError bekommen ist, weil ich eine Queue für jeden logScore Prozess erstellen, sondern dass sie alle die gleiche Queue verwenden sollen. Jetzt, wo ich so etwas wie: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

, während ich soll nur eine Warteschlange Aktie zwischen allen logScore Instanzen erstellen. d.

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
War es hilfreich?

Lösung

Das ist nicht ganz ist, wie funktioniert Pipelining ... aber dein Geist zu setzen zu erleichtern, hier ist ein Auszug aus dem subprocess Dokumentation :

  

stdin, stdout und stderr geben die   ausgeführten Programme Standardeingabe,   Standardausgabe und Standardfehler   Datei Griffe sind. Gültig   Werte sind PIPE, eine vorhandene Datei   Deskriptors (eine positive ganze Zahl), ein   bestehende Dateiobjekt, und None. ROHR   zeigt an, dass ein Neurohr für das Kind   sollte erstellt werden. Mit Keine, da kein   Umleitung wird auftreten; das Kind ist   Datei-Handles werden geerbt von   die Eltern.

Die wahrscheinlichste Bereiche Schuld würden mit dem Hauptprozess in der Kommunikation oder in Ihrem Management des Semaphore. Vielleicht schreiten Zustandsübergänge / Synchronisation nicht wie erwartet aufgrund eines Fehlers? Ich schlage vor, das Debuggen durch Hinzufügen Protokollierung / print-Anweisungen vor und nach jedem blockierenden Aufruf -., Wo Sie mit dem Hauptverfahren sind in Verbindung stehen und wo Sie erwerben / lassen Sie die Semaphore zu verengen, wo die Dinge schief gelaufen haben

Auch bin ich neugierig - ist die Semaphore unbedingt erforderlich

Andere Tipps

Ich wollte auch einfache Aufgaben parallelisieren und für die ich erstellt einen kleinen Python-Skript. Sie können auf einen Blick: http://bioinf.comav.upv.es/psubprocess/index.html

Ist ein wenig allgemeiner als das, was Sie wollen, aber für einfache Aufgaben ist ganz einfach zu bedienen. Es könnte zumindest einige insparation für Sie sein.

Jose Blanca

Es könnte ein Deadlock in subprocess sein, haben Sie versucht, das Verfahren kommuniziert mit anstatt zu warten? http://docs.python.org/library/subprocess.html

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top