]> git.decadent.org.uk Git - dak.git/blob - daklib/threadpool.py
Merge branch 'master' into msfl
[dak.git] / daklib / threadpool.py
1 import threading
2 from time import sleep
3
4 from daklib.config import Config
5
6 # This code is a modified copy of
7 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
8 # and is licensed under the Python License. The full text of the license
9 # is available in the file COPYING-PSF.
10
11 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
12 try:
13     True
14 except NameError:
15     False = 0
16     True = not False
17
18 class ThreadPool:
19
20     """Flexible thread pool class.  Creates a pool of threads, then
21     accepts tasks that will be dispatched to the next available thread.
22     The argument numThreads defaults to 'Common::ThreadCount' which must
23     be specified in dak.conf."""
24
25     def __init__(self, numThreads = Config()['Common::ThreadCount']):
26
27         """Initialize the thread pool with numThreads workers."""
28
29         self.__threads = []
30         self.__resizeLock = threading.Condition(threading.Lock())
31         self.__taskLock = threading.Condition(threading.Lock())
32         self.__tasks = []
33         self.__isJoining = False
34         self.setThreadCount(numThreads)
35
36     def setThreadCount(self, newNumThreads):
37
38         """ External method to set the current pool size.  Acquires
39         the resizing lock, then calls the internal version to do real
40         work."""
41
42         # Can't change the thread count if we're shutting down the pool!
43         if self.__isJoining:
44             return False
45
46         self.__resizeLock.acquire()
47         try:
48             self.__setThreadCountNolock(newNumThreads)
49         finally:
50             self.__resizeLock.release()
51         return True
52
53     def __setThreadCountNolock(self, newNumThreads):
54
55         """Set the current pool size, spawning or terminating threads
56         if necessary.  Internal use only; assumes the resizing lock is
57         held."""
58
59         # If we need to grow the pool, do so
60         while newNumThreads > len(self.__threads):
61             newThread = ThreadPoolThread(self)
62             self.__threads.append(newThread)
63             newThread.start()
64         # If we need to shrink the pool, do so
65         while newNumThreads < len(self.__threads):
66             self.__threads[0].goAway()
67             del self.__threads[0]
68
69     def getThreadCount(self):
70
71         """Return the number of threads in the pool."""
72
73         self.__resizeLock.acquire()
74         try:
75             return len(self.__threads)
76         finally:
77             self.__resizeLock.release()
78
79     def queueTask(self, task, args=None, taskCallback=None):
80
81         """Insert a task into the queue.  task must be callable;
82         args and taskCallback can be None."""
83
84         if self.__isJoining == True:
85             return False
86         if not callable(task):
87             return False
88
89         self.__taskLock.acquire()
90         try:
91             self.__tasks.append((task, args, taskCallback))
92             return True
93         finally:
94             self.__taskLock.release()
95
96     def getNextTask(self):
97
98         """ Retrieve the next task from the task queue.  For use
99         only by ThreadPoolThread objects contained in the pool."""
100
101         self.__taskLock.acquire()
102         try:
103             if self.__tasks == []:
104                 return (None, None, None)
105             else:
106                 return self.__tasks.pop(0)
107         finally:
108             self.__taskLock.release()
109
110     def joinAll(self, waitForTasks = True, waitForThreads = True):
111
112         """ Clear the task queue and terminate all pooled threads,
113         optionally allowing the tasks and threads to finish."""
114
115         # Mark the pool as joining to prevent any more task queueing
116         self.__isJoining = True
117
118         # Wait for tasks to finish
119         if waitForTasks:
120             while self.__tasks != []:
121                 sleep(.1)
122
123         # Tell all the threads to quit
124         self.__resizeLock.acquire()
125         try:
126             self.__setThreadCountNolock(0)
127             self.__isJoining = True
128
129             # Wait until all threads have exited
130             if waitForThreads:
131                 for t in self.__threads:
132                     t.join()
133                     del t
134
135             # Reset the pool for potential reuse
136             self.__isJoining = False
137         finally:
138             self.__resizeLock.release()
139
140
141
142 class ThreadPoolThread(threading.Thread):
143
144     """ Pooled thread class. """
145
146     threadSleepTime = 0.1
147
148     def __init__(self, pool):
149
150         """ Initialize the thread and remember the pool. """
151
152         threading.Thread.__init__(self)
153         self.__pool = pool
154         self.__isDying = False
155
156     def run(self):
157
158         """ Until told to quit, retrieve the next task and execute
159         it, calling the callback if any.  """
160
161         while self.__isDying == False:
162             cmd, args, callback = self.__pool.getNextTask()
163             # If there's nothing to do, just sleep a bit
164             if cmd is None:
165                 sleep(ThreadPoolThread.threadSleepTime)
166             elif callback is None:
167                 cmd(args)
168             else:
169                 callback(cmd(args))
170
171     def goAway(self):
172
173         """ Exit the run loop next time through."""
174
175         self.__isDying = True
176
177 # Usage example
178 if __name__ == "__main__":
179
180     from random import randrange
181
182     # Sample task 1: given a start and end value, shuffle integers,
183     # then sort them
184
185     def sortTask(data):
186         print "SortTask starting for ", data
187         numbers = range(data[0], data[1])
188         for a in numbers:
189             rnd = randrange(0, len(numbers) - 1)
190             a, numbers[rnd] = numbers[rnd], a
191         print "SortTask sorting for ", data
192         numbers.sort()
193         print "SortTask done for ", data
194         return "Sorter ", data
195
196     # Sample task 2: just sleep for a number of seconds.
197
198     def waitTask(data):
199         print "WaitTask starting for ", data
200         print "WaitTask sleeping for %d seconds" % data
201         sleep(data)
202         return "Waiter", data
203
204     # Both tasks use the same callback
205
206     def taskCallback(data):
207         print "Callback called for", data
208
209     # Create a pool with three worker threads
210
211     pool = ThreadPool(3)
212
213     # Insert tasks into the queue and let them run
214     pool.queueTask(sortTask, (1000, 100000), taskCallback)
215     pool.queueTask(waitTask, 5, taskCallback)
216     pool.queueTask(sortTask, (200, 200000), taskCallback)
217     pool.queueTask(waitTask, 2, taskCallback)
218     pool.queueTask(sortTask, (3, 30000), taskCallback)
219     pool.queueTask(waitTask, 7, taskCallback)
220
221     # When all tasks are finished, allow the threads to terminate
222     pool.joinAll()