from textutils import fix_maintainer
from regexes import re_html_escaping, html_escaping, re_single_line_field, \
re_multi_line_field, re_srchasver, re_verwithext, \
- re_parse_maintainer, re_taint_free, re_gpg_uid, re_re_mark, \
- re_whitespace_comment
+ re_parse_maintainer, re_taint_free, re_gpg_uid, \
+ re_re_mark, re_whitespace_comment, re_issource
+
+from srcformats import srcformats
################################################################################
################################################################################
+def check_dsc_files(dsc_filename, dsc=None, dsc_files=None):
+ """
+ Verify that the files listed in the Files field of the .dsc are
+ those expected given the announced Format.
+
+ @type dsc_filename: string
+ @param dsc_filename: path of .dsc file
+
+ @type dsc: dict
+ @param dsc: the content of the .dsc parsed by C{parse_changes()}
+
+ @type dsc_files: dict
+ @param dsc_files: the file list returned by C{build_file_list()}
+
+ @rtype: list
+ @return: all errors detected
+ """
+ rejmsg = []
+
+ # Parse the file if needed
+ if dsc == None:
+ dsc = parse_changes(dsc_filename, signing_rules=1);
+ if dsc_files == None:
+ dsc_files = build_file_list(dsc, is_a_dsc=1)
+
+ # Ensure .dsc lists proper set of source files according to the format
+ # announced
+ has_native_tar = 0
+ has_native_tar_gz = 0
+ has_orig_tar = 0
+ has_orig_tar_gz = 0
+ has_more_orig_tar = 0
+ has_debian_tar = 0
+ has_debian_diff = 0
+ for f in dsc_files.keys():
+ m = re_issource.match(f)
+ if not m:
+ rejmsg.append("%s: %s in Files field not recognised as source."
+ % (dsc_filename, f))
+ continue
+ ftype = m.group(3)
+ if ftype == "orig.tar.gz":
+ has_orig_tar_gz += 1
+ has_orig_tar += 1
+ elif ftype == "diff.gz":
+ has_debian_diff += 1
+ elif ftype == "tar.gz":
+ has_native_tar_gz += 1
+ has_native_tar += 1
+ elif re.match(r"debian\.tar\.(gz|bz2|lzma)", ftype):
+ has_debian_tar += 1
+ elif re.match(r"orig\.tar\.(gz|bz2|lzma)", ftype):
+ has_orig_tar += 1
+ elif re.match(r"tar\.(gz|bz2|lzma)", ftype):
+ has_native_tar += 1
+ elif re.match(r"orig-.+\.tar\.(gz|bz2|lzma)", ftype):
+ has_more_orig_tar += 1
+ else:
+ reject("%s: unexpected source file '%s'" % (dsc_filename, f))
+ if has_orig_tar > 1:
+ rejmsg.append("%s: lists multiple .orig tarballs." % (dsc_filename))
+ if has_native_tar > 1:
+ rejmsg.append("%s: lists multiple native tarballs." % (dsc_filename))
+ if has_debian_tar > 1 or has_debian_diff > 1:
+ rejmsg.append("%s: lists multiple debian diff/tarballs." % (dsc_filename))
+
+ for format in srcformats:
+ if format.re_format.match(dsc['format']):
+ rejmsg.extend(format.reject_msgs(
+ dsc_filename,
+ has_native_tar,
+ has_native_tar_gz,
+ has_debian_tar,
+ has_debian_diff,
+ has_orig_tar,
+ has_orig_tar_gz,
+ has_more_orig_tar
+ ))
+ break
+
+ return rejmsg
+
+################################################################################
+
def check_hash_fields(what, manifest):
"""
check_hash_fields ensures that there are no checksum fields in the
################################################################################
-def ensure_hashes(changes, dsc, files, dsc_files):
- rejmsg = []
-
- # Make sure we recognise the format of the Files: field in the .changes
- format = changes.get("format", "0.0").split(".", 1)
- if len(format) == 2:
- format = int(format[0]), int(format[1])
- else:
- format = int(float(format[0])), 0
-
- # We need to deal with the original changes blob, as the fields we need
- # might not be in the changes dict serialised into the .dak anymore.
- orig_changes = parse_deb822(changes['filecontents'])
-
- # Copy the checksums over to the current changes dict. This will keep
- # the existing modifications to it intact.
- for field in orig_changes:
- if field.startswith('checksums-'):
- changes[field] = orig_changes[field]
-
- # Check for unsupported hashes
- rejmsg.extend(check_hash_fields(".changes", changes))
- rejmsg.extend(check_hash_fields(".dsc", dsc))
-
- # We have to calculate the hash if we have an earlier changes version than
- # the hash appears in rather than require it exist in the changes file
- for hashname, hashfunc, version in known_hashes:
- rejmsg.extend(_ensure_changes_hash(changes, format, version, files,
- hashname, hashfunc))
- if "source" in changes["architecture"]:
- rejmsg.extend(_ensure_dsc_hash(dsc, dsc_files, hashname,
- hashfunc))
-
- return rejmsg
-
def parse_checksums(where, files, manifest, hashname):
rejmsg = []
field = 'checksums-%s' % hashname
for line in manifest[field].split('\n'):
if not line:
break
- checksum, size, checkfile = line.strip().split(' ')
+ clist = line.strip().split(' ')
+ if len(clist) == 3:
+ checksum, size, checkfile = clist
+ else:
+ rejmsg.append("Cannot parse checksum line [%s]" % (line))
+ continue
if not files.has_key(checkfile):
# TODO: check for the file's entry in the original files dict, not
# the one modified by (auto)byhand and other weird stuff
format = format[:2]
if is_a_dsc:
- # format = (1,0) are the only formats we currently accept,
# format = (0,0) are missing format headers of which we still
# have some in the archive.
- if format != (1,0) and format != (0,0):
+ if format != (1,0) and format != (0,0) and \
+ format != (3,0,"quilt") and format != (3,0,"native"):
raise UnknownFormatError, "%s" % (changes.get("format","0.0"))
else:
if (format < (1,5) or format > (1,8)):
################################################################################
-# Escape characters which have meaning to SQL's regex comparison operator ('~')
-# (woefully incomplete)
-
-def regex_safe (s):
- s = s.replace('+', '\\\\+')
- s = s.replace('.', '\\\\.')
- return s
-
-################################################################################
-
def TemplateSubst(map, filename):
""" Perform a substition of template """
templatefile = open_file(filename)
template = templatefile.read()
for x in map.keys():
- template = template.replace(x,map[x])
+ template = template.replace(x, str(map[x]))
templatefile.close()
return template
def whoami ():
return pwd.getpwuid(os.getuid())[4].split(',')[0].replace('.', '')
+def getusername ():
+ return pwd.getpwuid(os.getuid())[0]
+
################################################################################
def size_type (c):
suite_ids_list = []
for suitename in split_args(Options["Suite"]):
suite = get_suite(suitename, session=session)
- if suite_id is None:
- warn("suite '%s' not recognised." % (suitename))
+ if suite.suite_id is None:
+ warn("suite '%s' not recognised." % (suite.suite_name))
else:
suite_ids_list.append(suite.suite_id)
if suite_ids_list:
################################################################################
-def check_signature (sig_filename, reject, data_filename="", keyrings=None, autofetch=None):
+def check_signature (sig_filename, data_filename="", keyrings=None, autofetch=None):
"""
Check the signature of a file and return the fingerprint if the
signature is valid or 'None' if it's not. The first argument is the
used.
"""
+ rejects = []
+
# Ensure the filename contains no shell meta-characters or other badness
if not re_taint_free.match(sig_filename):
- reject("!!WARNING!! tainted signature filename: '%s'." % (sig_filename))
- return None
+ rejects.append("!!WARNING!! tainted signature filename: '%s'." % (sig_filename))
+ return (None, rejects)
if data_filename and not re_taint_free.match(data_filename):
- reject("!!WARNING!! tainted data filename: '%s'." % (data_filename))
- return None
+ rejects.append("!!WARNING!! tainted data filename: '%s'." % (data_filename))
+ return (None, rejects)
if not keyrings:
keyrings = Cnf.ValueList("Dinstall::GPGKeyring")
if autofetch:
error_msg = retrieve_key(sig_filename)
if error_msg:
- reject(error_msg)
- return None
+ rejects.append(error_msg)
+ return (None, rejects)
# Build the command line
status_read, status_write = os.pipe()
# If we failed to parse the status-fd output, let's just whine and bail now
if internal_error:
- reject("internal error while performing signature check on %s." % (sig_filename))
- reject(internal_error, "")
- reject("Please report the above errors to the Archive maintainers by replying to this mail.", "")
- return None
+ rejects.append("internal error while performing signature check on %s." % (sig_filename))
+ rejects.append(internal_error, "")
+ rejects.append("Please report the above errors to the Archive maintainers by replying to this mail.", "")
+ return (None, rejects)
- bad = ""
# Now check for obviously bad things in the processed output
if keywords.has_key("KEYREVOKED"):
- reject("The key used to sign %s has been revoked." % (sig_filename))
- bad = 1
+ rejects.append("The key used to sign %s has been revoked." % (sig_filename))
if keywords.has_key("BADSIG"):
- reject("bad signature on %s." % (sig_filename))
- bad = 1
+ rejects.append("bad signature on %s." % (sig_filename))
if keywords.has_key("ERRSIG") and not keywords.has_key("NO_PUBKEY"):
- reject("failed to check signature on %s." % (sig_filename))
- bad = 1
+ rejects.append("failed to check signature on %s." % (sig_filename))
if keywords.has_key("NO_PUBKEY"):
args = keywords["NO_PUBKEY"]
if len(args) >= 1:
key = args[0]
- reject("The key (0x%s) used to sign %s wasn't found in the keyring(s)." % (key, sig_filename))
- bad = 1
+ rejects.append("The key (0x%s) used to sign %s wasn't found in the keyring(s)." % (key, sig_filename))
if keywords.has_key("BADARMOR"):
- reject("ASCII armour of signature was corrupt in %s." % (sig_filename))
- bad = 1
+ rejects.append("ASCII armour of signature was corrupt in %s." % (sig_filename))
if keywords.has_key("NODATA"):
- reject("no signature found in %s." % (sig_filename))
- bad = 1
+ rejects.append("no signature found in %s." % (sig_filename))
if keywords.has_key("EXPKEYSIG"):
args = keywords["EXPKEYSIG"]
if len(args) >= 1:
key = args[0]
- reject("Signature made by expired key 0x%s" % (key))
- bad = 1
+ rejects.append("Signature made by expired key 0x%s" % (key))
if keywords.has_key("KEYEXPIRED") and not keywords.has_key("GOODSIG"):
args = keywords["KEYEXPIRED"]
expiredate=""
expiredate = "unknown (%s)" % (timestamp)
else:
expiredate = timestamp
- reject("The key used to sign %s has expired on %s" % (sig_filename, expiredate))
- bad = 1
+ rejects.append("The key used to sign %s has expired on %s" % (sig_filename, expiredate))
- if bad:
- return None
+ if len(rejects) > 0:
+ return (None, rejects)
# Next check gpgv exited with a zero return code
if exit_status:
- reject("gpgv failed while checking %s." % (sig_filename))
+ rejects.append("gpgv failed while checking %s." % (sig_filename))
if status.strip():
- reject(prefix_multi_line_string(status, " [GPG status-fd output:] "), "")
+ rejects.append(prefix_multi_line_string(status, " [GPG status-fd output:] "), "")
else:
- reject(prefix_multi_line_string(output, " [GPG output:] "), "")
- return None
+ rejects.append(prefix_multi_line_string(output, " [GPG output:] "), "")
+ return (None, rejects)
# Sanity check the good stuff we expect
if not keywords.has_key("VALIDSIG"):
- reject("signature on %s does not appear to be valid [No VALIDSIG]." % (sig_filename))
- bad = 1
+ rejects.append("signature on %s does not appear to be valid [No VALIDSIG]." % (sig_filename))
else:
args = keywords["VALIDSIG"]
if len(args) < 1:
- reject("internal error while checking signature on %s." % (sig_filename))
- bad = 1
+ rejects.append("internal error while checking signature on %s." % (sig_filename))
else:
fingerprint = args[0]
if not keywords.has_key("GOODSIG"):
- reject("signature on %s does not appear to be valid [No GOODSIG]." % (sig_filename))
- bad = 1
+ rejects.append("signature on %s does not appear to be valid [No GOODSIG]." % (sig_filename))
if not keywords.has_key("SIG_ID"):
- reject("signature on %s does not appear to be valid [No SIG_ID]." % (sig_filename))
- bad = 1
+ rejects.append("signature on %s does not appear to be valid [No SIG_ID]." % (sig_filename))
# Finally ensure there's not something we don't recognise
known_keywords = Dict(VALIDSIG="",SIG_ID="",GOODSIG="",BADSIG="",ERRSIG="",
for keyword in keywords.keys():
if not known_keywords.has_key(keyword):
- reject("found unknown status token '%s' from gpgv with args '%r' in %s." % (keyword, keywords[keyword], sig_filename))
- bad = 1
+ rejects.append("found unknown status token '%s' from gpgv with args '%r' in %s." % (keyword, keywords[keyword], sig_filename))
- if bad:
- return None
+ if len(rejects) > 0:
+ return (None, rejects)
else:
- return fingerprint
+ return (fingerprint, [])
################################################################################