]> git.decadent.org.uk Git - dak.git/commitdiff
Merge remote branch 'ftpmaster/master' into multiproc
authorMark Hymers <mhy@debian.org>
Sat, 9 Apr 2011 08:42:22 +0000 (09:42 +0100)
committerMark Hymers <mhy@debian.org>
Sat, 9 Apr 2011 08:42:22 +0000 (09:42 +0100)
Conflicts:
dak/show_new.py

Signed-off-by: Mark Hymers <mhy@debian.org>
dak/generate_filelist.py
dak/generate_packages_sources2.py
dak/generate_releases.py
dak/show_new.py
daklib/daklog.py
daklib/dakmultiprocessing.py
tests/test_multiprocessing.py [new file with mode: 0755]

index 2a566e06ed405e9e37a36a999e4d129fe3d980ee..98d239c58aaeb805bdbadf9b25d086805aa101a4 100755 (executable)
@@ -39,7 +39,7 @@ Generate file lists for apt-ftparchive.
 from daklib.dbconn import *
 from daklib.config import Config
 from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
 import apt_pkg, os, stat, sys
 
 from daklib.lists import getSources, getBinaries, getArchAll
@@ -78,7 +78,7 @@ def writeSourceList(suite_id, component_id, incremental_mode):
         file.write(filename + '\n')
     session.rollback()
     file.close()
-    return message
+    return (PROC_STATUS_SUCCESS, message)
 
 def writeAllList(suite_id, component_id, architecture_id, type, incremental_mode):
     session = DBConn().session()
@@ -95,7 +95,7 @@ def writeAllList(suite_id, component_id, architecture_id, type, incremental_mode
         file.write(filename + '\n')
     session.rollback()
     file.close()
-    return message
+    return (PROC_STATUS_SUCCESS, message)
 
 def writeBinaryList(suite_id, component_id, architecture_id, type, incremental_mode):
     session = DBConn().session()
@@ -112,7 +112,7 @@ def writeBinaryList(suite_id, component_id, architecture_id, type, incremental_m
         file.write(filename + '\n')
     session.rollback()
     file.close()
-    return message
+    return (PROC_STATUS_SUCCESS, message)
 
 def usage():
     print """Usage: dak generate_filelist [OPTIONS]
@@ -159,7 +159,7 @@ def main():
     Options = cnf.SubTree("Filelist::Options")
     if Options['Help']:
         usage()
-    #pool = Pool()
+    pool = DakProcessPool()
     query_suites = query_suites. \
         filter(Suite.suite_name.in_(utils.split_args(Options['Suite'])))
     query_components = query_components. \
@@ -167,8 +167,15 @@ def main():
     query_architectures = query_architectures. \
         filter(Architecture.arch_string.in_(utils.split_args(Options['Architecture'])))
 
-    def log(message):
-        Logger.log([message])
+    def parse_results(message):
+        # Split out into (code, msg)
+        code, msg = message
+        if code == PROC_STATUS_SUCCESS:
+            Logger.log([msg])
+        elif code == PROC_STATUS_SIGNALRAISED:
+            Logger.log(['E: Subprocess recieved signal ', msg])
+        else:
+            Logger.log(['E: ', msg])
 
     for suite in query_suites:
         suite_id = suite.suite_id
@@ -179,34 +186,32 @@ def main():
                 if architecture not in suite.architectures:
                     pass
                 elif architecture.arch_string == 'source':
-                    Logger.log([writeSourceList(suite_id, component_id, Options['Incremental'])])
-                    #pool.apply_async(writeSourceList,
-                    #    (suite_id, component_id, Options['Incremental']), callback=log)
+                    pool.apply_async(writeSourceList,
+                        (suite_id, component_id, Options['Incremental']), callback=parse_results)
                 elif architecture.arch_string == 'all':
-                    Logger.log([writeAllList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
-                    #pool.apply_async(writeAllList,
-                    #    (suite_id, component_id, architecture_id, 'deb',
-                    #        Options['Incremental']), callback=log)
-                    Logger.log([writeAllList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
-                    #pool.apply_async(writeAllList,
-                    #    (suite_id, component_id, architecture_id, 'udeb',
-                    #        Options['Incremental']), callback=log)
+                    pool.apply_async(writeAllList,
+                        (suite_id, component_id, architecture_id, 'deb',
+                            Options['Incremental']), callback=parse_results)
+                    pool.apply_async(writeAllList,
+                        (suite_id, component_id, architecture_id, 'udeb',
+                            Options['Incremental']), callback=parse_results)
                 else: # arch any
-                    Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
-                    #pool.apply_async(writeBinaryList,
-                    #    (suite_id, component_id, architecture_id, 'deb',
-                    #        Options['Incremental']), callback=log)
-                    Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
-                    #pool.apply_async(writeBinaryList,
-                    #    (suite_id, component_id, architecture_id, 'udeb',
-                    #        Options['Incremental']), callback=log)
-    #pool.close()
-    #pool.join()
+                    pool.apply_async(writeBinaryList,
+                        (suite_id, component_id, architecture_id, 'deb',
+                            Options['Incremental']), callback=parse_results)
+                    pool.apply_async(writeBinaryList,
+                        (suite_id, component_id, architecture_id, 'udeb',
+                            Options['Incremental']), callback=parse_results)
+    pool.close()
+    pool.join()
+
     # this script doesn't change the database
     session.close()
 
     Logger.close()
 
+    sys.exit(pool.overall_status())
+
 if __name__ == '__main__':
     main()
 
index fbae045fd1676ce2afc323f34c7404b9b3dbd01c..b157fcbe5fc989d0bb1adce55b9237c1d11313e1 100755 (executable)
@@ -31,7 +31,7 @@ Generate Packages/Sources files
 from daklib.dbconn import *
 from daklib.config import Config
 from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
 from daklib.filewriter import PackagesFileWriter, SourcesFileWriter
 
 import apt_pkg, os, stat, sys
@@ -119,7 +119,7 @@ def generate_sources(suite_id, component_id):
 
     message = ["generate sources", suite.suite_name, component.component_name]
     session.rollback()
-    return message
+    return (PROC_STATUS_SUCCESS, message)
 
 #############################################################################
 
@@ -225,7 +225,7 @@ def generate_packages(suite_id, component_id, architecture_id, type_name):
 
     message = ["generate-packages", suite.suite_name, component.component_name, architecture.arch_string]
     session.rollback()
-    return message
+    return (PROC_STATUS_SUCCESS, message)
 
 #############################################################################
 
@@ -265,28 +265,35 @@ def main():
 
     component_ids = [ c.component_id for c in session.query(Component).all() ]
 
-    def log(details):
-        logger.log(details)
-
-    #pool = Pool()
+    def parse_results(message):
+        # Split out into (code, msg)
+        code, msg = message
+        if code == PROC_STATUS_SUCCESS:
+            logger.log([msg])
+        elif code == PROC_STATUS_SIGNALRAISED:
+            logger.log(['E: Subprocess recieved signal ', msg])
+        else:
+            logger.log(['E: ', msg])
+
+    pool = DakProcessPool()
     for s in suites:
         if s.untouchable and not force:
             utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name)
         for c in component_ids:
-            logger.log(generate_sources(s.suite_id, c))
-            #pool.apply_async(generate_sources, [s.suite_id, c], callback=log)
+            pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_results)
             for a in s.architectures:
-                logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb'))
-                #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log)
-                logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb'))
-                #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log)
+                pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_results)
+                pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_results)
+
+    pool.close()
+    pool.join()
 
-    #pool.close()
-    #pool.join()
     # this script doesn't change the database
     session.close()
 
     logger.close()
 
+    sys.exit(pool.overall_status())
+
 if __name__ == '__main__':
     main()
index 39a76203967de8fbab9bd379e7a130cdab2ea685..3006364602517c17466c075afa82efc0ee7fed56 100755 (executable)
@@ -40,7 +40,6 @@ import bz2
 import apt_pkg
 from tempfile import mkstemp, mkdtemp
 import commands
-from multiprocessing import Pool, TimeoutError
 from sqlalchemy.orm import object_session
 
 from daklib import utils, daklog
@@ -48,10 +47,10 @@ from daklib.regexes import re_gensubrelease, re_includeinrelease
 from daklib.dak_exceptions import *
 from daklib.dbconn import *
 from daklib.config import Config
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS
 
 ################################################################################
 Logger = None                  #: Our logging object
-results = []                   #: Results of the subprocesses
 
 ################################################################################
 
@@ -74,11 +73,6 @@ SUITE can be a space seperated list, e.g.
 
 ########################################################################
 
-def get_result(arg):
-    global results
-    if arg:
-        results.append(arg)
-
 def sign_release_dir(suite, dirname):
     cnf = Config()
 
@@ -294,7 +288,7 @@ class ReleaseWriter(object):
 
 
 def main ():
-    global Logger, results
+    global Logger
 
     cnf = Config()
 
@@ -329,10 +323,8 @@ def main ():
         suites = session.query(Suite).filter(Suite.untouchable == False).all()
 
     broken=[]
-    # For each given suite, run one process
-    results = []
 
-    pool = Pool()
+    pool = DakProcessPool()
 
     for s in suites:
         # Setup a multiprocessing Pool. As many workers as we have CPU cores.
@@ -342,18 +334,17 @@ def main ():
 
         print "Processing %s" % s.suite_name
         Logger.log(['Processing release file for Suite: %s' % (s.suite_name)])
-        pool.apply_async(generate_helper, (s.suite_id, ), callback=get_result)
+        pool.apply_async(generate_helper, (s.suite_id, ))
 
     # No more work will be added to our pool, close it and then wait for all to finish
     pool.close()
     pool.join()
 
-    retcode = 0
+    retcode = pool.overall_status()
 
-    if len(results) > 0:
-        Logger.log(['Release file generation broken: %s' % (results)])
-        print "Release file generation broken:\n", '\n'.join(results)
-        retcode = 1
+    if retcode > 0:
+        # TODO: CENTRAL FUNCTION FOR THIS / IMPROVE LOGGING
+        Logger.log(['Release file generation broken: %s' % (','.join([str(x[1]) for x in pool.results]))])
 
     Logger.close()
 
@@ -365,13 +356,12 @@ def generate_helper(suite_id):
     '''
     session = DBConn().session()
     suite = Suite.get(suite_id, session)
-    try:
-        rw = ReleaseWriter(suite)
-        rw.generate_release_files()
-    except Exception, e:
-        return str(e)
 
-    return
+    # We allow the process handler to catch and deal with any exceptions
+    rw = ReleaseWriter(suite)
+    rw.generate_release_files()
+
+    return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name)
 
 #######################################################################################
 
index f50b7853def339f89466fa63da9a1049aebbd590..513129f9c5d34f91623878ce8e305410bb38a89d 100755 (executable)
@@ -37,7 +37,7 @@ from daklib.regexes import re_source_ext
 from daklib.config import Config
 from daklib import daklog
 from daklib.changesutils import *
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool
 
 # Globals
 Cnf = None
@@ -176,7 +176,7 @@ def do_pkg(changes_file):
         os.stat(htmlfile).st_mtime > os.stat(origchanges).st_mtime:
             sources.add(htmlname)
             session.close()
-            return
+            return (PROC_STATUS_SUCCESS, '%s already up-to-date' % htmlfile)
 
     # Now we'll load the fingerprint
     (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changes_file, session=session)
@@ -221,6 +221,8 @@ def do_pkg(changes_file):
     outfile.close()
     session.close()
 
+    return (PROC_STATUS_SUCCESS, '%s already updated' % htmlfile)
+
 ################################################################################
 
 def usage (exit_code=0):
@@ -266,16 +268,16 @@ def main():
 
     examine_package.use_html=1
 
-    #pool = Pool(processes=1)
+    pool = DakProcessPool()
     for changes_file in changes_files:
         changes_file = utils.validate_changes_file_arg(changes_file, 0)
         if not changes_file:
             continue
         print "\n" + changes_file
-        #pool.apply_async(do_pkg, (changes_file,))
+        pool.apply_async(do_pkg, (changes_file,))
         do_pkg(changes_file)
-    #pool.close()
-    #pool.join()
+    pool.close()
+    pool.join()
 
     files = set(os.listdir(cnf["Show-New::HTMLPath"]))
     to_delete = filter(lambda x: x.endswith(".html"), files.difference(sources))
index fb33b0bdab18160525e1c3183d828f1559abba0e..856dc84103b385f5be41f799c3543ad8ad433d3b 100644 (file)
@@ -32,32 +32,53 @@ import utils
 
 ################################################################################
 
-class Logger:
+class Logger(object):
     "Logger object"
-    Cnf = None
-    logfile = None
-    program = None
+    __shared_state = {}
 
-    def __init__ (self, Cnf, program, debug=0, print_starting=True):
+    def __init__(self, *args, **kwargs):
+        self.__dict__ = self.__shared_state
+
+        if not getattr(self, 'initialised', False):
+            from daklib.config import Config
+            self.initialised = True
+
+            # To be backwards compatibile, dump the first argument if it's a
+            # Config object.  TODO: Fix up all callers and remove this
+            if len(args) > 0 and isinstance(args[0], Config):
+                args = list(args)
+                args.pop(0)
+
+            self.__setup(*args, **kwargs)
+
+
+    def __setup(self, program='unknown', debug=False, print_starting=True, include_pid=False):
         "Initialize a new Logger object"
-        self.Cnf = Cnf
         self.program = program
+        self.debug = debug
+        self.include_pid = include_pid
+
         # Create the log directory if it doesn't exist
-        logdir = Cnf["Dir::Log"]
+        from daklib.config import Config
+        logdir = Config()["Dir::Log"]
         if not os.path.exists(logdir):
             umask = os.umask(00000)
             os.makedirs(logdir, 02775)
             os.umask(umask)
+
         # Open the logfile
         logfilename = "%s/%s" % (logdir, time.strftime("%Y-%m"))
         logfile = None
+
         if debug:
             logfile = sys.stderr
         else:
             umask = os.umask(00002)
             logfile = utils.open_file(logfilename, 'a')
             os.umask(umask)
+
         self.logfile = logfile
+
         if print_starting:
             self.log(["program start"])
 
index 8e66cfdb177342f94b7adcbac1c1c23e47674ba4..86fa74d5208a860a718edfa479568fcd0269608a 100644 (file)
@@ -25,32 +25,82 @@ multiprocessing for DAK
 
 ###############################################################################
 
-import multiprocessing
+from multiprocessing.pool import Pool
+from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
+
 import sqlalchemy.orm.session
 
+__all__ = []
+
+PROC_STATUS_SUCCESS      = 0  # Everything ok
+PROC_STATUS_EXCEPTION    = 1  # An exception was caught
+PROC_STATUS_SIGNALRAISED = 2  # A signal was generated
+PROC_STATUS_MISCFAILURE  = 3  # Process specific error; see message
+
+__all__.extend(['PROC_STATUS_SUCCESS',      'PROC_STATUS_EXCEPTION',
+                'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
+
+class SignalException(Exception):
+    def __init__(self, signum):
+        self.signum = signum
+
+    def __str__(self):
+        return "<SignalException: %d>" % self.signum
+
+__all__.append('SignalException')
+
+def signal_handler(signum, info):
+    raise SignalException(signum)
+
 def _func_wrapper(func, *args, **kwds):
+    # We need to handle signals to avoid hanging
+    signal(SIGHUP, signal_handler)
+    signal(SIGTERM, signal_handler)
+    signal(SIGPIPE, signal_handler)
+    signal(SIGALRM, signal_handler)
+
+    # We expect our callback function to return:
+    # (status, messages)
+    # Where:
+    #  status is one of PROC_STATUS_*
+    #  messages is a string used for logging
     try:
-        return func(*args, **kwds)
+        return (func(*args, **kwds))
+    except SignalException, e:
+        return (PROC_STATUS_SIGNALRAISED, e.signum)
+    except Exception, e:
+        return (PROC_STATUS_EXCEPTION, str(e))
     finally:
         # Make sure connections are closed. We might die otherwise.
         sqlalchemy.orm.session.Session.close_all()
 
-class Pool():
+
+class DakProcessPool(Pool):
     def __init__(self, *args, **kwds):
-        self.pool = multiprocessing.Pool(*args, **kwds)
+        Pool.__init__(self, *args, **kwds)
         self.results = []
+        self.int_results = []
 
     def apply_async(self, func, args=(), kwds={}, callback=None):
         wrapper_args = list(args)
         wrapper_args.insert(0, func)
-        self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback))
-
-    def close(self):
-        self.pool.close()
+        self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
 
     def join(self):
-        self.pool.join()
-        #for r in self.results:
-        #    # return values were already handled in the callbacks, but asking
-        #    # for them might raise exceptions which would otherwise be lost
-        #    r.get()
+        Pool.join(self)
+        for r in self.int_results:
+            # return values were already handled in the callbacks, but asking
+            # for them might raise exceptions which would otherwise be lost
+            self.results.append(r.get())
+
+    def overall_status(self):
+        # Return the highest of our status results
+        # This basically allows us to do sys.exit(overall_status()) and have us
+        # exit 0 if everything was good and non-zero if not
+        status = 0
+        for r in self.results:
+            if r[0] > status:
+                status = r[0]
+        return status
+
+__all__.append('DakProcessPool')
diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py
new file mode 100755 (executable)
index 0000000..2d93e8a
--- /dev/null
@@ -0,0 +1,61 @@
+#!/usr/bin/python
+
+from base_test import DakTestCase
+
+from daklib.dakmultiprocessing import DakProcessPool, \
+                                      PROC_STATUS_SUCCESS,   PROC_STATUS_MISCFAILURE, \
+                                      PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED 
+import signal
+
+def test_function(num, num2):
+    from os import kill, getpid
+
+    if num == 1:
+        sigs = [signal.SIGTERM, signal.SIGPIPE, signal.SIGALRM, signal.SIGHUP]
+        kill(getpid(), sigs[num2])
+
+    if num2 == 3:
+        raise Exception('Test uncaught exception handling')
+
+    if num == 0 and num2 == 1:
+        return (PROC_STATUS_MISCFAILURE, 'Test custom error return')
+
+    return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2))
+
+class DakProcessPoolTestCase(DakTestCase):
+    def testPool(self):
+        def alarm_handler(signum, frame):
+            raise AssertionError('Timed out')
+
+        # Shouldn't take us more than 15 seconds to run this test
+        signal.signal(signal.SIGALRM, alarm_handler)
+        signal.alarm(15)
+
+        p = DakProcessPool()
+        for s in range(3):
+            for j in range(4):
+                p.apply_async(test_function, [s, j])
+
+        p.close()
+        p.join()
+
+        signal.alarm(0)
+        signal.signal(signal.SIGALRM, signal.SIG_DFL)
+
+        expected = [(PROC_STATUS_SUCCESS,      'blah, 0, 0'),
+                    (PROC_STATUS_MISCFAILURE,  'Test custom error return'),
+                    (PROC_STATUS_SUCCESS,      'blah, 0, 2'),
+                    (PROC_STATUS_EXCEPTION,    'Test uncaught exception handling'),
+                    (PROC_STATUS_SIGNALRAISED, 15),
+                    (PROC_STATUS_SIGNALRAISED, 13),
+                    (PROC_STATUS_SIGNALRAISED, 14),
+                    (PROC_STATUS_SIGNALRAISED, 1),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 0'),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 1'),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 2'),
+                    (PROC_STATUS_EXCEPTION,    'Test uncaught exception handling')]
+
+        self.assertEqual( len(p.results), len(expected) )
+
+        for r in range(len(p.results)):
+            self.assertEqual(p.results[r], expected[r])