+++ /dev/null
-import sys, os, textwrap
-
-import apt_pkg
-import daklib.utils, daklib.database
-import yaml
-
-import daklib.extensions
-from daklib.extensions import replace_dak_function
-
-def check_transition():
- changes = dak_module.changes
- reject = dak_module.reject
- Cnf = dak_module.Cnf
-
- sourcepkg = changes["source"]
-
- # No sourceful upload -> no need to do anything else, direct return
- # We also work with unstable uploads, not experimental or those going to some
- # proposed-updates queue
- if "source" not in changes["architecture"] or "unstable" not in changes["distribution"]:
- return
-
- # Also only check if there is a file defined (and existant) with
- # checks.
- transpath = Cnf.get("Dinstall::Reject::ReleaseTransitions", "")
- if transpath == "" or not os.path.exists(transpath):
- return
-
- # Parse the yaml file
- sourcefile = file(transpath, 'r')
- sourcecontent = sourcefile.read()
- try:
- transitions = yaml.load(sourcecontent)
- except yaml.YAMLError, msg:
- # This shouldn't happen, there is a wrapper to edit the file which
- # checks it, but we prefer to be safe than ending up rejecting
- # everything.
- daklib.utils.warn("Not checking transitions, the transitions file is broken: %s." % (msg))
- return
-
- # Now look through all defined transitions
- for trans in transitions:
- t = transitions[trans]
- source = t["source"]
- expected = t["new"]
-
- # Will be None if nothing is in testing.
- current = daklib.database.get_suite_version(source, "testing")
- if current is not None:
- compare = apt_pkg.VersionCompare(current, expected)
-
- if current is None or compare < 0:
- # This is still valid, the current version in testing is older than
- # the new version we wait for, or there is none in testing yet
-
- # Check if the source we look at is affected by this.
- if sourcepkg in t['packages']:
- # The source is affected, lets reject it.
-
- rejectmsg = "%s: part of the %s transition.\n\n" % (
- sourcepkg, trans)
-
- if current is not None:
- currentlymsg = "at version %s" % (current)
- else:
- currentlymsg = "not present in testing"
-
- rejectmsg += "Transition description: %s\n\n" % (t["reason"])
-
- rejectmsg += "\n".join(textwrap.wrap("""Your package
-is part of a testing transition designed to get %s migrated (it is
-currently %s, we need version %s). This transition is managed by the
-Release Team, and %s is the Release-Team member responsible for it.
-Please mail debian-release@lists.debian.org or contact %s directly if you
-need further assistance. You might want to upload to experimental until this
-transition is done."""
- % (source, currentlymsg, expected,t["rm"], t["rm"])))
-
- reject(rejectmsg + "\n")
- return
-
-@replace_dak_function("process-unchecked", "check_signed_by_key")
-def check_signed_by_key(oldfn):
- changes = dak_module.changes
- reject = dak_module.reject
-
- if changes["source"] == "dpkg":
- fpr = changes["fingerprint"]
- (uid, uid_name, is_dm) = dak_module.lookup_uid_from_fingerprint(fpr)
- if fpr == "5906F687BD03ACAD0D8E602EFCF37657" or uid == "iwj":
- reject("Upload blocked due to hijack attempt 2008/03/19")
-
- # NB: 1.15.0, 1.15.2 signed by this key targetted at unstable
- # have been made available in the wild, and should remain
- # blocked until Debian's dpkg has revved past those version
- # numbers
-
- oldfn()
-
- check_transition()
import sys
import imp
import daklib.utils
-import daklib.extensions
-
-################################################################################
-
-class UserExtension:
- def __init__(self, user_extension = None):
- if user_extension:
- m = imp.load_source("dak_userext", user_extension)
- d = m.__dict__
- else:
- m, d = None, {}
- self.__dict__["_module"] = m
- self.__dict__["_d"] = d
-
- def __getattr__(self, a):
- if a in self.__dict__: return self.__dict__[a]
- if a[0] == "_": raise AttributeError, a
- return self._d.get(a, None)
-
- def __setattr__(self, a, v):
- self._d[a] = v
-
-################################################################################
-
-class UserExtension:
- def __init__(self, user_extension = None):
- if user_extension:
- m = imp.load_source("dak_userext", user_extension)
- d = m.__dict__
- else:
- m, d = None, {}
- self.__dict__["_module"] = m
- self.__dict__["_d"] = d
-
- def __getattr__(self, a):
- if a in self.__dict__: return self.__dict__[a]
- if a[0] == "_": raise AttributeError, a
- return self._d.get(a, None)
-
- def __setattr__(self, a, v):
- self._d[a] = v
################################################################################
def main():
"""Launch dak functionality."""
- Cnf = daklib.utils.get_conf()
-
- if Cnf.has_key("Dinstall::UserExtensions"):
- userext = UserExtension(Cnf["Dinstall::UserExtensions"])
- else:
- userext = UserExtension()
-
functionality = init()
modules = [ command for (command, _) in functionality ]
# Invoke the module
module = __import__(cmdname.replace("-","_"))
- module.dak_userext = userext
- userext.dak_module = module
-
- daklib.extensions.init(cmdname, module, userext)
- if userext.init is not None: userext.init(cmdname)
-
module.main()
################################################################################
import utils
import commands
import shutil
+import textwrap
from types import *
+import yaml
+
from dak_exceptions import *
from changes import *
from regexes import *
except:
self.rejects.append("%s: deb contents timestamp check failed [%s: %s]" % (filename, sys.exc_type, sys.exc_value))
+ ###########################################################################
+ def check_transition(self, session):
+ cnf = Config()
+
+ sourcepkg = self.pkg.changes["source"]
+
+ # No sourceful upload -> no need to do anything else, direct return
+ # We also work with unstable uploads, not experimental or those going to some
+ # proposed-updates queue
+ if "source" not in self.pkg.changes["architecture"] or \
+ "unstable" not in self.pkg.changes["distribution"]:
+ return
+
+ # Also only check if there is a file defined (and existant) with
+ # checks.
+ transpath = cnf.get("Dinstall::Reject::ReleaseTransitions", "")
+ if transpath == "" or not os.path.exists(transpath):
+ return
+
+ # Parse the yaml file
+ sourcefile = file(transpath, 'r')
+ sourcecontent = sourcefile.read()
+ try:
+ transitions = yaml.load(sourcecontent)
+ except yaml.YAMLError, msg:
+ # This shouldn't happen, there is a wrapper to edit the file which
+ # checks it, but we prefer to be safe than ending up rejecting
+ # everything.
+ utils.warn("Not checking transitions, the transitions file is broken: %s." % (msg))
+ return
+
+ # Now look through all defined transitions
+ for trans in transitions:
+ t = transitions[trans]
+ source = t["source"]
+ expected = t["new"]
+
+ # Will be None if nothing is in testing.
+ current = get_source_in_suite(source, "testing", session)
+ if current is not None:
+ compare = apt_pkg.VersionCompare(current.version, expected)
+
+ if current is None or compare < 0:
+ # This is still valid, the current version in testing is older than
+ # the new version we wait for, or there is none in testing yet
+
+ # Check if the source we look at is affected by this.
+ if sourcepkg in t['packages']:
+ # The source is affected, lets reject it.
+
+ rejectmsg = "%s: part of the %s transition.\n\n" % (
+ sourcepkg, trans)
+
+ if current is not None:
+ currentlymsg = "at version %s" % (current)
+ else:
+ currentlymsg = "not present in testing"
+
+ rejectmsg += "Transition description: %s\n\n" % (t["reason"])
+
+ rejectmsg += "\n".join(textwrap.wrap("""Your package
+ is part of a testing transition designed to get %s migrated (it is
+ currently %s, we need version %s). This transition is managed by the
+ Release Team, and %s is the Release-Team member responsible for it.
+ Please mail debian-release@lists.debian.org or contact %s directly if you
+ need further assistance. You might want to upload to experimental until this
+ transition is done."""
+ % (source, currentlymsg, expected,t["rm"], t["rm"])))
+
+ self.rejects.append(rejectmsg)
+ return
+
###########################################################################
def check_signed_by_key(self):
"""Ensure the .changes is signed by an authorized uploader."""
session = DBConn().session()
+ self.check_transition(session)
+
(uid, uid_name, is_dm) = lookup_uid_from_fingerprint(self.pkg.changes["fingerprint"], session=session)
# match claimed name with actual name: