5 multiprocessing for DAK
7 @contact: Debian FTP Master <ftpmaster@debian.org>
8 @copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
9 @license: GNU General Public License version 2 or later
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 ###############################################################################
28 from multiprocessing.pool import Pool
29 from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
31 import sqlalchemy.orm.session
35 PROC_STATUS_SUCCESS = 0 # Everything ok
36 PROC_STATUS_EXCEPTION = 1 # An exception was caught
37 PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
38 PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
40 __all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
41 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
43 class SignalException(Exception):
44 def __init__(self, signum):
48 return "<SignalException: %d>" % self.signum
50 __all__.append('SignalException')
52 def signal_handler(signum, info):
53 raise SignalException(signum)
55 def _func_wrapper(func, *args, **kwds):
56 # We need to handle signals to avoid hanging
57 signal(SIGHUP, signal_handler)
58 signal(SIGTERM, signal_handler)
59 signal(SIGPIPE, signal_handler)
60 signal(SIGALRM, signal_handler)
62 # We expect our callback function to return:
65 # status is one of PROC_STATUS_*
66 # messages is a string used for logging
68 return (func(*args, **kwds))
69 except SignalException as e:
70 return (PROC_STATUS_SIGNALRAISED, e.signum)
71 except Exception as e:
72 return (PROC_STATUS_EXCEPTION, str(e))
74 # Make sure connections are closed. We might die otherwise.
75 sqlalchemy.orm.session.Session.close_all()
78 class DakProcessPool(Pool):
79 def __init__(self, *args, **kwds):
80 Pool.__init__(self, *args, **kwds)
84 def apply_async(self, func, args=(), kwds={}, callback=None):
85 wrapper_args = list(args)
86 wrapper_args.insert(0, func)
87 self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
91 for r in self.int_results:
92 # return values were already handled in the callbacks, but asking
93 # for them might raise exceptions which would otherwise be lost
94 self.results.append(r.get())
96 def overall_status(self):
97 # Return the highest of our status results
98 # This basically allows us to do sys.exit(overall_status()) and have us
99 # exit 0 if everything was good and non-zero if not
101 for r in self.results:
106 __all__.append('DakProcessPool')