X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=daklib%2Fgpg.py;h=94842083125a5633a27c8dd58e6014568f9c1b9f;hb=391f5ec09a119131dc846b796ca791f4cecc69e4;hp=62bfe096510453c180acb4fda1f5b80af6581ffa;hpb=438e50fc19e566ea0c986351681d865a36862713;p=dak.git diff --git a/daklib/gpg.py b/daklib/gpg.py index 62bfe096..94842083 100644 --- a/daklib/gpg.py +++ b/daklib/gpg.py @@ -19,6 +19,8 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +import apt_pkg +import datetime import errno import fcntl import os @@ -68,7 +70,7 @@ class SignedFile(object): def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"): """ @param data: string containing the message - @param keyrings: seqeuence of keyrings + @param keyrings: sequence of keyrings @param require_signature: if True (the default), will raise an exception if no valid signature was found @param gpg: location of the gpg binary """ @@ -76,11 +78,29 @@ class SignedFile(object): self.keyrings = keyrings self.valid = False - self.fingerprint = None - self.primary_fingerprint = None + self.expired = False + self.invalid = False + self.fingerprints = [] + self.primary_fingerprints = [] + self.signature_ids = [] self._verify(data, require_signature) + @property + def fingerprint(self): + assert len(self.fingerprints) == 1 + return self.fingerprints[0] + + @property + def primary_fingerprint(self): + assert len(self.primary_fingerprints) == 1 + return self.primary_fingerprints[0] + + @property + def signature_id(self): + assert len(self.signature_ids) == 1 + return self.signature_ids[0] + def _verify(self, data, require_signature): with _Pipe() as stdin: with _Pipe() as contents: @@ -110,9 +130,15 @@ class SignedFile(object): for line in self.status.splitlines(): self._parse_status(line) + if self.invalid: + self.valid = False + if require_signature and not self.valid: raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr)) + assert len(self.fingerprints) == len(self.primary_fingerprints) + assert len(self.fingerprints) == len(self.signature_ids) + def _do_io(self, read, write): for fd in write.keys(): old = fcntl.fcntl(fd, fcntl.F_GETFL) @@ -141,6 +167,26 @@ class SignedFile(object): return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() ) + def _parse_timestamp(self, timestamp, datestring=None): + """parse timestamp in GnuPG's format + + @rtype: L{datetime.datetime} + @returns: datetime object for the given timestamp + """ + # The old implementation did only return the date. As we already + # used this for replay production, return the legacy value for + # old signatures. + if datestring is not None: + year, month, day = datestring.split('-') + date = datetime.date(int(year), int(month), int(day)) + time = datetime.time(0, 0) + if date < datetime.date(2014, 8, 4): + return datetime.datetime.combine(date, time) + + if 'T' in timestamp: + raise Exception('No support for ISO 8601 timestamps.') + return datetime.datetime.utcfromtimestamp(long(timestamp)) + def _parse_status(self, line): fields = line.split() if fields[0] != "[GNUPG:]": @@ -150,22 +196,44 @@ class SignedFile(object): # # if fields[1] == "VALIDSIG": + # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, + # which Debian 8 does not yet include. We want to make sure + # to not accept uploads covered by a MD5-based signature. + if fields[9] == "1": + raise GpgException("Digest algorithm MD5 is not trusted.") self.valid = True - self.fingerprint = fields[2] - self.primary_fingerprint = fields[11] + self.fingerprints.append(fields[2]) + self.primary_fingerprints.append(fields[11]) + self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) - if fields[1] == "BADARMOR": + elif fields[1] == "BADARMOR": raise GpgException("Bad armor.") - if fields[1] == "NODATA": + elif fields[1] == "NODATA": raise GpgException("No data.") - if fields[1] == "DECRYPTION_FAILED": + elif fields[1] == "DECRYPTION_FAILED": raise GpgException("Decryption failed.") - if fields[1] == "ERROR": + elif fields[1] == "ERROR": raise GpgException("Other error: %s %s" % (fields[2], fields[3])) + elif fields[1] == "SIG_ID": + self.signature_ids.append(fields[2]) + + elif fields[1] in ('PLAINTEXT', 'GOODSIG', 'NOTATION_NAME', 'NOTATION_DATA', 'SIGEXPIRED', 'KEYEXPIRED', 'POLICY_URL'): + pass + + elif fields[1] in ('EXPSIG', 'EXPKEYSIG'): + self.expired = True + self.invalid = True + + elif fields[1] in ('REVKEYSIG', 'BADSIG', 'ERRSIG', 'KEYREVOKED', 'NO_PUBKEY'): + self.invalid = True + + else: + raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(fields[1])) + def _exec_gpg(self, stdin, stdout, stderr, statusfd): try: if stdin != 0: @@ -181,7 +249,13 @@ class SignedFile(object): fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) os.closerange(4, _MAXFD) - args = [self.gpg, "--status-fd=3", "--no-default-keyring"] + args = [self.gpg, + "--status-fd=3", + "--no-default-keyring", + "--batch", + "--no-tty", + "--trust-model", "always", + "--fixed-list-mode"] for k in self.keyrings: args.append("--keyring=%s" % k) args.extend(["--decrypt", "-"]) @@ -190,4 +264,7 @@ class SignedFile(object): finally: os._exit(1) + def contents_sha1(self): + return apt_pkg.sha1sum(self.contents) + # vim: set sw=4 et: