Вопрос

У меня есть вопрос, касающийся многопроцессорности Python.Я пытаюсь взять набор данных, разбить на фрагменты и передать эти фрагменты одновременно запущенным процессам.Мне нужно преобразовать большие таблицы данных с помощью простых вычислений (например.электрическое сопротивление -> температура для термистора).

Приведенный ниже код почти работает так, как хотелось бы, но, похоже, он не запускает никаких новых процессов (или, если это так, только по одному за раз).Я новичок в Python, так что, вероятно, есть довольно простое решение этой проблемы.

Заранее спасибо!

from multiprocessing import Process

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = map(self.process, self.data)
        super(Worker, self).__init__()

if __name__ == '__main__':
    start = datetime.datetime.now()
    dataset = range(10000) # null dataset
    processes = 3

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))

        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        tmp = dataset[i * chunk : (i + 1) * chunk + remainder]
        exec('worker'+str(i)+' = Worker(tmp)')
        exec('worker'+str(i)+'.start()')

    for i in range(processes):
        exec('worker'+str(i)+'.join()')
        # just a placeholder to make sure the initial values of the set are as expected
        exec('print worker'+str(i)+'.result[0]')
Это было полезно?

Решение

Нет необходимости отправлять количество блоков каждому процессу, просто используйте get_nowait() и обработайте конечную очередь.Пустое исключение.Каждый процесс будет получать разное количество процессорного времени, и это должно держать их всех занятыми.

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()

Другие советы

Вы не переопределили run способ.С процессами (или потоками) есть два способа заставить его выполнять код:

  1. Создайте процесс, указывающий цель
  2. Подкласс процесса, переопределяющий run способ.

Переопределяющий __init__ это просто означает, что ваш процесс полностью разукрашен, и идти вам некуда.Его следует использовать, чтобы присвоить ему атрибуты, необходимые для выполнения того, что ему необходимо выполнить, но он не должен указывать задачу, которая должна быть выполнена.

В вашем коде вся тяжелая работа выполняется в этой строке:

exec('worker'+str(i)+' = Worker(tmp)')

и здесь ничего не делается:

exec('worker'+str(i)+'.start()')

Итак, проверяя результаты с помощью exec('print worker'+str(i)+'.result[0]') должен дать вам что-то значимое, но только потому, что код, который вы хотите выполнить имеет было выполнено, но при построении процесса, а не при запуске процесса.

Попробуй это:

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = []
        super(Worker, self).__init__()

    def run(self):
        self.result = map(self.process, self.data)

Редактировать:

Ладно...так что я просто летел, основываясь на своих здешних инстинктах ориентирования, и все они были ошибочными.Чего мы оба не понимали в процессах, так это того, что вы не можете напрямую обмениваться переменными.Все, что вы передаете новому процессу для запуска, считывается, копируется и исчезает навсегда.Если только вы не используете один из двух стандартных способов обмена данными: очереди и трубы.Я немного поиграл, пытаясь заставить ваш код работать, но пока безуспешно.Я думаю, это направит вас на правильный путь.

Итак, похоже, что список не был потокобезопасным, и я перешел к использованию очереди (хотя это выглядит намного медленнее).Этот код, по сути, выполняет то, что я пытался сделать:

import math, multiprocessing

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output, chunksize):
        self.input = input
        self.output = output
        self.chunksize = chunksize
        super(Worker, self).__init__()

    def run(self):
        for x in range(self.chunksize):
            self.output.put(self.process(self.input.get()))


if __name__ == '__main__':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))
        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        Worker(input, output, chunk + remainder).start()

    for i in range(len(dataset)):
        print output.get()
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top