]> git.decadent.org.uk Git - dak.git/blobdiff - daklib/gpg.py
Add by-hash support
[dak.git] / daklib / gpg.py
index 90103afa0ddc20ae98457e34314d0407714f2f8b..94842083125a5633a27c8dd58e6014568f9c1b9f 100644 (file)
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
+import apt_pkg
+import datetime
 import errno
 import fcntl
 import os
 import select
-import sys
 
 try:
     _MAXFD = os.sysconf("SC_OPEN_MAX")
@@ -69,7 +70,7 @@ class SignedFile(object):
     def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
         """
         @param data: string containing the message
-        @param keyrings: seqeuence of keyrings
+        @param keyrings: sequence of keyrings
         @param require_signature: if True (the default), will raise an exception if no valid signature was found
         @param gpg: location of the gpg binary
         """
@@ -77,11 +78,29 @@ class SignedFile(object):
         self.keyrings = keyrings
 
         self.valid = False
-        self.fingerprint = None
-        self.primary_fingerprint = None
+        self.expired = False
+        self.invalid = False
+        self.fingerprints = []
+        self.primary_fingerprints = []
+        self.signature_ids = []
 
         self._verify(data, require_signature)
 
+    @property
+    def fingerprint(self):
+        assert len(self.fingerprints) == 1
+        return self.fingerprints[0]
+
+    @property
+    def primary_fingerprint(self):
+        assert len(self.primary_fingerprints) == 1
+        return self.primary_fingerprints[0]
+
+    @property
+    def signature_id(self):
+        assert len(self.signature_ids) == 1
+        return self.signature_ids[0]
+
     def _verify(self, data, require_signature):
         with _Pipe() as stdin:
          with _Pipe() as contents:
@@ -111,10 +130,20 @@ class SignedFile(object):
                 for line in self.status.splitlines():
                     self._parse_status(line)
 
+                if self.invalid:
+                    self.valid = False
+
                 if require_signature and not self.valid:
                     raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
 
+        assert len(self.fingerprints) == len(self.primary_fingerprints)
+        assert len(self.fingerprints) == len(self.signature_ids)
+
     def _do_io(self, read, write):
+        for fd in write.keys():
+            old = fcntl.fcntl(fd, fcntl.F_GETFL)
+            fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
+
         read_lines = dict( (fd, []) for fd in read )
         write_pos = dict( (fd, 0) for fd in write )
 
@@ -138,6 +167,26 @@ class SignedFile(object):
 
         return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
 
+    def _parse_timestamp(self, timestamp, datestring=None):
+        """parse timestamp in GnuPG's format
+
+        @rtype:   L{datetime.datetime}
+        @returns: datetime object for the given timestamp
+        """
+        # The old implementation did only return the date. As we already
+        # used this for replay production, return the legacy value for
+        # old signatures.
+        if datestring is not None:
+            year, month, day = datestring.split('-')
+            date = datetime.date(int(year), int(month), int(day))
+            time = datetime.time(0, 0)
+            if date < datetime.date(2014, 8, 4):
+                return datetime.datetime.combine(date, time)
+
+        if 'T' in timestamp:
+            raise Exception('No support for ISO 8601 timestamps.')
+        return datetime.datetime.utcfromtimestamp(long(timestamp))
+
     def _parse_status(self, line):
         fields = line.split()
         if fields[0] != "[GNUPG:]":
@@ -147,22 +196,44 @@ class SignedFile(object):
         #             <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
         #             <hash-algo> <sig-class> <primary-key-fpr>
         if fields[1] == "VALIDSIG":
+            # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
+            # which Debian 8 does not yet include.  We want to make sure
+            # to not accept uploads covered by a MD5-based signature.
+            if fields[9] == "1":
+                raise GpgException("Digest algorithm MD5 is not trusted.")
             self.valid = True
-            self.fingerprint = fields[2]
-            self.primary_fingerprint = fields[11]
+            self.fingerprints.append(fields[2])
+            self.primary_fingerprints.append(fields[11])
+            self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
 
-        if fields[1] == "BADARMOR":
+        elif fields[1] == "BADARMOR":
             raise GpgException("Bad armor.")
 
-        if fields[1] == "NODATA":
+        elif fields[1] == "NODATA":
             raise GpgException("No data.")
 
-        if fields[1] == "DECRYPTION_FAILED":
+        elif fields[1] == "DECRYPTION_FAILED":
             raise GpgException("Decryption failed.")
 
-        if fields[1] == "ERROR":
+        elif fields[1] == "ERROR":
             raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
 
+        elif fields[1] == "SIG_ID":
+            self.signature_ids.append(fields[2])
+
+        elif fields[1] in ('PLAINTEXT', 'GOODSIG', 'NOTATION_NAME', 'NOTATION_DATA', 'SIGEXPIRED', 'KEYEXPIRED', 'POLICY_URL'):
+            pass
+
+        elif fields[1] in ('EXPSIG', 'EXPKEYSIG'):
+            self.expired = True
+            self.invalid = True
+
+        elif fields[1] in ('REVKEYSIG', 'BADSIG', 'ERRSIG', 'KEYREVOKED', 'NO_PUBKEY'):
+            self.invalid = True
+
+        else:
+            raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(fields[1]))
+
     def _exec_gpg(self, stdin, stdout, stderr, statusfd):
         try:
             if stdin != 0:
@@ -178,13 +249,22 @@ class SignedFile(object):
                 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
             os.closerange(4, _MAXFD)
 
-            args = [self.gpg, "--status-fd=3", "--no-default-keyring"]
+            args = [self.gpg,
+                    "--status-fd=3",
+                    "--no-default-keyring",
+                    "--batch",
+                    "--no-tty",
+                    "--trust-model", "always",
+                    "--fixed-list-mode"]
             for k in self.keyrings:
                 args.append("--keyring=%s" % k)
             args.extend(["--decrypt", "-"])
 
             os.execvp(self.gpg, args)
         finally:
-            sys.exit(1)
+            os._exit(1)
+
+    def contents_sha1(self):
+        return apt_pkg.sha1sum(self.contents)
 
 # vim: set sw=4 et: