X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=daklib%2Fdakmultiprocessing.py;h=b709ecf558850f6f4ec2cf405e9e10b6a0c130cd;hb=391f5ec09a119131dc846b796ca791f4cecc69e4;hp=ded81a2902cf975161cb2c20d372a7f9bb1a477c;hpb=e8ef4a21ceff20319cde5002cb562ae05d7622c9;p=dak.git diff --git a/daklib/dakmultiprocessing.py b/daklib/dakmultiprocessing.py index ded81a29..b709ecf5 100644 --- a/daklib/dakmultiprocessing.py +++ b/daklib/dakmultiprocessing.py @@ -25,32 +25,82 @@ multiprocessing for DAK ############################################################################### -import multiprocessing +from multiprocessing.pool import Pool +from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM + import sqlalchemy.orm.session +__all__ = [] + +PROC_STATUS_SUCCESS = 0 # Everything ok +PROC_STATUS_EXCEPTION = 1 # An exception was caught +PROC_STATUS_SIGNALRAISED = 2 # A signal was generated +PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message + +__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION', + 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE']) + +class SignalException(Exception): + def __init__(self, signum): + self.signum = signum + + def __str__(self): + return "" % self.signum + +__all__.append('SignalException') + +def signal_handler(signum, info): + raise SignalException(signum) + def _func_wrapper(func, *args, **kwds): + # We need to handle signals to avoid hanging + signal(SIGHUP, signal_handler) + signal(SIGTERM, signal_handler) + signal(SIGPIPE, signal_handler) + signal(SIGALRM, signal_handler) + + # We expect our callback function to return: + # (status, messages) + # Where: + # status is one of PROC_STATUS_* + # messages is a string used for logging try: - return func(*args, **kwds) + return (func(*args, **kwds)) + except SignalException as e: + return (PROC_STATUS_SIGNALRAISED, e.signum) + except Exception as e: + return (PROC_STATUS_EXCEPTION, str(e)) finally: # Make sure connections are closed. We might die otherwise. sqlalchemy.orm.session.Session.close_all() -class Pool(): + +class DakProcessPool(Pool): def __init__(self, *args, **kwds): - self.pool = multiprocessing.Pool(*args, **kwds) + Pool.__init__(self, *args, **kwds) self.results = [] + self.int_results = [] def apply_async(self, func, args=(), kwds={}, callback=None): wrapper_args = list(args) wrapper_args.insert(0, func) - self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback)) - - def close(self): - self.pool.close() + self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback)) def join(self): - self.pool.join() - for r in self.results: + Pool.join(self) + for r in self.int_results: # return values were already handled in the callbacks, but asking # for them might raise exceptions which would otherwise be lost - r.get() + self.results.append(r.get()) + + def overall_status(self): + # Return the highest of our status results + # This basically allows us to do sys.exit(overall_status()) and have us + # exit 0 if everything was good and non-zero if not + status = 0 + for r in self.results: + if r[0] > status: + status = r[0] + return status + +__all__.append('DakProcessPool')