"""module provided pre-acceptance tests
-Please read the documentation for the `Check` class for the interface.
+Please read the documentation for the L{Check} class for the interface.
"""
from daklib.config import Config
-from .dbconn import *
+from daklib.dbconn import *
import daklib.dbconn as dbconn
-from .regexes import *
-from .textutils import fix_maintainer, ParseMaintError
+from daklib.regexes import *
+from daklib.textutils import fix_maintainer, ParseMaintError
import daklib.lintian as lintian
import daklib.utils as utils
+import apt_inst
import apt_pkg
from apt_pkg import version_compare
import os
+import time
import yaml
# TODO: replace by subprocess
class Check(object):
"""base class for checks
- checks are called by daklib.archive.ArchiveUpload. Failing tests should
- raise a `daklib.checks.Reject` exception including a human-readable
+ checks are called by L{daklib.archive.ArchiveUpload}. Failing tests should
+ raise a L{daklib.checks.Reject} exception including a human-readable
description why the upload should be rejected.
"""
def check(self, upload):
"""do checks
- Args:
- upload (daklib.archive.ArchiveUpload): upload to check
+ @type upload: L{daklib.archive.ArchiveUpload}
+ @param upload: upload to check
- Raises:
- daklib.checks.Reject
+ @raise daklib.checks.Reject: upload should be rejected
"""
raise NotImplemented
def per_suite_check(self, upload, suite):
"""do per-suite checks
- Args:
- upload (daklib.archive.ArchiveUpload): upload to check
- suite (daklib.dbconn.Suite): suite to check
+ @type upload: L{daklib.archive.ArchiveUpload}
+ @param upload: upload to check
- Raises:
- daklib.checks.Reject
+ @type suite: L{daklib.dbconn.Suite}
+ @param suite: suite to check
+
+ @raise daklib.checks.Reject: upload should be rejected
"""
raise NotImplemented
@property
def forcable(self):
"""allow to force ignore failing test
- True if it is acceptable to force ignoring a failing test,
- False otherwise
+ C{True} if it is acceptable to force ignoring a failing test,
+ C{False} otherwise
"""
return False
except:
raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
+class BinaryTimestampCheck(Check):
+ """check timestamps of files in binary packages
+
+ Files in the near future cause ugly warnings and extreme time travel
+ can cause errors on extraction.
+ """
+ def check(self, upload):
+ cnf = Config()
+ future_cutoff = time.time() + cnf.find_i('Dinstall::FutureTimeTravelGrace', 24*3600)
+ past_cutoff = time.mktime(time.strptime(cnf.find('Dinstall::PastCutoffYear', '1984'), '%Y'))
+
+ class TarTime(object):
+ def __init__(self):
+ self.future_files = dict()
+ self.past_files = dict()
+ def callback(self, member, data):
+ if member.mtime > future_cutoff:
+ future_files[member.name] = member.mtime
+ elif member.mtime < past_cutoff:
+ past_files[member.name] = member.mtime
+
+ def format_reason(filename, direction, files):
+ reason = "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format(filename, len(files), direction)
+ for fn, ts in files.iteritems():
+ reason += " {0} ({1})".format(fn, time.ctime(ts))
+ return reason
+
+ for binary in upload.changes.binaries:
+ filename = binary.hashed_file.filename
+ path = os.path.join(upload.directory, filename)
+ deb = apt_inst.DebFile(path)
+ tar = TarTime()
+ deb.control.go(tar.callback)
+ if tar.future_files:
+ raise Reject(format_reason(filename, 'future', tar.future_files))
+ if tar.past_files:
+ raise Reject(format_reason(filename, 'past', tar.past_files))
+
class SourceCheck(Check):
"""Check source package for syntax errors."""
def check_filename(self, control, filename, regex):
except Exception as e:
raise Reject('{0}: APT could not parse {1} field: {2}'.format(dsc_fn, field, e))
- # TODO: check all expected files for given source format are included
+ rejects = utils.check_dsc_files(dsc_fn, control, source.files.keys())
+ if len(rejects) > 0:
+ raise Reject("\n".join(rejects))
+
+ return True
class SingleDistributionCheck(Check):
"""Check that the .changes targets only a single distribution."""
return True
+class UploadBlockCheck(Check):
+ """check for upload blocks"""
+ def check(self, upload):
+ session = upload.session
+ control = upload.changes.changes
+
+ source = re_field_source.match(control['Source']).group('package')
+ version = control['Version']
+ blocks = session.query(UploadBlock).filter_by(source=source) \
+ .filter((UploadBlock.version == version) | (UploadBlock.version == None))
+
+ for block in blocks:
+ if block.fingerprint == upload.fingerprint:
+ raise Reject('Manual upload block in place for package {0} and fingerprint {1}:\n{2}'.format(source, upload.fingerprint.fingerprint, block.reason))
+ if block.uid == upload.fingerprint.uid:
+ raise Reject('Manual upload block in place for package {0} and uid {1}:\n{2}'.format(source, block.uid.uid, block.reason))
+
+ return True
+
+class TransitionCheck(Check):
+ """check for a transition"""
+ def check(self, upload):
+ if 'source' not in upload.changes.architectures:
+ return True
+
+ transitions = self.get_transitions()
+ if transitions is None:
+ return True
+
+ control = upload.changes.changes
+ source = re_field_source.match(control['Source']).group('package')
+
+ for trans in transitions:
+ t = transitions[trans]
+ source = t["source"]
+ expected = t["new"]
+
+ # Will be None if nothing is in testing.
+ current = get_source_in_suite(source, "testing", session)
+ if current is not None:
+ compare = apt_pkg.version_compare(current.version, expected)
+
+ if current is None or compare < 0:
+ # This is still valid, the current version in testing is older than
+ # the new version we wait for, or there is none in testing yet
+
+ # Check if the source we look at is affected by this.
+ if source in t['packages']:
+ # The source is affected, lets reject it.
+
+ rejectmsg = "{0}: part of the {1} transition.\n\n".format(source, trans)
+
+ if current is not None:
+ currentlymsg = "at version {0}".format(current.version)
+ else:
+ currentlymsg = "not present in testing"
+
+ rejectmsg += "Transition description: {0}\n\n".format(t["reason"])
+
+ rejectmsg += "\n".join(textwrap.wrap("""Your package
+is part of a testing transition designed to get {0} migrated (it is
+currently {1}, we need version {2}). This transition is managed by the
+Release Team, and {3} is the Release-Team member responsible for it.
+Please mail debian-release@lists.debian.org or contact {3} directly if you
+need further assistance. You might want to upload to experimental until this
+transition is done.""".format(source, currentlymsg, expected,t["rm"])))
+
+ raise Reject(rejectmsg)
+
+ return True
+
+ def get_transitions(self):
+ cnf = Config()
+ path = cnf.get('Dinstall::ReleaseTransitions', '')
+ if path == '' or not os.path.exists(path):
+ return None
+
+ contents = file(path, 'r').read()
+ try:
+ transitions = yaml.load(contents)
+ return transitions
+ except yaml.YAMLError as msg:
+ utils.warn('Not checking transitions, the transitions file is broken: {0}'.format(msg))
+
+ return None
+
class NoSourceOnlyCheck(Check):
"""Check for source-only upload