#!/usr/bin/env python
# Utility functions
-# Copyright (C) 2000, 2001, 2002, 2003, 2004 James Troup <james@nocrew.org>
-# $Id: utils.py,v 1.68 2004-06-23 23:11:47 troup Exp $
+# Copyright (C) 2000, 2001, 2002, 2003, 2004, 2005 James Troup <james@nocrew.org>
+# $Id: utils.py,v 1.73 2005-03-18 05:24:38 troup Exp $
################################################################################
################################################################################
-import commands, encodings.ascii, encodings.utf_8, encodings.latin_1, \
- email.Header, os, pwd, re, select, socket, shutil, string, sys, \
- tempfile, traceback;
+import codecs, commands, email.Header, os, pwd, re, select, socket, shutil, \
+ string, sys, tempfile, traceback;
import apt_pkg;
import db_access;
################################################################################
re_comments = re.compile(r"\#.*")
-re_no_epoch = re.compile(r"^\d*\:")
-re_no_revision = re.compile(r"\-[^-]*$")
+re_no_epoch = re.compile(r"^\d+\:")
+re_no_revision = re.compile(r"-[^-]+$")
re_arch_from_filename = re.compile(r"/binary-[^/]+/")
re_extract_src_version = re.compile (r"(\S+)\s*\((.*)\)")
re_isadeb = re.compile (r"(.+?)_(.+?)_(.+)\.u?deb$");
re_single_line_field = re.compile(r"^(\S*)\s*:\s*(.*)");
re_multi_line_field = re.compile(r"^\s(.*)");
-re_taint_free = re.compile(r"^[-+~\.\w]+$");
+re_taint_free = re.compile(r"^[-+~/\.\w]+$");
re_parse_maintainer = re.compile(r"^\s*(\S.*\S)\s*\<([^\>]+)\>");
invalid_dsc_format_exc = "Invalid .dsc file";
nk_format_exc = "Unknown Format: in .changes file";
no_files_exc = "No Files: field in .dsc or .changes file.";
-cant_open_exc = "Can't read file.";
+cant_open_exc = "Can't open file";
unknown_hostname_exc = "Unknown hostname";
cant_overwrite_exc = "Permission denied; can't overwrite existent file."
file_exists_exc = "Destination file exists";
################################################################################
-# Parses a changes file and returns a dictionary where each field is a
-# key. The mandatory first argument is the filename of the .changes
-# file.
-
-# dsc_whitespace_rules is an optional boolean argument which defaults
-# to off. If true, it turns on strict format checking to avoid
-# allowing in source packages which are unextracable by the
-# inappropriately fragile dpkg-source.
-#
-# The rules are:
-#
-# o The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----"
-# followed by any PGP header data and must end with a blank line.
-#
-# o The data section must end with a blank line and must be followed by
-# "-----BEGIN PGP SIGNATURE-----".
-
-def parse_changes(filename, dsc_whitespace_rules=0):
+def parse_changes(filename, signing_rules=0):
+ """Parses a changes file and returns a dictionary where each field is a
+key. The mandatory first argument is the filename of the .changes
+file.
+
+signing_rules is an optional argument:
+
+ o If signing_rules == -1, no signature is required.
+ o If signing_rules == 0 (the default), a signature is required.
+ o If signing_rules == 1, it turns on the same strict format checking
+ as dpkg-source.
+
+The rules for (signing_rules == 1)-mode are:
+
+ o The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----"
+ followed by any PGP header data and must end with a blank line.
+
+ o The data section must end with a blank line and must be followed by
+ "-----BEGIN PGP SIGNATURE-----".
+"""
+
error = "";
changes = {};
index += 1;
line = indexed_lines[index];
if line == "":
- if dsc_whitespace_rules:
+ if signing_rules == 1:
index += 1;
if index > num_of_lines:
raise invalid_dsc_format_exc, index;
break;
if line.startswith("-----BEGIN PGP SIGNED MESSAGE"):
inside_signature = 1;
- if dsc_whitespace_rules:
+ if signing_rules == 1:
while index < num_of_lines and line != "":
index += 1;
line = indexed_lines[index];
continue;
# If we're not inside the signed data, don't process anything
- if not inside_signature:
+ if signing_rules >= 0 and not inside_signature:
continue;
slf = re_single_line_field.match(line);
if slf:
continue;
error += line;
- if dsc_whitespace_rules and inside_signature:
+ if signing_rules == 1 and inside_signature:
raise invalid_dsc_format_exc, index;
changes_in.close();
"""Encodes a (header) string per RFC2047 if necessary. If the
string is neither ASCII nor UTF-8, it's assumed to be ISO-8859-1."""
try:
- encodings.ascii.Codec().decode(s);
+ codecs.lookup('ascii')[1](s)
return s;
except UnicodeError:
pass;
try:
- encodings.utf_8.Codec().decode(s);
+ codecs.lookup('utf-8')[1](s)
h = email.Header.Header(s, 'utf-8', 998);
return str(h);
except UnicodeError:
return q;
# Sort by source version
- a_version = a_changes.get("version");
- b_version = b_changes.get("version");
+ a_version = a_changes.get("version", "0");
+ b_version = b_changes.get("version", "0");
q = apt_pkg.VersionCompare(a_version, b_version);
if q:
return q;
################################################################################
-def validate_changes_file_arg(file, fatal=1):
+def validate_changes_file_arg(filename, require_changes=1):
+ """'filename' is either a .changes or .katie file. If 'filename' is a
+.katie file, it's changed to be the corresponding .changes file. The
+function then checks if the .changes file a) exists and b) is
+readable and returns the .changes filename if so. If there's a
+problem, the next action depends on the option 'require_changes'
+argument:
+
+ o If 'require_changes' == -1, errors are ignored and the .changes
+ filename is returned.
+ o If 'require_changes' == 0, a warning is given and 'None' is returned.
+ o If 'require_changes' == 1, a fatal error is raised.
+"""
error = None;
- orig_filename = file
- if file.endswith(".katie"):
- file = file[:-6]+".changes";
+ orig_filename = filename
+ if filename.endswith(".katie"):
+ filename = filename[:-6]+".changes";
- if not file.endswith(".changes"):
+ if not filename.endswith(".changes"):
error = "invalid file type; not a changes file";
else:
- if not os.access(file,os.R_OK):
- if os.path.exists(file):
+ if not os.access(filename,os.R_OK):
+ if os.path.exists(filename):
error = "permission denied";
else:
error = "file not found";
if error:
- if fatal:
+ if require_changes == 1:
fubar("%s: %s." % (orig_filename, error));
- else:
+ elif require_changes == 0:
warn("Skipping %s - %s" % (orig_filename, error));
return None;
+ else: # We only care about the .katie file
+ return filename;
else:
- return file;
+ return filename;
################################################################################
################################################################################
+def pp_deps (deps):
+ pp_deps = [];
+ for atom in deps:
+ (pkg, version, constraint) = atom;
+ if constraint:
+ pp_dep = "%s (%s %s)" % (pkg, constraint, version);
+ else:
+ pp_dep = pkg;
+ pp_deps.append(pp_dep);
+ return " |".join(pp_deps);
+
+################################################################################
+
def get_conf():
return Cnf;
############################################################
-def check_signature (filename, reject):
+def check_signature (sig_filename, reject, data_filename="", keyrings=None):
"""Check the signature of a file and return the fingerprint if the
signature is valid or 'None' if it's not. The first argument is the
filename whose signature should be checked. The second argument is a
reject function and is called when an error is found. The reject()
function must allow for two arguments: the first is the error message,
the second is an optional prefix string. It's possible for reject()
-to be called more than once during an invocation of check_signature()."""
+to be called more than once during an invocation of check_signature().
+The third argument is optional and is the name of the files the
+detached signature applies to. The fourth argument is optional and is
+a *list* of keyrings to use.
+"""
# Ensure the filename contains no shell meta-characters or other badness
- if not re_taint_free.match(os.path.basename(filename)):
- reject("!!WARNING!! tainted filename: '%s'." % (filename));
- return 0;
+ if not re_taint_free.match(sig_filename):
+ reject("!!WARNING!! tainted signature filename: '%s'." % (sig_filename));
+ return None;
+
+ if data_filename and not re_taint_free.match(data_filename):
+ reject("!!WARNING!! tainted data filename: '%s'." % (data_filename));
+ return None;
+
+ if not keyrings:
+ keyrings = (Cnf["Dinstall::PGPKeyring"], Cnf["Dinstall::GPGKeyring"])
+ # Build the command line
+ status_read, status_write = os.pipe();
+ cmd = "gpgv --status-fd %s" % (status_write);
+ for keyring in keyrings:
+ cmd += " --keyring %s" % (keyring);
+ cmd += " %s %s" % (sig_filename, data_filename);
# Invoke gpgv on the file
- status_read, status_write = os.pipe();
- cmd = "gpgv --status-fd %s --keyring %s --keyring %s %s" \
- % (status_write, Cnf["Dinstall::PGPKeyring"], Cnf["Dinstall::GPGKeyring"], filename);
(output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write);
# Process the status-fd output
# If we failed to parse the status-fd output, let's just whine and bail now
if internal_error:
- reject("internal error while performing signature check on %s." % (filename));
+ reject("internal error while performing signature check on %s." % (sig_filename));
reject(internal_error, "");
reject("Please report the above errors to the Archive maintainers by replying to this mail.", "");
return None;
# Now check for obviously bad things in the processed output
if keywords.has_key("SIGEXPIRED"):
- reject("The key used to sign %s has expired." % (filename));
+ reject("The key used to sign %s has expired." % (sig_filename));
bad = 1;
if keywords.has_key("KEYREVOKED"):
- reject("The key used to sign %s has been revoked." % (filename));
+ reject("The key used to sign %s has been revoked." % (sig_filename));
bad = 1;
if keywords.has_key("BADSIG"):
- reject("bad signature on %s." % (filename));
+ reject("bad signature on %s." % (sig_filename));
bad = 1;
if keywords.has_key("ERRSIG") and not keywords.has_key("NO_PUBKEY"):
- reject("failed to check signature on %s." % (filename));
+ reject("failed to check signature on %s." % (sig_filename));
bad = 1;
if keywords.has_key("NO_PUBKEY"):
args = keywords["NO_PUBKEY"];
if len(args) >= 1:
key = args[0];
- reject("The key (0x%s) used to sign %s wasn't found in the keyring(s)." % (key, filename));
+ reject("The key (0x%s) used to sign %s wasn't found in the keyring(s)." % (key, sig_filename));
bad = 1;
if keywords.has_key("BADARMOR"):
- reject("ASCII armour of signature was corrupt in %s." % (filename));
+ reject("ASCII armour of signature was corrupt in %s." % (sig_filename));
bad = 1;
if keywords.has_key("NODATA"):
- reject("no signature found in %s." % (filename));
+ reject("no signature found in %s." % (sig_filename));
bad = 1;
if bad:
# Next check gpgv exited with a zero return code
if exit_status:
- reject("gpgv failed while checking %s." % (filename));
+ reject("gpgv failed while checking %s." % (sig_filename));
if status.strip():
reject(prefix_multi_line_string(status, " [GPG status-fd output:] "), "");
else:
# Sanity check the good stuff we expect
if not keywords.has_key("VALIDSIG"):
- reject("signature on %s does not appear to be valid [No VALIDSIG]." % (filename));
+ reject("signature on %s does not appear to be valid [No VALIDSIG]." % (sig_filename));
bad = 1;
else:
args = keywords["VALIDSIG"];
if len(args) < 1:
- reject("internal error while checking signature on %s." % (filename));
+ reject("internal error while checking signature on %s." % (sig_filename));
bad = 1;
else:
fingerprint = args[0];
if not keywords.has_key("GOODSIG"):
- reject("signature on %s does not appear to be valid [No GOODSIG]." % (filename));
+ reject("signature on %s does not appear to be valid [No GOODSIG]." % (sig_filename));
bad = 1;
if not keywords.has_key("SIG_ID"):
- reject("signature on %s does not appear to be valid [No SIG_ID]." % (filename));
+ reject("signature on %s does not appear to be valid [No SIG_ID]." % (sig_filename));
bad = 1;
# Finally ensure there's not something we don't recognise
for keyword in keywords.keys():
if not known_keywords.has_key(keyword):
- reject("found unknown status token '%s' from gpgv with args '%r' in %s." % (keyword, keywords[keyword], filename));
+ reject("found unknown status token '%s' from gpgv with args '%r' in %s." % (keyword, keywords[keyword], sig_filename));
bad = 1;
if bad: