]> git.decadent.org.uk Git - dak.git/blob - daklib/gpg.py
Add by-hash support
[dak.git] / daklib / gpg.py
1 """Utilities for signed files
2
3 @contact: Debian FTP Master <ftpmaster@debian.org>
4 @copyright: 2011  Ansgar Burchardt <ansgar@debian.org>
5 @license: GNU General Public License version 2 or later
6 """
7
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
12
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
22 import apt_pkg
23 import datetime
24 import errno
25 import fcntl
26 import os
27 import select
28
29 try:
30     _MAXFD = os.sysconf("SC_OPEN_MAX")
31 except:
32     _MAXFD = 256
33
34 class GpgException(Exception):
35     pass
36
37 class _Pipe(object):
38     """context manager for pipes
39
40     Note: When the pipe is closed by other means than the close_r and close_w
41     methods, you have to set self.r (self.w) to None.
42     """
43     def __enter__(self):
44         (self.r, self.w) = os.pipe()
45         return self
46     def __exit__(self, type, value, traceback):
47         self.close_w()
48         self.close_r()
49         return False
50     def close_r(self):
51         """close reading side of the pipe"""
52         if self.r:
53             os.close(self.r)
54             self.r = None
55     def close_w(self):
56         """close writing part of the pipe"""
57         if self.w:
58             os.close(self.w)
59             self.w = None
60
61 class SignedFile(object):
62     """handle files signed with PGP
63
64     The following attributes are available:
65       contents            - string with the content (after removing PGP armor)
66       valid               - Boolean indicating a valid signature was found
67       fingerprint         - fingerprint of the key used for signing
68       primary_fingerprint - fingerprint of the primary key associated to the key used for signing
69     """
70     def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
71         """
72         @param data: string containing the message
73         @param keyrings: sequence of keyrings
74         @param require_signature: if True (the default), will raise an exception if no valid signature was found
75         @param gpg: location of the gpg binary
76         """
77         self.gpg = gpg
78         self.keyrings = keyrings
79
80         self.valid = False
81         self.expired = False
82         self.invalid = False
83         self.fingerprints = []
84         self.primary_fingerprints = []
85         self.signature_ids = []
86
87         self._verify(data, require_signature)
88
89     @property
90     def fingerprint(self):
91         assert len(self.fingerprints) == 1
92         return self.fingerprints[0]
93
94     @property
95     def primary_fingerprint(self):
96         assert len(self.primary_fingerprints) == 1
97         return self.primary_fingerprints[0]
98
99     @property
100     def signature_id(self):
101         assert len(self.signature_ids) == 1
102         return self.signature_ids[0]
103
104     def _verify(self, data, require_signature):
105         with _Pipe() as stdin:
106          with _Pipe() as contents:
107           with _Pipe() as status:
108            with _Pipe() as stderr:
109             pid = os.fork()
110             if pid == 0:
111                 self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
112             else:
113                 stdin.close_r()
114                 contents.close_w()
115                 stderr.close_w()
116                 status.close_w()
117
118                 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
119                 stdin.w = None # was closed by _do_io
120
121                 (pid_, exit_code, usage_) = os.wait4(pid, 0)
122
123                 self.contents = read[contents.r]
124                 self.status   = read[status.r]
125                 self.stderr   = read[stderr.r]
126
127                 if self.status == "":
128                     raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
129
130                 for line in self.status.splitlines():
131                     self._parse_status(line)
132
133                 if self.invalid:
134                     self.valid = False
135
136                 if require_signature and not self.valid:
137                     raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
138
139         assert len(self.fingerprints) == len(self.primary_fingerprints)
140         assert len(self.fingerprints) == len(self.signature_ids)
141
142     def _do_io(self, read, write):
143         for fd in write.keys():
144             old = fcntl.fcntl(fd, fcntl.F_GETFL)
145             fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
146
147         read_lines = dict( (fd, []) for fd in read )
148         write_pos = dict( (fd, 0) for fd in write )
149
150         read_set = list(read)
151         write_set = write.keys()
152         while len(read_set) > 0 or len(write_set) > 0:
153             r, w, x_ = select.select(read_set, write_set, ())
154             for fd in r:
155                 data = os.read(fd, 4096)
156                 if data == "":
157                     read_set.remove(fd)
158                 read_lines[fd].append(data)
159             for fd in w:
160                 data = write[fd][write_pos[fd]:]
161                 if data == "":
162                     os.close(fd)
163                     write_set.remove(fd)
164                 else:
165                     bytes_written = os.write(fd, data)
166                     write_pos[fd] += bytes_written
167
168         return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
169
170     def _parse_timestamp(self, timestamp, datestring=None):
171         """parse timestamp in GnuPG's format
172
173         @rtype:   L{datetime.datetime}
174         @returns: datetime object for the given timestamp
175         """
176         # The old implementation did only return the date. As we already
177         # used this for replay production, return the legacy value for
178         # old signatures.
179         if datestring is not None:
180             year, month, day = datestring.split('-')
181             date = datetime.date(int(year), int(month), int(day))
182             time = datetime.time(0, 0)
183             if date < datetime.date(2014, 8, 4):
184                 return datetime.datetime.combine(date, time)
185
186         if 'T' in timestamp:
187             raise Exception('No support for ISO 8601 timestamps.')
188         return datetime.datetime.utcfromtimestamp(long(timestamp))
189
190     def _parse_status(self, line):
191         fields = line.split()
192         if fields[0] != "[GNUPG:]":
193             raise GpgException("Unexpected output on status-fd: %s" % line)
194
195         # VALIDSIG    <fingerprint in hex> <sig_creation_date> <sig-timestamp>
196         #             <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
197         #             <hash-algo> <sig-class> <primary-key-fpr>
198         if fields[1] == "VALIDSIG":
199             # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
200             # which Debian 8 does not yet include.  We want to make sure
201             # to not accept uploads covered by a MD5-based signature.
202             if fields[9] == "1":
203                 raise GpgException("Digest algorithm MD5 is not trusted.")
204             self.valid = True
205             self.fingerprints.append(fields[2])
206             self.primary_fingerprints.append(fields[11])
207             self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
208
209         elif fields[1] == "BADARMOR":
210             raise GpgException("Bad armor.")
211
212         elif fields[1] == "NODATA":
213             raise GpgException("No data.")
214
215         elif fields[1] == "DECRYPTION_FAILED":
216             raise GpgException("Decryption failed.")
217
218         elif fields[1] == "ERROR":
219             raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
220
221         elif fields[1] == "SIG_ID":
222             self.signature_ids.append(fields[2])
223
224         elif fields[1] in ('PLAINTEXT', 'GOODSIG', 'NOTATION_NAME', 'NOTATION_DATA', 'SIGEXPIRED', 'KEYEXPIRED', 'POLICY_URL'):
225             pass
226
227         elif fields[1] in ('EXPSIG', 'EXPKEYSIG'):
228             self.expired = True
229             self.invalid = True
230
231         elif fields[1] in ('REVKEYSIG', 'BADSIG', 'ERRSIG', 'KEYREVOKED', 'NO_PUBKEY'):
232             self.invalid = True
233
234         else:
235             raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(fields[1]))
236
237     def _exec_gpg(self, stdin, stdout, stderr, statusfd):
238         try:
239             if stdin != 0:
240                 os.dup2(stdin, 0)
241             if stdout != 1:
242                 os.dup2(stdout, 1)
243             if stderr != 2:
244                 os.dup2(stderr, 2)
245             if statusfd != 3:
246                 os.dup2(statusfd, 3)
247             for fd in range(4):
248                 old = fcntl.fcntl(fd, fcntl.F_GETFD)
249                 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
250             os.closerange(4, _MAXFD)
251
252             args = [self.gpg,
253                     "--status-fd=3",
254                     "--no-default-keyring",
255                     "--batch",
256                     "--no-tty",
257                     "--trust-model", "always",
258                     "--fixed-list-mode"]
259             for k in self.keyrings:
260                 args.append("--keyring=%s" % k)
261             args.extend(["--decrypt", "-"])
262
263             os.execvp(self.gpg, args)
264         finally:
265             os._exit(1)
266
267     def contents_sha1(self):
268         return apt_pkg.sha1sum(self.contents)
269
270 # vim: set sw=4 et: