2 thread pool implementation for Python
4 @contact: Debian FTPMaster <ftpmaster@debian.org>
5 @copyright: 2003 Tim Lesher
6 @copyright: 2004 Carl Kleffner
7 @copyright: 2010, 2011 Torsten Werner <twerner@debian.org>
10 # This code is a modified copy of
11 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
12 # and is licensed under the Python License. The full text of the license
13 # is available in the file COPYING-PSF.
16 from time import sleep
18 from daklib.config import Config
20 if Config().has_key('Common::ThreadCount'):
21 defaultThreadCount = int(Config()['Common::ThreadCount'])
23 defaultThreadCount = 1
27 """Flexible thread pool class. Creates a pool of threads, then
28 accepts tasks that will be dispatched to the next available thread.
29 The argument numThreads defaults to 'Common::ThreadCount' which must
30 be specified in dak.conf."""
32 def __init__(self, numThreads = 0):
34 """Initialize the thread pool with numThreads workers."""
37 numThreads = defaultThreadCount
40 self.__resizeLock = threading.Condition(threading.Lock())
41 self.__taskLock = threading.Condition(threading.Lock())
43 self.__isJoining = False
44 self.setThreadCount(numThreads)
46 def setThreadCount(self, newNumThreads):
48 """ External method to set the current pool size. Acquires
49 the resizing lock, then calls the internal version to do real
52 # Can't change the thread count if we're shutting down the pool!
56 self.__resizeLock.acquire()
58 self.__setThreadCountNolock(newNumThreads)
60 self.__resizeLock.release()
63 def __setThreadCountNolock(self, newNumThreads):
65 """Set the current pool size, spawning or terminating threads
66 if necessary. Internal use only; assumes the resizing lock is
69 # If we need to grow the pool, do so
70 while newNumThreads > len(self.__threads):
71 newThread = ThreadPoolThread(self)
72 self.__threads.append(newThread)
74 # If we need to shrink the pool, do so
75 while newNumThreads < len(self.__threads):
76 self.__threads[0].goAway()
79 def getThreadCount(self):
81 """Return the number of threads in the pool."""
83 self.__resizeLock.acquire()
85 return len(self.__threads)
87 self.__resizeLock.release()
89 def queueTask(self, task, args=None, taskCallback=None):
91 """Insert a task into the queue. task must be callable;
92 args and taskCallback can be None."""
94 if self.__isJoining == True:
96 if not callable(task):
99 self.__taskLock.acquire()
101 self.__tasks.append((task, args, taskCallback))
104 self.__taskLock.release()
106 def getNextTask(self):
108 """ Retrieve the next task from the task queue. For use
109 only by ThreadPoolThread objects contained in the pool."""
111 self.__taskLock.acquire()
113 if self.__tasks == []:
114 return (None, None, None)
116 return self.__tasks.pop(0)
118 self.__taskLock.release()
120 def joinAll(self, waitForTasks = True, waitForThreads = True):
122 """ Clear the task queue and terminate all pooled threads,
123 optionally allowing the tasks and threads to finish."""
125 # Mark the pool as joining to prevent any more task queueing
126 self.__isJoining = True
128 # Wait for tasks to finish
130 while self.__tasks != []:
133 # Tell all the threads to quit
134 self.__resizeLock.acquire()
136 # Wait until all threads have exited
138 for t in self.__threads:
140 for t in self.__threads:
143 self.__setThreadCountNolock(0)
144 self.__isJoining = True
146 # Reset the pool for potential reuse
147 self.__isJoining = False
149 self.__resizeLock.release()
153 class ThreadPoolThread(threading.Thread):
155 """ Pooled thread class. """
157 threadSleepTime = 0.1
159 def __init__(self, pool):
161 """ Initialize the thread and remember the pool. """
163 threading.Thread.__init__(self)
165 self.__isDying = False
169 """ Until told to quit, retrieve the next task and execute
170 it, calling the callback if any. """
172 while self.__isDying == False:
173 cmd, args, callback = self.__pool.getNextTask()
174 # If there's nothing to do, just sleep a bit
176 sleep(ThreadPoolThread.threadSleepTime)
177 elif callback is None:
184 """ Exit the run loop next time through."""
186 self.__isDying = True
189 if __name__ == "__main__":
191 from random import randrange
193 # Sample task 1: given a start and end value, shuffle integers,
197 print "SortTask starting for ", data
198 numbers = range(data[0], data[1])
200 rnd = randrange(0, len(numbers) - 1)
201 a, numbers[rnd] = numbers[rnd], a
202 print "SortTask sorting for ", data
204 print "SortTask done for ", data
205 return "Sorter ", data
207 # Sample task 2: just sleep for a number of seconds.
210 print "WaitTask starting for ", data
211 print "WaitTask sleeping for %d seconds" % data
213 return "Waiter", data
215 # Both tasks use the same callback
217 def taskCallback(data):
218 print "Callback called for", data
220 # Create a pool with three worker threads
224 # Insert tasks into the queue and let them run
225 pool.queueTask(sortTask, (1000, 100000), taskCallback)
226 pool.queueTask(waitTask, 5, taskCallback)
227 pool.queueTask(sortTask, (200, 200000), taskCallback)
228 pool.queueTask(waitTask, 2, taskCallback)
229 pool.queueTask(sortTask, (3, 30000), taskCallback)
230 pool.queueTask(waitTask, 7, taskCallback)
232 # When all tasks are finished, allow the threads to terminate