]> git.decadent.org.uk Git - dak.git/blob - daklib/threadpool.py
Merge branch 'master' into dbtests
[dak.git] / daklib / threadpool.py
1 import threading
2 from time import sleep
3
4 from daklib.config import Config
5
6 # This code is a modified copy of
7 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
8 # and is licensed under the Python License. The full text of the license
9 # is available in the file COPYING-PSF.
10
11 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
12 try:
13     True
14 except NameError:
15     False = 0
16     True = not False
17
18 if Config().has_key('Common::ThreadCount'):
19     defaultThreadCount = int(Config()['Common::ThreadCount'])
20 else:
21     defaultThreadCount = 1
22
23 class ThreadPool:
24
25     """Flexible thread pool class.  Creates a pool of threads, then
26     accepts tasks that will be dispatched to the next available thread.
27     The argument numThreads defaults to 'Common::ThreadCount' which must
28     be specified in dak.conf."""
29
30     def __init__(self, numThreads = 0):
31
32         """Initialize the thread pool with numThreads workers."""
33
34         if numThreads == 0:
35             numThreads = defaultThreadCount
36
37         self.__threads = []
38         self.__resizeLock = threading.Condition(threading.Lock())
39         self.__taskLock = threading.Condition(threading.Lock())
40         self.__tasks = []
41         self.__isJoining = False
42         self.setThreadCount(numThreads)
43
44     def setThreadCount(self, newNumThreads):
45
46         """ External method to set the current pool size.  Acquires
47         the resizing lock, then calls the internal version to do real
48         work."""
49
50         # Can't change the thread count if we're shutting down the pool!
51         if self.__isJoining:
52             return False
53
54         self.__resizeLock.acquire()
55         try:
56             self.__setThreadCountNolock(newNumThreads)
57         finally:
58             self.__resizeLock.release()
59         return True
60
61     def __setThreadCountNolock(self, newNumThreads):
62
63         """Set the current pool size, spawning or terminating threads
64         if necessary.  Internal use only; assumes the resizing lock is
65         held."""
66
67         # If we need to grow the pool, do so
68         while newNumThreads > len(self.__threads):
69             newThread = ThreadPoolThread(self)
70             self.__threads.append(newThread)
71             newThread.start()
72         # If we need to shrink the pool, do so
73         while newNumThreads < len(self.__threads):
74             self.__threads[0].goAway()
75             del self.__threads[0]
76
77     def getThreadCount(self):
78
79         """Return the number of threads in the pool."""
80
81         self.__resizeLock.acquire()
82         try:
83             return len(self.__threads)
84         finally:
85             self.__resizeLock.release()
86
87     def queueTask(self, task, args=None, taskCallback=None):
88
89         """Insert a task into the queue.  task must be callable;
90         args and taskCallback can be None."""
91
92         if self.__isJoining == True:
93             return False
94         if not callable(task):
95             return False
96
97         self.__taskLock.acquire()
98         try:
99             self.__tasks.append((task, args, taskCallback))
100             return True
101         finally:
102             self.__taskLock.release()
103
104     def getNextTask(self):
105
106         """ Retrieve the next task from the task queue.  For use
107         only by ThreadPoolThread objects contained in the pool."""
108
109         self.__taskLock.acquire()
110         try:
111             if self.__tasks == []:
112                 return (None, None, None)
113             else:
114                 return self.__tasks.pop(0)
115         finally:
116             self.__taskLock.release()
117
118     def joinAll(self, waitForTasks = True, waitForThreads = True):
119
120         """ Clear the task queue and terminate all pooled threads,
121         optionally allowing the tasks and threads to finish."""
122
123         # Mark the pool as joining to prevent any more task queueing
124         self.__isJoining = True
125
126         # Wait for tasks to finish
127         if waitForTasks:
128             while self.__tasks != []:
129                 sleep(.1)
130
131         # Tell all the threads to quit
132         self.__resizeLock.acquire()
133         try:
134             # Wait until all threads have exited
135             if waitForThreads:
136                 for t in self.__threads:
137                     t.goAway()
138                 for t in self.__threads:
139                     t.join()
140                     del t
141             self.__setThreadCountNolock(0)
142             self.__isJoining = True
143
144             # Reset the pool for potential reuse
145             self.__isJoining = False
146         finally:
147             self.__resizeLock.release()
148
149
150
151 class ThreadPoolThread(threading.Thread):
152
153     """ Pooled thread class. """
154
155     threadSleepTime = 0.1
156
157     def __init__(self, pool):
158
159         """ Initialize the thread and remember the pool. """
160
161         threading.Thread.__init__(self)
162         self.__pool = pool
163         self.__isDying = False
164
165     def run(self):
166
167         """ Until told to quit, retrieve the next task and execute
168         it, calling the callback if any.  """
169
170         while self.__isDying == False:
171             cmd, args, callback = self.__pool.getNextTask()
172             # If there's nothing to do, just sleep a bit
173             if cmd is None:
174                 sleep(ThreadPoolThread.threadSleepTime)
175             elif callback is None:
176                 cmd(args)
177             else:
178                 callback(cmd(args))
179
180     def goAway(self):
181
182         """ Exit the run loop next time through."""
183
184         self.__isDying = True
185
186 # Usage example
187 if __name__ == "__main__":
188
189     from random import randrange
190
191     # Sample task 1: given a start and end value, shuffle integers,
192     # then sort them
193
194     def sortTask(data):
195         print "SortTask starting for ", data
196         numbers = range(data[0], data[1])
197         for a in numbers:
198             rnd = randrange(0, len(numbers) - 1)
199             a, numbers[rnd] = numbers[rnd], a
200         print "SortTask sorting for ", data
201         numbers.sort()
202         print "SortTask done for ", data
203         return "Sorter ", data
204
205     # Sample task 2: just sleep for a number of seconds.
206
207     def waitTask(data):
208         print "WaitTask starting for ", data
209         print "WaitTask sleeping for %d seconds" % data
210         sleep(data)
211         return "Waiter", data
212
213     # Both tasks use the same callback
214
215     def taskCallback(data):
216         print "Callback called for", data
217
218     # Create a pool with three worker threads
219
220     pool = ThreadPool(3)
221
222     # Insert tasks into the queue and let them run
223     pool.queueTask(sortTask, (1000, 100000), taskCallback)
224     pool.queueTask(waitTask, 5, taskCallback)
225     pool.queueTask(sortTask, (200, 200000), taskCallback)
226     pool.queueTask(waitTask, 2, taskCallback)
227     pool.queueTask(sortTask, (3, 30000), taskCallback)
228     pool.queueTask(waitTask, 7, taskCallback)
229
230     # When all tasks are finished, allow the threads to terminate
231     pool.joinAll()