+
+ def _does_hijack(self, session, upload, suite):
+ for binary_name in upload.changes.binary_names:
+ binaries = session.query(DBBinary).join(DBBinary.source) \
+ .filter(DBBinary.suites.contains(suite)) \
+ .filter(DBBinary.package == binary_name)
+ for binary in binaries:
+ if binary.source.source != upload.changes.changes['Source']:
+ return True, binary, binary.source.source
+ return False, None, None
+
+ def _check_acl(self, session, upload, acl):
+ source_name = upload.changes.source_name
+
+ if acl.match_fingerprint and upload.fingerprint not in acl.fingerprints:
+ return None, None
+ if acl.match_keyring is not None and upload.fingerprint.keyring != acl.match_keyring:
+ return None, None
+
+ if not acl.allow_new:
+ if upload.new:
+ return False, "NEW uploads are not allowed"
+ for f in upload.changes.files.itervalues():
+ if f.section == 'byhand' or f.section.startswith("raw-"):
+ return False, "BYHAND uploads are not allowed"
+ if not acl.allow_source and upload.changes.source is not None:
+ return False, "sourceful uploads are not allowed"
+ binaries = upload.changes.binaries
+ if len(binaries) != 0:
+ if not acl.allow_binary:
+ return False, "binary uploads are not allowed"
+ if upload.changes.source is None and not acl.allow_binary_only:
+ return False, "binary-only uploads are not allowed"
+ if not acl.allow_binary_all:
+ uploaded_arches = set(upload.changes.architectures)
+ uploaded_arches.discard('source')
+ allowed_arches = set(a.arch_string for a in acl.architectures)
+ for a in uploaded_arches:
+ if a not in allowed_arches:
+ return False, "uploads for architecture {0} are not allowed".format(a)
+ if not acl.allow_hijack:
+ for suite in upload.final_suites:
+ does_hijack, hijacked_binary, hijacked_from = self._does_hijack(session, upload, suite)
+ if does_hijack:
+ return False, "hijacks are not allowed (binary={0}, other-source={1})".format(hijacked_binary, hijacked_from)
+
+ acl_per_source = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name).first()
+ if acl.allow_per_source:
+ # XXX: Drop DMUA part here and switch to new implementation.
+ dmua_status, dmua_reason = self._check_dmua(upload)
+ if not dmua_status:
+ return False, dmua_reason
+ #if acl_per_source is None:
+ # return False, "not allowed to upload source package '{0}'".format(source_name)
+ if acl.deny_per_source and acl_per_source is not None:
+ return False, acl_per_source.reason or "forbidden to upload source package '{0}'".format(source_name)
+
+ return True, None
+
+ def _check_dmua(self, upload):