From: Mark Hymers Date: Sat, 9 Apr 2011 08:42:22 +0000 (+0100) Subject: Merge remote branch 'ftpmaster/master' into multiproc X-Git-Url: https://git.decadent.org.uk/gitweb/?p=dak.git;a=commitdiff_plain;h=475051efae41a30723cdc1ab82c521cd1accf75b;hp=-c Merge remote branch 'ftpmaster/master' into multiproc Conflicts: dak/show_new.py Signed-off-by: Mark Hymers --- 475051efae41a30723cdc1ab82c521cd1accf75b diff --combined dak/generate_packages_sources2.py index 2b792f3c,fbae045f..b157fcbe --- a/dak/generate_packages_sources2.py +++ b/dak/generate_packages_sources2.py @@@ -31,7 -31,7 +31,7 @@@ Generate Packages/Sources file from daklib.dbconn import * from daklib.config import Config from daklib import utils, daklog -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED from daklib.filewriter import PackagesFileWriter, SourcesFileWriter import apt_pkg, os, stat, sys @@@ -119,7 -119,7 +119,7 @@@ def generate_sources(suite_id, componen message = ["generate sources", suite.suite_name, component.component_name] session.rollback() - return message + return (PROC_STATUS_SUCCESS, message) ############################################################################# @@@ -165,7 -165,9 +165,9 @@@ SELEC || COALESCE(E'\n' || (SELECT STRING_AGG(key || '\: ' || value, E'\n' ORDER BY key) FROM external_overrides eo - WHERE eo.package = tmp.package + WHERE + eo.package = tmp.package + AND eo.suite = :overridesuite AND eo.component = :component ), '') || E'\nSection\: ' || sec.section || E'\nPriority\: ' || pri.priority @@@ -223,7 -225,7 +225,7 @@@ def generate_packages(suite_id, compone message = ["generate-packages", suite.suite_name, component.component_name, architecture.arch_string] session.rollback() - return message + return (PROC_STATUS_SUCCESS, message) ############################################################################# @@@ -263,35 -265,28 +265,35 @@@ def main() component_ids = [ c.component_id for c in session.query(Component).all() ] - def log(details): - logger.log(details) - - #pool = Pool() + def parse_results(message): + # Split out into (code, msg) + code, msg = message + if code == PROC_STATUS_SUCCESS: + logger.log([msg]) + elif code == PROC_STATUS_SIGNALRAISED: + logger.log(['E: Subprocess recieved signal ', msg]) + else: + logger.log(['E: ', msg]) + + pool = DakProcessPool() for s in suites: if s.untouchable and not force: utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name) for c in component_ids: - logger.log(generate_sources(s.suite_id, c)) - #pool.apply_async(generate_sources, [s.suite_id, c], callback=log) + pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_results) for a in s.architectures: - logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb')) - #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log) - logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb')) - #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log) + pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_results) + pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_results) + + pool.close() + pool.join() - #pool.close() - #pool.join() # this script doesn't change the database session.close() logger.close() + sys.exit(pool.overall_status()) + if __name__ == '__main__': main() diff --combined dak/generate_releases.py index 8befbc99,39a76203..30063646 --- a/dak/generate_releases.py +++ b/dak/generate_releases.py @@@ -40,6 -40,7 +40,6 @@@ import bz import apt_pkg from tempfile import mkstemp, mkdtemp import commands -from multiprocessing import Pool, TimeoutError from sqlalchemy.orm import object_session from daklib import utils, daklog @@@ -47,10 -48,10 +47,10 @@@ from daklib.regexes import re_gensubrel from daklib.dak_exceptions import * from daklib.dbconn import * from daklib.config import Config +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS ################################################################################ Logger = None #: Our logging object -results = [] #: Results of the subprocesses ################################################################################ @@@ -73,6 -74,11 +73,6 @@@ SUITE can be a space seperated list, e. ######################################################################## -def get_result(arg): - global results - if arg: - results.append(arg) - def sign_release_dir(suite, dirname): cnf = Config() @@@ -152,7 -158,11 +152,11 @@@ class ReleaseWriter(object) for key, dbfield in attribs: if getattr(suite, dbfield) is not None: - out.write("%s: %s\n" % (key, getattr(suite, dbfield))) + # TEMPORARY HACK HACK HACK until we change the way we store the suite names etc + if key == 'Suite' and getattr(suite, dbfield) == 'squeeze-updates': + out.write("Suite: stable-updates\n") + else: + out.write("%s: %s\n" % (key, getattr(suite, dbfield))) out.write("Date: %s\n" % (time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime(time.time())))) @@@ -284,7 -294,7 +288,7 @@@ def main (): - global Logger, results + global Logger cnf = Config() @@@ -319,8 -329,10 +323,8 @@@ suites = session.query(Suite).filter(Suite.untouchable == False).all() broken=[] - # For each given suite, run one process - results = [] - pool = Pool() + pool = DakProcessPool() for s in suites: # Setup a multiprocessing Pool. As many workers as we have CPU cores. @@@ -330,17 -342,18 +334,17 @@@ print "Processing %s" % s.suite_name Logger.log(['Processing release file for Suite: %s' % (s.suite_name)]) - pool.apply_async(generate_helper, (s.suite_id, ), callback=get_result) + pool.apply_async(generate_helper, (s.suite_id, )) # No more work will be added to our pool, close it and then wait for all to finish pool.close() pool.join() - retcode = 0 + retcode = pool.overall_status() - if len(results) > 0: - Logger.log(['Release file generation broken: %s' % (results)]) - print "Release file generation broken:\n", '\n'.join(results) - retcode = 1 + if retcode > 0: + # TODO: CENTRAL FUNCTION FOR THIS / IMPROVE LOGGING + Logger.log(['Release file generation broken: %s' % (','.join([str(x[1]) for x in pool.results]))]) Logger.close() @@@ -352,12 -365,13 +356,12 @@@ def generate_helper(suite_id) ''' session = DBConn().session() suite = Suite.get(suite_id, session) - try: - rw = ReleaseWriter(suite) - rw.generate_release_files() - except Exception, e: - return str(e) - return + # We allow the process handler to catch and deal with any exceptions + rw = ReleaseWriter(suite) + rw.generate_release_files() + + return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name) ####################################################################################### diff --combined dak/show_new.py index 3b052c18,f50b7853..513129f9 --- a/dak/show_new.py +++ b/dak/show_new.py @@@ -37,7 -37,7 +37,7 @@@ from daklib.regexes import re_source_ex from daklib.config import Config from daklib import daklog from daklib.changesutils import * -from daklib.dakmultiprocessing import Pool +from daklib.dakmultiprocessing import DakProcessPool # Globals Cnf = None @@@ -154,23 -154,39 +154,39 @@@ def do_pkg(changes_file) session = DBConn().session() u = Upload() u.pkg.changes_file = changes_file - (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changes_file) + # We can afoord not to check the signature before loading the changes file + # as we've validated it already (otherwise it couldn't be in new) + # and we can more quickly skip over already processed files this way u.load_changes(changes_file) - new_queue = get_policy_queue('new', session ); - u.pkg.directory = new_queue.path - u.update_subst() + origchanges = os.path.abspath(u.pkg.changes_file) - files = u.pkg.files - changes = u.pkg.changes - htmlname = changes["source"] + "_" + changes["version"] + ".html" - sources.add(htmlname) - htmlfile = os.path.join(cnf["Show-New::HTMLPath"], htmlname) + # Still be cautious in case paring the changes file went badly + if u.pkg.changes.has_key('source') and u.pkg.changes.has_key('version'): + htmlname = u.pkg.changes["source"] + "_" + u.pkg.changes["version"] + ".html" + htmlfile = os.path.join(cnf["Show-New::HTMLPath"], htmlname) + else: + # Changes file was bad + print "Changes file %s missing source or version field" % changes_file + session.close() + return + + # Have we already processed this? if os.path.exists(htmlfile) and \ os.stat(htmlfile).st_mtime > os.stat(origchanges).st_mtime: + sources.add(htmlname) session.close() - return + return (PROC_STATUS_SUCCESS, '%s already up-to-date' % htmlfile) + # Now we'll load the fingerprint + (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changes_file, session=session) + new_queue = get_policy_queue('new', session ); + u.pkg.directory = new_queue.path + u.update_subst() + files = u.pkg.files + changes = u.pkg.changes + sources.add(htmlname) + for deb_filename, f in files.items(): if deb_filename.endswith(".udeb") or deb_filename.endswith(".deb"): u.binary_file_checks(deb_filename, session) @@@ -205,8 -221,6 +221,8 @@@ outfile.close() session.close() + return (PROC_STATUS_SUCCESS, '%s already updated' % htmlfile) + ################################################################################ def usage (exit_code=0): @@@ -252,15 -266,16 +268,16 @@@ def main() examine_package.use_html=1 - #pool = Pool(processes=1) + pool = DakProcessPool() for changes_file in changes_files: changes_file = utils.validate_changes_file_arg(changes_file, 0) if not changes_file: continue print "\n" + changes_file - #pool.apply_async(do_pkg, (changes_file,)) + pool.apply_async(do_pkg, (changes_file,)) + do_pkg(changes_file) - #pool.close() - #pool.join() + pool.close() + pool.join() files = set(os.listdir(cnf["Show-New::HTMLPath"])) to_delete = filter(lambda x: x.endswith(".html"), files.difference(sources))