]> git.decadent.org.uk Git - dak.git/blob - daklib/threadpool.py
Use the real path, not my local one - whoops
[dak.git] / daklib / threadpool.py
1 import threading
2 from time import sleep
3
4 # This code is a modified copy of
5 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
6 # and is licensed under the Python License. The full text of the license
7 # is available in the file COPYING-PSF.
8
9 # FIXME:
10 # numThreads defaults to 16 in __init__ to work best on
11 # franck.debian.org but the default value should be specified in
12 # dak.conf
13
14 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
15 try:
16     True
17 except NameError:
18     False = 0
19     True = not False
20
21 class ThreadPool:
22
23     """Flexible thread pool class.  Creates a pool of threads, then
24     accepts tasks that will be dispatched to the next available
25     thread."""
26
27     def __init__(self, numThreads = 16):
28
29         """Initialize the thread pool with numThreads workers."""
30
31         self.__threads = []
32         self.__resizeLock = threading.Condition(threading.Lock())
33         self.__taskLock = threading.Condition(threading.Lock())
34         self.__tasks = []
35         self.__isJoining = False
36         self.setThreadCount(numThreads)
37
38     def setThreadCount(self, newNumThreads):
39
40         """ External method to set the current pool size.  Acquires
41         the resizing lock, then calls the internal version to do real
42         work."""
43
44         # Can't change the thread count if we're shutting down the pool!
45         if self.__isJoining:
46             return False
47
48         self.__resizeLock.acquire()
49         try:
50             self.__setThreadCountNolock(newNumThreads)
51         finally:
52             self.__resizeLock.release()
53         return True
54
55     def __setThreadCountNolock(self, newNumThreads):
56
57         """Set the current pool size, spawning or terminating threads
58         if necessary.  Internal use only; assumes the resizing lock is
59         held."""
60
61         # If we need to grow the pool, do so
62         while newNumThreads > len(self.__threads):
63             newThread = ThreadPoolThread(self)
64             self.__threads.append(newThread)
65             newThread.start()
66         # If we need to shrink the pool, do so
67         while newNumThreads < len(self.__threads):
68             self.__threads[0].goAway()
69             del self.__threads[0]
70
71     def getThreadCount(self):
72
73         """Return the number of threads in the pool."""
74
75         self.__resizeLock.acquire()
76         try:
77             return len(self.__threads)
78         finally:
79             self.__resizeLock.release()
80
81     def queueTask(self, task, args=None, taskCallback=None):
82
83         """Insert a task into the queue.  task must be callable;
84         args and taskCallback can be None."""
85
86         if self.__isJoining == True:
87             return False
88         if not callable(task):
89             return False
90
91         self.__taskLock.acquire()
92         try:
93             self.__tasks.append((task, args, taskCallback))
94             return True
95         finally:
96             self.__taskLock.release()
97
98     def getNextTask(self):
99
100         """ Retrieve the next task from the task queue.  For use
101         only by ThreadPoolThread objects contained in the pool."""
102
103         self.__taskLock.acquire()
104         try:
105             if self.__tasks == []:
106                 return (None, None, None)
107             else:
108                 return self.__tasks.pop(0)
109         finally:
110             self.__taskLock.release()
111
112     def joinAll(self, waitForTasks = True, waitForThreads = True):
113
114         """ Clear the task queue and terminate all pooled threads,
115         optionally allowing the tasks and threads to finish."""
116
117         # Mark the pool as joining to prevent any more task queueing
118         self.__isJoining = True
119
120         # Wait for tasks to finish
121         if waitForTasks:
122             while self.__tasks != []:
123                 sleep(.1)
124
125         # Tell all the threads to quit
126         self.__resizeLock.acquire()
127         try:
128             self.__setThreadCountNolock(0)
129             self.__isJoining = True
130
131             # Wait until all threads have exited
132             if waitForThreads:
133                 for t in self.__threads:
134                     t.join()
135                     del t
136
137             # Reset the pool for potential reuse
138             self.__isJoining = False
139         finally:
140             self.__resizeLock.release()
141
142
143
144 class ThreadPoolThread(threading.Thread):
145
146     """ Pooled thread class. """
147
148     threadSleepTime = 0.1
149
150     def __init__(self, pool):
151
152         """ Initialize the thread and remember the pool. """
153
154         threading.Thread.__init__(self)
155         self.__pool = pool
156         self.__isDying = False
157
158     def run(self):
159
160         """ Until told to quit, retrieve the next task and execute
161         it, calling the callback if any.  """
162
163         while self.__isDying == False:
164             cmd, args, callback = self.__pool.getNextTask()
165             # If there's nothing to do, just sleep a bit
166             if cmd is None:
167                 sleep(ThreadPoolThread.threadSleepTime)
168             elif callback is None:
169                 cmd(args)
170             else:
171                 callback(cmd(args))
172
173     def goAway(self):
174
175         """ Exit the run loop next time through."""
176
177         self.__isDying = True
178
179 # Usage example
180 if __name__ == "__main__":
181
182     from random import randrange
183
184     # Sample task 1: given a start and end value, shuffle integers,
185     # then sort them
186
187     def sortTask(data):
188         print "SortTask starting for ", data
189         numbers = range(data[0], data[1])
190         for a in numbers:
191             rnd = randrange(0, len(numbers) - 1)
192             a, numbers[rnd] = numbers[rnd], a
193         print "SortTask sorting for ", data
194         numbers.sort()
195         print "SortTask done for ", data
196         return "Sorter ", data
197
198     # Sample task 2: just sleep for a number of seconds.
199
200     def waitTask(data):
201         print "WaitTask starting for ", data
202         print "WaitTask sleeping for %d seconds" % data
203         sleep(data)
204         return "Waiter", data
205
206     # Both tasks use the same callback
207
208     def taskCallback(data):
209         print "Callback called for", data
210
211     # Create a pool with three worker threads
212
213     pool = ThreadPool(3)
214
215     # Insert tasks into the queue and let them run
216     pool.queueTask(sortTask, (1000, 100000), taskCallback)
217     pool.queueTask(waitTask, 5, taskCallback)
218     pool.queueTask(sortTask, (200, 200000), taskCallback)
219     pool.queueTask(waitTask, 2, taskCallback)
220     pool.queueTask(sortTask, (3, 30000), taskCallback)
221     pool.queueTask(waitTask, 7, taskCallback)
222
223     # When all tasks are finished, allow the threads to terminate
224     pool.joinAll()