Domanda

I'm running Jython 2.5.3 on Ubuntu 12.04 with the OpenJDK 64-bit 1.7.0_55 JVM.

I'm trying to create a simple threaded application to optimize data processing and loading. I have populator threads that read records from a database and mangles them a bit before putting them onto a queue. The queue is read by consumer threads that store the data in a different database. Here is the outline of my code:

import sys
import time
import threading
import Queue

class PopulatorThread(threading.Thread):
    def __init__(self, mod, mods, queue):
        super(PopulatorThread, self).__init__()
        self.mod = mod
        self.mods = mods
        self.queue = queue

    def run(self):
        # Create db connection
        # ...
        try:
            # Select one segment of records using 'id % mods = mod'
            # Process these records & slap them onto the queue
            # ...
        except:
            con.rollback()
            raise
        finally:
            print "Made it to 'finally' in populator %d" % self.mod
            con.close()


class ConsumerThread(threading.Thread):
    def __init__(self, mod, queue):
        super(ConsumerThread, self).__init__()
        self.mod = mod
        self.queue = queue

    def run(self):
        # Create db connection
        # ...
        try:
            while True:
                item = queue.get()
                if not item: break
                # Put records from the queue into
                # a different database
                # ...
                queue.task_done()
        except:
            con.rollback()
            raise
        finally:
            print "Made it to 'finally' in consumer %d" % self.mod
            con.close()


def main(argv):
    tread1Count = 3
    tread2Count = 4
    # This is the notefactsselector data queue
    nfsQueue = Queue.Queue()

    # Start consumer/writer threads
    j = 0
    treads2 = []
    while j < tread2Count:
        treads2.append(ConsumerThread(j, nfsQueue))
        treads2[-1].start()
        j += 1

    # Start reader/populator threads
    i = 0
    treads1 = []
    while i < tread1Count:
        treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
        treads1[-1].start()
        i += 1

    # Wait for reader/populator threads
    print "Waiting to join %d populator threads" % len(treads1)
    i = 0
    for tread in treads1:
        print "Waiting to join a populator thread %d" % i
        tread.join()
        i += 1

    #Add one sentinel value to queue for each write thread
    print "Adding sentinel values to end of queue"
    for tread in treads2:
        nfsQueue.put(None)

    # Wait for consumer/writer threads
    print "Waiting to join consumer/writer threads"
    for tread in treads2:
        print "Waiting on a consumer/writer"
        tread.join()

    # Wait for Queue
    print "Waiting to join queue with %d items" % nfsQueue.qsize()
    nfsQueue.join()
    print "Queue has been joined"


if __name__ == '__main__':
    main(sys.argv)

I have simplified the database implementation somewhat to save space.

  1. When I run the code, the populator and consumer threads seem to reach the end, since I get the "Made it to finally in ..." messages.
  2. I get the "Waiting to join n populator threads" message, and eventually the "Waiting to join a populator thread n" messages.
  3. I get the "Waiting to join consumer/writer threads" message as well as each of the "Waiting on a consumer/writer" messages I expect.
  4. I get the "Waiting to join queue with 0 items" message I expect, but not the "Queue has been joined" message; apparently the program is blocking while waiting for the queue, and it never terminates.

I suspect I have my thread initializations or thread joinings in the wrong order somehow, but I have little experience with concurrent programming, so my intuitions about how to do things aren't well developed. I find plenty of Python/Jython examples of queues populated by while loops and read by threads, but none so far about queues populated by one set of threads and read by a different set.

The populator and consumer threads appear to finish.

The program seems to be blocking finally waiting for the Queue object to terminate.

Thanks to any who have suggestions and lessons for me!

È stato utile?

Soluzione

Are you calling task_done() on each item in the queue when you are done processing it? If you don't tell the queue explicitly that each task is done, it'll never return from join().

PS: You don't see "Waiting to join a populator thread %d" because you forgot the print in front of it :)

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top