Dynamic processes in Python
-
09-09-2019 - |
Question
I have a question concerning Python multiprocessing. I am trying to take a dataset, break into chunks, and pass those chunks to concurrently running processes. I need to transform large tables of data using simple calculations (eg. electrical resistance -> temperature for a thermistor).
The code listed below almost works as desired, but it doesn't seem to be spawning any new processes (or if so only one at a time). I am new to Python, so there is probably quite a simple solution to this problem.
Thanks in advance!
from multiprocessing import Process class Worker(Process): # example data transform def process(self, x): return (x * 2) / 3 def __init__(self, list): self.data = list self.result = map(self.process, self.data) super(Worker, self).__init__() if __name__ == '__main__': start = datetime.datetime.now() dataset = range(10000) # null dataset processes = 3 for i in range(processes): chunk = int(math.floor(len(dataset) / float(processes))) if i + 1 == processes: remainder = len(dataset) % processes else: remainder = 0 tmp = dataset[i * chunk : (i + 1) * chunk + remainder] exec('worker'+str(i)+' = Worker(tmp)') exec('worker'+str(i)+'.start()') for i in range(processes): exec('worker'+str(i)+'.join()') # just a placeholder to make sure the initial values of the set are as expected exec('print worker'+str(i)+'.result[0]')
Solution
No need to send the number of chunks to each process, just use get_nowait() and handle the eventual Queue.Empty exception. Every process will get different amounts of CPU time and this should keep them all busy.
import multiprocessing, Queue
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output):
self.input = input
self.output = output
super(Worker, self).__init__()
def run(self):
try:
while True:
self.output.put(self.process(self.input.get_nowait()))
except Queue.Empty:
pass
if name == 'main':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
Worker(input, output).start()
for i in range(len(dataset)):
print output.get()
OTHER TIPS
You haven't overridden the run
method. There are two ways with processes (or threads) to have it execute code:
- Create a process specifying target
- Subclass the process, overriding the
run
method.
Overriding __init__
just means your process is all dressed up with nowhere to go. It should be used to give it attributes that it needs to perform what it needs to perform, but it shouldn't specify the task to be performed.
In your code, all the heavy lifting is done in this line:
exec('worker'+str(i)+' = Worker(tmp)')
and nothing is done here:
exec('worker'+str(i)+'.start()')
So checking the results with exec('print worker'+str(i)+'.result[0]')
should give you something meaningful, but only because the code you want to be executed has been executed, but on process construction, not on process start.
Try this:
class Worker(Process):
# example data transform
def process(self, x): return (x * 2) / 3
def __init__(self, list):
self.data = list
self.result = []
super(Worker, self).__init__()
def run(self):
self.result = map(self.process, self.data)
EDIT:
Okay... so I was just flying based on my threading instincts here, and they were all wrong. What we both didn't understand about processes is that you can't directly share variables. Whatever you pass to a new process to start is read, copied, and gone forever. Unless you use one of the two standard ways to share data: queues and pipes. I've played around a little bit trying to get your code to work, but so far no luck. I think that will put you on the right track.
Ok, so it looks like the list was not thread safe, and I have moved to using a Queue (although it appears to be much slower). This code essentially accomplishes what I was trying to do:
import math, multiprocessing
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output, chunksize):
self.input = input
self.output = output
self.chunksize = chunksize
super(Worker, self).__init__()
def run(self):
for x in range(self.chunksize):
self.output.put(self.process(self.input.get()))
if __name__ == '__main__':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
chunk = int(math.floor(len(dataset) / float(processes)))
if i + 1 == processes:
remainder = len(dataset) % processes
else: remainder = 0
Worker(input, output, chunk + remainder).start()
for i in range(len(dataset)):
print output.get()