Question

I am currently programming a python-based datagram-Server using threads and all of that.

I encountered the following problem: I am using multiple Allocation Threads to allocate incoming packages to the different processing threads. Inside the Processing Threads, I am using threading.local() to keep track of thread-local variables.

I am currently testing how my server reacts during high load (2000 Packets in ~2 seconds), and have come upon a strange behaviour of the local()-Object.

It seems like it works just fine for a while, and then, at some point, it throws an exception:

Exception in thread 192.168.1.102: # <-- This is the Processing Thread
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
    print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'

# The following three lines are debug
# This is the Allocation thread that has called the Processing thread
<Thread(192.168.1.102, started 1106023568)> 
# This confirms that the queue it tries to access exists
<Queue.Queue instance at 0x40662b48>
# This is the Processing thread, which has stopped executing on exception
<Thread(192.168.1.102, stopped 1106023568)> 

Exception in thread pyAlloc-0: # <-- This is the Allocation thread
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
    foundThread.addTask(str(data))
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
    print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'

A part of the Processing Thread

# Imports
import threading
import time
import Queue

class Thread(threading.Thread):
    def __init__(self,pThread):
        threading.Thread.__init__(self)
        self.loc = threading.local()
        self.loc.todo = Queue.Queue()
        self.loc.father = pThread
        print "Konstruktor ausgefuehrt"
        print self.loc.todo

    def run(self):
        self.loc.run = True;
        print self.loc.father
        while (self.loc.run):
        try:
            task = self.loc.todo.get(True, 1)
            print "processing..."
            if(task == self):
                self.loc.run=False
            else:
                print task
        except Queue.Empty:
            pass
        self.loc.father.threadTerminated(self)
        print self.name, "terminating..."

    def addTask(self, pTask):
        print self
        print self.loc.todo
        self.loc.todo.put(pTask)

And the allocation thread:

import threading
import pyProcThread # My processing Thread
import Queue
import time
class Thread(threading.Thread):
    # Lock-Objects
    threadListLock = threading.Lock()
    waitListLock = threading.Lock()

    alive = True;

    taskQueue = Queue.Queue()
    # Lists
    # List of all running threads
    threads = []

    def threadExists(self,pIP):
        """Checks if there is already a thread with the given Name"""
        for x in self.threads:
            if x.name == pIP:
                return x
        return None

    def threadTerminated(self,pThread):
        """Called when a Processing Thread terminates"""
        with self.threadListLock:
            self.threads.remove(pThread)
            print "Thread removed"

    def threadRegistered(self,pThread):
        """Registers a new Thread"""
        self.threads.append(pThread)

    def killThread(self):
        self.alive = False

    def run(self):
        while(self.alive):
            # print "Verarbeite Nachricht ", self.Message
            # print "Von ", self.IP
            try:
                data, addtemp = self.taskQueue.get(True, 1)
                addr, _junk = addtemp
                with self.threadListLock:
                    foundThread=self.threadExists(str(addr))
                    # print "Thread " + self.name + " verarbeitet " + data
                    if (foundThread!=None):
                        #print "recycling thread"
                        foundThread.addTask(str(data))
                    else:
                        print "running new Thread"
                        t = pyProcThread.Thread(self)
                        t.name = str(addr)
                        t.addTask(str(data))
                        t.start()
                        self.threadRegistered(t)
                self.taskQueue.task_done()
            except Queue.Empty:
                pass
        print self.name, "terminating..."
        with self.threadListLock:
            for thread in self.threads:
                thread.addTask(thread)

The full output, including all debug print's:

running new Thread
Konstruktor ausgefuehrt
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, initial)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
Exception in thread 192.168.1.102:
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
    print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'

<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, stopped 1106023568)>
Exception in thread pyAlloc-0:
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
    foundThread.addTask(str(data))
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
    print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'

Terminating main
192.168.1.102
DEINEMUDDA  sent to  192.168.1.102 : 8082
pyAlloc-1 terminating...
<Thread(192.168.1.102, stopped 1106023568)>
<Queue.Queue instance at 0x40662b48>

As you can see in the log, everything seems to work fine for a while, although it never reaches the print "processing in the main function of the processing thread.

I have searched the web and StackOverflow for similar problems, but was unable to find any. Thanks in advance for any help, and excuse my coding style, I'm only programming Python for a few days now and still learning the ropes with some of its features.

EDIT: Of course, there is more to this Server than this. There is a main thread that is receiving the packets and sending them to the allocation thread, and a bunch of other stuff in the background. Additionally, the Server is far from done, but I want to get the receiving working before I start on other things.

Was it helpful?

Solution

threading.local is class that represents thread-local data. Thread-local data are data whose values are thread specific and thus available for local thread only (the thread that created local() object). In other words only the thread that set the value sees the value.

Because you set the loc.parent value in __init__ method of your processing thread, but executed inside the allocator thread, the thread locals of proccessing threads will be available only in the allocator thread. So it won't work in the run() of processing thread, as it will be executed by different thread (not the allocator).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top