Question

J'ai créé un script par défaut crée un processus de multitraitement; il fonctionne très bien. Lors du démarrage de plusieurs processus, il commence à se bloquer, et pas toujours au même endroit. Le programme est d'environ 700 lignes de code, donc je vais essayer de résumer ce qui se passe. Je veux tirer le meilleur parti de mes multi-cœurs, par paralléliser la plus lente tâche, qui aligne des séquences d'ADN. Pour que j'utilise le module de sous-processus pour appeler un programme de ligne de commande: « hmmsearch », que je peux alimenter en séquences par / dev / stdin, puis-je lire les séquences alignées à travers / dev / sortie standard. Je suppose que le blocage se produit à cause de ces multiples instances de sous-lecture / écriture de stdout / stdin, et je ne sais vraiment pas la meilleure façon d'aller à ce sujet ... Je regardais dans os.fdopen (...) et os.tmpfile (), pour créer ou temporaires handles de fichiers tuyaux où je peux débusquer les données à travers. Cependant, je ne l'ai jamais utilisé soit avant et je ne peux pas imaginer comment faire avec le module de sous-processus. Idéalement, je voudrais à contourner en utilisant le tout disque dur, parce que les tuyaux sont beaucoup mieux avec à haut débit de traitement de données! Toute aide avec ce serait super merveilleux !!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

Après avoir passé un certain temps de débogage, j'ai trouvé un problème qui a toujours été là et il est pas tout à fait encore résolu, mais ont fixé un autre inefficacités dans le processus (de mise au point). Il y a deux fonctions d'alimentation initiales, cette classe align_seq et un analyseur de fichier parseHMM () qui charge une matrice de notation spécifique de position (PSM) dans un dictionnaire. Le principal processus parent compare alors l'alignement de la PSM, en utilisant un dictionnaire (de dictionnaires) en tant que pointeur sur le score correspondant à chaque résidu. Pour calculer les scores que je veux, j'ai deux classes distinctes, multiprocessing.Process une classe logScore () qui calcule le rapport de cotes de log (avec Math.exp ()); Je paralléliser celui-ci; et Files d'attente les scores calculés au dernier processus, sumScore () qui résume simplement ces scores (avec math.fsum), retournant la somme et les scores spécifiques remettre en place au processus parent comme un dictionnaire. c'est à dire.     Queue.put ([somme, {position reste: la position score spécifique, ...}]) Je trouve cette confusion exceptionnelle pour obtenir ma tête (trop longue! File d'attente), donc j'espère que les lecteurs parviennent à suivre ... Après tous les calculs ci-dessus sont faits, je puis donner la possibilité d'enregistrer les scores cumulés tab- sortie délimité. C'est là maintenant (depuis la nuit dernière), parfois des pauses, comme je l'assure imprime un score pour chaque position où il devrait y avoir un score. Je pense qu'en raison de la latence (horaires d'ordinateur étant hors synchronisation), parfois ce qui sera mis dans la première file d'attente pour logScore ne parvient pas sumScore d'abord. Pour que sumScore sait quand il faut revenir le décompte et recommencer, je mets « ENDSEQ » dans la file d'attente pour le dernier processus de logScore qui a effectué un calcul. Je pensais que il devrait atteindre sumScore dernier aussi, mais ce n'est pas toujours le cas; seulement parfois briser. Alors maintenant, je ne suis pas une impasse plus, mais plutôt un KeyError lors de l'impression ou l'enregistrement des résultats. Je crois que la raison pour obtenir KeyError est parfois parce que je crée une file d'attente pour chaque processus de logScore, mais au contraire, ils doivent tous utiliser la même file d'attente. Maintenant, où j'ai quelque chose comme: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

alors que je devrais créer une seule file d'attente à partager entre toutes les instances de logScore. i.e..

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
Était-ce utile?

La solution

Ce n'est pas tout à fait comment fonctionne ... mais pipelining pour mettre votre esprit à la facilité, voici un extrait de la documentation sous-processus:

  

stdin, stdout et stderr préciser la   entrée standard de programmes exécutés,   sortie standard et l'erreur standard   fichier poignées, respectivement. Valide   Les valeurs sont PIPE, un fichier existant   Descripteur (un entier positif), un   objet fichier existant et Aucun. TUYAU   indique qu'un nouveau tuyau pour l'enfant   devrait être créé. Avec Aucun, pas   redirection aura lieu; l'enfant est   descripteurs de fichiers seront héritées de   le parent.

Les zones les plus probables à la faute serait dans la communication avec le processus principal ou dans votre gestion du sémaphores. Peut-être que les transitions d'état / synchronisation ne sont pas comme prévu en raison de procéder à un bug? Je suggère le débogage en ajoutant des instructions enregistrement / impression avant et après chaque blocage d'appel -. Où vous communiquez avec le processus principal et où vous acquérez / libérer le sémaphores pour restreindre où les choses ont mal tourné

Je suis aussi curieux - est absolument nécessaire sémaphores

Autres conseils

Je voulais aussi paralléliser les tâches simples et que j'ai créé un petit script python. Vous pouvez jeter un oeil à: http://bioinf.comav.upv.es/psubprocess/index.html

est un peu plus général que ce que vous voulez, mais pour des tâches simples est très facile à utiliser. Il pourrait être au moins de certains InSPAration vous.

Jose Blanca

Il pourrait être une impasse dans le sous-processus, vous avez essayé d'utiliser la méthode plutôt que d'attendre communiquer? http://docs.python.org/library/subprocess.html

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top