Pregunta

He creado un script que por defecto crea un proceso multiproceso; entonces funciona bien. Al iniciar varios procesos, que empieza a colgar, y no siempre en el mismo lugar. El programa trata de 700 líneas de código, así que voy a tratar de resumir lo que está pasando. Quiero hacer la mayor parte de mis múltiples núcleos, por parallelising la tarea más lenta, lo que se está alineando las secuencias de ADN. Para que utilizo el módulo subproceso para llamar a un programa de línea de comandos: 'hmmsearch', que puedo alimentar en secuencias a través de / dev / stdin, y luego leer las secuencias alineadas a través de / dev / stdout. Me imagino que la caída se produce a causa de estas múltiples instancias de subproceso de lectura / escritura de la salida estándar / entrada estándar, y la verdad es que no sé la mejor manera de ir sobre esto ... Yo estaba buscando en os.fdopen (...) y os.tmpfile (), para crear filehandles temporales o tuberías donde puedo vaciar los datos a través. Sin embargo, nunca he usado antes y no puedo imaginar cómo hacer eso con el módulo de subproceso. Idealmente me gustaría bypass mediante el disco duro por completo, ya que los tubos son mucho mejor con un alto rendimiento de procesamiento de datos! Cualquier ayuda con esto sería muy maravillosa !!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

Después de haber pasado un tiempo de depuración esto, he encontrado un problema que siempre ha estado ahí y no está del todo resuelto todavía, pero han fijado alguna otra ineficiencias en el proceso (de depuración). Hay dos funciones de alimentación inicial, esta clase align_seq y un archivo del analizador parseHMM () que las cargas de una matriz de puntuación posición específica (PSM) en un diccionario. El proceso principal de los padres a continuación, compara la alineación a la PSM, usando un diccionario (de diccionarios) como un puntero a la puntuación correspondiente para cada residuo. Con el fin de calcular las puntuaciones que quiero lo tengo dos clases multiprocessing.Process separadas, una clase logScore () que calcula el log odds ratio (con math.exp ()); Me paralelizar éste; y pone en cola las puntuaciones calculadas para el último proceso, sumScore () que acaba resume estas puntuaciones (con math.fsum), devolviendo la suma y todas las posiciones puntuaciones específicas de nuevo al proceso padre como un diccionario. es decir     Queue.put ([suma, {residuo de la posición: Puntuación posición específica, ...}]) Me parece excepcionalmente confuso para sacarlo de mi cabeza (de demasiados cola!), Así que espero que los lectores están administrando a seguir ... Después se llevan a cabo todos los cálculos anteriores, que le dará la opción de guardar las puntuaciones acumuladas como tabuladores salida delimitado. Aquí es donde ahora (desde la noche anterior) a veces se rompe, como me aseguro de que imprime una puntuación para cada posición donde debería haber una puntuación. Creo que debido a la latencia (tiempos de computación estar fuera de sincronización), a veces lo que se coloca en la cola para la primera logScore no llega a sumScore en primer lugar. Con el fin de que sumScore sabe cuándo debe regresar la cuenta y empezar de nuevo, puse 'endSEQ' en la cola para el último proceso logScore que realiza un cálculo. Pensé que entonces debería llegar a sumScore pasado también, pero eso no es siempre el caso; sólo a veces hace que se rompa. Así que ahora no consigo un punto muerto ya, pero en su lugar un KeyError al imprimir o guardar los resultados. Creo que la razón para conseguir KeyError veces se debe a que cree una cola para cada proceso logScore, pero que en lugar de que todos deben usar la misma cola. Ahora, en el que tengo algo así como: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

mientras que yo debería crear una sola cola para compartir entre todas las instancias logScore. es decir.

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
¿Fue útil?

Solución

Eso no es exactamente la forma en la canalización obras ... pero para poner su mente para aliviar, aquí está un extracto de la subproceso documentación :

  

stdin, stdout y stderr especificar el   entrada estándar programas ejecutados,   salida estándar y el error estándar   identificadores de archivo, respectivamente. Válido   Los valores son PIPE, un archivo existente   descriptor (un entero positivo), una   objeto de archivo y Ninguno existente. TUBO   indica que un nueva tubería para el niño   debe ser creado. Con Ninguno, sin   ocurrirá la redirección; los niños   identificadores de archivo se pueden heredar de   el padre.

Las áreas más probables culpables estaría en la comunicación con el proceso principal o en su gestión del semáforo. Tal vez las transiciones de estado / sincronización no sitúan dentro de lo esperado debido a un error? Sugiero depuración mediante la adición de declaraciones de registro / impresión antes y después de cada bloqueo de llamadas -. En el que se está comunicando con el proceso principal y donde se adquiere / liberar el semáforo de restringir donde las cosas tienen mal ha ido

También tengo curiosidad - es el semáforo absolutamente necesario

Otros consejos

También quería paralelizar tareas simples y por eso he creado un pequeño script en Python. Puede echar un vistazo a: http://bioinf.comav.upv.es/psubprocess/index.html

Es un poco más general que lo que quiere, pero para tareas simples es bastante fácil de usar. Puede ser que sea por lo menos de alguna InSPAration a usted.

José Blanca

Podría ser un punto muerto en el subproceso, han intentado utilizar el método en lugar de esperar se comunican? http://docs.python.org/library/subprocess.html

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top