From 3b8862ae0e21fae9fc552d4df160f45684976d7d Mon Sep 17 00:00:00 2001 From: Mark Hymers Date: Mon, 28 Mar 2011 01:12:39 +0100 Subject: [PATCH] Enhance process pool implementation Signed-off-by: Mark Hymers --- dak/generate_filelist.py | 57 +++++++++++++---------- dak/generate_packages_sources2.py | 35 ++++++++------ dak/generate_releases.py | 33 +++++-------- dak/show_new.py | 4 +- daklib/dakmultiprocessing.py | 77 +++++++++++++++++++++++++------ tests/test_multiprocessing.py | 61 ++++++++++++++++++++++++ 6 files changed, 192 insertions(+), 75 deletions(-) create mode 100755 tests/test_multiprocessing.py diff --git a/dak/generate_filelist.py b/dak/generate_filelist.py index 2a566e06..d015b3ed 100755 --- a/dak/generate_filelist.py +++ b/dak/generate_filelist.py @@ -39,11 +39,13 @@ Generate file lists for apt-ftparchive. from daklib.dbconn import * from daklib.config import Config from daklib import utils, daklog -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED import apt_pkg, os, stat, sys from daklib.lists import getSources, getBinaries, getArchAll +EXIT_STATUS = 0 + def listPath(suite, component, architecture = None, type = None, incremental_mode = False): """returns full path to the list file""" @@ -159,7 +161,7 @@ def main(): Options = cnf.SubTree("Filelist::Options") if Options['Help']: usage() - #pool = Pool() + pool = DakProcessPool() query_suites = query_suites. \ filter(Suite.suite_name.in_(utils.split_args(Options['Suite']))) query_components = query_components. \ @@ -167,8 +169,15 @@ def main(): query_architectures = query_architectures. \ filter(Architecture.arch_string.in_(utils.split_args(Options['Architecture']))) - def log(message): - Logger.log([message]) + def parse_results(message): + # Split out into (code, msg) + code, msg = message + if code == PROC_STATUS_SUCCESS: + Logger.log([msg]) + elif code == PROC_STATUS_SIGNALRAISED: + Logger.log(['E: Subprocess recieved signal ', msg]) + else: + Logger.log(['E: ', msg]) for suite in query_suites: suite_id = suite.suite_id @@ -179,34 +188,32 @@ def main(): if architecture not in suite.architectures: pass elif architecture.arch_string == 'source': - Logger.log([writeSourceList(suite_id, component_id, Options['Incremental'])]) - #pool.apply_async(writeSourceList, - # (suite_id, component_id, Options['Incremental']), callback=log) + pool.apply_async(writeSourceList, + (suite_id, component_id, Options['Incremental']), callback=parse_results) elif architecture.arch_string == 'all': - Logger.log([writeAllList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])]) - #pool.apply_async(writeAllList, - # (suite_id, component_id, architecture_id, 'deb', - # Options['Incremental']), callback=log) - Logger.log([writeAllList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])]) - #pool.apply_async(writeAllList, - # (suite_id, component_id, architecture_id, 'udeb', - # Options['Incremental']), callback=log) + pool.apply_async(writeAllList, + (suite_id, component_id, architecture_id, 'deb', + Options['Incremental']), callback=parse_results) + pool.apply_async(writeAllList, + (suite_id, component_id, architecture_id, 'udeb', + Options['Incremental']), callback=parse_results) else: # arch any - Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])]) - #pool.apply_async(writeBinaryList, - # (suite_id, component_id, architecture_id, 'deb', - # Options['Incremental']), callback=log) - Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])]) - #pool.apply_async(writeBinaryList, - # (suite_id, component_id, architecture_id, 'udeb', - # Options['Incremental']), callback=log) - #pool.close() - #pool.join() + pool.apply_async(writeBinaryList, + (suite_id, component_id, architecture_id, 'deb', + Options['Incremental']), callback=parse_results) + pool.apply_async(writeBinaryList, + (suite_id, component_id, architecture_id, 'udeb', + Options['Incremental']), callback=parse_results) + pool.close() + pool.join() + # this script doesn't change the database session.close() Logger.close() + sys.exit(pool.overall_status()) + if __name__ == '__main__': main() diff --git a/dak/generate_packages_sources2.py b/dak/generate_packages_sources2.py index eea799f5..a7efea24 100755 --- a/dak/generate_packages_sources2.py +++ b/dak/generate_packages_sources2.py @@ -31,11 +31,13 @@ Generate Packages/Sources files from daklib.dbconn import * from daklib.config import Config from daklib import utils, daklog -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED from daklib.filewriter import PackagesFileWriter, SourcesFileWriter import apt_pkg, os, stat, sys +EXIT_STATUS = 0 + def usage(): print """Usage: dak generate-packages-sources2 [OPTIONS] Generate the Packages/Sources files @@ -263,28 +265,35 @@ def main(): component_ids = [ c.component_id for c in session.query(Component).all() ] - def log(details): - logger.log(details) - - #pool = Pool() + def parse_results(message): + # Split out into (code, msg) + code, msg = message + if code == PROC_STATUS_SUCCESS: + Logger.log([msg]) + elif code == PROC_STATUS_SIGNALRAISED: + Logger.log(['E: Subprocess recieved signal ', msg]) + else: + Logger.log(['E: ', msg]) + + pool = DakProcessPool() for s in suites: if s.untouchable and not force: utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name) for c in component_ids: - logger.log(generate_sources(s.suite_id, c)) - #pool.apply_async(generate_sources, [s.suite_id, c], callback=log) + pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_result) for a in s.architectures: - logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb')) - #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log) - logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb')) - #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log) + pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_result) + pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_result) + + pool.close() + pool.join() - #pool.close() - #pool.join() # this script doesn't change the database session.close() logger.close() + sys.exit(pool.output_status()) + if __name__ == '__main__': main() diff --git a/dak/generate_releases.py b/dak/generate_releases.py index 7f5a9963..6c4fed0d 100755 --- a/dak/generate_releases.py +++ b/dak/generate_releases.py @@ -40,7 +40,6 @@ import bz2 import apt_pkg from tempfile import mkstemp, mkdtemp import commands -from multiprocessing import Pool, TimeoutError from sqlalchemy.orm import object_session from daklib import utils, daklog @@ -48,10 +47,10 @@ from daklib.regexes import re_gensubrelease, re_includeinrelease from daklib.dak_exceptions import * from daklib.dbconn import * from daklib.config import Config +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS ################################################################################ Logger = None #: Our logging object -results = [] #: Results of the subprocesses ################################################################################ @@ -74,11 +73,6 @@ SUITE can be a space seperated list, e.g. ######################################################################## -def get_result(arg): - global results - if arg: - results.append(arg) - def sign_release_dir(suite, dirname): cnf = Config() @@ -290,7 +284,7 @@ class ReleaseWriter(object): def main (): - global Logger, results + global Logger cnf = Config() @@ -325,10 +319,8 @@ def main (): suites = session.query(Suite).filter(Suite.untouchable == False).all() broken=[] - # For each given suite, run one process - results = [] - pool = Pool() + pool = DakProcessPool() for s in suites: # Setup a multiprocessing Pool. As many workers as we have CPU cores. @@ -344,12 +336,10 @@ def main (): pool.close() pool.join() - retcode = 0 + retcode = p.overall_status() - if len(results) > 0: - Logger.log(['Release file generation broken: %s' % (results)]) - print "Release file generation broken:\n", '\n'.join(results) - retcode = 1 + if retcode > 0: + Logger.log(['Release file generation broken: %s' % (p.results)]) Logger.close() @@ -361,13 +351,12 @@ def generate_helper(suite_id): ''' session = DBConn().session() suite = Suite.get(suite_id, session) - try: - rw = ReleaseWriter(suite) - rw.generate_release_files() - except Exception, e: - return str(e) - return + # We allow the process handler to catch and deal with any exceptions + rw = ReleaseWriter(suite) + rw.generate_release_files() + + return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name) ####################################################################################### diff --git a/dak/show_new.py b/dak/show_new.py index 84b45077..7396d3cc 100755 --- a/dak/show_new.py +++ b/dak/show_new.py @@ -37,7 +37,7 @@ from daklib.regexes import re_source_ext from daklib.config import Config from daklib import daklog from daklib.changesutils import * -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool # Globals Cnf = None @@ -250,7 +250,7 @@ def main(): examine_package.use_html=1 - pool = Pool() + pool = DakProcessPool() for changes_file in changes_files: changes_file = utils.validate_changes_file_arg(changes_file, 0) if not changes_file: diff --git a/daklib/dakmultiprocessing.py b/daklib/dakmultiprocessing.py index 8e66cfdb..57152bf8 100644 --- a/daklib/dakmultiprocessing.py +++ b/daklib/dakmultiprocessing.py @@ -25,32 +25,83 @@ multiprocessing for DAK ############################################################################### -import multiprocessing +from multiprocessing.pool import Pool +from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGCHLD, SIGALRM + import sqlalchemy.orm.session +__all__ = [] + +PROC_STATUS_SUCCESS = 0 # Everything ok +PROC_STATUS_EXCEPTION = 1 # An exception was caught +PROC_STATUS_SIGNALRAISED = 2 # A signal was generated +PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message + +__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION', + 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE']) + +class SignalException(Exception): + def __init__(self, signum): + self.signum = signum + + def __str__(self): + return "" % self.signum + +__all__.append('SignalException') + +def signal_handler(signum, info): + raise SignalException(signum) + def _func_wrapper(func, *args, **kwds): + # We need to handle signals to avoid hanging + signal(SIGHUP, signal_handler) + signal(SIGTERM, signal_handler) + signal(SIGPIPE, signal_handler) + signal(SIGCHLD, signal_handler) + signal(SIGALRM, signal_handler) + + # We expect our callback function to return: + # (status, messages) + # Where: + # status is one of PROC_STATUS_* + # messages is a string used for logging try: - return func(*args, **kwds) + return (func(*args, **kwds)) + except SignalException, e: + return (PROC_STATUS_SIGNALRAISED, e.signum) + except Exception, e: + return (PROC_STATUS_EXCEPTION, str(e)) finally: # Make sure connections are closed. We might die otherwise. sqlalchemy.orm.session.Session.close_all() -class Pool(): + +class DakProcessPool(Pool): def __init__(self, *args, **kwds): - self.pool = multiprocessing.Pool(*args, **kwds) + Pool.__init__(self, *args, **kwds) self.results = [] + self.int_results = [] def apply_async(self, func, args=(), kwds={}, callback=None): wrapper_args = list(args) wrapper_args.insert(0, func) - self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback)) - - def close(self): - self.pool.close() + self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback)) def join(self): - self.pool.join() - #for r in self.results: - # # return values were already handled in the callbacks, but asking - # # for them might raise exceptions which would otherwise be lost - # r.get() + Pool.join(self) + for r in self.int_results: + # return values were already handled in the callbacks, but asking + # for them might raise exceptions which would otherwise be lost + self.results.append(r.get()) + + def overall_status(self): + # Return the highest of our status results + # This basically allows us to do sys.exit(overall_status()) and have us + # exit 0 if everything was good and non-zero if not + status = 0 + for r in self.results: + if r[0] > status: + status = r[0] + return status + +__all__.append('DakProcessPool') diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py new file mode 100755 index 00000000..c67e51fb --- /dev/null +++ b/tests/test_multiprocessing.py @@ -0,0 +1,61 @@ +#!/usr/bin/python + +from base_test import DakTestCase + +from daklib.dakmultiprocessing import DakProcessPool, \ + PROC_STATUS_SUCCESS, PROC_STATUS_MISCFAILURE, \ + PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED +import signal + +def test_function(num, num2): + from os import kill, getpid + + if num == 1: + sigs = [signal.SIGTERM, signal.SIGCHLD, signal.SIGALRM, signal.SIGHUP] + kill(getpid(), sigs[num2]) + + if num2 == 3: + raise Exception('Test uncaught exception handling') + + if num == 0 and num2 == 1: + return (PROC_STATUS_MISCFAILURE, 'Test custom error return') + + return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2)) + +class DakProcessPoolTestCase(DakTestCase): + def testPool(self): + def alarm_handler(signum, frame): + raise AssertionError('Timed out') + + # Shouldn't take us more than 15 seconds to run this test + signal.signal(signal.SIGALRM, alarm_handler) + signal.alarm(15) + + p = DakProcessPool() + for s in range(3): + for j in range(4): + p.apply_async(test_function, [s, j]) + + p.close() + p.join() + + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + + expected = [(PROC_STATUS_SUCCESS, 'blah, 0, 0'), + (PROC_STATUS_MISCFAILURE, 'Test custom error return'), + (PROC_STATUS_SUCCESS, 'blah, 0, 2'), + (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling'), + (PROC_STATUS_SIGNALRAISED, 15), + (PROC_STATUS_SIGNALRAISED, 17), + (PROC_STATUS_SIGNALRAISED, 14), + (PROC_STATUS_SIGNALRAISED, 1), + (PROC_STATUS_SUCCESS, 'blah, 2, 0'), + (PROC_STATUS_SUCCESS, 'blah, 2, 1'), + (PROC_STATUS_SUCCESS, 'blah, 2, 2'), + (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling')] + + self.assertEqual( len(p.results), len(expected) ) + + for r in range(len(p.results)): + self.assertEqual(p.results[r], expected[r]) -- 2.39.5