многопроцессорность Python, каждый со своим подпроцессом (Kubuntu, Mac)
-
30-09-2019 - |
Вопрос
Я создал сценарий, который по умолчанию создает один многопроцессорный процесс;тогда все работает нормально.При запуске нескольких процессов начинает подвисать, причем не всегда в одном и том же месте.В программе около 700 строк кода, поэтому я попытаюсь обобщить происходящее.Я хочу максимально эффективно использовать свои многоядерные процессоры, распараллеливая самую медленную задачу — выравнивание последовательностей ДНК.Для этого я использую модуль подпроцесса для вызова программы командной строки:«hmmsearch», который я могу передавать последовательно через /dev/stdin, а затем считывать выровненные последовательности через /dev/stdout.Я предполагаю, что зависание происходит из-за чтения/записи нескольких экземпляров подпроцесса из стандартного вывода/стандартного ввода, и я действительно не знаю, как лучше всего это сделать...Я изучал os.fdopen(...) и os.tmpfile(), чтобы создать временные дескрипторы файлов или каналы, через которые я могу очистить данные.Однако я никогда раньше не использовал ни того, ни другого и не могу представить, как это сделать с модулем подпроцесса.В идеале я бы хотел полностью отказаться от использования жесткого диска, потому что каналы намного лучше справляются с высокопроизводительной обработкой данных!Любая помощь в этом была бы супер замечательной!!
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
Потратив некоторое время на отладку, я обнаружил проблему, которая всегда существовала и еще не решена полностью, но исправил некоторые другие. неэффективность в процессе (отладки).Есть две начальные функции подачи: класс align_seq и анализатор файлов. синтаксический анализХММ() который загружает матрицу оценок для конкретной позиции (PSM) в словарь.Затем основной родительский процесс сравнивает выравнивание с PSM, используя словарь (словарей) в качестве указателя на соответствующий балл для каждого остатка.Чтобы подсчитать баллы, которые мне нужны, у меня есть два отдельных класса multiprocessing.Process, один класс logScore() который вычисляет отношение шансов журнала (с помощью math.exp() );Я провожу параллель с этим;и он помещает вычисленные баллы в очередь в последний процесс, суммаScore() который просто суммирует эти оценки (с помощью math.fsum), возвращая сумму и все оценки, специфичные для позиции, обратно родительскому процессу в виде словаря.то естьQueue.put( [сумма, { позиция остатка:оценка конкретной позиции, ...}]) Я нахожу это исключительно запутанным, чтобы разобраться с головой (слишком много очередей!), Поэтому я надеюсь, что читатели смогут следовать ...После того, как все приведенные выше расчеты выполнены, я предоставляю возможность сохранить совокупные оценки в виде вывода, разделенного табуляцией.Именно здесь сейчас (со вчерашнего вечера) он иногда ломается, так как я гарантирую, что он распечатает счет для каждой позиции, где должен быть счет.Я думаю, что из-за задержки (рассинхронизация времени компьютера) иногда то, что сначала помещается в очередь, logScore не достигает суммаScore первый.Чтобы sumScore знал, когда следует вернуть подсчет и начать заново, я помещаю «endSEQ» в очередь для последнего процесса logScore, который выполнил расчет.Я думал, что тогда он тоже должен достичь sumScore последним, но это не всегда так;только иногда ломается.Итак, теперь я больше не получаю тупиковой ситуации, а вместо этого получаю ошибку KeyError при печати или сохранении результатов.Я считаю, что причина иногда получать KeyError заключается в том, что я создаю очередь для каждого процесса logScore, но вместо этого все они должны использовать одну и ту же очередь.Теперь, где у меня есть что-то вроде: -
class logScore( multiprocessing.Process ):
def __init__( self, inQ, outQ ):
self.inQ = inQ
...
def scoreSequence( processes, HMMPSM, sequenceInPipe ):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put( residue_score )
## End of sequence
processes[process_index].inQ.put( 'endSEQ' )
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
тогда как мне следует создать только одну очередь, которая будет использоваться всеми экземплярами logScore.то есть
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
Решение
Это не совсем так, как работает конвейер...но чтобы успокоить вас, вот отрывок из документация подпроцесса:
STDIN, STDOUT и STDERR указывают стандартный ввод выполненных программ, стандартный выходной сигнал и стандартные ручки файла ошибок, соответственно.Допустимые значения - это труба, существующий дескриптор файла (положительное целое число), существующий объект файла и нет.Труба указывает, что новая труба для ребенка должен быть создан.Без никого нет перенаправления;Обработки детства будут унаследованы от родителя.
Наиболее вероятными областями неисправности могут быть связь с основным процессом или управление семафором.Может быть, переходы между состояниями/синхронизация происходят не так, как ожидалось, из-за ошибки?Я предлагаю отладку, добавляя операторы регистрации/печати до и после каждого блокирующего вызова - где вы общаетесь с основным процессом и где вы получаете/освобождаете семафор, чтобы сузить круг вопросов, где что-то пошло не так.
Еще мне интересно - а семафор абсолютно необходим?
Другие советы
Я также хотел распараллелить простые задачи и для этого создал небольшой скрипт на Python.Вы можете посмотреть:http://bioinf.comav.upv.es/psubprocess/index.html
Это немного более общий вариант, чем вы хотите, но для простых задач его довольно легко использовать.Возможно, это будет для вас хоть каким-то вдохновением.
Хосе Бланка
Это может быть тупик в подпроцессе. Пробовали ли вы использовать метод общения вместо ожидания?http://docs.python.org/library/subprocess.html