Динамические процессы в Python
-
09-09-2019 - |
Вопрос
У меня есть вопрос, касающийся многопроцессорности Python.Я пытаюсь взять набор данных, разбить на фрагменты и передать эти фрагменты одновременно запущенным процессам.Мне нужно преобразовать большие таблицы данных с помощью простых вычислений (например.электрическое сопротивление -> температура для термистора).
Приведенный ниже код почти работает так, как хотелось бы, но, похоже, он не запускает никаких новых процессов (или, если это так, только по одному за раз).Я новичок в Python, так что, вероятно, есть довольно простое решение этой проблемы.
Заранее спасибо!
from multiprocessing import Process class Worker(Process): # example data transform def process(self, x): return (x * 2) / 3 def __init__(self, list): self.data = list self.result = map(self.process, self.data) super(Worker, self).__init__() if __name__ == '__main__': start = datetime.datetime.now() dataset = range(10000) # null dataset processes = 3 for i in range(processes): chunk = int(math.floor(len(dataset) / float(processes))) if i + 1 == processes: remainder = len(dataset) % processes else: remainder = 0 tmp = dataset[i * chunk : (i + 1) * chunk + remainder] exec('worker'+str(i)+' = Worker(tmp)') exec('worker'+str(i)+'.start()') for i in range(processes): exec('worker'+str(i)+'.join()') # just a placeholder to make sure the initial values of the set are as expected exec('print worker'+str(i)+'.result[0]')
Решение
Нет необходимости отправлять количество блоков каждому процессу, просто используйте get_nowait() и обработайте конечную очередь.Пустое исключение.Каждый процесс будет получать разное количество процессорного времени, и это должно держать их всех занятыми.
import multiprocessing, Queue
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output):
self.input = input
self.output = output
super(Worker, self).__init__()
def run(self):
try:
while True:
self.output.put(self.process(self.input.get_nowait()))
except Queue.Empty:
pass
if name == 'main':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
Worker(input, output).start()
for i in range(len(dataset)):
print output.get()
Другие советы
Вы не переопределили run
способ.С процессами (или потоками) есть два способа заставить его выполнять код:
- Создайте процесс, указывающий цель
- Подкласс процесса, переопределяющий
run
способ.
Переопределяющий __init__
это просто означает, что ваш процесс полностью разукрашен, и идти вам некуда.Его следует использовать, чтобы присвоить ему атрибуты, необходимые для выполнения того, что ему необходимо выполнить, но он не должен указывать задачу, которая должна быть выполнена.
В вашем коде вся тяжелая работа выполняется в этой строке:
exec('worker'+str(i)+' = Worker(tmp)')
и здесь ничего не делается:
exec('worker'+str(i)+'.start()')
Итак, проверяя результаты с помощью exec('print worker'+str(i)+'.result[0]')
должен дать вам что-то значимое, но только потому, что код, который вы хотите выполнить имеет было выполнено, но при построении процесса, а не при запуске процесса.
Попробуй это:
class Worker(Process):
# example data transform
def process(self, x): return (x * 2) / 3
def __init__(self, list):
self.data = list
self.result = []
super(Worker, self).__init__()
def run(self):
self.result = map(self.process, self.data)
Редактировать:
Ладно...так что я просто летел, основываясь на своих здешних инстинктах ориентирования, и все они были ошибочными.Чего мы оба не понимали в процессах, так это того, что вы не можете напрямую обмениваться переменными.Все, что вы передаете новому процессу для запуска, считывается, копируется и исчезает навсегда.Если только вы не используете один из двух стандартных способов обмена данными: очереди и трубы.Я немного поиграл, пытаясь заставить ваш код работать, но пока безуспешно.Я думаю, это направит вас на правильный путь.
Итак, похоже, что список не был потокобезопасным, и я перешел к использованию очереди (хотя это выглядит намного медленнее).Этот код, по сути, выполняет то, что я пытался сделать:
import math, multiprocessing
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output, chunksize):
self.input = input
self.output = output
self.chunksize = chunksize
super(Worker, self).__init__()
def run(self):
for x in range(self.chunksize):
self.output.put(self.process(self.input.get()))
if __name__ == '__main__':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
chunk = int(math.floor(len(dataset) / float(processes)))
if i + 1 == processes:
remainder = len(dataset) % processes
else: remainder = 0
Worker(input, output, chunk + remainder).start()
for i in range(len(dataset)):
print output.get()