+################################################################################
+
+def lookup_uid_from_fingerprint(fpr):
+ """
+ Return the uid,name,isdm for a given gpg fingerprint
+
+ @type fpr: string
+ @param fpr: a 40 byte GPG fingerprint
+
+ @return: (uid, name, isdm)
+ """
+ cursor = DBConn().cursor()
+ cursor.execute( "SELECT u.uid, u.name, k.debian_maintainer FROM fingerprint f JOIN keyrings k ON (f.keyring=k.id), uid u WHERE f.uid = u.id AND f.fingerprint = '%s'" % (fpr))
+ qs = cursor.fetchone()
+ if qs:
+ return qs
+ else:
+ return (None, None, False)
+
+def check_signed_by_key():
+ """Ensure the .changes is signed by an authorized uploader."""
+
+ (uid, uid_name, is_dm) = lookup_uid_from_fingerprint(changes["fingerprint"])
+ if uid_name == None:
+ uid_name = ""
+
+ # match claimed name with actual name:
+ if uid is None:
+ # This is fundamentally broken but need us to refactor how we get
+ # the UIDs/Fingerprints in order for us to fix it properly
+ uid, uid_email = changes["fingerprint"], uid
+ may_nmu, may_sponsor = 1, 1
+ # XXX by default new dds don't have a fingerprint/uid in the db atm,
+ # and can't get one in there if we don't allow nmu/sponsorship
+ elif is_dm is False:
+ # If is_dm is False, we allow full upload rights
+ uid_email = "%s@debian.org" % (uid)
+ may_nmu, may_sponsor = 1, 1
+ else:
+ # Assume limited upload rights unless we've discovered otherwise
+ uid_email = uid
+ may_nmu, may_sponsor = 0, 0
+
+
+ if uid_email in [changes["maintaineremail"], changes["changedbyemail"]]:
+ sponsored = 0
+ elif uid_name in [changes["maintainername"], changes["changedbyname"]]:
+ sponsored = 0
+ if uid_name == "": sponsored = 1
+ else:
+ sponsored = 1
+ if ("source" in changes["architecture"] and
+ uid_email and utils.is_email_alias(uid_email)):
+ sponsor_addresses = utils.gpg_get_key_addresses(changes["fingerprint"])
+ if (changes["maintaineremail"] not in sponsor_addresses and
+ changes["changedbyemail"] not in sponsor_addresses):
+ changes["sponsoremail"] = uid_email
+
+ if sponsored and not may_sponsor:
+ reject("%s is not authorised to sponsor uploads" % (uid))
+
+ cursor = DBConn().cursor()
+ if not sponsored and not may_nmu:
+ source_ids = []
+ cursor.execute( "SELECT s.id, s.version FROM source s JOIN src_associations sa ON (s.id = sa.source) WHERE s.source = %(source)s AND s.dm_upload_allowed = 'yes'", changes )
+
+ highest_sid, highest_version = None, None
+
+ should_reject = True
+ while True:
+ si = cursor.fetchone()
+ if not si:
+ break
+
+ if highest_version == None or apt_pkg.VersionCompare(si[1], highest_version) == 1:
+ highest_sid = si[0]
+ highest_version = si[1]
+
+ if highest_sid == None:
+ reject("Source package %s does not have 'DM-Upload-Allowed: yes' in its most recent version" % changes["source"])
+ else:
+
+ cursor.execute("SELECT m.name FROM maintainer m WHERE m.id IN (SELECT su.maintainer FROM src_uploaders su JOIN source s ON (s.id = su.source) WHERE su.source = %s)" % (highest_sid))
+
+ while True:
+ m = cursor.fetchone()
+ if not m:
+ break
+
+ (rfc822, rfc2047, name, email) = utils.fix_maintainer(m[0])
+ if email == uid_email or name == uid_name:
+ should_reject=False
+ break
+
+ if should_reject == True:
+ reject("%s is not in Maintainer or Uploaders of source package %s" % (uid, changes["source"]))
+
+ for b in changes["binary"].keys():
+ for suite in changes["distribution"].keys():
+ suite_id = DBConn().get_suite_id(suite)
+
+ cursor.execute("SELECT DISTINCT s.source FROM source s JOIN binaries b ON (s.id = b.source) JOIN bin_associations ba On (b.id = ba.bin) WHERE b.package = %(package)s AND ba.suite = %(suite)s" , {'package':b, 'suite':suite_id} )
+ while True:
+ s = cursor.fetchone()
+ if not s:
+ break
+
+ if s[0] != changes["source"]:
+ reject("%s may not hijack %s from source package %s in suite %s" % (uid, b, s, suite))
+
+ for f in files.keys():
+ if files[f].has_key("byhand"):
+ reject("%s may not upload BYHAND file %s" % (uid, f))
+ if files[f].has_key("new"):
+ reject("%s may not upload NEW file %s" % (uid, f))
+
+