X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=daklib%2Fdakmultiprocessing.py;h=b709ecf558850f6f4ec2cf405e9e10b6a0c130cd;hb=391f5ec09a119131dc846b796ca791f4cecc69e4;hp=681755cc49c1f37e1458b7fbf008259e1bb480f3;hpb=1149b8e288d45a56c61b7d3804e25fd33de4f27a;p=dak.git diff --git a/daklib/dakmultiprocessing.py b/daklib/dakmultiprocessing.py index 681755cc..b709ecf5 100644 --- a/daklib/dakmultiprocessing.py +++ b/daklib/dakmultiprocessing.py @@ -25,20 +25,82 @@ multiprocessing for DAK ############################################################################### -import multiprocessing +from multiprocessing.pool import Pool +from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM -class Pool(): +import sqlalchemy.orm.session + +__all__ = [] + +PROC_STATUS_SUCCESS = 0 # Everything ok +PROC_STATUS_EXCEPTION = 1 # An exception was caught +PROC_STATUS_SIGNALRAISED = 2 # A signal was generated +PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message + +__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION', + 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE']) + +class SignalException(Exception): + def __init__(self, signum): + self.signum = signum + + def __str__(self): + return "" % self.signum + +__all__.append('SignalException') + +def signal_handler(signum, info): + raise SignalException(signum) + +def _func_wrapper(func, *args, **kwds): + # We need to handle signals to avoid hanging + signal(SIGHUP, signal_handler) + signal(SIGTERM, signal_handler) + signal(SIGPIPE, signal_handler) + signal(SIGALRM, signal_handler) + + # We expect our callback function to return: + # (status, messages) + # Where: + # status is one of PROC_STATUS_* + # messages is a string used for logging + try: + return (func(*args, **kwds)) + except SignalException as e: + return (PROC_STATUS_SIGNALRAISED, e.signum) + except Exception as e: + return (PROC_STATUS_EXCEPTION, str(e)) + finally: + # Make sure connections are closed. We might die otherwise. + sqlalchemy.orm.session.Session.close_all() + + +class DakProcessPool(Pool): def __init__(self, *args, **kwds): - self.pool = multiprocessing.Pool(*args, **kwds) + Pool.__init__(self, *args, **kwds) self.results = [] + self.int_results = [] def apply_async(self, func, args=(), kwds={}, callback=None): - self.results.append(self.pool.apply_async(func, args, kwds, callback)) - - def close(self): - self.pool.close() + wrapper_args = list(args) + wrapper_args.insert(0, func) + self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback)) def join(self): - self.pool.join() + Pool.join(self) + for r in self.int_results: + # return values were already handled in the callbacks, but asking + # for them might raise exceptions which would otherwise be lost + self.results.append(r.get()) + + def overall_status(self): + # Return the highest of our status results + # This basically allows us to do sys.exit(overall_status()) and have us + # exit 0 if everything was good and non-zero if not + status = 0 for r in self.results: - r.get() + if r[0] > status: + status = r[0] + return status + +__all__.append('DakProcessPool')