]> git.decadent.org.uk Git - dak.git/blob - daklib/threadpool.py
Merge remote branch 'drkranz/master' into merge
[dak.git] / daklib / threadpool.py
1 import threading
2 from time import sleep
3
4 from daklib.config import Config
5
6 # This code is a modified copy of
7 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
8 # and is licensed under the Python License. The full text of the license
9 # is available in the file COPYING-PSF.
10
11 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
12 try:
13     True
14 except NameError:
15     False = 0
16     True = not False
17
18 if Config().has_key('Common::ThreadCount'):
19     defaultThreadCount = int(Config()['Common::ThreadCount'])
20 else:
21     defaultThreadCount = 1
22
23 class ThreadPool:
24
25     """Flexible thread pool class.  Creates a pool of threads, then
26     accepts tasks that will be dispatched to the next available thread.
27     The argument numThreads defaults to 'Common::ThreadCount' which must
28     be specified in dak.conf."""
29
30     def __init__(self, numThreads = 0):
31
32         """Initialize the thread pool with numThreads workers."""
33
34         if numThreads == 0:
35             numThreads = defaultThreadCount
36
37         self.__threads = []
38         self.__resizeLock = threading.Condition(threading.Lock())
39         self.__taskLock = threading.Condition(threading.Lock())
40         self.__tasks = []
41         self.__isJoining = False
42         self.setThreadCount(numThreads)
43
44     def setThreadCount(self, newNumThreads):
45
46         """ External method to set the current pool size.  Acquires
47         the resizing lock, then calls the internal version to do real
48         work."""
49
50         # Can't change the thread count if we're shutting down the pool!
51         if self.__isJoining:
52             return False
53
54         self.__resizeLock.acquire()
55         try:
56             self.__setThreadCountNolock(newNumThreads)
57         finally:
58             self.__resizeLock.release()
59         return True
60
61     def __setThreadCountNolock(self, newNumThreads):
62
63         """Set the current pool size, spawning or terminating threads
64         if necessary.  Internal use only; assumes the resizing lock is
65         held."""
66
67         # If we need to grow the pool, do so
68         while newNumThreads > len(self.__threads):
69             newThread = ThreadPoolThread(self)
70             self.__threads.append(newThread)
71             newThread.start()
72         # If we need to shrink the pool, do so
73         while newNumThreads < len(self.__threads):
74             self.__threads[0].goAway()
75             del self.__threads[0]
76
77     def getThreadCount(self):
78
79         """Return the number of threads in the pool."""
80
81         self.__resizeLock.acquire()
82         try:
83             return len(self.__threads)
84         finally:
85             self.__resizeLock.release()
86
87     def queueTask(self, task, args=None, taskCallback=None):
88
89         """Insert a task into the queue.  task must be callable;
90         args and taskCallback can be None."""
91
92         if self.__isJoining == True:
93             return False
94         if not callable(task):
95             return False
96
97         self.__taskLock.acquire()
98         try:
99             self.__tasks.append((task, args, taskCallback))
100             return True
101         finally:
102             self.__taskLock.release()
103
104     def getNextTask(self):
105
106         """ Retrieve the next task from the task queue.  For use
107         only by ThreadPoolThread objects contained in the pool."""
108
109         self.__taskLock.acquire()
110         try:
111             if self.__tasks == []:
112                 return (None, None, None)
113             else:
114                 return self.__tasks.pop(0)
115         finally:
116             self.__taskLock.release()
117
118     def joinAll(self, waitForTasks = True, waitForThreads = True):
119
120         """ Clear the task queue and terminate all pooled threads,
121         optionally allowing the tasks and threads to finish."""
122
123         # Mark the pool as joining to prevent any more task queueing
124         self.__isJoining = True
125
126         # Wait for tasks to finish
127         if waitForTasks:
128             while self.__tasks != []:
129                 sleep(.1)
130
131         # Tell all the threads to quit
132         self.__resizeLock.acquire()
133         try:
134             self.__setThreadCountNolock(0)
135             self.__isJoining = True
136
137             # Wait until all threads have exited
138             if waitForThreads:
139                 for t in self.__threads:
140                     t.join()
141                     del t
142
143             # Reset the pool for potential reuse
144             self.__isJoining = False
145         finally:
146             self.__resizeLock.release()
147
148
149
150 class ThreadPoolThread(threading.Thread):
151
152     """ Pooled thread class. """
153
154     threadSleepTime = 0.1
155
156     def __init__(self, pool):
157
158         """ Initialize the thread and remember the pool. """
159
160         threading.Thread.__init__(self)
161         self.__pool = pool
162         self.__isDying = False
163
164     def run(self):
165
166         """ Until told to quit, retrieve the next task and execute
167         it, calling the callback if any.  """
168
169         while self.__isDying == False:
170             cmd, args, callback = self.__pool.getNextTask()
171             # If there's nothing to do, just sleep a bit
172             if cmd is None:
173                 sleep(ThreadPoolThread.threadSleepTime)
174             elif callback is None:
175                 cmd(args)
176             else:
177                 callback(cmd(args))
178
179     def goAway(self):
180
181         """ Exit the run loop next time through."""
182
183         self.__isDying = True
184
185 # Usage example
186 if __name__ == "__main__":
187
188     from random import randrange
189
190     # Sample task 1: given a start and end value, shuffle integers,
191     # then sort them
192
193     def sortTask(data):
194         print "SortTask starting for ", data
195         numbers = range(data[0], data[1])
196         for a in numbers:
197             rnd = randrange(0, len(numbers) - 1)
198             a, numbers[rnd] = numbers[rnd], a
199         print "SortTask sorting for ", data
200         numbers.sort()
201         print "SortTask done for ", data
202         return "Sorter ", data
203
204     # Sample task 2: just sleep for a number of seconds.
205
206     def waitTask(data):
207         print "WaitTask starting for ", data
208         print "WaitTask sleeping for %d seconds" % data
209         sleep(data)
210         return "Waiter", data
211
212     # Both tasks use the same callback
213
214     def taskCallback(data):
215         print "Callback called for", data
216
217     # Create a pool with three worker threads
218
219     pool = ThreadPool(3)
220
221     # Insert tasks into the queue and let them run
222     pool.queueTask(sortTask, (1000, 100000), taskCallback)
223     pool.queueTask(waitTask, 5, taskCallback)
224     pool.queueTask(sortTask, (200, 200000), taskCallback)
225     pool.queueTask(waitTask, 2, taskCallback)
226     pool.queueTask(sortTask, (3, 30000), taskCallback)
227     pool.queueTask(waitTask, 7, taskCallback)
228
229     # When all tasks are finished, allow the threads to terminate
230     pool.joinAll()