многопроцессорность Python, каждый со своим подпроцессом (Kubuntu, Mac)

StackOverflow https://stackoverflow.com/questions/4620041

Вопрос

Я создал сценарий, который по умолчанию создает один многопроцессорный процесс;тогда все работает нормально.При запуске нескольких процессов начинает подвисать, причем не всегда в одном и том же месте.В программе около 700 строк кода, поэтому я попытаюсь обобщить происходящее.Я хочу максимально эффективно использовать свои многоядерные процессоры, распараллеливая самую медленную задачу — выравнивание последовательностей ДНК.Для этого я использую модуль подпроцесса для вызова программы командной строки:«hmmsearch», который я могу передавать последовательно через /dev/stdin, а затем считывать выровненные последовательности через /dev/stdout.Я предполагаю, что зависание происходит из-за чтения/записи нескольких экземпляров подпроцесса из стандартного вывода/стандартного ввода, и я действительно не знаю, как лучше всего это сделать...Я изучал os.fdopen(...) и os.tmpfile(), чтобы создать временные дескрипторы файлов или каналы, через которые я могу очистить данные.Однако я никогда раньше не использовал ни того, ни другого и не могу представить, как это сделать с модулем подпроцесса.В идеале я бы хотел полностью отказаться от использования жесткого диска, потому что каналы намного лучше справляются с высокопроизводительной обработкой данных!Любая помощь в этом была бы супер замечательной!!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

Потратив некоторое время на отладку, я обнаружил проблему, которая всегда существовала и еще не решена полностью, но исправил некоторые другие. неэффективность в процессе (отладки).Есть две начальные функции подачи: класс align_seq и анализатор файлов. синтаксический анализХММ() который загружает матрицу оценок для конкретной позиции (PSM) в словарь.Затем основной родительский процесс сравнивает выравнивание с PSM, используя словарь (словарей) в качестве указателя на соответствующий балл для каждого остатка.Чтобы подсчитать баллы, которые мне нужны, у меня есть два отдельных класса multiprocessing.Process, один класс logScore() который вычисляет отношение шансов журнала (с помощью math.exp() );Я провожу параллель с этим;и он помещает вычисленные баллы в очередь в последний процесс, суммаScore() который просто суммирует эти оценки (с помощью math.fsum), возвращая сумму и все оценки, специфичные для позиции, обратно родительскому процессу в виде словаря.то естьQueue.put( [сумма, { позиция остатка:оценка конкретной позиции, ...}]) Я нахожу это исключительно запутанным, чтобы разобраться с головой (слишком много очередей!), Поэтому я надеюсь, что читатели смогут следовать ...После того, как все приведенные выше расчеты выполнены, я предоставляю возможность сохранить совокупные оценки в виде вывода, разделенного табуляцией.Именно здесь сейчас (со вчерашнего вечера) он иногда ломается, так как я гарантирую, что он распечатает счет для каждой позиции, где должен быть счет.Я думаю, что из-за задержки (рассинхронизация времени компьютера) иногда то, что сначала помещается в очередь, logScore не достигает суммаScore первый.Чтобы sumScore знал, когда следует вернуть подсчет и начать заново, я помещаю «endSEQ» в очередь для последнего процесса logScore, который выполнил расчет.Я думал, что тогда он тоже должен достичь sumScore последним, но это не всегда так;только иногда ломается.Итак, теперь я больше не получаю тупиковой ситуации, а вместо этого получаю ошибку KeyError при печати или сохранении результатов.Я считаю, что причина иногда получать KeyError заключается в том, что я создаю очередь для каждого процесса logScore, но вместо этого все они должны использовать одну и ту же очередь.Теперь, где у меня есть что-то вроде: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

тогда как мне следует создать только одну очередь, которая будет использоваться всеми экземплярами logScore.то есть

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
Это было полезно?

Решение

Это не совсем так, как работает конвейер...но чтобы успокоить вас, вот отрывок из документация подпроцесса:

STDIN, STDOUT и STDERR указывают стандартный ввод выполненных программ, стандартный выходной сигнал и стандартные ручки файла ошибок, соответственно.Допустимые значения - это труба, существующий дескриптор файла (положительное целое число), существующий объект файла и нет.Труба указывает, что новая труба для ребенка должен быть создан.Без никого нет перенаправления;Обработки детства будут унаследованы от родителя.

Наиболее вероятными областями неисправности могут быть связь с основным процессом или управление семафором.Может быть, переходы между состояниями/синхронизация происходят не так, как ожидалось, из-за ошибки?Я предлагаю отладку, добавляя операторы регистрации/печати до и после каждого блокирующего вызова - где вы общаетесь с основным процессом и где вы получаете/освобождаете семафор, чтобы сузить круг вопросов, где что-то пошло не так.

Еще мне интересно - а семафор абсолютно необходим?

Другие советы

Я также хотел распараллелить простые задачи и для этого создал небольшой скрипт на Python.Вы можете посмотреть:http://bioinf.comav.upv.es/psubprocess/index.html

Это немного более общий вариант, чем вы хотите, но для простых задач его довольно легко использовать.Возможно, это будет для вас хоть каким-то вдохновением.

Хосе Бланка

Это может быть тупик в подпроцессе. Пробовали ли вы использовать метод общения вместо ожидания?http://docs.python.org/library/subprocess.html

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top