X-Git-Url: https://git.decadent.org.uk/gitweb/?p=dak.git;a=blobdiff_plain;f=dak%2Fprocess_policy.py;h=ab37aa2216b3282fcfeb4657d36944b89b32b3fc;hp=c23a3cdc24e7a186d46ef2a1d2e097ebd85d5995;hb=026bdb9580e6d18bf504d2f44d46890df5d89f1a;hpb=3f2b87f4f4cae93e27c6e1b9fc5ff9dc3577e9d8 diff --git a/dak/process_policy.py b/dak/process_policy.py index c23a3cdc..ab37aa22 100755 --- a/dak/process_policy.py +++ b/dak/process_policy.py @@ -32,17 +32,24 @@ ################################################################################ import os -import copy +import datetime +import re import sys +import traceback import apt_pkg +from sqlalchemy.orm.exc import NoResultFound from daklib.dbconn import * -from daklib.queue import * from daklib import daklog from daklib import utils from daklib.dak_exceptions import CantOpenError, AlreadyLockedError, CantGetLockError from daklib.config import Config -from daklib.changesutils import * +from daklib.archive import ArchiveTransaction, source_component_from_package_list +from daklib.urgencylog import UrgencyLog +from daklib.packagelist import PackageList + +import daklib.announce +import daklib.utils # Globals Options = None @@ -50,56 +57,384 @@ Logger = None ################################################################################ -def do_comments(dir, srcqueue, opref, npref, line, fn, session): +def do_comments(dir, srcqueue, opref, npref, line, fn, transaction): + session = transaction.session + actions = [] for comm in [ x for x in os.listdir(dir) if x.startswith(opref) ]: - lines = open("%s/%s" % (dir, comm)).readlines() + lines = open(os.path.join(dir, comm)).readlines() if len(lines) == 0 or lines[0] != line + "\n": continue - changes_files = [ x for x in os.listdir(".") if x.startswith(comm[7:]+"_") - and x.endswith(".changes") ] - changes_files = sort_changes(changes_files, session) - for f in changes_files: - print "Processing changes file: %s" % f - f = utils.validate_changes_file_arg(f, 0) - if not f: - print "Couldn't validate changes file %s" % f - continue - fn(f, srcqueue, "".join(lines[1:]), session) - - if opref != npref and not Options["No-Action"]: + + # If the ACCEPT includes a _ we only accept that .changes. + # Otherwise we accept all .changes that start with the given prefix + changes_prefix = comm[len(opref):] + if changes_prefix.count('_') < 2: + changes_prefix = changes_prefix + '_' + else: + changes_prefix = changes_prefix + '.changes' + + # We need to escape "_" as we use it with the LIKE operator (via the + # SQLA startwith) later. + changes_prefix = changes_prefix.replace("_", r"\_") + + uploads = session.query(PolicyQueueUpload).filter_by(policy_queue=srcqueue) \ + .join(PolicyQueueUpload.changes).filter(DBChange.changesname.startswith(changes_prefix)) \ + .order_by(PolicyQueueUpload.source_id) + reason = "".join(lines[1:]) + actions.extend((u, reason) for u in uploads) + + if opref != npref: newcomm = npref + comm[len(opref):] - os.rename("%s/%s" % (dir, comm), "%s/%s" % (dir, newcomm)) + newcomm = utils.find_next_free(os.path.join(dir, newcomm)) + transaction.fs.move(os.path.join(dir, comm), newcomm) + + actions.sort() + + for u, reason in actions: + print("Processing changes file: {0}".format(u.changes.changesname)) + fn(u, srcqueue, reason, transaction) ################################################################################ -def comment_accept(changes_file, srcqueue, comments, session): - u = Upload() - u.pkg.changes_file = changes_file - u.load_changes(changes_file) - u.update_subst() +def try_or_reject(function): + def wrapper(upload, srcqueue, comments, transaction): + try: + function(upload, srcqueue, comments, transaction) + except Exception as e: + comments = 'An exception was raised while processing the package:\n{0}\nOriginal comments:\n{1}'.format(traceback.format_exc(), comments) + try: + transaction.rollback() + real_comment_reject(upload, srcqueue, comments, transaction) + except Exception as e: + comments = 'In addition an exception was raised while trying to reject the upload:\n{0}\nOriginal rejection:\n{1}'.format(traceback.format_exc(), comments) + transaction.rollback() + real_comment_reject(upload, srcqueue, comments, transaction, notify=False) + if not Options['No-Action']: + transaction.commit() + else: + transaction.rollback() + return wrapper - if not Options["No-Action"]: - destqueue = get_policy_queue('newstage', session) - if changes_to_queue(u, srcqueue, destqueue, session): - Logger.log(["Policy Queue ACCEPT: %s: %s" % (srcqueue.queue_name, u.pkg.changes_file)]) - else: - print "E: Failed to migrate %s" % u.pkg.changes_file +################################################################################ + +@try_or_reject +def comment_accept(upload, srcqueue, comments, transaction): + for byhand in upload.byhand: + path = os.path.join(srcqueue.path, byhand.filename) + if os.path.exists(path): + raise Exception('E: cannot ACCEPT upload with unprocessed byhand file {0}'.format(byhand.filename)) + + cnf = Config() + + fs = transaction.fs + session = transaction.session + changesname = upload.changes.changesname + allow_tainted = srcqueue.suite.archive.tainted + + # We need overrides to get the target component + overridesuite = upload.target_suite + if overridesuite.overridesuite is not None: + overridesuite = session.query(Suite).filter_by(suite_name=overridesuite.overridesuite).one() + + def binary_component_func(db_binary): + section = db_binary.proxy['Section'] + component_name = 'main' + if section.find('/') != -1: + component_name = section.split('/', 1)[0] + return get_mapped_component(component_name, session=session) + + def is_debug_binary(db_binary): + return daklib.utils.is_in_debug_section(db_binary.proxy) + + def has_debug_binaries(upload): + return any((is_debug_binary(x) for x in upload.binaries)) + + def source_component_func(db_source): + package_list = PackageList(db_source.proxy) + component = source_component_from_package_list(package_list, upload.target_suite) + if component is not None: + return get_mapped_component(component.component_name, session=session) + + # Fallback for packages without Package-List field + query = session.query(Override).filter_by(suite=overridesuite, package=db_source.source) \ + .join(OverrideType).filter(OverrideType.overridetype == 'dsc') \ + .join(Component) + return query.one().component + + all_target_suites = [upload.target_suite] + all_target_suites.extend([q.suite for q in upload.target_suite.copy_queues]) + + for suite in all_target_suites: + debug_suite = suite.debug_suite + + if upload.source is not None: + # If we have Source in this upload, let's include it into + # upload suite. + transaction.copy_source( + upload.source, + suite, + source_component_func(upload.source), + allow_tainted=allow_tainted, + ) + + if debug_suite is not None and has_debug_binaries(upload): + # If we're handing a debug package, we also need to include the + # source in the debug suite as well. + transaction.copy_source( + upload.source, + debug_suite, + source_component_func(upload.source), + allow_tainted=allow_tainted, + ) + + for db_binary in upload.binaries: + # Now, let's work out where to copy this guy to -- if it's + # a debug binary, and the suite has a debug suite, let's go + # ahead and target the debug suite rather then the stock + # suite. + copy_to_suite = suite + if debug_suite is not None and is_debug_binary(db_binary): + copy_to_suite = debug_suite + + # build queues and debug suites may miss the source package + # if this is a binary-only upload. + if copy_to_suite != upload.target_suite: + transaction.copy_source( + db_binary.source, + copy_to_suite, + source_component_func(db_binary.source), + allow_tainted=allow_tainted, + ) + + transaction.copy_binary( + db_binary, + copy_to_suite, + binary_component_func(db_binary), + allow_tainted=allow_tainted, + extra_archives=[upload.target_suite.archive], + ) + + suite.update_last_changed() + + # Copy .changes if needed + if upload.target_suite.copychanges: + src = os.path.join(upload.policy_queue.path, upload.changes.changesname) + dst = os.path.join(upload.target_suite.path, upload.changes.changesname) + fs.copy(src, dst, mode=upload.target_suite.archive.mode) + + # Copy upload to Process-Policy::CopyDir + # Used on security.d.o to sync accepted packages to ftp-master, but this + # should eventually be replaced by something else. + copydir = cnf.get('Process-Policy::CopyDir') or None + if copydir is not None: + mode = upload.target_suite.archive.mode + if upload.source is not None: + for f in [ df.poolfile for df in upload.source.srcfiles ]: + dst = os.path.join(copydir, f.basename) + if not os.path.exists(dst): + fs.copy(f.fullpath, dst, mode=mode) + + for db_binary in upload.binaries: + f = db_binary.poolfile + dst = os.path.join(copydir, f.basename) + if not os.path.exists(dst): + fs.copy(f.fullpath, dst, mode=mode) + + src = os.path.join(upload.policy_queue.path, upload.changes.changesname) + dst = os.path.join(copydir, upload.changes.changesname) + if not os.path.exists(dst): + fs.copy(src, dst, mode=mode) + + if upload.source is not None and not Options['No-Action']: + urgency = upload.changes.urgency + if urgency not in cnf.value_list('Urgency::Valid'): + urgency = cnf['Urgency::Default'] + UrgencyLog().log(upload.source.source, upload.source.version, urgency) + + print " ACCEPT" + if not Options['No-Action']: + Logger.log(["Policy Queue ACCEPT", srcqueue.queue_name, changesname]) + + pu = get_processed_upload(upload) + daklib.announce.announce_accept(pu) + + # TODO: code duplication. Similar code is in process-upload. + # Move .changes to done + src = os.path.join(upload.policy_queue.path, upload.changes.changesname) + now = datetime.datetime.now() + donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d')) + dst = os.path.join(donedir, upload.changes.changesname) + dst = utils.find_next_free(dst) + fs.copy(src, dst, mode=0o644) + + remove_upload(upload, transaction) ################################################################################ -def comment_reject(changes_file, srcqueue, comments, session): - u = Upload() - u.pkg.changes_file = changes_file - u.load_changes(changes_file) - u.update_subst() +@try_or_reject +def comment_reject(*args): + real_comment_reject(*args, manual=True) - u.rejects.append(comments) +def real_comment_reject(upload, srcqueue, comments, transaction, notify=True, manual=False): + cnf = Config() + fs = transaction.fs + session = transaction.session + changesname = upload.changes.changesname + queuedir = upload.policy_queue.path + rejectdir = cnf['Dir::Reject'] + + ### Copy files to reject/ + + poolfiles = [b.poolfile for b in upload.binaries] + if upload.source is not None: + poolfiles.extend([df.poolfile for df in upload.source.srcfiles]) + # Not beautiful... + files = [ af.path for af in session.query(ArchiveFile) \ + .filter_by(archive=upload.policy_queue.suite.archive) \ + .join(ArchiveFile.file) \ + .filter(PoolFile.file_id.in_([ f.file_id for f in poolfiles ])) ] + for byhand in upload.byhand: + path = os.path.join(queuedir, byhand.filename) + if os.path.exists(path): + files.append(path) + files.append(os.path.join(queuedir, changesname)) + + for fn in files: + dst = utils.find_next_free(os.path.join(rejectdir, os.path.basename(fn))) + fs.copy(fn, dst, link=True) + + ### Write reason + + dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(changesname))) + fh = fs.create(dst) + fh.write(comments) + fh.close() + + ### Send mail notification + + if notify: + rejected_by = None + reason = comments + + # Try to use From: from comment file if there is one. + # This is not very elegant... + match = re.match(r"\AFrom: ([^\n]+)\n\n", comments) + if match: + rejected_by = match.group(1) + reason = '\n'.join(comments.splitlines()[2:]) + + pu = get_processed_upload(upload) + daklib.announce.announce_reject(pu, reason, rejected_by) + + print " REJECT" if not Options["No-Action"]: - u.do_reject(manual=0, reject_message='\n'.join(u.rejects)) - u.pkg.remove_known_changes(session=session) + Logger.log(["Policy Queue REJECT", srcqueue.queue_name, upload.changes.changesname]) + + changes = upload.changes + remove_upload(upload, transaction) + session.delete(changes) + +################################################################################ - Logger.log(["Policy Queue REJECT: %s: %s" % (srcqueue.queue_name, u.pkg.changes_file)]) +def remove_upload(upload, transaction): + fs = transaction.fs + session = transaction.session + changes = upload.changes + # Remove byhand and changes files. Binary and source packages will be + # removed from {bin,src}_associations and eventually removed by clean-suites automatically. + queuedir = upload.policy_queue.path + for byhand in upload.byhand: + path = os.path.join(queuedir, byhand.filename) + if os.path.exists(path): + fs.unlink(path) + session.delete(byhand) + fs.unlink(os.path.join(queuedir, upload.changes.changesname)) + + session.delete(upload) + session.flush() + +################################################################################ + +def get_processed_upload(upload): + pu = daklib.announce.ProcessedUpload() + + pu.maintainer = upload.changes.maintainer + pu.changed_by = upload.changes.changedby + pu.fingerprint = upload.changes.fingerprint + + pu.suites = [ upload.target_suite ] + pu.from_policy_suites = [ upload.target_suite ] + + changes_path = os.path.join(upload.policy_queue.path, upload.changes.changesname) + pu.changes = open(changes_path, 'r').read() + pu.changes_filename = upload.changes.changesname + pu.sourceful = upload.source is not None + pu.source = upload.changes.source + pu.version = upload.changes.version + pu.architecture = upload.changes.architecture + pu.bugs = upload.changes.closes + + pu.program = "process-policy" + + return pu + +################################################################################ + +def remove_unreferenced_binaries(policy_queue, transaction): + """Remove binaries that are no longer referenced by an upload + + @type policy_queue: L{daklib.dbconn.PolicyQueue} + + @type transaction: L{daklib.archive.ArchiveTransaction} + """ + session = transaction.session + suite = policy_queue.suite + + query = """ + SELECT b.* + FROM binaries b + JOIN bin_associations ba ON b.id = ba.bin + WHERE ba.suite = :suite_id + AND NOT EXISTS (SELECT 1 FROM policy_queue_upload_binaries_map pqubm + JOIN policy_queue_upload pqu ON pqubm.policy_queue_upload_id = pqu.id + WHERE pqu.policy_queue_id = :policy_queue_id + AND pqubm.binary_id = b.id)""" + binaries = session.query(DBBinary).from_statement(query) \ + .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id}) + + for binary in binaries: + Logger.log(["removed binary from policy queue", policy_queue.queue_name, binary.package, binary.version]) + transaction.remove_binary(binary, suite) + +def remove_unreferenced_sources(policy_queue, transaction): + """Remove sources that are no longer referenced by an upload or a binary + + @type policy_queue: L{daklib.dbconn.PolicyQueue} + + @type transaction: L{daklib.archive.ArchiveTransaction} + """ + session = transaction.session + suite = policy_queue.suite + + query = """ + SELECT s.* + FROM source s + JOIN src_associations sa ON s.id = sa.source + WHERE sa.suite = :suite_id + AND NOT EXISTS (SELECT 1 FROM policy_queue_upload pqu + WHERE pqu.policy_queue_id = :policy_queue_id + AND pqu.source_id = s.id) + AND NOT EXISTS (SELECT 1 FROM binaries b + JOIN bin_associations ba ON b.id = ba.bin + WHERE b.source = s.id + AND ba.suite = :suite_id)""" + sources = session.query(DBSource).from_statement(query) \ + .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id}) + + for source in sources: + Logger.log(["removed source from policy queue", policy_queue.queue_name, source.source, source.version]) + transaction.remove_source(source, suite) ################################################################################ @@ -116,7 +451,7 @@ def main(): if not cnf.has_key("Process-Policy::Options::%s" % (i)): cnf["Process-Policy::Options::%s" % (i)] = "" - queue_name = apt_pkg.ParseCommandLine(cnf.Cnf,Arguments,sys.argv) + queue_name = apt_pkg.parse_commandline(cnf.Cnf,Arguments,sys.argv) if len(queue_name) != 1: print "E: Specify exactly one policy queue" @@ -124,32 +459,36 @@ def main(): queue_name = queue_name[0] - Options = cnf.SubTree("Process-Policy::Options") + Options = cnf.subtree("Process-Policy::Options") if Options["Help"]: usage() + Logger = daklog.Logger("process-policy") if not Options["No-Action"]: + urgencylog = UrgencyLog() + + with ArchiveTransaction() as transaction: + session = transaction.session try: - Logger = daklog.Logger(cnf, "process-policy") - except CantOpenError, e: - Logger = None + pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one() + except NoResultFound: + print "E: Cannot find policy queue %s" % queue_name + sys.exit(1) - # Find policy queue - session.query(PolicyQueue) + commentsdir = os.path.join(pq.path, 'COMMENTS') + # The comments stuff relies on being in the right directory + os.chdir(pq.path) - try: - pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one() - except NoResultFound: - print "E: Cannot find policy queue %s" % queue_name - sys.exit(1) + do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, transaction) + do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, transaction) + do_comments(commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, transaction) - commentsdir = os.path.join(pq.path, 'COMMENTS') - # The comments stuff relies on being in the right directory - os.chdir(pq.path) - do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, session) - do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, session) + remove_unreferenced_binaries(pq, transaction) + remove_unreferenced_sources(pq, transaction) + if not Options['No-Action']: + urgencylog.close() ################################################################################