Python متعددة المعالجة مع كل معالجة فرعية خاصة (Kubuntu ، Mac)

StackOverflow https://stackoverflow.com/questions/4620041

سؤال

لقد قمت بإنشاء برنامج نصي بشكل افتراضي ينشئ عملية معالجة متعددة ؛ ثم يعمل بشكل جيد. عند بدء عمليات متعددة ، يبدأ في الشنق ، وليس دائمًا في نفس المكان. البرنامج حوالي 700 سطر من التعليمات البرمجية ، لذلك سأحاول تلخيص ما يحدث. أريد الاستفادة القصوى من بلدي المتعددة ، من خلال موازاة الأبطأ المهمة ، والتي هي محاذاة تسلسل الحمض النووي. لذلك ، أستخدم وحدة المعالجة الفرعية لاستدعاء برنامج سطر الأوامر: "HMMSEARCH" ، والذي يمكنني إطعامه في التسلسلات من خلال /dev /stdin ، ثم قرأت التسلسلات المحاذاة من خلال /dev /stdout. أتصور أن التعليق يحدث بسبب هذه الحالات المتعددة للعملية الفرعية التي تقرأ / الكتابة من stdout / stdin ، وأنا حقًا لا أعرف أفضل طريقة للقيام بذلك ... كنت أبحث في OS.FDOpen (...) ونظام التشغيل .tmpfile () ، لإنشاء FileHandles المؤقتة أو الأنابيب حيث يمكنني مسح البيانات من خلال. ومع ذلك ، لم أستخدم أبدًا من قبل ولا يمكنني تصوير كيفية القيام بذلك باستخدام وحدة المعالجة الفرعية. من الناحية المثالية ، أود الالتفاف باستخدام القيادة الصلبة تمامًا ، لأن الأنابيب أفضل بكثير مع معالجة البيانات عالية الإنتاجية! أي مساعدة في هذا سيكون رائعا للغاية !!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

بعد أن أمضيت بعض الوقت في تصحيح هذا ، لقد وجدت مشكلة كانت موجودة دائمًا ولم يتم حلها تمامًا بعد ، لكن قمت بإصلاح بعضها الآخر عدم الكفاءة في عملية (تصحيح الأخطاء). هناك وظيفتان للتغذية الأولية ، فئة ALIGN_SEQ هذه وحجم ملف parsehmm () الذي يقوم بتحميل المصفوفة المحددة للتسجيل (PSM) في القاموس. ثم تقارن عملية الوالدين الرئيسية المحاذاة مع PSM ، باستخدام القاموس (القواميس) كمؤشر إلى النتيجة ذات الصلة لكل بقايا. من أجل حساب الدرجات التي أريدها لدي فئتين منفصلتين متعدد المعالجة. LogScore () التي تحسب نسبة الأرجحية السجل (مع Math.exp ()) ؛ أنا متوارا هذا واحد. ويتصطف على الدرجات المحسوبة في العملية الأخيرة ، SUMSCORE () الذي يلخص هذه الدرجات فقط (مع Math.fsum) ، وإعادة المبلغ وجميع الدرجات المحددة للموضع إلى عملية الوالدين كقاموس. ie queue.put ([sum ، {kingue position: position position procors ، ...}]) أجد هذا مربكًا بشكل استثنائي للحصول على رأسي (عدد كبير جدًا من قائمة الانتظار!) ، لذلك آمل أن يتمكن القراء من متابعة .. بعد كل الحسابات المذكورة أعلاه ، أعطي بعد ذلك خيار حفظ الدرجات التراكمية كإخراج علامة تبويب. هذا هو المكان الذي ينهار فيه الآن (منذ الليلة الماضية) في بعض الأحيان ، حيث أتأكد من قيامه بطباعة درجة لكل موقف يجب أن تكون فيه النتيجة. أعتقد أنه بسبب الكمون (توقيت الكمبيوتر يكون خارج المزامنة) ، في بعض الأحيان ما يتم وضعه في قائمة الانتظار أولاً LogScore لا تصل SUMSCORE أول. من أجل أن يعرف Sumscore متى يعيد العدد والبدء من جديد ، وضعت "Endseq" في قائمة الانتظار لعملية تسجيل الدخول الأخيرة التي أجرت حسابًا. اعتقدت أنه يجب أن تصل إلى Sumscore أيضًا ، لكن هذا ليس هو الحال دائمًا ؛ فقط في بعض الأحيان لا ينكسر. حتى الآن لم أعد أحصل على طريق مسدود ، ولكن بدلاً من ذلك ، فإن keyerror عند طباعة النتائج أو حفظها. أعتقد أن السبب في الحصول على Keyerror في بعض الأحيان هو أنني أقوم بإنشاء قائمة انتظار لكل عملية سجل ، ولكن بدلاً من ذلك يجب أن يستخدموا جميع قائمة الانتظار. الآن ، حيث لدي شيء مثل:-

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

بينما يجب أن أقوم بإنشاء قائمة انتظار واحدة فقط للمشاركة بين جميع مثيلات LogScore. بمعنى آخر

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
هل كانت مفيدة؟

المحلول

هذا ليس كيف تعمل خطوط الأنابيب ... ولكن لتخفيف عقلك ، إليك مقتطف من وثائق العمليات الفرعية:

حدد stdin و stdout و stderr الإدخال القياسي للبرامج المنفذة والإخراج القياسي ومقابض ملف الخطأ القياسية ، على التوالي. القيم الصالحة هي الأنابيب ، واصف ملف موجود (عدد صحيح موجب) ، وكائن ملف موجود ، ولا شيء. يشير الأنابيب إلى أن أ أنبوب جديد يجب إنشاء الطفل. مع لا شيء ، لن يحدث أي إعادة توجيه ؛ سيتم توريث مقابض ملف الطفل من الوالد.

ستكون المناطق الأكثر احتمالا على خطأ في التواصل مع العملية الرئيسية أو في إدارتك لسيارة الإشارة. ربما لا تسير التحولات / التزامن الدولة كما هو متوقع بسبب وجود خطأ؟ أقترح تصحيح الأخطاء من خلال إضافة بيانات التسجيل/الطباعة قبل وبعد كل مكالمة حظر - حيث تتواصل مع العملية الرئيسية وحيث يمكنك الحصول على/إصدار الإشارة لتضييق المكان الذي حدثت فيه الأمور.

أيضا أنا فضولي - هل الإشارة ضرورية للغاية؟

نصائح أخرى

أردت أيضًا موازاة المهام البسيطة ولهذا قمت بإنشاء نص بيثون صغير. يمكنك إلقاء نظرة على:http://bioinf.comav.upv.es/psubprocess/index.html

هو أكثر عمومية قليلاً مما تريد ، ولكن للمهام البسيطة سهل الاستخدام. قد يكون على الأقل بعض التبعية لك.

خوسيه بلانكا

يمكن أن يكون مسدود في العملية الفرعية ، هل حاولت استخدام طريقة التواصل بدلاً من الانتظار؟http://docs.python.org/library/subprocess.html

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top