سؤال

I have a pretty standard multiprocessing script that processes 2 million records in a database table. Before I even finish putting work in the worker_queue, the memory usage balloons over 12GB and crashes. Is there a better way to design this?

import math
import psycopg2
from psycopg2.extras import DictCursor
from multiprocessing import Process, Manager

from config import DB

connection = psycopg2.connect(DB)
cursor = connection.cursor(cursor_factory=DictCursor)

def worker(worker_queue, progress):
    for row in iter(worker_queue.get, None):
        # Do work
        progress.put(1)


if __name__ == "__main__":
    total, done = 0, 0

    cursor.execute("SELECT * from table")

    manager = Manager()
    worker_queue = manager.Queue()
    progress = manager.Queue()

    for row in cursor:
        worker_queue.put(row)
        total += 1

    workers = [Process(target=worker, args=(worker_queue, progress)) for i in range(50)]

    for each in workers:
        each.start()

    for i in iter(progress.get, None):
        done += 1

        remaining = total - done

        if remaining == 0:
            print 'Done'
        elif ((remaining % (10 ** int(math.log10(remaining)))) == 0):
            print str(remaining) + ' remaining'
هل كانت مفيدة؟

المحلول

Two things worth noting

1) Don't use select *. There are two reasons for that: first, you load more data then you probably need. Second is that you have no control over the order of data (that will be important once we go to point 2))

2) Don't use DictCursor. It turns each row into a dict which eats lots of memory (since you effectively duplicate column names in each dict). Use default cursor_factory instead. Now in order to know the order of fields returned in tuples you have to specify that order in your select query.

That should take care of your problem. If it does not, then you have to do the job over smaller set of data.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top