X-Git-Url: https://git.decadent.org.uk/gitweb/?p=dak.git;a=blobdiff_plain;f=dak%2Fprocess_upload.py;h=d5425e058ccfa2de6c87dc43e8ec24cce83c5915;hp=22c333c9910a86cf8148492f28a18ea1c724a1e2;hb=026bdb9580e6d18bf504d2f44d46890df5d89f1a;hpb=e6372e0f8b1889b6ec463392b973a99e4096d661 diff --git a/dak/process_upload.py b/dak/process_upload.py index 22c333c9..d5425e05 100755 --- a/dak/process_upload.py +++ b/dak/process_upload.py @@ -159,24 +159,29 @@ Checks Debian packages from Incoming ## Queue builds +import datetime +import errno from errno import EACCES, EAGAIN import fcntl import os import sys import traceback import apt_pkg +import time from sqlalchemy.orm.exc import NoResultFound from daklib import daklog -from daklib.queue import * -from daklib.queue_install import * -from daklib import utils from daklib.dbconn import * from daklib.urgencylog import UrgencyLog from daklib.summarystats import SummaryStats -from daklib.holding import Holding from daklib.config import Config -from daklib.regexes import re_match_expired +import daklib.utils as utils +from daklib.regexes import * + +import daklib.announce +import daklib.archive +import daklib.checks +import daklib.upload ############################################################################### @@ -188,6 +193,7 @@ Logger = None def usage (exit_code=0): print """Usage: dak process-upload [OPTION]... [CHANGES]... -a, --automatic automatic run + -d, --directory process uploads in -h, --help show this help and exit. -n, --no-action don't do anything -p, --no-lock don't check lockfile !! for cron.daily only !! @@ -197,91 +203,214 @@ def usage (exit_code=0): ############################################################################### -def byebye(): - if not Options["No-Action"]: - # Clean out the queue files - session = DBConn().session() - session.execute("DELETE FROM changes_pending_files WHERE id NOT IN (SELECT file_id FROM changes_pending_files_map )") - session.commit() +def try_or_reject(function): + """Try to call function or reject the upload if that fails + """ + def wrapper(directory, upload, *args, **kwargs): + reason = 'No exception caught. This should not happen.' + + try: + return function(directory, upload, *args, **kwargs) + except (daklib.archive.ArchiveException, daklib.checks.Reject) as e: + reason = unicode(e) + except Exception as e: + reason = "There was an uncaught exception when processing your upload:\n{0}\nAny original reject reason follows below.".format(traceback.format_exc()) + + try: + upload.rollback() + return real_reject(directory, upload, reason=reason) + except Exception as e: + reason = "In addition there was an exception when rejecting the package:\n{0}\nPrevious reasons:\n{1}".format(traceback.format_exc(), reason) + upload.rollback() + return real_reject(directory, upload, reason=reason, notify=False) + + raise Exception('Rejecting upload failed after multiple tries. Giving up. Last reason:\n{0}'.format(reason)) + + return wrapper + +def get_processed_upload(upload): + changes = upload.changes + control = upload.changes.changes + + pu = daklib.announce.ProcessedUpload() + + pu.maintainer = control.get('Maintainer') + pu.changed_by = control.get('Changed-By') + pu.fingerprint = changes.primary_fingerprint + + pu.suites = upload.final_suites or [] + pu.from_policy_suites = [] + + pu.changes = open(upload.changes.path, 'r').read() + pu.changes_filename = upload.changes.filename + pu.sourceful = upload.changes.sourceful + pu.source = control.get('Source') + pu.version = control.get('Version') + pu.architecture = control.get('Architecture') + pu.bugs = changes.closed_bugs + + pu.program = "process-upload" + + pu.warnings = upload.warnings + + return pu + +@try_or_reject +def accept(directory, upload): + cnf = Config() + + Logger.log(['ACCEPT', upload.changes.filename]) + print "ACCEPT" + + upload.install() + + accepted_to_real_suite = False + for suite in upload.final_suites: + accepted_to_real_suite = accepted_to_real_suite or suite.policy_queue is None + + sourceful_upload = 'source' in upload.changes.architectures + + control = upload.changes.changes + if sourceful_upload and not Options['No-Action']: + urgency = control.get('Urgency') + if urgency not in cnf.value_list('Urgency::Valid'): + urgency = cnf['Urgency::Default'] + UrgencyLog().log(control['Source'], control['Version'], urgency) + + pu = get_processed_upload(upload) + daklib.announce.announce_accept(pu) + + # Move .changes to done, but only for uploads that were accepted to a + # real suite. process-policy will handle this for uploads to queues. + if accepted_to_real_suite: + src = os.path.join(upload.directory, upload.changes.filename) + + now = datetime.datetime.now() + donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d')) + dst = os.path.join(donedir, upload.changes.filename) + dst = utils.find_next_free(dst) + + upload.transaction.fs.copy(src, dst, mode=0o644) + + SummaryStats().accept_count += 1 + SummaryStats().accept_bytes += upload.changes.bytes + +@try_or_reject +def accept_to_new(directory, upload): + + Logger.log(['ACCEPT-TO-NEW', upload.changes.filename]) + print "ACCEPT-TO-NEW" + + upload.install_to_new() + # TODO: tag bugs pending + + pu = get_processed_upload(upload) + daklib.announce.announce_new(pu) + + SummaryStats().accept_count += 1 + SummaryStats().accept_bytes += upload.changes.bytes + +@try_or_reject +def reject(directory, upload, reason=None, notify=True): + real_reject(directory, upload, reason, notify) + +def real_reject(directory, upload, reason=None, notify=True): + # XXX: rejection itself should go to daklib.archive.ArchiveUpload + cnf = Config() + + Logger.log(['REJECT', upload.changes.filename]) + print "REJECT" + fs = upload.transaction.fs + rejectdir = cnf['Dir::Reject'] + files = [ f.filename for f in upload.changes.files.itervalues() ] + files.append(upload.changes.filename) + + for fn in files: + src = os.path.join(upload.directory, fn) + dst = utils.find_next_free(os.path.join(rejectdir, fn)) + if not os.path.exists(src): + continue + fs.copy(src, dst) + + if upload.reject_reasons is not None: + if reason is None: + reason = '' + reason = reason + '\n' + '\n'.join(upload.reject_reasons) + + if reason is None: + reason = '(Unknown reason. Please check logs.)' + + dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(upload.changes.filename))) + fh = fs.create(dst) + fh.write(reason) + fh.close() + + if notify: + pu = get_processed_upload(upload) + daklib.announce.announce_reject(pu, reason) + + SummaryStats().reject_count += 1 + +############################################################################### + +def action(directory, upload): + changes = upload.changes + processed = True -def action(u, session): global Logger cnf = Config() - holding = Holding() - # changes["distribution"] may not exist in corner cases - # (e.g. unreadable changes files) - if not u.pkg.changes.has_key("distribution") or not isinstance(u.pkg.changes["distribution"], dict): - u.pkg.changes["distribution"] = {} + okay = upload.check() - (summary, short_summary) = u.build_summaries() + summary = changes.changes.get('Changes', '') + + package_info = [] + if okay: + if changes.source is not None: + package_info.append("source:{0}".format(changes.source.dsc['Source'])) + for binary in changes.binaries: + package_info.append("binary:{0}".format(binary.control['Package'])) (prompt, answer) = ("", "XXX") if Options["No-Action"] or Options["Automatic"]: answer = 'S' - queuekey = '' - - pi = u.package_info() - - try: - chg = session.query(DBChange).filter_by(changesname=os.path.basename(u.pkg.changes_file)).one() - except NoResultFound, e: - chg = None - - if len(u.rejects) > 0: - if u.upload_too_new(): - print "SKIP (too new)\n" + pi, + print summary + print + print "\n".join(package_info) + print + if len(upload.warnings) > 0: + print "\n".join(upload.warnings) + print + + if len(upload.reject_reasons) > 0: + print "Reason:" + print "\n".join(upload.reject_reasons) + print + + path = os.path.join(directory, changes.filename) + created = os.stat(path).st_mtime + now = time.time() + too_new = (now - created < int(cnf['Dinstall::SkipTime'])) + + if too_new: + print "SKIP (too new)" prompt = "[S]kip, Quit ?" else: - print "REJECT\n" + pi prompt = "[R]eject, Skip, Quit ?" if Options["Automatic"]: answer = 'R' + elif upload.new: + prompt = "[N]ew, Skip, Quit ?" + if Options['Automatic']: + answer = 'N' else: - # Are we headed for NEW / BYHAND / AUTOBYHAND? - # Note that policy queues are no longer handled here - qu = determine_target(u) - if qu: - print "%s for %s\n%s%s" % ( qu.upper(), ", ".join(u.pkg.changes["distribution"].keys()), pi, summary) - queuekey = qu[0].upper() - if queuekey in "RQSA": - queuekey = "D" - prompt = "[D]ivert, Skip, Quit ?" - else: - prompt = "[%s]%s, Skip, Quit ?" % (queuekey, qu[1:].lower()) - if Options["Automatic"]: - answer = queuekey - else: - # Does suite have a policy_queue configured - divert = False - for s in u.pkg.changes["distribution"].keys(): - suite = get_suite(s, session) - if suite.policy_queue: - if not chg or chg.approved_for_id != suite.policy_queue.policy_queue_id: - # This routine will check whether the upload is a binary - # upload when the source is already in the target suite. If - # so, we skip the policy queue, otherwise we go there. - divert = package_to_suite(u, suite.suite_name, session=session) - if divert: - print "%s for %s\n%s%s" % ( suite.policy_queue.queue_name.upper(), - ", ".join(u.pkg.changes["distribution"].keys()), - pi, summary) - queuekey = "P" - prompt = "[P]olicy, Skip, Quit ?" - policyqueue = suite.policy_queue - if Options["Automatic"]: - answer = 'P' - break - - if not divert: - print "ACCEPT\n" + pi + summary, - prompt = "[A]ccept, Skip, Quit ?" - if Options["Automatic"]: - answer = 'A' + prompt = "[A]ccept, Skip, Quit ?" + if Options['Automatic']: + answer = 'A' while prompt.find(answer) == -1: answer = utils.our_raw_input(prompt) @@ -291,131 +420,78 @@ def action(u, session): answer = answer[:1].upper() if answer == 'R': - os.chdir(u.pkg.directory) - u.do_reject(0, pi) + reject(directory, upload) elif answer == 'A': - if not chg: - chg = u.pkg.add_known_changes(holding.holding_dir, session=session, logger=Logger) - session.commit() - u.accept(summary, short_summary, session) - u.check_override() - chg.clean_from_queue() - session.commit() - u.remove() - elif answer == 'P': - if not chg: - chg = u.pkg.add_known_changes(holding.holding_dir, session=session, logger=Logger) - package_to_queue(u, summary, short_summary, policyqueue, chg, session) - session.commit() - u.remove() - elif answer == queuekey: - if not chg: - chg = u.pkg.add_known_changes(holding.holding_dir, session=session, logger=Logger) - QueueInfo[qu]["process"](u, summary, short_summary, chg, session) - session.commit() - u.remove() + # upload.try_autobyhand must not be run with No-Action. + if Options['No-Action']: + accept(directory, upload) + elif upload.try_autobyhand(): + accept(directory, upload) + else: + print "W: redirecting to BYHAND as automatic processing failed." + accept_to_new(directory, upload) + elif answer == 'N': + accept_to_new(directory, upload) elif answer == 'Q': - byebye() sys.exit(0) + elif answer == 'S': + processed = False - session.commit() + if not Options['No-Action']: + upload.commit() + + return processed ############################################################################### -def cleanup(): - h = Holding() - if not Options["No-Action"]: - h.clean() +def unlink_if_exists(path): + try: + os.unlink(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise -def process_it(changes_file, session): +def process_it(directory, changes, keyrings): global Logger - Logger.log(["Processing changes file", changes_file]) - - cnf = Config() + print "\n{0}\n".format(changes.filename) + Logger.log(["Processing changes file", changes.filename]) - holding = Holding() + with daklib.archive.ArchiveUpload(directory, changes, keyrings) as upload: + processed = action(directory, upload) + if processed and not Options['No-Action']: + session = DBConn().session() + history = SignatureHistory.from_signed_file(upload.changes) + if history.query(session) is None: + session.add(history) + session.commit() + session.close() - # TODO: Actually implement using pending* tables so that we don't lose track - # of what is where + unlink_if_exists(os.path.join(directory, changes.filename)) + for fn in changes.files: + unlink_if_exists(os.path.join(directory, fn)) - u = Upload() - u.pkg.changes_file = changes_file - u.pkg.directory = os.getcwd() - u.logger = Logger - origchanges = os.path.abspath(u.pkg.changes_file) - - # Some defaults in case we can't fully process the .changes file - u.pkg.changes["maintainer2047"] = cnf["Dinstall::MyEmailAddress"] - u.pkg.changes["changedby2047"] = cnf["Dinstall::MyEmailAddress"] - - # debian-{devel-,}-changes@lists.debian.org toggles writes access based on this header - bcc = "X-DAK: dak process-upload" - if cnf.has_key("Dinstall::Bcc"): - u.Subst["__BCC__"] = bcc + "\nBcc: %s" % (cnf["Dinstall::Bcc"]) - else: - u.Subst["__BCC__"] = bcc +############################################################################### - # Remember where we are so we can come back after cd-ing into the - # holding directory. TODO: Fix this stupid hack - u.prevdir = os.getcwd() +def process_changes(changes_filenames): + session = DBConn().session() + keyrings = session.query(Keyring).filter_by(active=True).order_by(Keyring.priority) + keyring_files = [ k.keyring_name for k in keyrings ] + session.close() - try: - # If this is the Real Thing(tm), copy things into a private - # holding directory first to avoid replacable file races. - if not Options["No-Action"]: - os.chdir(cnf["Dir::Queue::Holding"]) - - # Absolutize the filename to avoid the requirement of being in the - # same directory as the .changes file. - holding.copy_to_holding(origchanges) - - # Relativize the filename so we use the copy in holding - # rather than the original... - changespath = os.path.basename(u.pkg.changes_file) - else: - changespath = origchanges + changes = [] + for fn in changes_filenames: + try: + directory, filename = os.path.split(fn) + c = daklib.upload.Changes(directory, filename, keyring_files) + changes.append([directory, c]) + except Exception as e: + Logger.log([filename, "Error while loading changes: {0}".format(e)]) - (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changespath) + changes.sort(key=lambda x: x[1]) - if u.pkg.changes["fingerprint"]: - valid_changes_p = u.load_changes(changespath) - else: - for reason in rejects: - if re_match_expired.match(reason): - # Hrm, key expired. Lets see if we can still parse the .changes before - # we reject. Then we would be able to mail the maintainer, instead of - # just silently dropping the upload. - u.load_changes(changespath) - valid_changes_p = False - u.rejects.extend(rejects) - - if valid_changes_p: - u.check_distributions() - u.check_files(not Options["No-Action"]) - valid_dsc_p = u.check_dsc(not Options["No-Action"]) - if valid_dsc_p and not Options["No-Action"]: - u.check_source() - u.check_hashes() - if valid_dsc_p and not Options["No-Action"] and not len(u.rejects): - u.check_lintian() - u.check_urgency() - u.check_timestamps() - u.check_signed_by_key() - - action(u, session) - - except (SystemExit, KeyboardInterrupt): - cleanup() - raise - - except: - print "ERROR" - traceback.print_exc(file=sys.stderr) - - cleanup() - # Restore previous WD - os.chdir(u.prevdir) + for directory, c in changes: + process_it(directory, c, keyring_files) ############################################################################### @@ -424,9 +500,6 @@ def main(): cnf = Config() summarystats = SummaryStats() - log_urgency = False - - DBConn() Arguments = [('a',"automatic","Dinstall::Options::Automatic"), ('h',"help","Dinstall::Options::Help"), @@ -440,8 +513,8 @@ def main(): if not cnf.has_key("Dinstall::Options::%s" % (i)): cnf["Dinstall::Options::%s" % (i)] = "" - changes_files = apt_pkg.ParseCommandLine(cnf.Cnf, Arguments, sys.argv) - Options = cnf.SubTree("Dinstall::Options") + changes_files = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv) + Options = cnf.subtree("Dinstall::Options") if Options["Help"]: usage() @@ -456,20 +529,20 @@ def main(): # Obtain lock if not in no-action mode and initialize the log if not Options["No-Action"]: - lock_fd = os.open(cnf["Dinstall::LockFile"], os.O_RDWR | os.O_CREAT) + lock_fd = os.open(os.path.join(cnf["Dir::Lock"], 'dinstall.lock'), os.O_RDWR | os.O_CREAT) try: fcntl.lockf(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError, e: + except IOError as e: if errno.errorcode[e.errno] == 'EACCES' or errno.errorcode[e.errno] == 'EAGAIN': utils.fubar("Couldn't obtain lock; assuming another 'dak process-upload' is already running.") else: raise - if cnf.get("Dir::UrgencyLog"): - # Initialise UrgencyLog() - log_urgency = True - UrgencyLog() - Logger = daklog.Logger(cnf, "process-upload", Options["No-Action"]) + # Initialise UrgencyLog() - it will deal with the case where we don't + # want to log urgencies + urgencylog = UrgencyLog() + + Logger = daklog.Logger("process-upload", Options["No-Action"]) # If we have a directory flag, use it to find our files if cnf["Dinstall::Options::Directory"] != "": @@ -485,15 +558,7 @@ def main(): else: Logger.log(["Using changes files from command-line", len(changes_files)]) - # Sort the .changes files so that we process sourceful ones first - changes_files.sort(utils.changes_compare) - - # Process the changes files - for changes_file in changes_files: - print "\n" + changes_file - session = DBConn().session() - process_it(changes_file, session) - session.close() + process_changes(changes_files) if summarystats.accept_count: sets = "set" @@ -503,11 +568,15 @@ def main(): utils.size_type(int(summarystats.accept_bytes))) Logger.log(["total", summarystats.accept_count, summarystats.accept_bytes]) - byebye() + if summarystats.reject_count: + sets = "set" + if summarystats.reject_count > 1: + sets = "sets" + print "Rejected %d package %s." % (summarystats.reject_count, sets) + Logger.log(["rejected", summarystats.reject_count]) if not Options["No-Action"]: - if log_urgency: - UrgencyLog().close() + urgencylog.close() Logger.close()