]> git.decadent.org.uk Git - dak.git/blob - daklib/dakmultiprocessing.py
Enhance process pool implementation
[dak.git] / daklib / dakmultiprocessing.py
1 #!/usr/bin/env python
2 # vim:set et sw=4:
3
4 """
5 multiprocessing for DAK
6
7 @contact: Debian FTP Master <ftpmaster@debian.org>
8 @copyright: 2011  Ansgar Burchardt <ansgar@debian.org>
9 @license: GNU General Public License version 2 or later
10 """
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ###############################################################################
27
28 from multiprocessing.pool import Pool
29 from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGCHLD, SIGALRM
30
31 import sqlalchemy.orm.session
32
33 __all__ = []
34
35 PROC_STATUS_SUCCESS      = 0  # Everything ok
36 PROC_STATUS_EXCEPTION    = 1  # An exception was caught
37 PROC_STATUS_SIGNALRAISED = 2  # A signal was generated
38 PROC_STATUS_MISCFAILURE  = 3  # Process specific error; see message
39
40 __all__.extend(['PROC_STATUS_SUCCESS',      'PROC_STATUS_EXCEPTION',
41                 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
42
43 class SignalException(Exception):
44     def __init__(self, signum):
45         self.signum = signum
46
47     def __str__(self):
48         return "<SignalException: %d>" % self.signum
49
50 __all__.append('SignalException')
51
52 def signal_handler(signum, info):
53     raise SignalException(signum)
54
55 def _func_wrapper(func, *args, **kwds):
56     # We need to handle signals to avoid hanging
57     signal(SIGHUP, signal_handler)
58     signal(SIGTERM, signal_handler)
59     signal(SIGPIPE, signal_handler)
60     signal(SIGCHLD, signal_handler)
61     signal(SIGALRM, signal_handler)
62
63     # We expect our callback function to return:
64     # (status, messages)
65     # Where:
66     #  status is one of PROC_STATUS_*
67     #  messages is a string used for logging
68     try:
69         return (func(*args, **kwds))
70     except SignalException, e:
71         return (PROC_STATUS_SIGNALRAISED, e.signum)
72     except Exception, e:
73         return (PROC_STATUS_EXCEPTION, str(e))
74     finally:
75         # Make sure connections are closed. We might die otherwise.
76         sqlalchemy.orm.session.Session.close_all()
77
78
79 class DakProcessPool(Pool):
80     def __init__(self, *args, **kwds):
81         Pool.__init__(self, *args, **kwds)
82         self.results = []
83         self.int_results = []
84
85     def apply_async(self, func, args=(), kwds={}, callback=None):
86         wrapper_args = list(args)
87         wrapper_args.insert(0, func)
88         self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
89
90     def join(self):
91         Pool.join(self)
92         for r in self.int_results:
93             # return values were already handled in the callbacks, but asking
94             # for them might raise exceptions which would otherwise be lost
95             self.results.append(r.get())
96
97     def overall_status(self):
98         # Return the highest of our status results
99         # This basically allows us to do sys.exit(overall_status()) and have us
100         # exit 0 if everything was good and non-zero if not
101         status = 0
102         for r in self.results:
103             if r[0] > status:
104                 status = r[0]
105         return status
106
107 __all__.append('DakProcessPool')