]> git.decadent.org.uk Git - dak.git/blob - daklib/gpg.py
Merge remote branch 'ftpmaster/master'
[dak.git] / daklib / gpg.py
1 """Utilities for signed files
2
3 @contact: Debian FTP Master <ftpmaster@debian.org>
4 @copyright: 2011  Ansgar Burchardt <ansgar@debian.org>
5 @license: GNU General Public License version 2 or later
6 """
7
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
12
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
22 import errno
23 import fcntl
24 import os
25 import select
26
27 try:
28     _MAXFD = os.sysconf("SC_OPEN_MAX")
29 except:
30     _MAXFD = 256
31
32 class GpgException(Exception):
33     pass
34
35 class _Pipe(object):
36     """context manager for pipes
37
38     Note: When the pipe is closed by other means than the close_r and close_w
39     methods, you have to set self.r (self.w) to None.
40     """
41     def __enter__(self):
42         (self.r, self.w) = os.pipe()
43         return self
44     def __exit__(self, type, value, traceback):
45         self.close_w()
46         self.close_r()
47         return False
48     def close_r(self):
49         """close reading side of the pipe"""
50         if self.r:
51             os.close(self.r)
52             self.r = None
53     def close_w(self):
54         """close writing part of the pipe"""
55         if self.w:
56             os.close(self.w)
57             self.w = None
58
59 class SignedFile(object):
60     """handle files signed with PGP
61
62     The following attributes are available:
63       contents            - string with the content (after removing PGP armor)
64       valid               - Boolean indicating a valid signature was found
65       fingerprint         - fingerprint of the key used for signing
66       primary_fingerprint - fingerprint of the primary key associated to the key used for signing
67     """
68     def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
69         """
70         @param data: string containing the message
71         @param keyrings: seqeuence of keyrings
72         @param require_signature: if True (the default), will raise an exception if no valid signature was found
73         @param gpg: location of the gpg binary
74         """
75         self.gpg = gpg
76         self.keyrings = keyrings
77
78         self.valid = False
79         self.fingerprint = None
80         self.primary_fingerprint = None
81
82         self._verify(data, require_signature)
83
84     def _verify(self, data, require_signature):
85         with _Pipe() as stdin:
86          with _Pipe() as contents:
87           with _Pipe() as status:
88            with _Pipe() as stderr:
89             pid = os.fork()
90             if pid == 0:
91                 self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
92             else:
93                 stdin.close_r()
94                 contents.close_w()
95                 stderr.close_w()
96                 status.close_w()
97
98                 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
99                 stdin.w = None # was closed by _do_io
100
101                 (pid_, exit_code, usage_) = os.wait4(pid, 0)
102
103                 self.contents = read[contents.r]
104                 self.status   = read[status.r]
105                 self.stderr   = read[stderr.r]
106
107                 if self.status == "":
108                     raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
109
110                 for line in self.status.splitlines():
111                     self._parse_status(line)
112
113                 if require_signature and not self.valid:
114                     raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
115
116     def _do_io(self, read, write):
117         for fd in write.keys():
118             old = fcntl.fcntl(fd, fcntl.F_GETFL)
119             fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
120
121         read_lines = dict( (fd, []) for fd in read )
122         write_pos = dict( (fd, 0) for fd in write )
123
124         read_set = list(read)
125         write_set = write.keys()
126         while len(read_set) > 0 or len(write_set) > 0:
127             r, w, x_ = select.select(read_set, write_set, ())
128             for fd in r:
129                 data = os.read(fd, 4096)
130                 if data == "":
131                     read_set.remove(fd)
132                 read_lines[fd].append(data)
133             for fd in w:
134                 data = write[fd][write_pos[fd]:]
135                 if data == "":
136                     os.close(fd)
137                     write_set.remove(fd)
138                 else:
139                     bytes_written = os.write(fd, data)
140                     write_pos[fd] += bytes_written
141
142         return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
143
144     def _parse_status(self, line):
145         fields = line.split()
146         if fields[0] != "[GNUPG:]":
147             raise GpgException("Unexpected output on status-fd: %s" % line)
148
149         # VALIDSIG    <fingerprint in hex> <sig_creation_date> <sig-timestamp>
150         #             <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
151         #             <hash-algo> <sig-class> <primary-key-fpr>
152         if fields[1] == "VALIDSIG":
153             self.valid = True
154             self.fingerprint = fields[2]
155             self.primary_fingerprint = fields[11]
156
157         if fields[1] == "BADARMOR":
158             raise GpgException("Bad armor.")
159
160         if fields[1] == "NODATA":
161             raise GpgException("No data.")
162
163         if fields[1] == "DECRYPTION_FAILED":
164             raise GpgException("Decryption failed.")
165
166         if fields[1] == "ERROR":
167             raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
168
169     def _exec_gpg(self, stdin, stdout, stderr, statusfd):
170         try:
171             if stdin != 0:
172                 os.dup2(stdin, 0)
173             if stdout != 1:
174                 os.dup2(stdout, 1)
175             if stderr != 2:
176                 os.dup2(stderr, 2)
177             if statusfd != 3:
178                 os.dup2(statusfd, 3)
179             for fd in range(4):
180                 old = fcntl.fcntl(fd, fcntl.F_GETFD)
181                 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
182             os.closerange(4, _MAXFD)
183
184             args = [self.gpg, "--status-fd=3", "--no-default-keyring"]
185             for k in self.keyrings:
186                 args.append("--keyring=%s" % k)
187             args.extend(["--decrypt", "-"])
188
189             os.execvp(self.gpg, args)
190         finally:
191             os._exit(1)
192
193 # vim: set sw=4 et: