import utils
import commands
import shutil
+import textwrap
from types import *
+import yaml
+
from dak_exceptions import *
from changes import *
from regexes import *
(source, dest) = args[1:3]
if self.pkg.changes["distribution"].has_key(source):
for arch in self.pkg.changes["architecture"].keys():
- if arch not in [ arch_string for a in get_suite_architectures(source) ]:
+ if arch not in [ a.arch_string for a in get_suite_architectures(source) ]:
self.notes.append("Mapping %s to %s for unreleased architecture %s." % (source, dest, arch))
del self.pkg.changes["distribution"][source]
self.pkg.changes["distribution"][dest] = 1
if entry.has_key("byhand"):
return
+ # Check we have fields we need to do these checks
+ oktogo = True
+ for m in ['component', 'package', 'priority', 'size', 'md5sum']:
+ if not entry.has_key(m):
+ self.rejects.append("file '%s' does not have field %s set" % (f, m))
+ oktogo = False
+
+ if not oktogo:
+ return
+
# Handle component mappings
for m in cnf.ValueList("ComponentMappings"):
(source, dest) = m.split()
return
# Validate the component
- component = entry["component"]
- if not get_component(component, session):
+ if not get_component(entry["component"], session):
self.rejects.append("file '%s' has unknown component '%s'." % (f, component))
return
except:
self.rejects.append("%s: deb contents timestamp check failed [%s: %s]" % (filename, sys.exc_type, sys.exc_value))
+ ###########################################################################
+ def check_transition(self, session):
+ cnf = Config()
+
+ sourcepkg = self.pkg.changes["source"]
+
+ # No sourceful upload -> no need to do anything else, direct return
+ # We also work with unstable uploads, not experimental or those going to some
+ # proposed-updates queue
+ if "source" not in self.pkg.changes["architecture"] or \
+ "unstable" not in self.pkg.changes["distribution"]:
+ return
+
+ # Also only check if there is a file defined (and existant) with
+ # checks.
+ transpath = cnf.get("Dinstall::Reject::ReleaseTransitions", "")
+ if transpath == "" or not os.path.exists(transpath):
+ return
+
+ # Parse the yaml file
+ sourcefile = file(transpath, 'r')
+ sourcecontent = sourcefile.read()
+ try:
+ transitions = yaml.load(sourcecontent)
+ except yaml.YAMLError, msg:
+ # This shouldn't happen, there is a wrapper to edit the file which
+ # checks it, but we prefer to be safe than ending up rejecting
+ # everything.
+ utils.warn("Not checking transitions, the transitions file is broken: %s." % (msg))
+ return
+
+ # Now look through all defined transitions
+ for trans in transitions:
+ t = transitions[trans]
+ source = t["source"]
+ expected = t["new"]
+
+ # Will be None if nothing is in testing.
+ current = get_source_in_suite(source, "testing", session)
+ if current is not None:
+ compare = apt_pkg.VersionCompare(current.version, expected)
+
+ if current is None or compare < 0:
+ # This is still valid, the current version in testing is older than
+ # the new version we wait for, or there is none in testing yet
+
+ # Check if the source we look at is affected by this.
+ if sourcepkg in t['packages']:
+ # The source is affected, lets reject it.
+
+ rejectmsg = "%s: part of the %s transition.\n\n" % (
+ sourcepkg, trans)
+
+ if current is not None:
+ currentlymsg = "at version %s" % (current)
+ else:
+ currentlymsg = "not present in testing"
+
+ rejectmsg += "Transition description: %s\n\n" % (t["reason"])
+
+ rejectmsg += "\n".join(textwrap.wrap("""Your package
+ is part of a testing transition designed to get %s migrated (it is
+ currently %s, we need version %s). This transition is managed by the
+ Release Team, and %s is the Release-Team member responsible for it.
+ Please mail debian-release@lists.debian.org or contact %s directly if you
+ need further assistance. You might want to upload to experimental until this
+ transition is done."""
+ % (source, currentlymsg, expected,t["rm"], t["rm"])))
+
+ self.rejects.append(rejectmsg)
+ return
+
###########################################################################
def check_signed_by_key(self):
"""Ensure the .changes is signed by an authorized uploader."""
session = DBConn().session()
+ self.check_transition(session)
+
(uid, uid_name, is_dm) = lookup_uid_from_fingerprint(self.pkg.changes["fingerprint"], session=session)
# match claimed name with actual name: