###############################################################################
-import multiprocessing
+from multiprocessing.pool import Pool
+from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
-class Pool():
+import sqlalchemy.orm.session
+
+__all__ = []
+
+PROC_STATUS_SUCCESS = 0 # Everything ok
+PROC_STATUS_EXCEPTION = 1 # An exception was caught
+PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
+PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
+
+__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
+ 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
+
+class SignalException(Exception):
+ def __init__(self, signum):
+ self.signum = signum
+
+ def __str__(self):
+ return "<SignalException: %d>" % self.signum
+
+__all__.append('SignalException')
+
+def signal_handler(signum, info):
+ raise SignalException(signum)
+
+def _func_wrapper(func, *args, **kwds):
+ # We need to handle signals to avoid hanging
+ signal(SIGHUP, signal_handler)
+ signal(SIGTERM, signal_handler)
+ signal(SIGPIPE, signal_handler)
+ signal(SIGALRM, signal_handler)
+
+ # We expect our callback function to return:
+ # (status, messages)
+ # Where:
+ # status is one of PROC_STATUS_*
+ # messages is a string used for logging
+ try:
+ return (func(*args, **kwds))
+ except SignalException, e:
+ return (PROC_STATUS_SIGNALRAISED, e.signum)
+ except Exception, e:
+ return (PROC_STATUS_EXCEPTION, str(e))
+ finally:
+ # Make sure connections are closed. We might die otherwise.
+ sqlalchemy.orm.session.Session.close_all()
+
+
+class DakProcessPool(Pool):
def __init__(self, *args, **kwds):
- self.pool = multiprocessing.Pool(*args, **kwds)
+ Pool.__init__(self, *args, **kwds)
self.results = []
+ self.int_results = []
def apply_async(self, func, args=(), kwds={}, callback=None):
- self.results.append(self.pool.apply_async(func, args, kwds, callback))
-
- def close(self):
- self.pool.close()
+ wrapper_args = list(args)
+ wrapper_args.insert(0, func)
+ self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
def join(self):
- self.pool.join()
+ Pool.join(self)
+ for r in self.int_results:
+ # return values were already handled in the callbacks, but asking
+ # for them might raise exceptions which would otherwise be lost
+ self.results.append(r.get())
+
+ def overall_status(self):
+ # Return the highest of our status results
+ # This basically allows us to do sys.exit(overall_status()) and have us
+ # exit 0 if everything was good and non-zero if not
+ status = 0
for r in self.results:
- r.get()
+ if r[0] > status:
+ status = r[0]
+ return status
+
+__all__.append('DakProcessPool')