from daklib.textutils import fix_maintainer, ParseMaintError
import daklib.lintian as lintian
import daklib.utils as utils
+from daklib.upload import InvalidHashException
import apt_inst
import apt_pkg
from apt_pkg import version_compare
+import errno
import os
import time
import yaml
"""exception raised by failing checks"""
pass
+class RejectStupidMaintainerException(Exception):
+ """exception raised by failing the external hashes check"""
+
+ def __str__(self):
+ return "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" % self.args[:4]
+
+class RejectACL(Reject):
+ """exception raise by failing ACL checks"""
+ def __init__(self, acl, reason):
+ self.acl = acl
+ self.reason = reason
+
+ def __str__(self):
+ return "ACL {0}: {1}".format(self.acl.name, self.reason)
+
class Check(object):
"""base class for checks
class HashesCheck(Check):
"""Check hashes in .changes and .dsc are valid."""
def check(self, upload):
+ what = None
+ try:
+ changes = upload.changes
+ what = changes.filename
+ for f in changes.files.itervalues():
+ f.check(upload.directory)
+ source = changes.source
+ if source is not None:
+ what = source.filename
+ for f in source.files.itervalues():
+ f.check(upload.directory)
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ raise Reject('{0} refers to non-existing file: {1}\n'
+ 'Perhaps you need to include it in your upload?'
+ .format(what, os.path.basename(e.filename)))
+ raise
+ except InvalidHashException as e:
+ raise Reject('{0}: {1}'.format(what, unicode(e)))
+
+class ExternalHashesCheck(Check):
+ """Checks hashes in .changes and .dsc against an external database."""
+ def check_single(self, session, f):
+ q = session.execute("SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE '%%/%s'" % f.filename)
+ (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or (None, None, None, None)
+
+ if not ext_size:
+ return
+
+ if ext_size != f.size:
+ raise RejectStupidMaintainerException(f.filename, 'size', f.size, ext_size)
+
+ if ext_md5sum != f.md5sum:
+ raise RejectStupidMaintainerException(f.filename, 'md5sum', f.md5sum, ext_md5sum)
+
+ if ext_sha1sum != f.sha1sum:
+ raise RejectStupidMaintainerException(f.filename, 'sha1sum', f.sha1sum, ext_sha1sum)
+
+ if ext_sha256sum != f.sha256sum:
+ raise RejectStupidMaintainerException(f.filename, 'sha256sum', f.sha256sum, ext_sha256sum)
+
+ def check(self, upload):
+ cnf = Config()
+
+ if not cnf.use_extfiles:
+ return
+
+ session = upload.session
changes = upload.changes
+
for f in changes.files.itervalues():
- f.check(upload.directory)
+ self.check_single(session, f)
source = changes.source
if source is not None:
for f in source.files.itervalues():
- f.check(upload.directory)
+ self.check_single(session, f)
class BinaryCheck(Check):
"""Check binary packages for syntax errors."""
# XXX: Drop DMUA part here and switch to new implementation.
# XXX: Send warning mail once users can set the new DMUA flag
dmua_status, dmua_reason = self._check_dmua(upload)
- if not dmua_status:
- return False, dmua_reason
+ if acl_per_source is None:
+ if not dmua_status:
+ return False, dmua_reason
+ else:
+ upload.warn('DM flag not set, but accepted as DMUA was set.')
#if acl_per_source is None:
# return False, "not allowed to upload source package '{0}'".format(source_name)
if acl.deny_per_source and acl_per_source is not None:
raise Reject('No ACL for fingerprint {0}'.format(fingerprint.fingerprint))
result, reason = self._check_acl(session, upload, acl)
if not result:
- raise Reject(reason)
+ raise RejectACL(acl, reason)
for acl in session.query(ACL).filter_by(is_global=True):
result, reason = self._check_acl(session, upload, acl)
if result == False:
- raise Reject(reason)
+ raise RejectACL(acl, reason)
return True