+################################################################################
+
+def lookup_uid_from_fingerprint(fpr):
+ q = Upload.projectB.query("SELECT u.uid, u.name, k.debian_maintainer FROM fingerprint f JOIN keyrings k ON (f.keyring=k.id), uid u WHERE f.uid = u.id AND f.fingerprint = '%s'" % (fpr))
+ qs = q.getresult()
+ if len(qs) == 0:
+ return (None, None)
+ else:
+ return qs[0]
+
+def check_signed_by_key():
+ """Ensure the .changes is signed by an authorized uploader."""
+
+ (uid, uid_name, is_dm) = lookup_uid_from_fingerprint(changes["fingerprint"])
+ if uid_name == None:
+ uid_name = ""
+
+ # match claimed name with actual name:
+ if uid == None:
+ uid, uid_email = changes["fingerprint"], uid
+ may_nmu, may_sponsor = 1, 1
+ # XXX by default new dds don't have a fingerprint/uid in the db atm,
+ # and can't get one in there if we don't allow nmu/sponsorship
+ elif is_dm is "t":
+ uid_email = uid
+ may_nmu, may_sponsor = 0, 0
+ else:
+ uid_email = "%s@debian.org" % (uid)
+ may_nmu, may_sponsor = 1, 1
+
+ if uid_email in [changes["maintaineremail"], changes["changedbyemail"]]:
+ sponsored = 0
+ elif uid_name in [changes["maintainername"], changes["changedbyname"]]:
+ sponsored = 0
+ if uid_name == "": sponsored = 1
+ else:
+ sponsored = 1
+ if ("source" in changes["architecture"] and
+ uid_email and utils.is_email_alias(uid_email)):
+ sponsor_addresses = utils.gpg_get_key_addresses(changes["fingerprint"])
+ if (changes["maintaineremail"] not in sponsor_addresses and
+ changes["changedbyemail"] not in sponsor_addresses):
+ changes["sponsoremail"] = uid_email
+
+ if sponsored and not may_sponsor:
+ reject("%s is not authorised to sponsor uploads" % (uid))
+
+ if not sponsored and not may_nmu:
+ source_ids = []
+ q = Upload.projectB.query("SELECT s.id, s.version FROM source s JOIN src_associations sa ON (s.id = sa.source) WHERE s.source = '%s' AND s.dm_upload_allowed = 'yes'" % (changes["source"]))
+
+ highest_sid, highest_version = None, None
+
+ should_reject = True
+ for si in q.getresult():
+ if highest_version == None or apt_pkg.VersionCompare(si[1], highest_version) == 1:
+ highest_sid = si[0]
+ highest_version = si[1]
+
+ if highest_sid == None:
+ reject("Source package %s does not have 'DM-Upload-Allowed: yes' in its most recent version" % changes["source"])
+ else:
+ q = Upload.projectB.query("SELECT m.name FROM maintainer m WHERE m.id IN (SELECT su.maintainer FROM src_uploaders su JOIN source s ON (s.id = su.source) WHERE su.source = %s)" % (highest_sid))
+ for m in q.getresult():
+ (rfc822, rfc2047, name, email) = utils.fix_maintainer(m[0])
+ if email == uid_email or name == uid_name:
+ should_reject=False
+ break
+
+ if should_reject == True:
+ reject("%s is not in Maintainer or Uploaders of source package %s" % (uid, changes["source"]))
+
+ for b in changes["binary"].keys():
+ for suite in changes["distribution"].keys():
+ suite_id = database.get_suite_id(suite)
+ q = Upload.projectB.query("SELECT DISTINCT s.source FROM source s JOIN binaries b ON (s.id = b.source) JOIN bin_associations ba On (b.id = ba.bin) WHERE b.package = '%s' AND ba.suite = %s" % (b, suite_id))
+ for s in q.getresult():
+ if s[0] != changes["source"]:
+ reject("%s may not hijack %s from source package %s in suite %s" % (uid, b, s, suite))
+
+ for f in files.keys():
+ if files[f].has_key("byhand"):
+ reject("%s may not upload BYHAND file %s" % (uid, f))
+ if files[f].has_key("new"):
+ reject("%s may not upload NEW file %s" % (uid, f))
+
+