4 from daklib.config import Config
6 # This code is a modified copy of
7 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
8 # and is licensed under the Python License. The full text of the license
9 # is available in the file COPYING-PSF.
11 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
18 if Config().has_key('Common::ThreadCount'):
19 defaultThreadCount = int(Config()['Common::ThreadCount'])
21 defaultThreadCount = 1
25 """Flexible thread pool class. Creates a pool of threads, then
26 accepts tasks that will be dispatched to the next available thread.
27 The argument numThreads defaults to 'Common::ThreadCount' which must
28 be specified in dak.conf."""
30 def __init__(self, numThreads = 0):
32 """Initialize the thread pool with numThreads workers."""
35 numThreads = defaultThreadCount
38 self.__resizeLock = threading.Condition(threading.Lock())
39 self.__taskLock = threading.Condition(threading.Lock())
41 self.__isJoining = False
42 self.setThreadCount(numThreads)
44 def setThreadCount(self, newNumThreads):
46 """ External method to set the current pool size. Acquires
47 the resizing lock, then calls the internal version to do real
50 # Can't change the thread count if we're shutting down the pool!
54 self.__resizeLock.acquire()
56 self.__setThreadCountNolock(newNumThreads)
58 self.__resizeLock.release()
61 def __setThreadCountNolock(self, newNumThreads):
63 """Set the current pool size, spawning or terminating threads
64 if necessary. Internal use only; assumes the resizing lock is
67 # If we need to grow the pool, do so
68 while newNumThreads > len(self.__threads):
69 newThread = ThreadPoolThread(self)
70 self.__threads.append(newThread)
72 # If we need to shrink the pool, do so
73 while newNumThreads < len(self.__threads):
74 self.__threads[0].goAway()
77 def getThreadCount(self):
79 """Return the number of threads in the pool."""
81 self.__resizeLock.acquire()
83 return len(self.__threads)
85 self.__resizeLock.release()
87 def queueTask(self, task, args=None, taskCallback=None):
89 """Insert a task into the queue. task must be callable;
90 args and taskCallback can be None."""
92 if self.__isJoining == True:
94 if not callable(task):
97 self.__taskLock.acquire()
99 self.__tasks.append((task, args, taskCallback))
102 self.__taskLock.release()
104 def getNextTask(self):
106 """ Retrieve the next task from the task queue. For use
107 only by ThreadPoolThread objects contained in the pool."""
109 self.__taskLock.acquire()
111 if self.__tasks == []:
112 return (None, None, None)
114 return self.__tasks.pop(0)
116 self.__taskLock.release()
118 def joinAll(self, waitForTasks = True, waitForThreads = True):
120 """ Clear the task queue and terminate all pooled threads,
121 optionally allowing the tasks and threads to finish."""
123 # Mark the pool as joining to prevent any more task queueing
124 self.__isJoining = True
126 # Wait for tasks to finish
128 while self.__tasks != []:
131 # Tell all the threads to quit
132 self.__resizeLock.acquire()
134 # Wait until all threads have exited
136 for t in self.__threads:
138 for t in self.__threads:
141 self.__setThreadCountNolock(0)
142 self.__isJoining = True
144 # Reset the pool for potential reuse
145 self.__isJoining = False
147 self.__resizeLock.release()
151 class ThreadPoolThread(threading.Thread):
153 """ Pooled thread class. """
155 threadSleepTime = 0.1
157 def __init__(self, pool):
159 """ Initialize the thread and remember the pool. """
161 threading.Thread.__init__(self)
163 self.__isDying = False
167 """ Until told to quit, retrieve the next task and execute
168 it, calling the callback if any. """
170 while self.__isDying == False:
171 cmd, args, callback = self.__pool.getNextTask()
172 # If there's nothing to do, just sleep a bit
174 sleep(ThreadPoolThread.threadSleepTime)
175 elif callback is None:
182 """ Exit the run loop next time through."""
184 self.__isDying = True
187 if __name__ == "__main__":
189 from random import randrange
191 # Sample task 1: given a start and end value, shuffle integers,
195 print "SortTask starting for ", data
196 numbers = range(data[0], data[1])
198 rnd = randrange(0, len(numbers) - 1)
199 a, numbers[rnd] = numbers[rnd], a
200 print "SortTask sorting for ", data
202 print "SortTask done for ", data
203 return "Sorter ", data
205 # Sample task 2: just sleep for a number of seconds.
208 print "WaitTask starting for ", data
209 print "WaitTask sleeping for %d seconds" % data
211 return "Waiter", data
213 # Both tasks use the same callback
215 def taskCallback(data):
216 print "Callback called for", data
218 # Create a pool with three worker threads
222 # Insert tasks into the queue and let them run
223 pool.queueTask(sortTask, (1000, 100000), taskCallback)
224 pool.queueTask(waitTask, 5, taskCallback)
225 pool.queueTask(sortTask, (200, 200000), taskCallback)
226 pool.queueTask(waitTask, 2, taskCallback)
227 pool.queueTask(sortTask, (3, 30000), taskCallback)
228 pool.queueTask(waitTask, 7, taskCallback)
230 # When all tasks are finished, allow the threads to terminate