Domanda

Ho creato uno script che per default crea un processo di multiprocessing; allora funziona benissimo. Quando si avvia più processi, comincia a bloccarsi, e non sempre nello stesso posto. Il programma è circa 700 linee di codice, quindi cercherò di riassumere quello che sta succedendo. Voglio fare la maggior parte dei miei multi-core, dal Parallelizzazione il compito più lento, che si sta allineando sequenze di DNA. Per l'utilizzo dello modulo sottoprocesso di chiamare un programma a linea di comando: 'hmmsearch', che posso alimentare in sequenza attraverso / dev / stdin, e poi leggere le sequenze allineate attraverso / dev / stdout. Immagino che il blocco si verifica a causa di queste istanze di processo parziali multipli di lettura / scrittura da stdout / stdin, e io davvero non so il modo migliore per andare su questo ... Stavo cercando in os.fdopen (...) & os.tmpfile (), per creare filehandles temporanei o tubi dove posso svuotare i dati attraverso. Tuttavia, non ho mai usato né prima e non riesco a immaginare come fare con il modulo sottoprocesso. Idealmente mi piacerebbe bypass utilizzando il disco rigido del tutto, perché i tubi sono molto meglio con high-throughput di elaborazione dei dati! Qualsiasi aiuto con questo sarebbe super meraviglioso !!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

Dopo aver trascorso un po 'di debug questo, ho trovato un problema che era sempre lì e non è ancora del tutto risolto, ma hanno fissato un altro inefficienze nel processo (di debug). Ci sono due funzioni dell'alimentatore iniziali, questa classe align_seq e un parser di file parseHMM () che carica una determinata posizione di matrice di punteggio (PSM) in un dizionario. Il processo genitore principale confronta quindi l'allineamento alla PSM, utilizzando un dizionario (di dizionari) come un puntatore al punteggio rilevante per ciascun residuo. Per calcolare i punteggi che voglio Ho due classi multiprocessing.Process separate, una classe di logScore () che calcola l'odds ratio di registro (con math.exp ()); Ho parallelizzare questo; ed in coda i punteggi calcolati per l'ultimo processo, sumScore () che ha appena riassume questi punteggi (con math.fsum), restituendo la somma e tutte le posizioni punteggi specifici indietro al processo genitore come un dizionario. cioè     Queue.put ([sum, {residuo posizione: punteggio specifica posizione, ...}]) Trovo che questo sia estremamente confuso per ottenere la mia testa intorno (di troppi coda!), Quindi spero che i lettori stanno riuscendo a seguire ... Dopo che tutti i calcoli di cui sopra sono fatto, ho poi dare la possibilità di salvare i punteggi cumulativi come tabulazioni uscita delimitata. Questo è dove ora (da ieri sera) a volte si rompe, come mi assicuro che stampa un punteggio per ogni posizione in cui ci dovrebbe essere un punteggio. Penso che a causa di latenza (tempi di computer essendo out-of-sync), a volte ciò che viene inserito nella coda prima di logScore non raggiunge sumScore prima. Affinché sumScore sa quando restituire il conteggio e ricominciare da capo, ho messo 'endSEQ' in coda per l'ultimo processo logScore che ha eseguito un calcolo. Ho pensato che allora dovrebbe raggiungere sumScore scorso troppo, ma non è sempre il caso; solo a volte lo fa rompere. Così ora non ottengo una situazione di stallo più, ma invece un KeyError durante la stampa o il salvataggio dei risultati. Credo che la ragione per ottenere a volte KeyError è perché creo una coda per ogni processo logScore, ma che invece si dovrebbero utilizzare la stessa coda. Ora, dove ho qualcosa di simile: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

mentre dovrei creare una sola coda per condividere tra tutte le istanze logScore. cioè.

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
È stato utile?

Soluzione

Questo non è proprio come il pipelining lavori ... ma per mettere la vostra mente a proprio agio, ecco un estratto dal sottoprocesso documentazione :

  

stdin, stdout e stderr specificare il   standard input programmi eseguiti,   standard output e standard error   handle di file, rispettivamente. Valido   valori sono PIPE, un file esistente   descrittore (un numero intero positivo), un   oggetto file, e None esistente. TUBO   indica che un nuovo tubo per il bambino   dovrebbe essere creato. Con Nessuno, no   si verificherà il reindirizzamento; il bambino è   handle di file verranno ereditate da   il genitore.

Le zone più probabili in colpa sarebbe in comunicazione con il processo principale o nella vostra gestione del semaforo. Forse transizioni di stato / sincronizzazione non stanno procedendo come previsto a causa di un bug? Suggerisco di debug con l'aggiunta di dichiarazioni di registrazione / stampa prima e dopo ogni blocco delle chiamate -. Dove si sta comunicando con il processo principale e dove si acquista / rilasciare il semaforo di restringere dove le cose hanno sbagliato andata

Anche io sono curioso -? È il semaforo assolutamente necessario

Altri suggerimenti

Volevo anche parallelizzare compiti semplici e per questo ho creato un piccolo script python. Si può dare un'occhiata a: http://bioinf.comav.upv.es/psubprocess/index.html

È un po 'più generale di quello che si vuole, ma per compiti semplici è abbastanza facile da usare. Potrebbe essere almeno di qualche insparation a voi.

Jose Blanca

Potrebbe essere una situazione di stallo nel sottoprocesso, hai provato ad utilizzare il metodo piuttosto che aspettare comunicare? http://docs.python.org/library/subprocess.html

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top