1 """Utilities for signed files
3 @contact: Debian FTP Master <ftpmaster@debian.org>
4 @copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
5 @license: GNU General Public License version 2 or later
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 _MAXFD = os.sysconf("SC_OPEN_MAX")
33 class GpgException(Exception):
37 """context manager for pipes
39 Note: When the pipe is closed by other means than the close_r and close_w
40 methods, you have to set self.r (self.w) to None.
43 (self.r, self.w) = os.pipe()
45 def __exit__(self, type, value, traceback):
50 """close reading side of the pipe"""
55 """close writing part of the pipe"""
60 class SignedFile(object):
61 """handle files signed with PGP
63 The following attributes are available:
64 contents - string with the content (after removing PGP armor)
65 valid - Boolean indicating a valid signature was found
66 fingerprint - fingerprint of the key used for signing
67 primary_fingerprint - fingerprint of the primary key associated to the key used for signing
69 def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
71 @param data: string containing the message
72 @param keyrings: seqeuence of keyrings
73 @param require_signature: if True (the default), will raise an exception if no valid signature was found
74 @param gpg: location of the gpg binary
77 self.keyrings = keyrings
80 self.fingerprint = None
81 self.primary_fingerprint = None
83 self._verify(data, require_signature)
85 def _verify(self, data, require_signature):
86 with _Pipe() as stdin:
87 with _Pipe() as contents:
88 with _Pipe() as status:
91 self._exec_gpg(stdin.r, contents.w, sys.stderr.fileno(), status.w)
97 read = self._do_io([contents.r, status.r], {stdin.w: data})
98 stdin.w = None # was closed by _do_io
100 (pid_, exit_code, usage_) = os.wait4(pid, 0)
102 self.contents = read[contents.r]
103 self.status = read[status.r]
105 if self.status == "":
106 raise GpgException("No status output from GPG. (GPG exited with status code %s)" % exit_code)
108 for line in self.status.splitlines():
109 self._parse_status(line)
111 if require_signature and not self.valid:
112 raise GpgException("No valid signature found. (GPG exited with status code %s)" % exit_code)
114 def _do_io(self, read, write):
115 read_lines = dict( (fd, []) for fd in read )
116 write_pos = dict( (fd, 0) for fd in write )
118 read_set = list(read)
119 write_set = write.keys()
120 while len(read_set) > 0 or len(write_set) > 0:
121 r, w, x_ = select.select(read_set, write_set, ())
123 data = os.read(fd, 4096)
126 read_lines[fd].append(data)
128 data = write[fd][write_pos[fd]:]
133 bytes_written = os.write(fd, data)
134 write_pos[fd] += bytes_written
136 return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
138 def _parse_status(self, line):
139 fields = line.split()
140 if fields[0] != "[GNUPG:]":
141 raise GpgException("Unexpected output on status-fd: %s" % line)
143 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
144 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
145 # <hash-algo> <sig-class> <primary-key-fpr>
146 if fields[1] == "VALIDSIG":
148 self.fingerprint = fields[2]
149 self.primary_fingerprint = fields[11]
151 if fields[1] == "BADARMOR":
152 raise GpgException("Bad armor.")
154 if fields[1] == "NODATA":
155 raise GpgException("No data.")
157 if fields[1] == "DECRYPTION_FAILED":
158 raise GpgException("Decryption failed.")
160 if fields[1] == "ERROR":
161 raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
163 def _exec_gpg(self, stdin, stdout, stderr, statusfd):
174 old = fcntl.fcntl(fd, fcntl.F_GETFD)
175 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
176 os.closerange(4, _MAXFD)
178 args = [self.gpg, "--status-fd=3", "--no-default-keyring"]
179 for k in self.keyrings:
180 args.append("--keyring=%s" % k)
181 args.extend(["--decrypt", "-"])
183 os.execvp(self.gpg, args)