python multiprocessing ciascuna con la propria sottoprocesso (Kubuntu, Mac)
-
30-09-2019 - |
Domanda
Ho creato uno script che per default crea un processo di multiprocessing; allora funziona benissimo. Quando si avvia più processi, comincia a bloccarsi, e non sempre nello stesso posto. Il programma è circa 700 linee di codice, quindi cercherò di riassumere quello che sta succedendo. Voglio fare la maggior parte dei miei multi-core, dal Parallelizzazione il compito più lento, che si sta allineando sequenze di DNA. Per l'utilizzo dello modulo sottoprocesso di chiamare un programma a linea di comando: 'hmmsearch', che posso alimentare in sequenza attraverso / dev / stdin, e poi leggere le sequenze allineate attraverso / dev / stdout. Immagino che il blocco si verifica a causa di queste istanze di processo parziali multipli di lettura / scrittura da stdout / stdin, e io davvero non so il modo migliore per andare su questo ... Stavo cercando in os.fdopen (...) & os.tmpfile (), per creare filehandles temporanei o tubi dove posso svuotare i dati attraverso. Tuttavia, non ho mai usato né prima e non riesco a immaginare come fare con il modulo sottoprocesso. Idealmente mi piacerebbe bypass utilizzando il disco rigido del tutto, perché i tubi sono molto meglio con high-throughput di elaborazione dei dati! Qualsiasi aiuto con questo sarebbe super meraviglioso !!
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
Dopo aver trascorso un po 'di debug questo, ho trovato un problema che era sempre lì e non è ancora del tutto risolto, ma hanno fissato un altro inefficienze nel processo (di debug). Ci sono due funzioni dell'alimentatore iniziali, questa classe align_seq e un parser di file parseHMM () che carica una determinata posizione di matrice di punteggio (PSM) in un dizionario. Il processo genitore principale confronta quindi l'allineamento alla PSM, utilizzando un dizionario (di dizionari) come un puntatore al punteggio rilevante per ciascun residuo. Per calcolare i punteggi che voglio Ho due classi multiprocessing.Process separate, una classe di logScore () che calcola l'odds ratio di registro (con math.exp ()); Ho parallelizzare questo; ed in coda i punteggi calcolati per l'ultimo processo, sumScore () che ha appena riassume questi punteggi (con math.fsum), restituendo la somma e tutte le posizioni punteggi specifici indietro al processo genitore come un dizionario. cioè Queue.put ([sum, {residuo posizione: punteggio specifica posizione, ...}]) Trovo che questo sia estremamente confuso per ottenere la mia testa intorno (di troppi coda!), Quindi spero che i lettori stanno riuscendo a seguire ... Dopo che tutti i calcoli di cui sopra sono fatto, ho poi dare la possibilità di salvare i punteggi cumulativi come tabulazioni uscita delimitata. Questo è dove ora (da ieri sera) a volte si rompe, come mi assicuro che stampa un punteggio per ogni posizione in cui ci dovrebbe essere un punteggio. Penso che a causa di latenza (tempi di computer essendo out-of-sync), a volte ciò che viene inserito nella coda prima di logScore non raggiunge sumScore prima. Affinché sumScore sa quando restituire il conteggio e ricominciare da capo, ho messo 'endSEQ' in coda per l'ultimo processo logScore che ha eseguito un calcolo. Ho pensato che allora dovrebbe raggiungere sumScore scorso troppo, ma non è sempre il caso; solo a volte lo fa rompere. Così ora non ottengo una situazione di stallo più, ma invece un KeyError durante la stampa o il salvataggio dei risultati. Credo che la ragione per ottenere a volte KeyError è perché creo una coda per ogni processo logScore, ma che invece si dovrebbero utilizzare la stessa coda. Ora, dove ho qualcosa di simile: -
class logScore( multiprocessing.Process ):
def __init__( self, inQ, outQ ):
self.inQ = inQ
...
def scoreSequence( processes, HMMPSM, sequenceInPipe ):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put( residue_score )
## End of sequence
processes[process_index].inQ.put( 'endSEQ' )
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
mentre dovrei creare una sola coda per condividere tra tutte le istanze logScore. cioè.
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
Soluzione
Questo non è proprio come il pipelining lavori ... ma per mettere la vostra mente a proprio agio, ecco un estratto dal sottoprocesso documentazione :
stdin, stdout e stderr specificare il standard input programmi eseguiti, standard output e standard error handle di file, rispettivamente. Valido valori sono PIPE, un file esistente descrittore (un numero intero positivo), un oggetto file, e None esistente. TUBO indica che un nuovo tubo per il bambino dovrebbe essere creato. Con Nessuno, no si verificherà il reindirizzamento; il bambino è handle di file verranno ereditate da il genitore.
Le zone più probabili in colpa sarebbe in comunicazione con il processo principale o nella vostra gestione del semaforo. Forse transizioni di stato / sincronizzazione non stanno procedendo come previsto a causa di un bug? Suggerisco di debug con l'aggiunta di dichiarazioni di registrazione / stampa prima e dopo ogni blocco delle chiamate -. Dove si sta comunicando con il processo principale e dove si acquista / rilasciare il semaforo di restringere dove le cose hanno sbagliato andata ??p>
Anche io sono curioso -? È il semaforo assolutamente necessario
Altri suggerimenti
Volevo anche parallelizzare compiti semplici e per questo ho creato un piccolo script python. Si può dare un'occhiata a: http://bioinf.comav.upv.es/psubprocess/index.html
È un po 'più generale di quello che si vuole, ma per compiti semplici è abbastanza facile da usare. Potrebbe essere almeno di qualche insparation a voi.
Jose Blanca
Potrebbe essere una situazione di stallo nel sottoprocesso, hai provato ad utilizzare il metodo piuttosto che aspettare comunicare? http://docs.python.org/library/subprocess.html