1 """Utilities for signed files
3 @contact: Debian FTP Master <ftpmaster@debian.org>
4 @copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
5 @license: GNU General Public License version 2 or later
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30 _MAXFD = os.sysconf("SC_OPEN_MAX")
34 class GpgException(Exception):
38 """context manager for pipes
40 Note: When the pipe is closed by other means than the close_r and close_w
41 methods, you have to set self.r (self.w) to None.
44 (self.r, self.w) = os.pipe()
46 def __exit__(self, type, value, traceback):
51 """close reading side of the pipe"""
56 """close writing part of the pipe"""
61 class SignedFile(object):
62 """handle files signed with PGP
64 The following attributes are available:
65 contents - string with the content (after removing PGP armor)
66 valid - Boolean indicating a valid signature was found
67 fingerprint - fingerprint of the key used for signing
68 primary_fingerprint - fingerprint of the primary key associated to the key used for signing
70 def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
72 @param data: string containing the message
73 @param keyrings: sequence of keyrings
74 @param require_signature: if True (the default), will raise an exception if no valid signature was found
75 @param gpg: location of the gpg binary
78 self.keyrings = keyrings
83 self.fingerprints = []
84 self.primary_fingerprints = []
85 self.signature_ids = []
87 self._verify(data, require_signature)
90 def fingerprint(self):
91 assert len(self.fingerprints) == 1
92 return self.fingerprints[0]
95 def primary_fingerprint(self):
96 assert len(self.primary_fingerprints) == 1
97 return self.primary_fingerprints[0]
100 def signature_id(self):
101 assert len(self.signature_ids) == 1
102 return self.signature_ids[0]
104 def _verify(self, data, require_signature):
105 with _Pipe() as stdin:
106 with _Pipe() as contents:
107 with _Pipe() as status:
108 with _Pipe() as stderr:
111 self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
118 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
119 stdin.w = None # was closed by _do_io
121 (pid_, exit_code, usage_) = os.wait4(pid, 0)
123 self.contents = read[contents.r]
124 self.status = read[status.r]
125 self.stderr = read[stderr.r]
127 if self.status == "":
128 raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
130 for line in self.status.splitlines():
131 self._parse_status(line)
136 if require_signature and not self.valid:
137 raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
139 assert len(self.fingerprints) == len(self.primary_fingerprints)
140 assert len(self.fingerprints) == len(self.signature_ids)
142 def _do_io(self, read, write):
143 for fd in write.keys():
144 old = fcntl.fcntl(fd, fcntl.F_GETFL)
145 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
147 read_lines = dict( (fd, []) for fd in read )
148 write_pos = dict( (fd, 0) for fd in write )
150 read_set = list(read)
151 write_set = write.keys()
152 while len(read_set) > 0 or len(write_set) > 0:
153 r, w, x_ = select.select(read_set, write_set, ())
155 data = os.read(fd, 4096)
158 read_lines[fd].append(data)
160 data = write[fd][write_pos[fd]:]
165 bytes_written = os.write(fd, data)
166 write_pos[fd] += bytes_written
168 return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
170 def _parse_timestamp(self, timestamp, datestring=None):
171 """parse timestamp in GnuPG's format
173 @rtype: L{datetime.datetime}
174 @returns: datetime object for the given timestamp
176 # The old implementation did only return the date. As we already
177 # used this for replay production, return the legacy value for
179 if datestring is not None:
180 year, month, day = datestring.split('-')
181 date = datetime.date(int(year), int(month), int(day))
182 time = datetime.time(0, 0)
183 if date < datetime.date(2014, 8, 4):
184 return datetime.datetime.combine(date, time)
187 raise Exception('No support for ISO 8601 timestamps.')
188 return datetime.datetime.utcfromtimestamp(long(timestamp))
190 def _parse_status(self, line):
191 fields = line.split()
192 if fields[0] != "[GNUPG:]":
193 raise GpgException("Unexpected output on status-fd: %s" % line)
195 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
196 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
197 # <hash-algo> <sig-class> <primary-key-fpr>
198 if fields[1] == "VALIDSIG":
199 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
200 # which Debian 8 does not yet include. We want to make sure
201 # to not accept uploads covered by a MD5-based signature.
203 raise GpgException("Digest algorithm MD5 is not trusted.")
205 self.fingerprints.append(fields[2])
206 self.primary_fingerprints.append(fields[11])
207 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
209 elif fields[1] == "BADARMOR":
210 raise GpgException("Bad armor.")
212 elif fields[1] == "NODATA":
213 raise GpgException("No data.")
215 elif fields[1] == "DECRYPTION_FAILED":
216 raise GpgException("Decryption failed.")
218 elif fields[1] == "ERROR":
219 raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
221 elif fields[1] == "SIG_ID":
222 self.signature_ids.append(fields[2])
224 elif fields[1] in ('PLAINTEXT', 'GOODSIG', 'NOTATION_NAME', 'NOTATION_DATA', 'SIGEXPIRED', 'KEYEXPIRED', 'POLICY_URL'):
227 elif fields[1] in ('EXPSIG', 'EXPKEYSIG'):
231 elif fields[1] in ('REVKEYSIG', 'BADSIG', 'ERRSIG', 'KEYREVOKED', 'NO_PUBKEY'):
235 raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(fields[1]))
237 def _exec_gpg(self, stdin, stdout, stderr, statusfd):
248 old = fcntl.fcntl(fd, fcntl.F_GETFD)
249 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
250 os.closerange(4, _MAXFD)
254 "--no-default-keyring",
257 "--trust-model", "always",
259 for k in self.keyrings:
260 args.append("--keyring=%s" % k)
261 args.extend(["--decrypt", "-"])
263 os.execvp(self.gpg, args)
267 def contents_sha1(self):
268 return apt_pkg.sha1sum(self.contents)