]> git.decadent.org.uk Git - dak.git/blob - daklib/threadpool.py
Merge branch 'master' of ssh://ftp-master.debian.org/srv/ftp.debian.org/git/dak
[dak.git] / daklib / threadpool.py
1 """
2 thread pool implementation for Python
3
4 @contact: Debian FTPMaster <ftpmaster@debian.org>
5 @copyright: 2003 Tim Lesher
6 @copyright: 2004 Carl Kleffner
7 @copyright: 2010, 2011 Torsten Werner <twerner@debian.org>
8 """
9
10 # This code is a modified copy of
11 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
12 # and is licensed under the Python License. The full text of the license
13 # is available in the file COPYING-PSF.
14
15 import threading
16 from time import sleep
17
18 from daklib.config import Config
19
20 if Config().has_key('Common::ThreadCount'):
21     defaultThreadCount = int(Config()['Common::ThreadCount'])
22 else:
23     defaultThreadCount = 1
24
25 class ThreadPool:
26
27     """Flexible thread pool class.  Creates a pool of threads, then
28     accepts tasks that will be dispatched to the next available thread.
29     The argument numThreads defaults to 'Common::ThreadCount' which must
30     be specified in dak.conf."""
31
32     def __init__(self, numThreads = 0):
33
34         """Initialize the thread pool with numThreads workers."""
35
36         if numThreads == 0:
37             numThreads = defaultThreadCount
38
39         self.__threads = []
40         self.__resizeLock = threading.Condition(threading.Lock())
41         self.__taskLock = threading.Condition(threading.Lock())
42         self.__tasks = []
43         self.__isJoining = False
44         self.setThreadCount(numThreads)
45
46     def setThreadCount(self, newNumThreads):
47
48         """ External method to set the current pool size.  Acquires
49         the resizing lock, then calls the internal version to do real
50         work."""
51
52         # Can't change the thread count if we're shutting down the pool!
53         if self.__isJoining:
54             return False
55
56         self.__resizeLock.acquire()
57         try:
58             self.__setThreadCountNolock(newNumThreads)
59         finally:
60             self.__resizeLock.release()
61         return True
62
63     def __setThreadCountNolock(self, newNumThreads):
64
65         """Set the current pool size, spawning or terminating threads
66         if necessary.  Internal use only; assumes the resizing lock is
67         held."""
68
69         # If we need to grow the pool, do so
70         while newNumThreads > len(self.__threads):
71             newThread = ThreadPoolThread(self)
72             self.__threads.append(newThread)
73             newThread.start()
74         # If we need to shrink the pool, do so
75         while newNumThreads < len(self.__threads):
76             self.__threads[0].goAway()
77             del self.__threads[0]
78
79     def getThreadCount(self):
80
81         """Return the number of threads in the pool."""
82
83         self.__resizeLock.acquire()
84         try:
85             return len(self.__threads)
86         finally:
87             self.__resizeLock.release()
88
89     def queueTask(self, task, args=None, taskCallback=None):
90
91         """Insert a task into the queue.  task must be callable;
92         args and taskCallback can be None."""
93
94         if self.__isJoining == True:
95             return False
96         if not callable(task):
97             return False
98
99         self.__taskLock.acquire()
100         try:
101             self.__tasks.append((task, args, taskCallback))
102             return True
103         finally:
104             self.__taskLock.release()
105
106     def getNextTask(self):
107
108         """ Retrieve the next task from the task queue.  For use
109         only by ThreadPoolThread objects contained in the pool."""
110
111         self.__taskLock.acquire()
112         try:
113             if self.__tasks == []:
114                 return (None, None, None)
115             else:
116                 return self.__tasks.pop(0)
117         finally:
118             self.__taskLock.release()
119
120     def joinAll(self, waitForTasks = True, waitForThreads = True):
121
122         """ Clear the task queue and terminate all pooled threads,
123         optionally allowing the tasks and threads to finish."""
124
125         # Mark the pool as joining to prevent any more task queueing
126         self.__isJoining = True
127
128         # Wait for tasks to finish
129         if waitForTasks:
130             while self.__tasks != []:
131                 sleep(.1)
132
133         # Tell all the threads to quit
134         self.__resizeLock.acquire()
135         try:
136             # Wait until all threads have exited
137             if waitForThreads:
138                 for t in self.__threads:
139                     t.goAway()
140                 for t in self.__threads:
141                     t.join()
142                     del t
143             self.__setThreadCountNolock(0)
144             self.__isJoining = True
145
146             # Reset the pool for potential reuse
147             self.__isJoining = False
148         finally:
149             self.__resizeLock.release()
150
151
152
153 class ThreadPoolThread(threading.Thread):
154
155     """ Pooled thread class. """
156
157     threadSleepTime = 0.1
158
159     def __init__(self, pool):
160
161         """ Initialize the thread and remember the pool. """
162
163         threading.Thread.__init__(self)
164         self.__pool = pool
165         self.__isDying = False
166
167     def run(self):
168
169         """ Until told to quit, retrieve the next task and execute
170         it, calling the callback if any.  """
171
172         while self.__isDying == False:
173             cmd, args, callback = self.__pool.getNextTask()
174             # If there's nothing to do, just sleep a bit
175             if cmd is None:
176                 sleep(ThreadPoolThread.threadSleepTime)
177             elif callback is None:
178                 cmd(args)
179             else:
180                 callback(cmd(args))
181
182     def goAway(self):
183
184         """ Exit the run loop next time through."""
185
186         self.__isDying = True
187
188 # Usage example
189 if __name__ == "__main__":
190
191     from random import randrange
192
193     # Sample task 1: given a start and end value, shuffle integers,
194     # then sort them
195
196     def sortTask(data):
197         print "SortTask starting for ", data
198         numbers = range(data[0], data[1])
199         for a in numbers:
200             rnd = randrange(0, len(numbers) - 1)
201             a, numbers[rnd] = numbers[rnd], a
202         print "SortTask sorting for ", data
203         numbers.sort()
204         print "SortTask done for ", data
205         return "Sorter ", data
206
207     # Sample task 2: just sleep for a number of seconds.
208
209     def waitTask(data):
210         print "WaitTask starting for ", data
211         print "WaitTask sleeping for %d seconds" % data
212         sleep(data)
213         return "Waiter", data
214
215     # Both tasks use the same callback
216
217     def taskCallback(data):
218         print "Callback called for", data
219
220     # Create a pool with three worker threads
221
222     pool = ThreadPool(3)
223
224     # Insert tasks into the queue and let them run
225     pool.queueTask(sortTask, (1000, 100000), taskCallback)
226     pool.queueTask(waitTask, 5, taskCallback)
227     pool.queueTask(sortTask, (200, 200000), taskCallback)
228     pool.queueTask(waitTask, 2, taskCallback)
229     pool.queueTask(sortTask, (3, 30000), taskCallback)
230     pool.queueTask(waitTask, 7, taskCallback)
231
232     # When all tasks are finished, allow the threads to terminate
233     pool.joinAll()