X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;ds=sidebyside;f=daklib%2Fchecks.py;h=3c22f390aa0ba393a67ba5089cd22223d2405d71;hb=0476947fa4da88744730e9918c4e2be9a38c142a;hp=cae801e489f341c370d22ad982313ff0553fe4b6;hpb=dbd3ceac44904f678181a189f34dc75c56178953;p=dak.git diff --git a/daklib/checks.py b/daklib/checks.py index cae801e4..3c22f390 100644 --- a/daklib/checks.py +++ b/daklib/checks.py @@ -53,6 +53,15 @@ class RejectStupidMaintainerException(Exception): def __str__(self): return "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" % self.args[:4] +class RejectACL(Reject): + """exception raise by failing ACL checks""" + def __init__(self, acl, reason): + self.acl = acl + self.reason = reason + + def __str__(self): + return "ACL {0}: {1}".format(self.acl.name, self.reason) + class Check(object): """base class for checks @@ -461,58 +470,13 @@ class ACLCheck(Check): acl_per_source = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name).first() if acl.allow_per_source: - # XXX: Drop DMUA part here and switch to new implementation. - # XXX: Send warning mail once users can set the new DMUA flag - dmua_status, dmua_reason = self._check_dmua(upload) if acl_per_source is None: - if not dmua_status: - return False, dmua_reason - else: - upload.warn('DM flag not set, but accepted as DMUA was set.') - #if acl_per_source is None: - # return False, "not allowed to upload source package '{0}'".format(source_name) + return False, "not allowed to upload source package '{0}'".format(source_name) if acl.deny_per_source and acl_per_source is not None: return False, acl_per_source.reason or "forbidden to upload source package '{0}'".format(source_name) return True, None - def _check_dmua(self, upload): - # This code is not very nice, but hopefully works until we can replace - # DM-Upload-Allowed, cf. https://lists.debian.org/debian-project/2012/06/msg00029.html - session = upload.session - - # Check DM-Upload-Allowed - suites = upload.final_suites - assert len(suites) == 1 - suite = list(suites)[0] - - last_suites = ['unstable', 'experimental'] - if suite.suite_name.endswith('-backports'): - last_suites = [suite.suite_name] - last = session.query(DBSource).filter_by(source=upload.changes.changes['Source']) \ - .join(DBSource.suites).filter(Suite.suite_name.in_(last_suites)) \ - .order_by(DBSource.version.desc()).limit(1).first() - if last is None: - return False, 'No existing source found in {0}'.format(' or '.join(last_suites)) - if not last.dm_upload_allowed: - return False, 'DM-Upload-Allowed is not set in {0}={1}'.format(last.source, last.version) - - # check current Changed-by is in last Maintainer or Uploaders - uploader_names = [ u.name for u in last.uploaders ] - changed_by_field = upload.changes.changes.get('Changed-By', upload.changes.changes['Maintainer']) - if changed_by_field not in uploader_names: - return False, '{0} is not an uploader for {1}={2}'.format(changed_by_field, last.source, last.version) - - # check Changed-by is the DM - changed_by = fix_maintainer(changed_by_field) - uid = upload.fingerprint.uid - if uid is None: - return False, 'Unknown uid for fingerprint {0}'.format(upload.fingerprint.fingerprint) - if uid.uid != changed_by[3] and uid.name != changed_by[2]: - return False, 'DMs are not allowed to sponsor uploads (expected {0} <{1}> as maintainer, but got {2})'.format(uid.name, uid.uid, changed_by_field) - - return True, None - def check(self, upload): session = upload.session fingerprint = upload.fingerprint @@ -528,12 +492,12 @@ class ACLCheck(Check): raise Reject('No ACL for fingerprint {0}'.format(fingerprint.fingerprint)) result, reason = self._check_acl(session, upload, acl) if not result: - raise Reject(reason) + raise RejectACL(acl, reason) for acl in session.query(ACL).filter_by(is_global=True): result, reason = self._check_acl(session, upload, acl) if result == False: - raise Reject(reason) + raise RejectACL(acl, reason) return True @@ -704,7 +668,7 @@ class SuiteArchitectureCheck(Check): for arch in upload.changes.architectures: query = session.query(Architecture).filter_by(arch_string=arch).filter(Architecture.suites.contains(suite)) if query.first() is None: - raise Reject('Architecture {0} is not allowed in suite {2}'.format(arch, suite.suite_name)) + raise Reject('Architecture {0} is not allowed in suite {1}'.format(arch, suite.suite_name)) return True