]> git.decadent.org.uk Git - dak.git/blobdiff - daklib/dakmultiprocessing.py
Add by-hash support
[dak.git] / daklib / dakmultiprocessing.py
index ded81a2902cf975161cb2c20d372a7f9bb1a477c..b709ecf558850f6f4ec2cf405e9e10b6a0c130cd 100644 (file)
@@ -25,32 +25,82 @@ multiprocessing for DAK
 
 ###############################################################################
 
-import multiprocessing
+from multiprocessing.pool import Pool
+from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
+
 import sqlalchemy.orm.session
 
+__all__ = []
+
+PROC_STATUS_SUCCESS      = 0  # Everything ok
+PROC_STATUS_EXCEPTION    = 1  # An exception was caught
+PROC_STATUS_SIGNALRAISED = 2  # A signal was generated
+PROC_STATUS_MISCFAILURE  = 3  # Process specific error; see message
+
+__all__.extend(['PROC_STATUS_SUCCESS',      'PROC_STATUS_EXCEPTION',
+                'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
+
+class SignalException(Exception):
+    def __init__(self, signum):
+        self.signum = signum
+
+    def __str__(self):
+        return "<SignalException: %d>" % self.signum
+
+__all__.append('SignalException')
+
+def signal_handler(signum, info):
+    raise SignalException(signum)
+
 def _func_wrapper(func, *args, **kwds):
+    # We need to handle signals to avoid hanging
+    signal(SIGHUP, signal_handler)
+    signal(SIGTERM, signal_handler)
+    signal(SIGPIPE, signal_handler)
+    signal(SIGALRM, signal_handler)
+
+    # We expect our callback function to return:
+    # (status, messages)
+    # Where:
+    #  status is one of PROC_STATUS_*
+    #  messages is a string used for logging
     try:
-        return func(*args, **kwds)
+        return (func(*args, **kwds))
+    except SignalException as e:
+        return (PROC_STATUS_SIGNALRAISED, e.signum)
+    except Exception as e:
+        return (PROC_STATUS_EXCEPTION, str(e))
     finally:
         # Make sure connections are closed. We might die otherwise.
         sqlalchemy.orm.session.Session.close_all()
 
-class Pool():
+
+class DakProcessPool(Pool):
     def __init__(self, *args, **kwds):
-        self.pool = multiprocessing.Pool(*args, **kwds)
+        Pool.__init__(self, *args, **kwds)
         self.results = []
+        self.int_results = []
 
     def apply_async(self, func, args=(), kwds={}, callback=None):
         wrapper_args = list(args)
         wrapper_args.insert(0, func)
-        self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback))
-
-    def close(self):
-        self.pool.close()
+        self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
 
     def join(self):
-        self.pool.join()
-        for r in self.results:
+        Pool.join(self)
+        for r in self.int_results:
             # return values were already handled in the callbacks, but asking
             # for them might raise exceptions which would otherwise be lost
-            r.get()
+            self.results.append(r.get())
+
+    def overall_status(self):
+        # Return the highest of our status results
+        # This basically allows us to do sys.exit(overall_status()) and have us
+        # exit 0 if everything was good and non-zero if not
+        status = 0
+        for r in self.results:
+            if r[0] > status:
+                status = r[0]
+        return status
+
+__all__.append('DakProcessPool')