4 from daklib.config import Config
6 # This code is a modified copy of
7 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
8 # and is licensed under the Python License. The full text of the license
9 # is available in the file COPYING-PSF.
11 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
20 """Flexible thread pool class. Creates a pool of threads, then
21 accepts tasks that will be dispatched to the next available thread.
22 The argument numThreads defaults to 'Common::ThreadCount' which must
23 be specified in dak.conf."""
25 def __init__(self, numThreads = Config()['Common::ThreadCount']):
27 """Initialize the thread pool with numThreads workers."""
30 self.__resizeLock = threading.Condition(threading.Lock())
31 self.__taskLock = threading.Condition(threading.Lock())
33 self.__isJoining = False
34 self.setThreadCount(numThreads)
36 def setThreadCount(self, newNumThreads):
38 """ External method to set the current pool size. Acquires
39 the resizing lock, then calls the internal version to do real
42 # Can't change the thread count if we're shutting down the pool!
46 self.__resizeLock.acquire()
48 self.__setThreadCountNolock(newNumThreads)
50 self.__resizeLock.release()
53 def __setThreadCountNolock(self, newNumThreads):
55 """Set the current pool size, spawning or terminating threads
56 if necessary. Internal use only; assumes the resizing lock is
59 # If we need to grow the pool, do so
60 while newNumThreads > len(self.__threads):
61 newThread = ThreadPoolThread(self)
62 self.__threads.append(newThread)
64 # If we need to shrink the pool, do so
65 while newNumThreads < len(self.__threads):
66 self.__threads[0].goAway()
69 def getThreadCount(self):
71 """Return the number of threads in the pool."""
73 self.__resizeLock.acquire()
75 return len(self.__threads)
77 self.__resizeLock.release()
79 def queueTask(self, task, args=None, taskCallback=None):
81 """Insert a task into the queue. task must be callable;
82 args and taskCallback can be None."""
84 if self.__isJoining == True:
86 if not callable(task):
89 self.__taskLock.acquire()
91 self.__tasks.append((task, args, taskCallback))
94 self.__taskLock.release()
96 def getNextTask(self):
98 """ Retrieve the next task from the task queue. For use
99 only by ThreadPoolThread objects contained in the pool."""
101 self.__taskLock.acquire()
103 if self.__tasks == []:
104 return (None, None, None)
106 return self.__tasks.pop(0)
108 self.__taskLock.release()
110 def joinAll(self, waitForTasks = True, waitForThreads = True):
112 """ Clear the task queue and terminate all pooled threads,
113 optionally allowing the tasks and threads to finish."""
115 # Mark the pool as joining to prevent any more task queueing
116 self.__isJoining = True
118 # Wait for tasks to finish
120 while self.__tasks != []:
123 # Tell all the threads to quit
124 self.__resizeLock.acquire()
126 self.__setThreadCountNolock(0)
127 self.__isJoining = True
129 # Wait until all threads have exited
131 for t in self.__threads:
135 # Reset the pool for potential reuse
136 self.__isJoining = False
138 self.__resizeLock.release()
142 class ThreadPoolThread(threading.Thread):
144 """ Pooled thread class. """
146 threadSleepTime = 0.1
148 def __init__(self, pool):
150 """ Initialize the thread and remember the pool. """
152 threading.Thread.__init__(self)
154 self.__isDying = False
158 """ Until told to quit, retrieve the next task and execute
159 it, calling the callback if any. """
161 while self.__isDying == False:
162 cmd, args, callback = self.__pool.getNextTask()
163 # If there's nothing to do, just sleep a bit
165 sleep(ThreadPoolThread.threadSleepTime)
166 elif callback is None:
173 """ Exit the run loop next time through."""
175 self.__isDying = True
178 if __name__ == "__main__":
180 from random import randrange
182 # Sample task 1: given a start and end value, shuffle integers,
186 print "SortTask starting for ", data
187 numbers = range(data[0], data[1])
189 rnd = randrange(0, len(numbers) - 1)
190 a, numbers[rnd] = numbers[rnd], a
191 print "SortTask sorting for ", data
193 print "SortTask done for ", data
194 return "Sorter ", data
196 # Sample task 2: just sleep for a number of seconds.
199 print "WaitTask starting for ", data
200 print "WaitTask sleeping for %d seconds" % data
202 return "Waiter", data
204 # Both tasks use the same callback
206 def taskCallback(data):
207 print "Callback called for", data
209 # Create a pool with three worker threads
213 # Insert tasks into the queue and let them run
214 pool.queueTask(sortTask, (1000, 100000), taskCallback)
215 pool.queueTask(waitTask, 5, taskCallback)
216 pool.queueTask(sortTask, (200, 200000), taskCallback)
217 pool.queueTask(waitTask, 2, taskCallback)
218 pool.queueTask(sortTask, (3, 30000), taskCallback)
219 pool.queueTask(waitTask, 7, taskCallback)
221 # When all tasks are finished, allow the threads to terminate