From: Mark Hymers Date: Sat, 7 May 2011 12:35:26 +0000 (+0100) Subject: Merge remote branch 'ftpmaster/master' into multiproc X-Git-Url: https://git.decadent.org.uk/gitweb/?a=commitdiff_plain;h=99749f86a3ca945f6f9f322f5694255e47ee7809;hp=e6a26ca720356e214b73b834e02ff94d3b873862;p=dak.git Merge remote branch 'ftpmaster/master' into multiproc Conflicts: dak/generate_packages_sources2.py Signed-off-by: Mark Hymers --- diff --git a/dak/generate_filelist.py b/dak/generate_filelist.py index 2a566e06..98d239c5 100755 --- a/dak/generate_filelist.py +++ b/dak/generate_filelist.py @@ -39,7 +39,7 @@ Generate file lists for apt-ftparchive. from daklib.dbconn import * from daklib.config import Config from daklib import utils, daklog -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED import apt_pkg, os, stat, sys from daklib.lists import getSources, getBinaries, getArchAll @@ -78,7 +78,7 @@ def writeSourceList(suite_id, component_id, incremental_mode): file.write(filename + '\n') session.rollback() file.close() - return message + return (PROC_STATUS_SUCCESS, message) def writeAllList(suite_id, component_id, architecture_id, type, incremental_mode): session = DBConn().session() @@ -95,7 +95,7 @@ def writeAllList(suite_id, component_id, architecture_id, type, incremental_mode file.write(filename + '\n') session.rollback() file.close() - return message + return (PROC_STATUS_SUCCESS, message) def writeBinaryList(suite_id, component_id, architecture_id, type, incremental_mode): session = DBConn().session() @@ -112,7 +112,7 @@ def writeBinaryList(suite_id, component_id, architecture_id, type, incremental_m file.write(filename + '\n') session.rollback() file.close() - return message + return (PROC_STATUS_SUCCESS, message) def usage(): print """Usage: dak generate_filelist [OPTIONS] @@ -159,7 +159,7 @@ def main(): Options = cnf.SubTree("Filelist::Options") if Options['Help']: usage() - #pool = Pool() + pool = DakProcessPool() query_suites = query_suites. \ filter(Suite.suite_name.in_(utils.split_args(Options['Suite']))) query_components = query_components. \ @@ -167,8 +167,15 @@ def main(): query_architectures = query_architectures. \ filter(Architecture.arch_string.in_(utils.split_args(Options['Architecture']))) - def log(message): - Logger.log([message]) + def parse_results(message): + # Split out into (code, msg) + code, msg = message + if code == PROC_STATUS_SUCCESS: + Logger.log([msg]) + elif code == PROC_STATUS_SIGNALRAISED: + Logger.log(['E: Subprocess recieved signal ', msg]) + else: + Logger.log(['E: ', msg]) for suite in query_suites: suite_id = suite.suite_id @@ -179,34 +186,32 @@ def main(): if architecture not in suite.architectures: pass elif architecture.arch_string == 'source': - Logger.log([writeSourceList(suite_id, component_id, Options['Incremental'])]) - #pool.apply_async(writeSourceList, - # (suite_id, component_id, Options['Incremental']), callback=log) + pool.apply_async(writeSourceList, + (suite_id, component_id, Options['Incremental']), callback=parse_results) elif architecture.arch_string == 'all': - Logger.log([writeAllList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])]) - #pool.apply_async(writeAllList, - # (suite_id, component_id, architecture_id, 'deb', - # Options['Incremental']), callback=log) - Logger.log([writeAllList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])]) - #pool.apply_async(writeAllList, - # (suite_id, component_id, architecture_id, 'udeb', - # Options['Incremental']), callback=log) + pool.apply_async(writeAllList, + (suite_id, component_id, architecture_id, 'deb', + Options['Incremental']), callback=parse_results) + pool.apply_async(writeAllList, + (suite_id, component_id, architecture_id, 'udeb', + Options['Incremental']), callback=parse_results) else: # arch any - Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])]) - #pool.apply_async(writeBinaryList, - # (suite_id, component_id, architecture_id, 'deb', - # Options['Incremental']), callback=log) - Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])]) - #pool.apply_async(writeBinaryList, - # (suite_id, component_id, architecture_id, 'udeb', - # Options['Incremental']), callback=log) - #pool.close() - #pool.join() + pool.apply_async(writeBinaryList, + (suite_id, component_id, architecture_id, 'deb', + Options['Incremental']), callback=parse_results) + pool.apply_async(writeBinaryList, + (suite_id, component_id, architecture_id, 'udeb', + Options['Incremental']), callback=parse_results) + pool.close() + pool.join() + # this script doesn't change the database session.close() Logger.close() + sys.exit(pool.overall_status()) + if __name__ == '__main__': main() diff --git a/dak/generate_packages_sources2.py b/dak/generate_packages_sources2.py index 4523252c..8908e3cf 100755 --- a/dak/generate_packages_sources2.py +++ b/dak/generate_packages_sources2.py @@ -31,7 +31,7 @@ Generate Packages/Sources files from daklib.dbconn import * from daklib.config import Config from daklib import utils, daklog -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED from daklib.filewriter import PackagesFileWriter, SourcesFileWriter import apt_pkg, os, stat, sys @@ -119,7 +119,7 @@ def generate_sources(suite_id, component_id): message = ["generate sources", suite.suite_name, component.component_name] session.rollback() - return message + return (PROC_STATUS_SUCCESS, message) ############################################################################# @@ -225,7 +225,7 @@ def generate_packages(suite_id, component_id, architecture_id, type_name): message = ["generate-packages", suite.suite_name, component.component_name, architecture.arch_string] session.rollback() - return message + return (PROC_STATUS_SUCCESS, message) ############################################################################# @@ -265,30 +265,37 @@ def main(): component_ids = [ c.component_id for c in session.query(Component).all() ] - def log(details): - logger.log(details) - - #pool = Pool() + def parse_results(message): + # Split out into (code, msg) + code, msg = message + if code == PROC_STATUS_SUCCESS: + logger.log([msg]) + elif code == PROC_STATUS_SIGNALRAISED: + logger.log(['E: Subprocess recieved signal ', msg]) + else: + logger.log(['E: ', msg]) + + pool = DakProcessPool() for s in suites: if s.untouchable and not force: utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name) for c in component_ids: - logger.log(generate_sources(s.suite_id, c)) - #pool.apply_async(generate_sources, [s.suite_id, c], callback=log) + pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_results) for a in s.architectures: if a == 'source': continue - logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb')) - #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log) - logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb')) - #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log) + pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_results) + pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_results) + + pool.close() + pool.join() - #pool.close() - #pool.join() # this script doesn't change the database session.close() logger.close() + sys.exit(pool.overall_status()) + if __name__ == '__main__': main() diff --git a/dak/generate_releases.py b/dak/generate_releases.py index 39a76203..30063646 100755 --- a/dak/generate_releases.py +++ b/dak/generate_releases.py @@ -40,7 +40,6 @@ import bz2 import apt_pkg from tempfile import mkstemp, mkdtemp import commands -from multiprocessing import Pool, TimeoutError from sqlalchemy.orm import object_session from daklib import utils, daklog @@ -48,10 +47,10 @@ from daklib.regexes import re_gensubrelease, re_includeinrelease from daklib.dak_exceptions import * from daklib.dbconn import * from daklib.config import Config +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS ################################################################################ Logger = None #: Our logging object -results = [] #: Results of the subprocesses ################################################################################ @@ -74,11 +73,6 @@ SUITE can be a space seperated list, e.g. ######################################################################## -def get_result(arg): - global results - if arg: - results.append(arg) - def sign_release_dir(suite, dirname): cnf = Config() @@ -294,7 +288,7 @@ class ReleaseWriter(object): def main (): - global Logger, results + global Logger cnf = Config() @@ -329,10 +323,8 @@ def main (): suites = session.query(Suite).filter(Suite.untouchable == False).all() broken=[] - # For each given suite, run one process - results = [] - pool = Pool() + pool = DakProcessPool() for s in suites: # Setup a multiprocessing Pool. As many workers as we have CPU cores. @@ -342,18 +334,17 @@ def main (): print "Processing %s" % s.suite_name Logger.log(['Processing release file for Suite: %s' % (s.suite_name)]) - pool.apply_async(generate_helper, (s.suite_id, ), callback=get_result) + pool.apply_async(generate_helper, (s.suite_id, )) # No more work will be added to our pool, close it and then wait for all to finish pool.close() pool.join() - retcode = 0 + retcode = pool.overall_status() - if len(results) > 0: - Logger.log(['Release file generation broken: %s' % (results)]) - print "Release file generation broken:\n", '\n'.join(results) - retcode = 1 + if retcode > 0: + # TODO: CENTRAL FUNCTION FOR THIS / IMPROVE LOGGING + Logger.log(['Release file generation broken: %s' % (','.join([str(x[1]) for x in pool.results]))]) Logger.close() @@ -365,13 +356,12 @@ def generate_helper(suite_id): ''' session = DBConn().session() suite = Suite.get(suite_id, session) - try: - rw = ReleaseWriter(suite) - rw.generate_release_files() - except Exception, e: - return str(e) - return + # We allow the process handler to catch and deal with any exceptions + rw = ReleaseWriter(suite) + rw.generate_release_files() + + return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name) ####################################################################################### diff --git a/dak/show_new.py b/dak/show_new.py index f50b7853..513129f9 100755 --- a/dak/show_new.py +++ b/dak/show_new.py @@ -37,7 +37,7 @@ from daklib.regexes import re_source_ext from daklib.config import Config from daklib import daklog from daklib.changesutils import * -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool # Globals Cnf = None @@ -176,7 +176,7 @@ def do_pkg(changes_file): os.stat(htmlfile).st_mtime > os.stat(origchanges).st_mtime: sources.add(htmlname) session.close() - return + return (PROC_STATUS_SUCCESS, '%s already up-to-date' % htmlfile) # Now we'll load the fingerprint (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changes_file, session=session) @@ -221,6 +221,8 @@ def do_pkg(changes_file): outfile.close() session.close() + return (PROC_STATUS_SUCCESS, '%s already updated' % htmlfile) + ################################################################################ def usage (exit_code=0): @@ -266,16 +268,16 @@ def main(): examine_package.use_html=1 - #pool = Pool(processes=1) + pool = DakProcessPool() for changes_file in changes_files: changes_file = utils.validate_changes_file_arg(changes_file, 0) if not changes_file: continue print "\n" + changes_file - #pool.apply_async(do_pkg, (changes_file,)) + pool.apply_async(do_pkg, (changes_file,)) do_pkg(changes_file) - #pool.close() - #pool.join() + pool.close() + pool.join() files = set(os.listdir(cnf["Show-New::HTMLPath"])) to_delete = filter(lambda x: x.endswith(".html"), files.difference(sources)) diff --git a/daklib/daklog.py b/daklib/daklog.py index fb33b0bd..856dc841 100644 --- a/daklib/daklog.py +++ b/daklib/daklog.py @@ -32,32 +32,53 @@ import utils ################################################################################ -class Logger: +class Logger(object): "Logger object" - Cnf = None - logfile = None - program = None + __shared_state = {} - def __init__ (self, Cnf, program, debug=0, print_starting=True): + def __init__(self, *args, **kwargs): + self.__dict__ = self.__shared_state + + if not getattr(self, 'initialised', False): + from daklib.config import Config + self.initialised = True + + # To be backwards compatibile, dump the first argument if it's a + # Config object. TODO: Fix up all callers and remove this + if len(args) > 0 and isinstance(args[0], Config): + args = list(args) + args.pop(0) + + self.__setup(*args, **kwargs) + + + def __setup(self, program='unknown', debug=False, print_starting=True, include_pid=False): "Initialize a new Logger object" - self.Cnf = Cnf self.program = program + self.debug = debug + self.include_pid = include_pid + # Create the log directory if it doesn't exist - logdir = Cnf["Dir::Log"] + from daklib.config import Config + logdir = Config()["Dir::Log"] if not os.path.exists(logdir): umask = os.umask(00000) os.makedirs(logdir, 02775) os.umask(umask) + # Open the logfile logfilename = "%s/%s" % (logdir, time.strftime("%Y-%m")) logfile = None + if debug: logfile = sys.stderr else: umask = os.umask(00002) logfile = utils.open_file(logfilename, 'a') os.umask(umask) + self.logfile = logfile + if print_starting: self.log(["program start"]) diff --git a/daklib/dakmultiprocessing.py b/daklib/dakmultiprocessing.py index 8e66cfdb..86fa74d5 100644 --- a/daklib/dakmultiprocessing.py +++ b/daklib/dakmultiprocessing.py @@ -25,32 +25,82 @@ multiprocessing for DAK ############################################################################### -import multiprocessing +from multiprocessing.pool import Pool +from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM + import sqlalchemy.orm.session +__all__ = [] + +PROC_STATUS_SUCCESS = 0 # Everything ok +PROC_STATUS_EXCEPTION = 1 # An exception was caught +PROC_STATUS_SIGNALRAISED = 2 # A signal was generated +PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message + +__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION', + 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE']) + +class SignalException(Exception): + def __init__(self, signum): + self.signum = signum + + def __str__(self): + return "" % self.signum + +__all__.append('SignalException') + +def signal_handler(signum, info): + raise SignalException(signum) + def _func_wrapper(func, *args, **kwds): + # We need to handle signals to avoid hanging + signal(SIGHUP, signal_handler) + signal(SIGTERM, signal_handler) + signal(SIGPIPE, signal_handler) + signal(SIGALRM, signal_handler) + + # We expect our callback function to return: + # (status, messages) + # Where: + # status is one of PROC_STATUS_* + # messages is a string used for logging try: - return func(*args, **kwds) + return (func(*args, **kwds)) + except SignalException, e: + return (PROC_STATUS_SIGNALRAISED, e.signum) + except Exception, e: + return (PROC_STATUS_EXCEPTION, str(e)) finally: # Make sure connections are closed. We might die otherwise. sqlalchemy.orm.session.Session.close_all() -class Pool(): + +class DakProcessPool(Pool): def __init__(self, *args, **kwds): - self.pool = multiprocessing.Pool(*args, **kwds) + Pool.__init__(self, *args, **kwds) self.results = [] + self.int_results = [] def apply_async(self, func, args=(), kwds={}, callback=None): wrapper_args = list(args) wrapper_args.insert(0, func) - self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback)) - - def close(self): - self.pool.close() + self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback)) def join(self): - self.pool.join() - #for r in self.results: - # # return values were already handled in the callbacks, but asking - # # for them might raise exceptions which would otherwise be lost - # r.get() + Pool.join(self) + for r in self.int_results: + # return values were already handled in the callbacks, but asking + # for them might raise exceptions which would otherwise be lost + self.results.append(r.get()) + + def overall_status(self): + # Return the highest of our status results + # This basically allows us to do sys.exit(overall_status()) and have us + # exit 0 if everything was good and non-zero if not + status = 0 + for r in self.results: + if r[0] > status: + status = r[0] + return status + +__all__.append('DakProcessPool') diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py new file mode 100755 index 00000000..2d93e8aa --- /dev/null +++ b/tests/test_multiprocessing.py @@ -0,0 +1,61 @@ +#!/usr/bin/python + +from base_test import DakTestCase + +from daklib.dakmultiprocessing import DakProcessPool, \ + PROC_STATUS_SUCCESS, PROC_STATUS_MISCFAILURE, \ + PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED +import signal + +def test_function(num, num2): + from os import kill, getpid + + if num == 1: + sigs = [signal.SIGTERM, signal.SIGPIPE, signal.SIGALRM, signal.SIGHUP] + kill(getpid(), sigs[num2]) + + if num2 == 3: + raise Exception('Test uncaught exception handling') + + if num == 0 and num2 == 1: + return (PROC_STATUS_MISCFAILURE, 'Test custom error return') + + return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2)) + +class DakProcessPoolTestCase(DakTestCase): + def testPool(self): + def alarm_handler(signum, frame): + raise AssertionError('Timed out') + + # Shouldn't take us more than 15 seconds to run this test + signal.signal(signal.SIGALRM, alarm_handler) + signal.alarm(15) + + p = DakProcessPool() + for s in range(3): + for j in range(4): + p.apply_async(test_function, [s, j]) + + p.close() + p.join() + + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + + expected = [(PROC_STATUS_SUCCESS, 'blah, 0, 0'), + (PROC_STATUS_MISCFAILURE, 'Test custom error return'), + (PROC_STATUS_SUCCESS, 'blah, 0, 2'), + (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling'), + (PROC_STATUS_SIGNALRAISED, 15), + (PROC_STATUS_SIGNALRAISED, 13), + (PROC_STATUS_SIGNALRAISED, 14), + (PROC_STATUS_SIGNALRAISED, 1), + (PROC_STATUS_SUCCESS, 'blah, 2, 0'), + (PROC_STATUS_SUCCESS, 'blah, 2, 1'), + (PROC_STATUS_SUCCESS, 'blah, 2, 2'), + (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling')] + + self.assertEqual( len(p.results), len(expected) ) + + for r in range(len(p.results)): + self.assertEqual(p.results[r], expected[r])