X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=daklib%2Fchecks.py;h=81bd629e481e171991d1ddf0da8d3b4dce282c12;hb=707a89a3b86961755a99cb9e1a0a5f23690f9529;hp=770615aa04d64235b2677176b71061e7f5e60038;hpb=6f5a4716955f6370dc740b62c907d0c0e83735be;p=dak.git diff --git a/daklib/checks.py b/daklib/checks.py index 770615aa..81bd629e 100644 --- a/daklib/checks.py +++ b/daklib/checks.py @@ -352,21 +352,70 @@ class SingleDistributionCheck(Check): class ACLCheck(Check): """Check the uploader is allowed to upload the packages in .changes""" - def _check_dm(self, upload): + + def _does_hijack(self, session, upload, suite): + for binary_name in upload.changes.binary_names: + binaries = session.query(DBBinary).join(DBBinary.source) \ + .filter(DBBinary.suites.contains(suite)) \ + .filter(DBBinary.package == binary_name) + for binary in binaries: + if binary.source.source != upload.changes.changes['Source']: + return True, binary, binary.source.source + return False, None, None + + def _check_acl(self, session, upload, acl): + source_name = upload.changes.source_name + + if acl.match_fingerprint and upload.fingerprint not in acl.fingerprints: + return None, None + if acl.match_keyring is not None and upload.fingerprint.keyring != acl.match_keyring: + return None, None + + if not acl.allow_new: + if upload.new: + return False, "NEW uploads are not allowed" + for f in upload.changes.files.itervalues(): + if f.section == 'byhand' or f.section.startswith("raw-"): + return False, "BYHAND uploads are not allowed" + if not acl.allow_source and upload.changes.source is not None: + return False, "sourceful uploads are not allowed" + binaries = upload.changes.binaries + if len(binaries) != 0: + if not acl.allow_binary: + return False, "binary uploads are not allowed" + if upload.changes.source is None and not acl.allow_binary_only: + return False, "binary-only uploads are not allowed" + if not acl.allow_binary_all: + uploaded_arches = set(upload.changes.architectures) + uploaded_arches.discard('source') + allowed_arches = set(a.arch_string for a in acl.architectures) + for a in uploaded_arches: + if a not in allowed_arches: + return False, "uploads for architecture {0} are not allowed".format(a) + if not acl.allow_hijack: + for suite in upload.final_suites: + does_hijack, hijacked_binary, hijacked_from = self._does_hijack(session, upload, suite) + if does_hijack: + return False, "hijacks are not allowed (binary={0}, other-source={1})".format(hijacked_binary, hijacked_from) + + acl_per_source = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name).first() + if acl.allow_per_source: + # XXX: Drop DMUA part here and switch to new implementation. + dmua_status, dmua_reason = self._check_dmua(upload) + if not dmua_status: + return False, dmua_reason + #if acl_per_source is None: + # return False, "not allowed to upload source package '{0}'".format(source_name) + if acl.deny_per_source and acl_per_source is not None: + return False, acl_per_source.reason or "forbidden to upload source package '{0}'".format(source_name) + + return True, None + + def _check_dmua(self, upload): # This code is not very nice, but hopefully works until we can replace # DM-Upload-Allowed, cf. https://lists.debian.org/debian-project/2012/06/msg00029.html session = upload.session - if 'source' not in upload.changes.architectures: - raise Reject('DM uploads must include source') - for f in upload.changes.files.itervalues(): - if f.section == 'byhand' or f.section[:4] == "raw-": - raise Reject("Uploading byhand packages is not allowed for DMs.") - - # Reject NEW packages - if upload.new: - raise Reject('Uploading NEW packages is not allowed for DMs.') - # Check DM-Upload-Allowed suites = upload.final_suites assert len(suites) == 1 @@ -379,84 +428,61 @@ class ACLCheck(Check): .join(DBSource.suites).filter(Suite.suite_name.in_(last_suites)) \ .order_by(DBSource.version.desc()).limit(1).first() if last is None: - raise Reject('No existing source found in {0}'.format(' or '.join(last_suites))) + return False, 'No existing source found in {0}'.format(' or '.join(last_suites)) if not last.dm_upload_allowed: - raise Reject('DM-Upload-Allowed is not set in {0}={1}'.format(last.source, last.version)) + return False, 'DM-Upload-Allowed is not set in {0}={1}'.format(last.source, last.version) # check current Changed-by is in last Maintainer or Uploaders uploader_names = [ u.name for u in last.uploaders ] changed_by_field = upload.changes.changes.get('Changed-By', upload.changes.changes['Maintainer']) if changed_by_field not in uploader_names: - raise Reject('{0} is not an uploader for {1}={2}'.format(changed_by_field, last.source, last.version)) + return False, '{0} is not an uploader for {1}={2}'.format(changed_by_field, last.source, last.version) # check Changed-by is the DM changed_by = fix_maintainer(changed_by_field) uid = upload.fingerprint.uid if uid is None: - raise Reject('Unknown uid for fingerprint {0}'.format(upload.fingerprint.fingerprint)) + return False, 'Unknown uid for fingerprint {0}'.format(upload.fingerprint.fingerprint) if uid.uid != changed_by[3] and uid.name != changed_by[2]: - raise Reject('DMs are not allowed to sponsor uploads (expected {0} <{1}> as maintainer, but got {2})'.format(uid.name, uid.uid, changed_by_field)) + return False, 'DMs are not allowed to sponsor uploads (expected {0} <{1}> as maintainer, but got {2})'.format(uid.name, uid.uid, changed_by_field) - # Try to catch hijacks. - # This doesn't work correctly. Uploads to experimental can still - # "hijack" binaries from unstable. Also one can hijack packages - # via buildds (but people who try this should not be DMs). - for binary_name in upload.changes.binary_names: - binaries = session.query(DBBinary).join(DBBinary.source) \ - .filter(DBBinary.suites.contains(suite)) \ - .filter(DBBinary.package == binary_name) - for binary in binaries: - if binary.source.source != upload.changes.changes['Source']: - raise Reject('DMs must not hijack binaries (binary={0}, other-source={1})'.format(binary_name, binary.source.source)) - - return True + return True, None def check(self, upload): + session = upload.session fingerprint = upload.fingerprint - source_acl = fingerprint.source_acl - if source_acl is None: - if 'source' in upload.changes.architectures: - raise Reject('Fingerprint {0} must not upload source'.format(fingerprint.fingerprint)) - elif source_acl.access_level == 'dm': - self._check_dm(upload) - elif source_acl.access_level != 'full': - raise Reject('Unknown source_acl access level {0} for fingerprint {1}'.format(source_acl.access_level, fingerprint.fingerprint)) - - bin_architectures = set(upload.changes.architectures) - bin_architectures.discard('source') - binary_acl = fingerprint.binary_acl - if binary_acl is None: - if len(bin_architectures) > 0: - raise Reject('Fingerprint {0} must not upload binary packages'.format(fingerprint.fingerprint)) - elif binary_acl.access_level == 'map': - query = upload.session.query(BinaryACLMap).filter_by(fingerprint=fingerprint) - allowed_architectures = [ m.architecture.arch_string for m in query ] - - for arch in upload.changes.architectures: - if arch not in allowed_architectures: - raise Reject('Fingerprint {0} must not upload binaries for architecture {1}'.format(fingerprint.fingerprint, arch)) - elif binary_acl.access_level != 'full': - raise Reject('Unknown binary_acl access level {0} for fingerprint {1}'.format(binary_acl.access_level, fingerprint.fingerprint)) + keyring = fingerprint.keyring - return True + if keyring is None: + raise Reject('No keyring for fingerprint {0}'.format(fingerprint.fingerprint)) + if not keyring.active: + raise Reject('Keyring {0} is not active'.format(keyring.name)) -class UploadBlockCheck(Check): - """check for upload blocks""" - def check(self, upload): - session = upload.session - control = upload.changes.changes + acl = fingerprint.acl or keyring.acl + if acl is None: + raise Reject('No ACL for fingerprint {0}'.format(fingerprint.fingerprint)) + result, reason = self._check_acl(session, upload, acl) + if not result: + raise Reject(reason) - source = re_field_source.match(control['Source']).group('package') - version = control['Version'] - blocks = session.query(UploadBlock).filter_by(source=source) \ - .filter((UploadBlock.version == version) | (UploadBlock.version == None)) + for acl in session.query(ACL).filter_by(is_global=True): + result, reason = self._check_acl(session, upload, acl) + if result == False: + raise Reject(reason) - for block in blocks: - if block.fingerprint == upload.fingerprint: - raise Reject('Manual upload block in place for package {0} and fingerprint {1}:\n{2}'.format(source, upload.fingerprint.fingerprint, block.reason)) - if block.uid == upload.fingerprint.uid: - raise Reject('Manual upload block in place for package {0} and uid {1}:\n{2}'.format(source, block.uid.uid, block.reason)) + return True + def per_suite_check(self, upload, suite): + acls = suite.acls + if len(acls) != 0: + accept = False + for acl in acls: + result, reason = self._check_acl(upload.session, upload, acl) + if result == False: + raise Reject(reason) + accept = accept or result + if not accept: + raise Reject('Not accepted by any per-suite acl (suite={0})'.format(suite.suite_name)) return True class TransitionCheck(Check):