# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import apt_pkg
+import datetime
import errno
import fcntl
import os
def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
"""
@param data: string containing the message
- @param keyrings: seqeuence of keyrings
+ @param keyrings: sequence of keyrings
@param require_signature: if True (the default), will raise an exception if no valid signature was found
@param gpg: location of the gpg binary
"""
self.keyrings = keyrings
self.valid = False
- self.fingerprint = None
- self.primary_fingerprint = None
+ self.expired = False
+ self.invalid = False
+ self.fingerprints = []
+ self.primary_fingerprints = []
+ self.signature_ids = []
self._verify(data, require_signature)
+ @property
+ def fingerprint(self):
+ assert len(self.fingerprints) == 1
+ return self.fingerprints[0]
+
+ @property
+ def primary_fingerprint(self):
+ assert len(self.primary_fingerprints) == 1
+ return self.primary_fingerprints[0]
+
+ @property
+ def signature_id(self):
+ assert len(self.signature_ids) == 1
+ return self.signature_ids[0]
+
def _verify(self, data, require_signature):
with _Pipe() as stdin:
with _Pipe() as contents:
for line in self.status.splitlines():
self._parse_status(line)
+ if self.invalid:
+ self.valid = False
+
if require_signature and not self.valid:
raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
+ assert len(self.fingerprints) == len(self.primary_fingerprints)
+ assert len(self.fingerprints) == len(self.signature_ids)
+
def _do_io(self, read, write):
+ for fd in write.keys():
+ old = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
+
read_lines = dict( (fd, []) for fd in read )
write_pos = dict( (fd, 0) for fd in write )
return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
+ def _parse_timestamp(self, timestamp, datestring=None):
+ """parse timestamp in GnuPG's format
+
+ @rtype: L{datetime.datetime}
+ @returns: datetime object for the given timestamp
+ """
+ # The old implementation did only return the date. As we already
+ # used this for replay production, return the legacy value for
+ # old signatures.
+ if datestring is not None:
+ year, month, day = datestring.split('-')
+ date = datetime.date(int(year), int(month), int(day))
+ time = datetime.time(0, 0)
+ if date < datetime.date(2014, 8, 4):
+ return datetime.datetime.combine(date, time)
+
+ if 'T' in timestamp:
+ raise Exception('No support for ISO 8601 timestamps.')
+ return datetime.datetime.utcfromtimestamp(long(timestamp))
+
def _parse_status(self, line):
fields = line.split()
if fields[0] != "[GNUPG:]":
# <hash-algo> <sig-class> <primary-key-fpr>
if fields[1] == "VALIDSIG":
self.valid = True
- self.fingerprint = fields[2]
- self.primary_fingerprint = fields[11]
+ self.fingerprints.append(fields[2])
+ self.primary_fingerprints.append(fields[11])
+ self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
- if fields[1] == "BADARMOR":
+ elif fields[1] == "BADARMOR":
raise GpgException("Bad armor.")
- if fields[1] == "NODATA":
+ elif fields[1] == "NODATA":
raise GpgException("No data.")
- if fields[1] == "DECRYPTION_FAILED":
+ elif fields[1] == "DECRYPTION_FAILED":
raise GpgException("Decryption failed.")
- if fields[1] == "ERROR":
+ elif fields[1] == "ERROR":
raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
+ elif fields[1] == "SIG_ID":
+ self.signature_ids.append(fields[2])
+
+ elif fields[1] in ('PLAINTEXT', 'GOODSIG', 'NOTATION_NAME', 'NOTATION_DATA', 'SIGEXPIRED', 'KEYEXPIRED', 'POLICY_URL'):
+ pass
+
+ elif fields[1] in ('EXPSIG', 'EXPKEYSIG'):
+ self.expired = True
+ self.invalid = True
+
+ elif fields[1] in ('REVKEYSIG', 'BADSIG', 'ERRSIG', 'KEYREVOKED', 'NO_PUBKEY'):
+ self.invalid = True
+
+ else:
+ raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(fields[1]))
+
def _exec_gpg(self, stdin, stdout, stderr, statusfd):
try:
if stdin != 0:
fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
os.closerange(4, _MAXFD)
- args = [self.gpg, "--status-fd=3", "--no-default-keyring"]
+ args = [self.gpg,
+ "--status-fd=3",
+ "--no-default-keyring",
+ "--batch",
+ "--no-tty",
+ "--trust-model", "always",
+ "--fixed-list-mode"]
for k in self.keyrings:
args.append("--keyring=%s" % k)
args.extend(["--decrypt", "-"])
finally:
os._exit(1)
+ def contents_sha1(self):
+ return apt_pkg.sha1sum(self.contents)
+
# vim: set sw=4 et: