from urgencylog import UrgencyLog
from dbconn import *
from summarystats import SummaryStats
-from utils import parse_changes, check_dsc_files, build_package_set
+from utils import parse_changes, check_dsc_files, build_package_list
from textutils import fix_maintainer
from lintian import parse_lintian_output, generate_reject_messages
from contents import UnpackedSource
-# suppress some deprecation warnings in squeeze related to apt_pkg
-# module
-import warnings
-warnings.filterwarnings('ignore', \
- "apt_pkg.ParseSection\(\) is deprecated. Please see apt_pkg\.TagSection\(\) for the replacement\.", \
- DeprecationWarning)
-warnings.filterwarnings('ignore', \
- "Attribute 'Find' of the 'apt_pkg\.TagSection' object is deprecated, use 'find' instead\.", \
- DeprecationWarning)
-
###############################################################################
def get_type(f, session):
file_type = f["dbtype"]
elif re_source_ext.match(f["type"]):
file_type = "dsc"
+ elif f['architecture'] == 'source' and f["type"] == 'unreadable':
+ utils.warn('unreadable source file (will continue and hope for the best)')
+ return f["type"]
else:
file_type = f["type"]
utils.fubar("invalid type (%s) for new. Dazed, confused and sure as heck not continuing." % (file_type))
# Determine what parts in a .changes are NEW
-def determine_new(filename, changes, files, warn=1, session = None, dsc = None, new = {}):
+def determine_new(filename, changes, files, warn=1, session = None, dsc = None, new = None):
"""
Determine what parts in a C{changes} file are NEW.
# TODO: This should all use the database instead of parsing the changes
# file again
byhand = {}
+ if new is None:
+ new = {}
dbchg = get_dbchange(filename, session)
if dbchg is None:
# Try to get the Package-Set field from an included .dsc file (if possible).
if dsc:
- for package, entry in build_package_set(dsc, session).items():
- if not new.has_key(package):
+ for package, entry in build_package_list(dsc, session).items():
+ if package not in new:
new[package] = entry
# Build up a list of potentially new things
self.future_files = {}
self.ancient_files = {}
- def callback(self, Kind, Name, Link, Mode, UID, GID, Size, MTime, Major, Minor):
- if MTime > self.future_cutoff:
- self.future_files[Name] = MTime
- if MTime < self.past_cutoff:
- self.ancient_files[Name] = MTime
+ def callback(self, member, data):
+ if member.mtime > self.future_cutoff:
+ self.future_files[Name] = member.mtime
+ if member.mtime < self.past_cutoff:
+ self.ancient_files[Name] = member.mtime
###############################################################################
cnf = Config()
self.Subst = {}
self.Subst["__ADMIN_ADDRESS__"] = cnf["Dinstall::MyAdminAddress"]
- self.Subst["__BUG_SERVER__"] = cnf["Dinstall::BugServer"]
+ if cnf.has_key("Dinstall::BugServer"):
+ self.Subst["__BUG_SERVER__"] = cnf["Dinstall::BugServer"]
self.Subst["__DISTRO__"] = cnf["Dinstall::MyDistribution"]
self.Subst["__DAK_ADDRESS__"] = cnf["Dinstall::MyEmailAddress"]
except CantOpenError:
self.rejects.append("%s: can't read file." % (filename))
return False
- except ParseChangesError, line:
+ except ParseChangesError as line:
self.rejects.append("%s: parse error, can't grok: %s." % (filename, line))
return False
except ChangesUnicodeError:
# Parse the Files field from the .changes into another dictionary
try:
self.pkg.files.update(utils.build_file_list(self.pkg.changes))
- except ParseChangesError, line:
+ except ParseChangesError as line:
self.rejects.append("%s: parse error, can't grok: %s." % (filename, line))
return False
- except UnknownFormatError, format:
+ except UnknownFormatError as format:
self.rejects.append("%s: unknown format '%s'." % (filename, format))
return False
self.pkg.changes["maintainername"],
self.pkg.changes["maintaineremail"]) = \
fix_maintainer (self.pkg.changes["maintainer"])
- except ParseMaintError, msg:
+ except ParseMaintError as msg:
self.rejects.append("%s: Maintainer field ('%s') failed to parse: %s" \
% (filename, self.pkg.changes["maintainer"], msg))
self.pkg.changes["changedbyname"],
self.pkg.changes["changedbyemail"]) = \
fix_maintainer (self.pkg.changes.get("changed-by", ""))
- except ParseMaintError, msg:
+ except ParseMaintError as msg:
self.pkg.changes["changedby822"] = ""
self.pkg.changes["changedby2047"] = ""
self.pkg.changes["changedbyname"] = ""
# Ensure target distributions exist
for suite in self.pkg.changes["distribution"].keys():
- if not Cnf.has_key("Suite::%s" % (suite)):
+ if not get_suite(suite.lower()):
self.rejects.append("Unknown distribution `%s'." % (suite))
###########################################################################
try:
control = apt_pkg.ParseSection(apt_inst.debExtractControl(deb_file))
except:
- self.rejects.append("%s: debExtractControl() raised %s." % (f, sys.exc_type))
+ self.rejects.append("%s: debExtractControl() raised %s." % (f, sys.exc_info()[0]))
deb_file.close()
# Can't continue, none of the checks on control would work.
return
self.rejects.append("%s: invalid version number '%s'." % (f, version))
# Ensure the architecture of the .deb is one we know about.
- default_suite = cnf.get("Dinstall::DefaultSuite", "Unstable")
+ default_suite = cnf.get("Dinstall::DefaultSuite", "unstable")
architecture = control.Find("Architecture")
upload_suite = self.pkg.changes["distribution"].keys()[0]
else:
entry["built-using"].append( (bu_so[0].source, bu_so[0].version, ) )
- except ValueError, e:
+ except ValueError as e:
self.rejects.append("%s: Cannot parse Built-Using field: %s" % (f, str(e)))
# Check in one of the other directories
source_epochless_version = re_no_epoch.sub('', source_version)
dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
- if os.path.exists(os.path.join(cnf["Dir::Queue::Byhand"], dsc_filename)):
+
+ byhand_dir = get_policy_queue('byhand', session).path
+ new_dir = get_policy_queue('new', session).path
+
+ if os.path.exists(os.path.join(byhand_dir, dsc_filename)):
entry["byhand"] = 1
- elif os.path.exists(os.path.join(cnf["Dir::Queue::New"], dsc_filename)):
+ elif os.path.exists(os.path.join(new_dir, dsc_filename)):
entry["new"] = 1
else:
dsc_file_exists = False
- for myq in ["Embargoed", "Unembargoed", "ProposedUpdates", "OldProposedUpdates"]:
- if cnf.has_key("Dir::Queue::%s" % (myq)):
- if os.path.exists(os.path.join(cnf["Dir::Queue::" + myq], dsc_filename)):
+ # TODO: Don't hardcode this list: use all relevant queues
+ # The question is how to determine what is relevant
+ for queue_name in ["embargoed", "unembargoed", "proposedupdates", "oldproposedupdates"]:
+ queue = get_policy_queue(queue_name, session)
+ if queue:
+ if os.path.exists(os.path.join(queue.path, dsc_filename)):
dsc_file_exists = True
break
entry["component"] = dest
# Ensure the component is valid for the target suite
- if cnf.has_key("Suite:%s::Components" % (suite)) and \
- entry["component"] not in cnf.ValueList("Suite::%s::Components" % (suite)):
+ if entry["component"] not in get_component_names(session):
self.rejects.append("unknown component `%s' for suite `%s'." % (entry["component"], suite))
return
or (dbc.in_queue is not None
and dbc.in_queue.queue_name not in ["unchecked", "newstage"]):
self.rejects.append("%s file already known to dak" % base_filename)
- except NoResultFound, e:
+ except NoResultFound as e:
# not known, good
pass
for f, entry in self.pkg.files.items():
# Ensure the file does not already exist in one of the accepted directories
- for d in [ "Byhand", "New", "ProposedUpdates", "OldProposedUpdates", "Embargoed", "Unembargoed" ]:
- if not cnf.has_key("Dir::Queue::%s" % (d)): continue
- if os.path.exists(os.path.join(cnf["Dir::Queue::%s" % (d) ], f)):
- self.rejects.append("%s file already exists in the %s directory." % (f, d))
+ # TODO: Dynamically generate this list
+ for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates", "embargoed", "unembargoed" ]:
+ queue = get_policy_queue(queue_name, session)
+ if queue and os.path.exists(os.path.join(queue.path, f)):
+ self.rejects.append("%s file already exists in the %s queue." % (f, queue_name))
if not re_taint_free.match(f):
self.rejects.append("!!WARNING!! tainted filename: '%s'." % (f))
if not has_source:
self.rejects.append("no source found and Architecture line in changes mention source.")
- if not has_binaries and cnf.FindB("Dinstall::Reject::NoSourceOnly"):
+ if (not has_binaries) and (not cnf.FindB("Dinstall::AllowSourceOnlyUploads")):
self.rejects.append("source only uploads are not supported.")
###########################################################################
except CantOpenError:
if not action:
return False, "%s: can't read file." % (dsc_filename)
- except ParseChangesError, line:
+ except ParseChangesError as line:
return False, "%s: parse error, can't grok: %s." % (dsc_filename, line)
- except InvalidDscError, line:
+ except InvalidDscError as line:
return False, "%s: syntax error on line %s." % (dsc_filename, line)
except ChangesUnicodeError:
return False, "%s: dsc file not proper utf-8." % (dsc_filename)
if not self.pkg.changes["architecture"].has_key("source"):
return True
+ if session is None:
+ session = DBConn().session()
+
(status, reason) = self.load_dsc(action=action)
if not status:
self.rejects.append(reason)
except NoFilesFieldError:
self.rejects.append("%s: no Files: field." % (dsc_filename))
return False
- except UnknownFormatError, format:
+ except UnknownFormatError as format:
self.rejects.append("%s: unknown format '%s'." % (dsc_filename, format))
return False
- except ParseChangesError, line:
+ except ParseChangesError as line:
self.rejects.append("%s: parse error, can't grok: %s." % (dsc_filename, line))
return False
# Only a limited list of source formats are allowed in each suite
for dist in self.pkg.changes["distribution"].keys():
- allowed = [ x.format_name for x in get_suite_src_formats(dist, session) ]
+ suite = get_suite(dist, session=session)
+ if not suite:
+ self.rejects.append("%s: cannot find suite %s when checking source formats" % (dsc_filename, dist))
+ continue
+ allowed = [ x.format_name for x in suite.srcformats ]
if self.pkg.dsc["format"] not in allowed:
self.rejects.append("%s: source format '%s' not allowed in %s (accepted: %s) " % (dsc_filename, self.pkg.dsc["format"], dist, ", ".join(allowed)))
try:
# We ignore the return value
fix_maintainer(self.pkg.dsc["maintainer"])
- except ParseMaintError, msg:
+ except ParseMaintError as msg:
self.rejects.append("%s: Maintainer field ('%s') failed to parse: %s" \
% (dsc_filename, self.pkg.dsc["maintainer"], msg))
session.close()
- return True
+ return (len(self.rejects) == 0)
###########################################################################
# Extract the source
try:
unpacked = UnpackedSource(dsc_filename)
- except:
- self.rejects.append("'dpkg-source -x' failed for %s." % dsc_filename)
+ except Exception as e:
+ self.rejects.append("'dpkg-source -x' failed for %s. (%s)" % (dsc_filename, str(e)))
return
- if not cnf.Find("Dir::Queue::BTSVersionTrack"):
+ if not cnf.Find("Dir::BTSVersionTrack"):
return
# Get the upstream version
try:
shutil.rmtree(tmpdir)
- except OSError, e:
+ except OSError as e:
if e.errno != errno.EACCES:
print "foobar"
utils.fubar("%s: couldn't remove tmp dir for source tree." % (self.pkg.dsc["source"]))
if result != 0:
utils.fubar("'%s' failed with result %s." % (cmd, result))
shutil.rmtree(tmpdir)
- except Exception, e:
+ except Exception as e:
print "foobar2 (%s)" % e
utils.fubar("%s: couldn't remove tmp dir for source tree." % (self.pkg.dsc["source"]))
continue
# Look in some other queues for the file
- queues = ('New', 'Byhand', 'ProposedUpdates',
- 'OldProposedUpdates', 'Embargoed', 'Unembargoed')
+ queue_names = ['new', 'byhand',
+ 'proposedupdates', 'oldproposedupdates',
+ 'embargoed', 'unembargoed']
- for queue in queues:
- if not cnf.get('Dir::Queue::%s' % queue):
+ for queue_name in queue_names:
+ queue = get_policy_queue(queue_name, session)
+ if not queue:
continue
- queuefile_path = os.path.join(
- cnf['Dir::Queue::%s' % queue], filename
- )
+ queuefile_path = os.path.join(queue.path, filename)
if not os.path.exists(queuefile_path):
# Does not exist in this queue
try:
lintiantags = yaml.load(sourcecontent)['lintian']
- except yaml.YAMLError, msg:
+ except yaml.YAMLError as msg:
utils.fubar("Can not read the lintian tags file %s, YAML error: %s." % (tagfile, msg))
return
if entry["type"] == "deb":
tar.reset()
try:
- deb_file = utils.open_file(filename)
- apt_inst.debExtract(deb_file, tar.callback, "control.tar.gz")
- deb_file.seek(0)
- try:
- apt_inst.debExtract(deb_file, tar.callback, "data.tar.gz")
- except SystemError, e:
- # If we can't find a data.tar.gz, look for data.tar.bz2 instead.
- if not re.search(r"Cannot f[ui]nd chunk data.tar.gz$", str(e)):
- raise
- deb_file.seek(0)
- apt_inst.debExtract(deb_file,tar.callback,"data.tar.bz2")
-
- deb_file.close()
+ deb = apt_inst.DebFile(filename)
+ deb.control.go(tar.callback)
future_files = tar.future_files.keys()
if future_files:
self.rejects.append("%s: has %s file(s) with a time stamp too ancient (e.g. %s [%s])."
% (filename, num_ancient_files, ancient_file, time.ctime(ancient_date)))
except:
- self.rejects.append("%s: deb contents timestamp check failed [%s: %s]" % (filename, sys.exc_type, sys.exc_value))
+ self.rejects.append("%s: deb contents timestamp check failed [%s: %s]" % (filename, sys.exc_info()[0], sys.exc_info()[1]))
def check_if_upload_is_sponsored(self, uid_email, uid_name):
+ for key in "maintaineremail", "changedbyemail", "maintainername", "changedbyname":
+ if not self.pkg.changes.has_key(key):
+ return False
+ uid_email = '@'.join(uid_email.split('@')[:2])
if uid_email in [self.pkg.changes["maintaineremail"], self.pkg.changes["changedbyemail"]]:
sponsored = False
elif uid_name in [self.pkg.changes["maintainername"], self.pkg.changes["changedbyname"]]:
sponsored = True
else:
sponsored = True
+ sponsor_addresses = utils.gpg_get_key_addresses(self.pkg.changes["fingerprint"])
+ debian_emails = filter(lambda addr: addr.endswith('@debian.org'), sponsor_addresses)
+ if uid_email not in debian_emails:
+ if debian_emails:
+ uid_email = debian_emails[0]
if ("source" in self.pkg.changes["architecture"] and uid_email and utils.is_email_alias(uid_email)):
- sponsor_addresses = utils.gpg_get_key_addresses(self.pkg.changes["fingerprint"])
if (self.pkg.changes["maintaineremail"] not in sponsor_addresses and
self.pkg.changes["changedbyemail"] not in sponsor_addresses):
self.pkg.changes["sponsoremail"] = uid_email
## experimental lists the uploader in the Maintainer: or Uploaders: fields (ie,
## non-developer maintainers cannot NMU or hijack packages)
- # srcuploaders includes the maintainer
+ # uploader includes the maintainer
accept = False
- for sup in r.srcuploaders:
- (rfc822, rfc2047, name, email) = sup.maintainer.get_split_maintainer()
+ for uploader in r.uploaders:
+ (rfc822, rfc2047, name, email) = uploader.get_split_maintainer()
# Eww - I hope we never have two people with the same name in Debian
if email == fpr.uid.uid or name == fpr.uid.name:
accept = True
# Also only check if there is a file defined (and existant) with
# checks.
- transpath = cnf.get("Dinstall::Reject::ReleaseTransitions", "")
+ transpath = cnf.get("Dinstall::ReleaseTransitions", "")
if transpath == "" or not os.path.exists(transpath):
return
sourcecontent = sourcefile.read()
try:
transitions = yaml.load(sourcecontent)
- except yaml.YAMLError, msg:
+ except yaml.YAMLError as msg:
# This shouldn't happen, there is a wrapper to edit the file which
# checks it, but we prefer to be safe than ending up rejecting
# everything.
"""
cnf = Config()
- announcetemplate = os.path.join(cnf["Dir::Templates"], 'process-unchecked.announce')
+
+ # Skip all of this if not sending mail to avoid confusing people
+ if cnf.has_key("Dinstall::Options::No-Mail") and cnf["Dinstall::Options::No-Mail"]:
+ return ""
# Only do announcements for source uploads with a recent dpkg-dev installed
if float(self.pkg.changes.get("format", 0)) < 1.6 or not \
self.pkg.changes["architecture"].has_key("source"):
return ""
- lists_done = {}
- summary = ""
+ announcetemplate = os.path.join(cnf["Dir::Templates"], 'process-unchecked.announce')
- self.Subst["__SHORT_SUMMARY__"] = short_summary
+ lists_todo = {}
+ summary = ""
+ # Get a unique list of target lists
for dist in self.pkg.changes["distribution"].keys():
suite = get_suite(dist)
if suite is None: continue
- announce_list = suite.announce
- if announce_list == "" or lists_done.has_key(announce_list):
- continue
+ for tgt in suite.announce:
+ lists_todo[tgt] = 1
+
+ self.Subst["__SHORT_SUMMARY__"] = short_summary
- lists_done[announce_list] = 1
+ for announce_list in lists_todo.keys():
summary += "Announcing to %s\n" % (announce_list)
if action:
del self.Subst["__ANNOUNCE_LIST_ADDRESS__"]
- if cnf.FindB("Dinstall::CloseBugs"):
+ if cnf.FindB("Dinstall::CloseBugs") and cnf.has_key("Dinstall::BugServer"):
summary = self.close_bugs(summary, action)
del self.Subst["__SHORT_SUMMARY__"]
session.commit()
# Move the .changes into the 'done' directory
+ ye, mo, da = time.gmtime()[0:3]
+ donedir = os.path.join(cnf["Dir::Done"], str(ye), "%0.2d" % mo, "%0.2d" % da)
+ if not os.path.isdir(donedir):
+ os.makedirs(donedir)
+
utils.move(self.pkg.changes_file,
- os.path.join(cnf["Dir::Queue::Done"], os.path.basename(self.pkg.changes_file)))
+ os.path.join(donedir, os.path.basename(self.pkg.changes_file)))
- if self.pkg.changes["architecture"].has_key("source") and cnf.get("Dir::UrgencyLog"):
+ if self.pkg.changes["architecture"].has_key("source"):
UrgencyLog().log(self.pkg.dsc["source"], self.pkg.dsc["version"], self.pkg.changes["urgency"])
self.update_subst()
self.announce(short_summary, 1)
## Helper stuff for DebBugs Version Tracking
- if cnf.Find("Dir::Queue::BTSVersionTrack"):
+ if cnf.Find("Dir::BTSVersionTrack"):
if self.pkg.changes["architecture"].has_key("source"):
- (fd, temp_filename) = utils.temp_filename(cnf["Dir::Queue::BTSVersionTrack"], prefix=".")
+ (fd, temp_filename) = utils.temp_filename(cnf["Dir::BTSVersionTrack"], prefix=".")
version_history = os.fdopen(fd, 'w')
version_history.write(self.pkg.dsc["bts changelog"])
version_history.close()
- filename = "%s/%s" % (cnf["Dir::Queue::BTSVersionTrack"],
+ filename = "%s/%s" % (cnf["Dir::BTSVersionTrack"],
self.pkg.changes_file[:-8]+".versions")
os.rename(temp_filename, filename)
- os.chmod(filename, 0644)
+ os.chmod(filename, 0o644)
# Write out the binary -> source mapping.
- (fd, temp_filename) = utils.temp_filename(cnf["Dir::Queue::BTSVersionTrack"], prefix=".")
+ (fd, temp_filename) = utils.temp_filename(cnf["Dir::BTSVersionTrack"], prefix=".")
debinfo = os.fdopen(fd, 'w')
for name, entry in sorted(self.pkg.files.items()):
if entry["type"] == "deb":
entry["source version"]])
debinfo.write(line+"\n")
debinfo.close()
- filename = "%s/%s" % (cnf["Dir::Queue::BTSVersionTrack"],
+ filename = "%s/%s" % (cnf["Dir::BTSVersionTrack"],
self.pkg.changes_file[:-8]+".debinfo")
os.rename(temp_filename, filename)
- os.chmod(filename, 0644)
+ os.chmod(filename, 0o644)
session.commit()
if os.access(file_entry, os.R_OK) == 0:
continue
- dest_file = os.path.join(cnf["Dir::Queue::Reject"], file_entry)
+ dest_file = os.path.join(cnf["Dir::Reject"], file_entry)
try:
- dest_fd = os.open(dest_file, os.O_RDWR | os.O_CREAT | os.O_EXCL, 0644)
- except OSError, e:
+ dest_fd = os.open(dest_file, os.O_RDWR | os.O_CREAT | os.O_EXCL, 0o644)
+ except OSError as e:
# File exists? Let's find a new name by adding a number
if e.errno == errno.EEXIST:
try:
except NoFreeFilenameError:
# Something's either gone badly Pete Tong, or
# someone is trying to exploit us.
- utils.warn("**WARNING** failed to find a free filename for %s in %s." % (file_entry, cnf["Dir::Queue::Reject"]))
+ utils.warn("**WARNING** failed to find a free filename for %s in %s." % (file_entry, cnf["Dir::Reject"]))
return
# Make sure we really got it
try:
- dest_fd = os.open(dest_file, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0644)
- except OSError, e:
+ dest_fd = os.open(dest_file, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0o644)
+ except OSError as e:
# Likewise
utils.warn("**WARNING** failed to claim %s in the reject directory." % (file_entry))
return
raise
# If we got here, we own the destination file, so we can
# safely overwrite it.
- utils.move(file_entry, dest_file, 1, perms=0660)
+ utils.move(file_entry, dest_file, 1, perms=0o660)
os.close(dest_fd)
###########################################################################
cnf = Config()
reason_filename = self.pkg.changes_file[:-8] + ".reason"
- reason_filename = os.path.join(cnf["Dir::Queue::Reject"], reason_filename)
+ reason_filename = os.path.join(cnf["Dir::Reject"], reason_filename)
# Move all the files into the reject directory
reject_files = self.pkg.files.keys() + [self.pkg.changes_file]
# so let's just raise an exception ...
if os.path.exists(reason_filename):
os.unlink(reason_filename)
- reason_fd = os.open(reason_filename, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0644)
+ reason_fd = os.open(reason_filename, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0o644)
rej_template = os.path.join(cnf["Dir::Templates"], "queue.rejected")
if self.logger:
self.logger.log(["rejected", self.pkg.changes_file])
+ stats = SummaryStats()
+ stats.reject_count += 1
return 0
################################################################################
"""
Cnf = Config()
anyversion = None
- anysuite = [suite] + Cnf.ValueList("Suite::%s::VersionChecks::Enhances" % (suite))
+ anysuite = [suite] + [ vc.reference.suite_name for vc in get_version_checks(suite, "Enhances") ]
for (s, v) in sv_list:
if s in [ x.lower() for x in anysuite ]:
if not anyversion or apt_pkg.VersionCompare(anyversion, v) <= 0:
# Check versions for each target suite
for target_suite in self.pkg.changes["distribution"].keys():
- must_be_newer_than = [ i.lower() for i in cnf.ValueList("Suite::%s::VersionChecks::MustBeNewerThan" % (target_suite)) ]
- must_be_older_than = [ i.lower() for i in cnf.ValueList("Suite::%s::VersionChecks::MustBeOlderThan" % (target_suite)) ]
+ # Check we can find the target suite
+ ts = get_suite(target_suite)
+ if ts is None:
+ self.rejects.append("Cannot find target suite %s to perform version checks" % target_suite)
+ continue
+
+ must_be_newer_than = [ vc.reference.suite_name for vc in get_version_checks(target_suite, "MustBeNewerThan") ]
+ must_be_older_than = [ vc.reference.suite_name for vc in get_version_checks(target_suite, "MustBeOlderThan") ]
# Enforce "must be newer than target suite" even if conffile omits it
if target_suite not in must_be_newer_than:
orig_files[dsc_name]["path"] = old_file
orig_files[dsc_name]["location"] = x.location.location_id
else:
- # TODO: Record the queues and info in the DB so we don't hardcode all this crap
+ # TODO: Determine queue list dynamically
# Not there? Check the queue directories...
- for directory in [ "New", "Byhand", "ProposedUpdates", "OldProposedUpdates", "Embargoed", "Unembargoed" ]:
- if not Cnf.has_key("Dir::Queue::%s" % (directory)):
+ for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates", "embargoed", "unembargoed" ]:
+ queue = get_policy_queue(queue_name, session)
+ if not queue:
continue
- in_otherdir = os.path.join(Cnf["Dir::Queue::%s" % (directory)], dsc_name)
+
+ in_otherdir = os.path.join(queue.path, dsc_name)
+
if os.path.exists(in_otherdir):
in_otherdir_fh = utils.open_file(in_otherdir)
actual_md5 = apt_pkg.md5sum(in_otherdir_fh)
source_epochless_version = re_no_epoch.sub('', source_version)
dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
found = False
- for q in ["Embargoed", "Unembargoed", "Newstage"]:
- if cnf.has_key("Dir::Queue::%s" % (q)):
- if os.path.exists(cnf["Dir::Queue::%s" % (q)] + '/' + dsc_filename):
- found = True
+ for queue_name in ["embargoed", "unembargoed", "newstage"]:
+ queue = get_policy_queue(queue_name, session)
+ if queue and os.path.exists(os.path.join(queue.path, dsc_filename)):
+ found = True
if not found:
self.rejects.append("no source found for %s %s (%s)." % (source_package, source_version, f))