سؤال

هل هناك فئة تجمع للعامل الخيوط, ، على غرار وحدة المعالجة المتعددة فئة البلياردو?

أحب على سبيل المثال الطريقة السهلة لموازنة وظيفة الخريطة

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

ومع ذلك ، أود أن أفعل ذلك دون النفقات العامة لإنشاء عمليات جديدة.

أنا أعرف عن جيل. ومع ذلك ، في usecase الخاص بي ، ستكون الوظيفة هي وظيفة C المرتبطة بـ IO التي سيصدر غلاف Python GIL قبل استدعاء الوظيفة الفعلية.

هل يجب أن أكتب تجمع الخيوط الخاص بي؟

هل كانت مفيدة؟

المحلول

لقد اكتشفت للتو أن هناك بالفعل هو واجهة تجمع قائمة على الخيوط في multiprocessing الوحدة ، ومع ذلك فهي مخفية إلى حد ما وليس موثقة بشكل صحيح.

يمكن استيراده عبر

from multiprocessing.pool import ThreadPool

يتم تنفيذه باستخدام فئة عملية وهمية تغليف خيط Python. يمكن العثور على فئة العملية القائمة على الخيوط هذه في multiprocessing.dummy الذي تم ذكره لفترة وجيزة في مستندات. من المفترض أن توفر هذه الوحدة الوهمية واجهة المعالجة المتعددة الكاملة على أساس مؤشرات الترابط.

نصائح أخرى

في Python 3 يمكنك استخدامه concurrent.futures.ThreadPoolExecutor, ، بمعنى آخر:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

انظر مستندات لمزيد من المعلومات والأمثلة.

نعم ، ويبدو أن لديها (أكثر أو أقل) نفس واجهة برمجة التطبيقات.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....

لشيء بسيط للغاية وخفيف الوزن (تم تعديله قليلاً من هنا):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

لدعم عمليات الاسترجاعات عند الانتهاء من المهمة ، يمكنك فقط إضافة رد الاتصال إلى Tuck Tuple.

مرحبًا لاستخدام تجمع الخيوط في Python ، يمكنك استخدام هذه المكتبة:

from multiprocessing.dummy import Pool as ThreadPool

ثم للاستخدام ، هذه المكتبة مثل ذلك:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

المواضيع هي عدد المواضيع التي تريدها والمهام هي قائمة بالمهمة التي تخطط لمعظم الخريطة للخدمة.

ها هي النتيجة التي انتهى بها الأمر أخيرًا. إنها نسخة معدلة من الفئات بواسطة Dgorissen أعلاه.

ملف: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

لاستخدام المسبح

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()

إن النفقات العامة لإنشاء العمليات الجديدة ضئيلة ، خاصة عندما يكون 4 منها فقط. أشك في أن هذه نقطة ساخنة للأداء. اجعل الأمر بسيطًا ، قم بتحسين المكان الذي تضطر إليه وأين تشير نتائج التنميط إلى.

لا يوجد بركة مبنية على ترابط. ومع ذلك ، يمكن أن يكون سريعًا للغاية في تنفيذ قائمة انتظار منتج/مستهلك مع Queue صف دراسي.

من:https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top