from daklib import utils
from daklib.dbconn import DBConn, get_component_by_package_suite
+from daklib.gpg import SignedFile
from daklib.regexes import html_escaping, re_html_escaping, re_version, re_spacestrip, \
re_contrib, re_nonfree, re_localhost, re_newlinespace, \
re_package, re_doc_directory
# Read a file, strip the signature and return the modified contents as
# a string.
def strip_pgp_signature (filename):
- inputfile = utils.open_file (filename)
- contents = ""
- inside_signature = 0
- skip_next = 0
- for line in inputfile.readlines():
- if line[:-1] == "":
- continue
- if inside_signature:
- continue
- if skip_next:
- skip_next = 0
- continue
- if line.startswith("-----BEGIN PGP SIGNED MESSAGE"):
- skip_next = 1
- continue
- if line.startswith("-----BEGIN PGP SIGNATURE"):
- inside_signature = 1
- continue
- if line.startswith("-----END PGP SIGNATURE"):
- inside_signature = 0
- continue
- contents += line
- inputfile.close()
- return contents
+ with utils.open_file(filename) as f:
+ data = f.read()
+ signedfile = SignedFile(data, keyrings=(), require_signature=False)
+ return signedfile.contents
def display_changes(suite, changes_filename):
global printed
--- /dev/null
+"""Utilities for signed files
+
+@contact: Debian FTP Master <ftpmaster@debian.org>
+@copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
+@license: GNU General Public License version 2 or later
+"""
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import errno
+import fcntl
+import os
+import select
+import sys
+
+try:
+ _MAXFD = os.sysconf("SC_OPEN_MAX")
+except:
+ _MAXFD = 256
+
+class GpgException(Exception):
+ pass
+
+class _Pipe(object):
+ """context manager for pipes
+
+ Note: When the pipe is closed by other means than the close_r and close_w
+ methods, you have to set self.r (self.w) to None.
+ """
+ def __enter__(self):
+ (self.r, self.w) = os.pipe()
+ return self
+ def __exit__(self, type, value, traceback):
+ self.close_w()
+ self.close_r()
+ return False
+ def close_r(self):
+ """close reading side of the pipe"""
+ if self.r:
+ os.close(self.r)
+ self.r = None
+ def close_w(self):
+ """close writing part of the pipe"""
+ if self.w:
+ os.close(self.w)
+ self.w = None
+
+class SignedFile(object):
+ """handle files signed with PGP
+
+ The following attributes are available:
+ contents - string with the content (after removing PGP armor)
+ valid - Boolean indicating a valid signature was found
+ fingerprint - fingerprint of the key used for signing
+ primary_fingerprint - fingerprint of the primary key associated to the key used for signing
+ """
+ def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
+ """
+ @param data: string containing the message
+ @param keyrings: seqeuence of keyrings
+ @param require_signature: if True (the default), will raise an exception if no valid signature was found
+ @param gpg: location of the gpg binary
+ """
+ self.gpg = gpg
+ self.keyrings = keyrings
+
+ self.valid = False
+ self.fingerprint = None
+ self.primary_fingerprint = None
+
+ self._verify(data, require_signature)
+
+ def _verify(self, data, require_signature):
+ with _Pipe() as stdin:
+ with _Pipe() as contents:
+ with _Pipe() as status:
+ with _Pipe() as stderr:
+ pid = os.fork()
+ if pid == 0:
+ self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
+ else:
+ stdin.close_r()
+ contents.close_w()
+ stderr.close_w()
+ status.close_w()
+
+ read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
+ stdin.w = None # was closed by _do_io
+
+ (pid_, exit_code, usage_) = os.wait4(pid, 0)
+
+ self.contents = read[contents.r]
+ self.status = read[status.r]
+ self.stderr = read[stderr.r]
+
+ if self.status == "":
+ raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
+
+ for line in self.status.splitlines():
+ self._parse_status(line)
+
+ if require_signature and not self.valid:
+ raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
+
+ def _do_io(self, read, write):
+ read_lines = dict( (fd, []) for fd in read )
+ write_pos = dict( (fd, 0) for fd in write )
+
+ read_set = list(read)
+ write_set = write.keys()
+ while len(read_set) > 0 or len(write_set) > 0:
+ r, w, x_ = select.select(read_set, write_set, ())
+ for fd in r:
+ data = os.read(fd, 4096)
+ if data == "":
+ read_set.remove(fd)
+ read_lines[fd].append(data)
+ for fd in w:
+ data = write[fd][write_pos[fd]:]
+ if data == "":
+ os.close(fd)
+ write_set.remove(fd)
+ else:
+ bytes_written = os.write(fd, data)
+ write_pos[fd] += bytes_written
+
+ return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() )
+
+ def _parse_status(self, line):
+ fields = line.split()
+ if fields[0] != "[GNUPG:]":
+ raise GpgException("Unexpected output on status-fd: %s" % line)
+
+ # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
+ # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
+ # <hash-algo> <sig-class> <primary-key-fpr>
+ if fields[1] == "VALIDSIG":
+ self.valid = True
+ self.fingerprint = fields[2]
+ self.primary_fingerprint = fields[11]
+
+ if fields[1] == "BADARMOR":
+ raise GpgException("Bad armor.")
+
+ if fields[1] == "NODATA":
+ raise GpgException("No data.")
+
+ if fields[1] == "DECRYPTION_FAILED":
+ raise GpgException("Decryption failed.")
+
+ if fields[1] == "ERROR":
+ raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
+
+ def _exec_gpg(self, stdin, stdout, stderr, statusfd):
+ try:
+ if stdin != 0:
+ os.dup2(stdin, 0)
+ if stdout != 1:
+ os.dup2(stdout, 1)
+ if stderr != 2:
+ os.dup2(stderr, 2)
+ if statusfd != 3:
+ os.dup2(statusfd, 3)
+ for fd in range(4):
+ old = fcntl.fcntl(fd, fcntl.F_GETFD)
+ fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
+ os.closerange(4, _MAXFD)
+
+ args = [self.gpg, "--status-fd=3", "--no-default-keyring"]
+ for k in self.keyrings:
+ args.append("--keyring=%s" % k)
+ args.extend(["--decrypt", "-"])
+
+ os.execvp(self.gpg, args)
+ finally:
+ sys.exit(1)
+
+# vim: set sw=4 et:
from dbconn import DBConn, get_architecture, get_component, get_suite, get_override_type, Keyring, session_wrapper
from dak_exceptions import *
+from gpg import SignedFile
from textutils import fix_maintainer
from regexes import re_html_escaping, html_escaping, re_single_line_field, \
re_multi_line_field, re_srchasver, re_taint_free, \
################################################################################
-def parse_deb822(contents, signing_rules=0):
+def parse_deb822(armored_contents, signing_rules=0, keyrings=None):
+ if keyrings == None:
+ keyrings = [ k.keyring_name for k in DBConn().session().query(Keyring).filter(Keyring.active == True).all() ]
+ require_signature = True
+ if signing_rules == -1:
+ require_signature = False
+
+ signed_file = SignedFile(armored_contents, keyrings=keyrings, require_signature=require_signature)
+ contents = signed_file.contents
+
error = ""
changes = {}
index += 1
indexed_lines[index] = line[:-1]
- inside_signature = 0
-
num_of_lines = len(indexed_lines.keys())
index = 0
first = -1
while index < num_of_lines:
index += 1
line = indexed_lines[index]
- if line == "":
- if signing_rules == 1:
- index += 1
- if index > num_of_lines:
- raise InvalidDscError, index
- line = indexed_lines[index]
- if not line.startswith("-----BEGIN PGP SIGNATURE"):
- raise InvalidDscError, index
- inside_signature = 0
- break
- else:
- continue
- if line.startswith("-----BEGIN PGP SIGNATURE"):
+ if line == "" and signing_rules == 1:
+ if index != num_of_lines:
+ raise InvalidDscError, index
break
- if line.startswith("-----BEGIN PGP SIGNED MESSAGE"):
- inside_signature = 1
- if signing_rules == 1:
- while index < num_of_lines and line != "":
- index += 1
- line = indexed_lines[index]
- continue
- # If we're not inside the signed data, don't process anything
- if signing_rules >= 0 and not inside_signature:
- continue
slf = re_single_line_field.match(line)
if slf:
field = slf.groups()[0].lower()
continue
error += line
- if signing_rules == 1 and inside_signature:
- raise InvalidDscError, index
-
- changes["filecontents"] = "".join(lines)
+ changes["filecontents"] = armored_contents
if changes.has_key("source"):
# Strip the source version in brackets from the source field,
################################################################################
-def parse_changes(filename, signing_rules=0, dsc_file=0):
+def parse_changes(filename, signing_rules=0, dsc_file=0, keyrings=None):
"""
Parses a changes file and returns a dictionary where each field is a
key. The mandatory first argument is the filename of the .changes
unicode(content, 'utf-8')
except UnicodeError:
raise ChangesUnicodeError, "Changes file not proper utf-8"
- changes = parse_deb822(content, signing_rules)
+ changes = parse_deb822(content, signing_rules, keyrings=keyrings)
if not dsc_file:
--- /dev/null
+-----BEGIN PGP SIGNED MESSAGE OR SO-----
+Hash: SHA512
+
+This: is a bug.
+-----BEGIN PGP SIGNED MESSAGE-----
+Hash: SHA512
+
+Question: Is this a bug?
+-----BEGIN PGP SIGNATURE-----
+Version: GnuPG v1.4.11 (GNU/Linux)
+
+iQIcBAEBCgAGBQJN1s1eAAoJEIATJTTdNH3Is4IP/3ppCve+jzobPjacyqYGyAec
+Op2rnYkQulfln1tyaxr8A40MHSWUly1kFebPgO3XNgAQ8mIh7FCeL7tSsaDnrBwq
+v/S/6JK1ZGCSuL6dleoqxoBgViJWQEvd297zAe0CzIdJ+JYgTPxX5cHh4E23rWmG
+zG9ct3v+5J4mSeEGksZPn8/YalnWRwb72hj/0WTagA2SY89TVZ9onT6p8ftWf6aO
+ODXDtclP56GixfnA3jR3reKI5/aLHXSLSYWGDOyEXffr0NoFvgtbsO4Y0FF2+Np3
+MpmJitoIRuJWk3zInYt0GeJskhEbvuF5Fnhiqrg43W5tFxB8pz5QHpDa/oq8Gfea
+MU/2p6FHA12nwD7CVdKWv/ra3nAWcJPqqfV//xgnZaBdS7d4G+3+tMFFYk8sWqc1
+JphkXJ9M8eX67oEuKgwhwHGV/wGu96nkTergnvlqpxk6uesfnsy0ixXX0UgLzwEZ
+ty1sZcCgq8dhdnEatkvRy2M13pS8S9iONmrowAck15YZuHcudBmvh5PFeNbpldmM
+ABLFApnjtD3DljzrjBgnHQS5UHDzDhDiEEAiQrUM3nu/CNi6UPoxasGszJK8W0iV
+MQmYVybk2L2lVV3b1qXURMyaFRcmVnLBNad/IiCbQiWTUCwg8zxzJoty1+f7+EDa
+rPpj3R0qGxz01UsVtS3W
+=/dES
+-----END PGP SIGNATURE-----
import unittest
+from daklib.gpg import GpgException
from daklib.utils import parse_changes
from daklib.dak_exceptions import InvalidDscError, ParseChangesError
class ParseChangesTestCase(DakTestCase):
def assertParse(self, filename, *args):
- return parse_changes(fixture(filename), *args)
+ return parse_changes(fixture(filename), *args, keyrings=())
def assertFails(self, filename, line=None, *args):
try:
self.fail('%s was not recognised as invalid' % filename)
except ParseChangesError:
pass
+ except GpgException:
+ pass
except InvalidDscError, actual_line:
if line is not None:
assertEqual(actual_line, line)
class ParseDscTestCase(ParseChangesTestCase):
def test_1(self):
- self.assertParse('dsc/1.dsc', 0, 1)
+ self.assertParse('dsc/1.dsc', -1, 1)
def test_1_ignoreErrors(self):
# Valid .dsc ; ignoring errors
- self.assertParse('dsc/1.dsc', 0, 1)
+ self.assertParse('dsc/1.dsc', -1, 1)
def test_2(self):
# Missing blank line before signature body
- self.assertParse('dsc/2.dsc', 0, 1)
+ self.assertParse('dsc/2.dsc', -1, 1)
def test_2_ignoreErrors(self):
# Invalid .dsc ; ignoring errors
- self.assertParse('dsc/2.dsc', 0, 1)
+ self.assertParse('dsc/2.dsc', -1, 1)
def test_3(self):
# Missing blank line after signature header
- self.assertParse('dsc/3.dsc', 0, 1)
+ self.assertParse('dsc/3.dsc', -1, 1)
def test_4(self):
# No blank lines at all
- self.assertParse('dsc/4.dsc', 0, 1)
+ self.assertFails('dsc/4.dsc', -1, 1)
def test_5(self):
# Extra blank line before signature body
- self.assertParse('dsc/5.dsc', 0, 1)
+ self.assertParse('dsc/5.dsc', -1, 1)
def test_6(self):
# Extra blank line after signature header
- self.assertParse('dsc/6.dsc', 0, 1)
+ self.assertParse('dsc/6.dsc', -1, 1)
class ParseChangesTestCase(ParseChangesTestCase):
def test_1(self):
# Empty changes
- self.assertFails('changes/1.changes', line=5)
+ self.assertFails('changes/1.changes', 5, -1)
def test_2(self):
- changes = self.assertParse('changes/2.changes', 0)
+ changes = self.assertParse('changes/2.changes', -1)
binaries = changes['binary']
def test_3(self):
for filename in ('valid', 'bogus-pre', 'bogus-post'):
- for strict_whitespace in (0, 1):
+ for strict_whitespace in (-1,):
changes = self.assertParse(
'changes/%s.changes' % filename,
strict_whitespace,
)
self.failIf(changes.get('you'))
+ def test_4(self):
+ changes = self.assertParse('changes/two-beginnings.changes', -1, 1)
+ self.assert_(changes['question'] == 'Is this a bug?')
+ self.failIf(changes.get('this'))
+
if __name__ == '__main__':
unittest.main()