4 from daklib.config import Config
6 # This code is a modified copy of
7 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
8 # and is licensed under the Python License. The full text of the license
9 # is available in the file COPYING-PSF.
11 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
18 if Config().has_key('Common::ThreadCount'):
19 defaultThreadCount = int(Config()['Common::ThreadCount'])
21 defaultThreadCount = 1
25 """Flexible thread pool class. Creates a pool of threads, then
26 accepts tasks that will be dispatched to the next available thread.
27 The argument numThreads defaults to 'Common::ThreadCount' which must
28 be specified in dak.conf."""
30 def __init__(self, numThreads = 0):
32 """Initialize the thread pool with numThreads workers."""
35 numThreads = defaultThreadCount
38 self.__resizeLock = threading.Condition(threading.Lock())
39 self.__taskLock = threading.Condition(threading.Lock())
41 self.__isJoining = False
42 self.setThreadCount(numThreads)
44 def setThreadCount(self, newNumThreads):
46 """ External method to set the current pool size. Acquires
47 the resizing lock, then calls the internal version to do real
50 # Can't change the thread count if we're shutting down the pool!
54 self.__resizeLock.acquire()
56 self.__setThreadCountNolock(newNumThreads)
58 self.__resizeLock.release()
61 def __setThreadCountNolock(self, newNumThreads):
63 """Set the current pool size, spawning or terminating threads
64 if necessary. Internal use only; assumes the resizing lock is
67 # If we need to grow the pool, do so
68 while newNumThreads > len(self.__threads):
69 newThread = ThreadPoolThread(self)
70 self.__threads.append(newThread)
72 # If we need to shrink the pool, do so
73 while newNumThreads < len(self.__threads):
74 self.__threads[0].goAway()
77 def getThreadCount(self):
79 """Return the number of threads in the pool."""
81 self.__resizeLock.acquire()
83 return len(self.__threads)
85 self.__resizeLock.release()
87 def queueTask(self, task, args=None, taskCallback=None):
89 """Insert a task into the queue. task must be callable;
90 args and taskCallback can be None."""
92 if self.__isJoining == True:
94 if not callable(task):
97 self.__taskLock.acquire()
99 self.__tasks.append((task, args, taskCallback))
102 self.__taskLock.release()
104 def getNextTask(self):
106 """ Retrieve the next task from the task queue. For use
107 only by ThreadPoolThread objects contained in the pool."""
109 self.__taskLock.acquire()
111 if self.__tasks == []:
112 return (None, None, None)
114 return self.__tasks.pop(0)
116 self.__taskLock.release()
118 def joinAll(self, waitForTasks = True, waitForThreads = True):
120 """ Clear the task queue and terminate all pooled threads,
121 optionally allowing the tasks and threads to finish."""
123 # Mark the pool as joining to prevent any more task queueing
124 self.__isJoining = True
126 # Wait for tasks to finish
128 while self.__tasks != []:
131 # Tell all the threads to quit
132 self.__resizeLock.acquire()
134 self.__setThreadCountNolock(0)
135 self.__isJoining = True
137 # Wait until all threads have exited
139 for t in self.__threads:
143 # Reset the pool for potential reuse
144 self.__isJoining = False
146 self.__resizeLock.release()
150 class ThreadPoolThread(threading.Thread):
152 """ Pooled thread class. """
154 threadSleepTime = 0.1
156 def __init__(self, pool):
158 """ Initialize the thread and remember the pool. """
160 threading.Thread.__init__(self)
162 self.__isDying = False
166 """ Until told to quit, retrieve the next task and execute
167 it, calling the callback if any. """
169 while self.__isDying == False:
170 cmd, args, callback = self.__pool.getNextTask()
171 # If there's nothing to do, just sleep a bit
173 sleep(ThreadPoolThread.threadSleepTime)
174 elif callback is None:
181 """ Exit the run loop next time through."""
183 self.__isDying = True
186 if __name__ == "__main__":
188 from random import randrange
190 # Sample task 1: given a start and end value, shuffle integers,
194 print "SortTask starting for ", data
195 numbers = range(data[0], data[1])
197 rnd = randrange(0, len(numbers) - 1)
198 a, numbers[rnd] = numbers[rnd], a
199 print "SortTask sorting for ", data
201 print "SortTask done for ", data
202 return "Sorter ", data
204 # Sample task 2: just sleep for a number of seconds.
207 print "WaitTask starting for ", data
208 print "WaitTask sleeping for %d seconds" % data
210 return "Waiter", data
212 # Both tasks use the same callback
214 def taskCallback(data):
215 print "Callback called for", data
217 # Create a pool with three worker threads
221 # Insert tasks into the queue and let them run
222 pool.queueTask(sortTask, (1000, 100000), taskCallback)
223 pool.queueTask(waitTask, 5, taskCallback)
224 pool.queueTask(sortTask, (200, 200000), taskCallback)
225 pool.queueTask(waitTask, 2, taskCallback)
226 pool.queueTask(sortTask, (3, 30000), taskCallback)
227 pool.queueTask(waitTask, 7, taskCallback)
229 # When all tasks are finished, allow the threads to terminate