python multiprocessing chacune avec sous-processus (Kubuntu, Mac)
-
30-09-2019 - |
Question
J'ai créé un script par défaut crée un processus de multitraitement; il fonctionne très bien. Lors du démarrage de plusieurs processus, il commence à se bloquer, et pas toujours au même endroit. Le programme est d'environ 700 lignes de code, donc je vais essayer de résumer ce qui se passe. Je veux tirer le meilleur parti de mes multi-cœurs, par paralléliser la plus lente tâche, qui aligne des séquences d'ADN. Pour que j'utilise le module de sous-processus pour appeler un programme de ligne de commande: « hmmsearch », que je peux alimenter en séquences par / dev / stdin, puis-je lire les séquences alignées à travers / dev / sortie standard. Je suppose que le blocage se produit à cause de ces multiples instances de sous-lecture / écriture de stdout / stdin, et je ne sais vraiment pas la meilleure façon d'aller à ce sujet ... Je regardais dans os.fdopen (...) et os.tmpfile (), pour créer ou temporaires handles de fichiers tuyaux où je peux débusquer les données à travers. Cependant, je ne l'ai jamais utilisé soit avant et je ne peux pas imaginer comment faire avec le module de sous-processus. Idéalement, je voudrais à contourner en utilisant le tout disque dur, parce que les tuyaux sont beaucoup mieux avec à haut débit de traitement de données! Toute aide avec ce serait super merveilleux !!
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
Après avoir passé un certain temps de débogage, j'ai trouvé un problème qui a toujours été là et il est pas tout à fait encore résolu, mais ont fixé un autre inefficacités dans le processus (de mise au point). Il y a deux fonctions d'alimentation initiales, cette classe align_seq et un analyseur de fichier parseHMM () qui charge une matrice de notation spécifique de position (PSM) dans un dictionnaire. Le principal processus parent compare alors l'alignement de la PSM, en utilisant un dictionnaire (de dictionnaires) en tant que pointeur sur le score correspondant à chaque résidu. Pour calculer les scores que je veux, j'ai deux classes distinctes, multiprocessing.Process une classe logScore () qui calcule le rapport de cotes de log (avec Math.exp ()); Je paralléliser celui-ci; et Files d'attente les scores calculés au dernier processus, sumScore () qui résume simplement ces scores (avec math.fsum), retournant la somme et les scores spécifiques remettre en place au processus parent comme un dictionnaire. c'est à dire. Queue.put ([somme, {position reste: la position score spécifique, ...}]) Je trouve cette confusion exceptionnelle pour obtenir ma tête (trop longue! File d'attente), donc j'espère que les lecteurs parviennent à suivre ... Après tous les calculs ci-dessus sont faits, je puis donner la possibilité d'enregistrer les scores cumulés tab- sortie délimité. C'est là maintenant (depuis la nuit dernière), parfois des pauses, comme je l'assure imprime un score pour chaque position où il devrait y avoir un score. Je pense qu'en raison de la latence (horaires d'ordinateur étant hors synchronisation), parfois ce qui sera mis dans la première file d'attente pour logScore ne parvient pas sumScore d'abord. Pour que sumScore sait quand il faut revenir le décompte et recommencer, je mets « ENDSEQ » dans la file d'attente pour le dernier processus de logScore qui a effectué un calcul. Je pensais que il devrait atteindre sumScore dernier aussi, mais ce n'est pas toujours le cas; seulement parfois briser. Alors maintenant, je ne suis pas une impasse plus, mais plutôt un KeyError lors de l'impression ou l'enregistrement des résultats. Je crois que la raison pour obtenir KeyError est parfois parce que je crée une file d'attente pour chaque processus de logScore, mais au contraire, ils doivent tous utiliser la même file d'attente. Maintenant, où j'ai quelque chose comme: -
class logScore( multiprocessing.Process ):
def __init__( self, inQ, outQ ):
self.inQ = inQ
...
def scoreSequence( processes, HMMPSM, sequenceInPipe ):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put( residue_score )
## End of sequence
processes[process_index].inQ.put( 'endSEQ' )
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
alors que je devrais créer une seule file d'attente à partager entre toutes les instances de logScore. i.e..
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
La solution
Ce n'est pas tout à fait comment fonctionne ... mais pipelining pour mettre votre esprit à la facilité, voici un extrait de la documentation sous-processus:
stdin, stdout et stderr préciser la entrée standard de programmes exécutés, sortie standard et l'erreur standard fichier poignées, respectivement. Valide Les valeurs sont PIPE, un fichier existant Descripteur (un entier positif), un objet fichier existant et Aucun. TUYAU indique qu'un nouveau tuyau pour l'enfant devrait être créé. Avec Aucun, pas redirection aura lieu; l'enfant est descripteurs de fichiers seront héritées de le parent.
Les zones les plus probables à la faute serait dans la communication avec le processus principal ou dans votre gestion du sémaphores. Peut-être que les transitions d'état / synchronisation ne sont pas comme prévu en raison de procéder à un bug? Je suggère le débogage en ajoutant des instructions enregistrement / impression avant et après chaque blocage d'appel -. Où vous communiquez avec le processus principal et où vous acquérez / libérer le sémaphores pour restreindre où les choses ont mal tourné
Je suis aussi curieux - est absolument nécessaire sémaphores
Autres conseils
Je voulais aussi paralléliser les tâches simples et que j'ai créé un petit script python. Vous pouvez jeter un oeil à: http://bioinf.comav.upv.es/psubprocess/index.html
est un peu plus général que ce que vous voulez, mais pour des tâches simples est très facile à utiliser. Il pourrait être au moins de certains InSPAration vous.
Jose Blanca
Il pourrait être une impasse dans le sous-processus, vous avez essayé d'utiliser la méthode plutôt que d'attendre communiquer? http://docs.python.org/library/subprocess.html