"""
return False
-class SignatureCheck(Check):
+class SignatureAndHashesCheck(Check):
"""Check signature of changes and dsc file (if included in upload)
Make sure the signature is valid and done by a known user.
changes = upload.changes
if not changes.valid_signature:
raise Reject("Signature for .changes not valid.")
- if changes.source is not None:
- if not changes.source.valid_signature:
+ self._check_hashes(upload, changes.filename, changes.files.itervalues())
+
+ source = None
+ try:
+ source = changes.source
+ except Exception as e:
+ raise Reject("Invalid dsc file: {0}".format(e))
+ if source is not None:
+ if not source.valid_signature:
raise Reject("Signature for .dsc not valid.")
- if changes.source.primary_fingerprint != changes.primary_fingerprint:
+ if source.primary_fingerprint != changes.primary_fingerprint:
raise Reject(".changes and .dsc not signed by the same key.")
+ self._check_hashes(upload, source.filename, source.files.itervalues())
+
if upload.fingerprint is None or upload.fingerprint.uid is None:
raise Reject(".changes signed by unknown key.")
+ """Make sure hashes match existing files
+
+ @type upload: L{daklib.archive.ArchiveUpload}
+ @param upload: upload we are processing
+
+ @type filename: str
+ @param filename: name of the file the expected hash values are taken from
+
+ @type files: sequence of L{daklib.upload.HashedFile}
+ @param files: files to check the hashes for
+ """
+ def _check_hashes(self, upload, filename, files):
+ try:
+ for f in files:
+ f.check(upload.directory)
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ raise Reject('{0} refers to non-existing file: {1}\n'
+ 'Perhaps you need to include it in your upload?'
+ .format(filename, os.path.basename(e.filename)))
+ raise
+ except InvalidHashException as e:
+ raise Reject('{0}: {1}'.format(filename, unicode(e)))
+
class ChangesCheck(Check):
"""Check changes file for syntax errors."""
def check(self, upload):
return True
-class HashesCheck(Check):
- """Check hashes in .changes and .dsc are valid."""
- def check(self, upload):
- what = None
- try:
- changes = upload.changes
- what = changes.filename
- for f in changes.files.itervalues():
- f.check(upload.directory)
- source = changes.source
- if source is not None:
- what = source.filename
- for f in source.files.itervalues():
- f.check(upload.directory)
- except IOError as e:
- if e.errno == errno.ENOENT:
- raise Reject('{0} refers to non-existing file: {1}\n'
- 'Perhaps you need to include it in your upload?'
- .format(what, os.path.basename(e.filename)))
- raise
- except InvalidHashException as e:
- raise Reject('{0}: {1}'.format(what, unicode(e)))
-
class ExternalHashesCheck(Check):
"""Checks hashes in .changes and .dsc against an external database."""
def check_single(self, session, f):
changespath = os.path.join(upload.directory, changes.filename)
try:
- if cnf.unprivgroup:
- cmd = "sudo -H -u {0} -- /usr/bin/lintian --show-overrides --tags-from-file {1} {2}".format(cnf.unprivgroup, temp_filename, changespath)
- else:
- cmd = "/usr/bin/lintian --show-overrides --tags-from-file {0} {1}".format(temp_filename, changespath)
- result, output = commands.getstatusoutput(cmd)
+ cmd = []
+
+ user = cnf.get('Dinstall::UnprivUser') or None
+ if user is not None:
+ cmd.extend(['sudo', '-H', '-u', user])
+
+ cmd.extend(['/usr/bin/lintian', '--show-overrides', '--tags-from-file', temp_filename, changespath])
+ result, output = commands.getstatusoutput(" ".join(cmd))
finally:
os.unlink(temp_filename)
for arch in upload.changes.architectures:
query = session.query(Architecture).filter_by(arch_string=arch).filter(Architecture.suites.contains(suite))
if query.first() is None:
- raise Reject('Architecture {0} is not allowed in suite {2}'.format(arch, suite.suite_name))
+ raise Reject('Architecture {0} is not allowed in suite {1}'.format(arch, suite.suite_name))
return True