# Utility functions
# Copyright (C) 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
-# $Id: utils.py,v 1.73 2005-03-18 05:24:38 troup Exp $
################################################################################
################################################################################
import codecs, commands, email.Header, os, pwd, re, select, socket, shutil, \
- string, sys, tempfile, traceback
+ sys, tempfile, traceback
import apt_pkg
import database
re_taint_free = re.compile(r"^[-+~/\.\w]+$")
re_parse_maintainer = re.compile(r"^\s*(\S.*\S)\s*\<([^\>]+)\>")
+re_gpg_uid = re.compile('^uid.*<([^>]*)>')
+
+re_srchasver = re.compile(r"^(\S+)\s+\((\S+)\)$")
+re_verwithext = re.compile(r"^(\d+)(?:\.(\d+))(?:\s+\((\S+)\))?$")
changes_parse_error_exc = "Can't parse line in .changes file"
invalid_dsc_format_exc = "Invalid .dsc file"
default_config = "/etc/dak/dak.conf"
default_apt_config = "/etc/dak/apt.conf"
+alias_cache = None
+key_uid_email_cache = {}
+
################################################################################
class Error(Exception):
################################################################################
-def str_isnum (s):
- for c in s:
- if c not in string.digits:
- return 0
- return 1
-
-################################################################################
-
def extract_component_from_section(section):
component = ""
if section.find('/') != -1:
component = section.split('/')[0]
- if component.lower() == "non-us" and section.find('/') != -1:
- s = component + '/' + section.split('/')[1]
- if Cnf.has_key("Component::%s" % s): # Avoid e.g. non-US/libs
- component = s
-
- if section.lower() == "non-us":
- component = "non-US/main"
-
- # non-US prefix is case insensitive
- if component.lower()[:6] == "non-us":
- component = "non-US"+component[6:]
# Expand default component
if component == "":
component = section
else:
component = "main"
- elif component == "non-US":
- component = "non-US/main"
return (section, component)
changes_in.close()
changes["filecontents"] = "".join(lines)
+ if changes.has_key("source"):
+ # Strip the source version in brackets from the source field,
+ # put it in the "source-version" field instead.
+ srcver = re_srchasver.search(changes["source"])
+ if srcver:
+ changes["source"] = srcver.group(1)
+ changes["source-version"] = srcver.group(2)
+
if error:
raise changes_parse_error_exc, error
# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl
-def build_file_list(changes, is_a_dsc=0):
+def build_file_list(changes, is_a_dsc=0, field="files", hashname="md5sum"):
files = {}
# Make sure we have a Files: field to parse...
- if not changes.has_key("files"):
- raise no_files_exc
+ if not changes.has_key(field):
+ raise no_files_exc
# Make sure we recognise the format of the Files: field
- format = changes.get("format", "")
- if format != "":
- format = float(format)
- if not is_a_dsc and (format < 1.5 or format > 2.0):
- raise nk_format_exc, format
+ format = re_verwithext.search(changes.get("format", "0.0"))
+ if not format:
+ raise nk_format_exc, "%s" % (changes.get("format","0.0"))
+
+ format = format.groups()
+ if format[1] == None:
+ format = int(float(format[0])), 0, format[2]
+ else:
+ format = int(format[0]), int(format[1]), format[2]
+ if format[2] == None:
+ format = format[:2]
+
+ if is_a_dsc:
+ if format != (1,0):
+ raise nk_format_exc, "%s" % (changes.get("format","0.0"))
+ else:
+ if (format < (1,5) or format > (1,8)):
+ raise nk_format_exc, "%s" % (changes.get("format","0.0"))
+ if field != "files" and format < (1,8):
+ raise nk_format_exc, "%s" % (changes.get("format","0.0"))
+
+ includes_section = (not is_a_dsc) and field == "files"
# Parse each entry/line:
- for i in changes["files"].split('\n'):
+ for i in changes[field].split('\n'):
if not i:
break
s = i.split()
section = priority = ""
try:
- if is_a_dsc:
- (md5, size, name) = s
- else:
+ if includes_section:
(md5, size, section, priority, name) = s
+ else:
+ (md5, size, name) = s
except ValueError:
raise changes_parse_error_exc, i
(section, component) = extract_component_from_section(section)
- files[name] = Dict(md5sum=md5, size=size, section=section,
+ files[name] = Dict(size=size, section=section,
priority=priority, component=component)
+ files[name][hashname] = md5
return files
def poolify (source, component):
if component:
component += '/'
- # FIXME: this is nasty
- component = component.lower().replace("non-us/", "non-US/")
if source[:3] == "lib":
return component + source[:4] + '/' + source + '/'
else:
else:
return default_apt_config
+def which_alias_file():
+ hostname = socket.gethostbyaddr(socket.gethostname())[0]
+ aliasfn = '/var/lib/misc/'+hostname+'/forward-alias'
+ if os.path.exists(aliasfn):
+ return aliasfn
+ else:
+ return None
+
################################################################################
# Escape characters which have meaning to SQL's regex comparison operator ('~')
orig_filename = filename
if filename.endswith(".dak"):
- filename = filename[:-6]+".changes"
+ filename = filename[:-4]+".changes"
if not filename.endswith(".changes"):
error = "invalid file type; not a changes file"
if Options["Suite"]:
suite_ids_list = []
for suite in split_args(Options["Suite"]):
- suite_id = dak.lib.database.get_suite_id(suite)
+ suite_id = database.get_suite_id(suite)
if suite_id == -1:
warn("suite '%s' not recognised." % (suite))
else:
suite_ids_list.append(suite_id)
if suite_ids_list:
- con_suites = "AND su.id IN (%s)" % ", ".join(map(str, suite_ids_list))
+ con_suites = "AND su.id IN (%s)" % ", ".join([ str(i) for i in suite_ids_list ])
else:
fubar("No valid suite given.")
else:
if Options["Component"]:
component_ids_list = []
for component in split_args(Options["Component"]):
- component_id = dak.lib.database.get_component_id(component)
+ component_id = database.get_component_id(component)
if component_id == -1:
warn("component '%s' not recognised." % (component))
else:
component_ids_list.append(component_id)
if component_ids_list:
- con_components = "AND c.id IN (%s)" % ", ".join(map(str, component_ids_list))
+ con_components = "AND c.id IN (%s)" % ", ".join([ str(i) for i in component_ids_list ])
else:
fubar("No valid component given.")
else:
if architecture == "source":
check_source = 1
else:
- architecture_id = dak.lib.database.get_architecture_id(architecture)
+ architecture_id = database.get_architecture_id(architecture)
if architecture_id == -1:
warn("architecture '%s' not recognised." % (architecture))
else:
arch_ids_list.append(architecture_id)
if arch_ids_list:
- con_architectures = "AND a.id IN (%s)" % ", ".join(map(str, arch_ids_list))
+ con_architectures = "AND a.id IN (%s)" % ", ".join([ str(i) for i in arch_ids_list ])
else:
if not check_source:
fubar("No valid architecture given.")
return output, status, exit_status
-############################################################
+################################################################################
+
+def process_gpgv_output(status):
+ # Process the status-fd output
+ keywords = {}
+ internal_error = ""
+ for line in status.split('\n'):
+ line = line.strip()
+ if line == "":
+ continue
+ split = line.split()
+ if len(split) < 2:
+ internal_error += "gpgv status line is malformed (< 2 atoms) ['%s'].\n" % (line)
+ continue
+ (gnupg, keyword) = split[:2]
+ if gnupg != "[GNUPG:]":
+ internal_error += "gpgv status line is malformed (incorrect prefix '%s').\n" % (gnupg)
+ continue
+ args = split[2:]
+ if keywords.has_key(keyword) and keyword not in [ "NODATA", "SIGEXPIRED", "KEYEXPIRED" ]:
+ internal_error += "found duplicate status token ('%s').\n" % (keyword)
+ continue
+ else:
+ keywords[keyword] = args
+
+ return (keywords, internal_error)
+
+################################################################################
+
+def retrieve_key (filename, keyserver=None, keyring=None):
+ """Retrieve the key that signed 'filename' from 'keyserver' and
+add it to 'keyring'. Returns nothing on success, or an error message
+on error."""
+
+ # Defaults for keyserver and keyring
+ if not keyserver:
+ keyserver = Cnf["Dinstall::KeyServer"]
+ if not keyring:
+ keyring = Cnf.ValueList("Dinstall::GPGKeyring")[0]
+
+ # Ensure the filename contains no shell meta-characters or other badness
+ if not re_taint_free.match(filename):
+ return "%s: tainted filename" % (filename)
+
+ # Invoke gpgv on the file
+ status_read, status_write = os.pipe();
+ cmd = "gpgv --status-fd %s --keyring /dev/null %s" % (status_write, filename)
+ (_, status, _) = gpgv_get_status_output(cmd, status_read, status_write)
+
+ # Process the status-fd output
+ (keywords, internal_error) = process_gpgv_output(status)
+ if internal_error:
+ return internal_error
+
+ if not keywords.has_key("NO_PUBKEY"):
+ return "didn't find expected NO_PUBKEY in gpgv status-fd output"
+
+ fingerprint = keywords["NO_PUBKEY"][0]
+ # XXX - gpg sucks. You can't use --secret-keyring=/dev/null as
+ # it'll try to create a lockfile in /dev. A better solution might
+ # be a tempfile or something.
+ cmd = "gpg --no-default-keyring --secret-keyring=%s --no-options" \
+ % (Cnf["Dinstall::SigningKeyring"])
+ cmd += " --keyring %s --keyserver %s --recv-key %s" \
+ % (keyring, keyserver, fingerprint)
+ (result, output) = commands.getstatusoutput(cmd)
+ if (result != 0):
+ return "'%s' failed with exit code %s" % (cmd, result)
+
+ return ""
+
+################################################################################
+def gpg_keyring_args(keyrings=None):
+ if not keyrings:
+ keyrings = Cnf.ValueList("Dinstall::GPGKeyring")
+
+ return " ".join(["--keyring %s" % x for x in keyrings])
+
+################################################################################
-def check_signature (sig_filename, reject, data_filename="", keyrings=None):
+def check_signature (sig_filename, reject, data_filename="", keyrings=None, autofetch=None):
"""Check the signature of a file and return the fingerprint if the
signature is valid or 'None' if it's not. The first argument is the
filename whose signature should be checked. The second argument is a
to be called more than once during an invocation of check_signature().
The third argument is optional and is the name of the files the
detached signature applies to. The fourth argument is optional and is
-a *list* of keyrings to use.
-"""
+a *list* of keyrings to use. 'autofetch' can either be None, True or
+False. If None, the default behaviour specified in the config will be
+used."""
# Ensure the filename contains no shell meta-characters or other badness
if not re_taint_free.match(sig_filename):
return None
if not keyrings:
- keyrings = (Cnf["Dinstall::PGPKeyring"], Cnf["Dinstall::GPGKeyring"])
+ keyrings = Cnf.ValueList("Dinstall::GPGKeyring")
+
+ # Autofetch the signing key if that's enabled
+ if autofetch == None:
+ autofetch = Cnf.get("Dinstall::KeyAutoFetch")
+ if autofetch:
+ error_msg = retrieve_key(sig_filename)
+ if error_msg:
+ reject(error_msg)
+ return None
# Build the command line
status_read, status_write = os.pipe();
- cmd = "gpgv --status-fd %s" % (status_write)
- for keyring in keyrings:
- cmd += " --keyring %s" % (keyring)
- cmd += " %s %s" % (sig_filename, data_filename)
+ cmd = "gpgv --status-fd %s %s %s %s" % (
+ status_write, gpg_keyring_args(keyrings), sig_filename, data_filename)
+
# Invoke gpgv on the file
(output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write)
# Process the status-fd output
- keywords = {}
- bad = internal_error = ""
- for line in status.split('\n'):
- line = line.strip()
- if line == "":
- continue
- split = line.split()
- if len(split) < 2:
- internal_error += "gpgv status line is malformed (< 2 atoms) ['%s'].\n" % (line)
- continue
- (gnupg, keyword) = split[:2]
- if gnupg != "[GNUPG:]":
- internal_error += "gpgv status line is malformed (incorrect prefix '%s').\n" % (gnupg)
- continue
- args = split[2:]
- if keywords.has_key(keyword) and (keyword != "NODATA" and keyword != "SIGEXPIRED"):
- internal_error += "found duplicate status token ('%s').\n" % (keyword)
- continue
- else:
- keywords[keyword] = args
+ (keywords, internal_error) = process_gpgv_output(status)
# If we failed to parse the status-fd output, let's just whine and bail now
if internal_error:
reject("Please report the above errors to the Archive maintainers by replying to this mail.", "")
return None
+ bad = ""
# Now check for obviously bad things in the processed output
- if keywords.has_key("SIGEXPIRED"):
- reject("The key used to sign %s has expired." % (sig_filename))
- bad = 1
if keywords.has_key("KEYREVOKED"):
reject("The key used to sign %s has been revoked." % (sig_filename))
bad = 1
if keywords.has_key("NODATA"):
reject("no signature found in %s." % (sig_filename))
bad = 1
+ if keywords.has_key("KEYEXPIRED") and not keywords.has_key("GOODSIG"):
+ args = keywords["KEYEXPIRED"]
+ if len(args) >= 1:
+ key = args[0]
+ reject("The key (0x%s) used to sign %s has expired." % (key, sig_filename))
+ bad = 1
if bad:
return None
# Finally ensure there's not something we don't recognise
known_keywords = Dict(VALIDSIG="",SIG_ID="",GOODSIG="",BADSIG="",ERRSIG="",
SIGEXPIRED="",KEYREVOKED="",NO_PUBKEY="",BADARMOR="",
- NODATA="")
+ NODATA="",NOTATION_DATA="",NOTATION_NAME="",KEYEXPIRED="")
for keyword in keywords.keys():
if not known_keywords.has_key(keyword):
################################################################################
+def gpg_get_key_addresses(fingerprint):
+ """retreive email addresses from gpg key uids for a given fingerprint"""
+ addresses = key_uid_email_cache.get(fingerprint)
+ if addresses != None:
+ return addresses
+ addresses = set()
+ cmd = "gpg --no-default-keyring %s --fingerprint %s" \
+ % (gpg_keyring_args(), fingerprint)
+ (result, output) = commands.getstatusoutput(cmd)
+ if result == 0:
+ for l in output.split('\n'):
+ m = re_gpg_uid.match(l)
+ if m:
+ addresses.add(m.group(1))
+ key_uid_email_cache[fingerprint] = addresses
+ return addresses
+
+################################################################################
+
# Inspired(tm) by http://www.zopelabs.com/cookbook/1022242603
def wrap(paragraph, max_length, prefix=""):
################################################################################
+# checks if the user part of the email is listed in the alias file
+
+def is_email_alias(email):
+ global alias_cache
+ if alias_cache == None:
+ aliasfn = which_alias_file()
+ alias_cache = set()
+ if aliasfn:
+ for l in open(aliasfn):
+ alias_cache.add(l.split(':')[0])
+ uid = email.split('@')[0]
+ return uid in alias_cache
+
+################################################################################
+
apt_pkg.init()
Cnf = apt_pkg.newConfiguration()