#!/usr/bin/env python
+# vim:set et ts=4 sw=4:
-# Handles NEW and BYHAND packages
-# Copyright (C) 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
+""" Handles NEW and BYHAND packages
+@contact: Debian FTP Master <ftpmaster@debian.org>
+@copyright: 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
+@license: GNU General Public License version 2 or later
+"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
################################################################################
-import copy, errno, os, readline, stat, sys, time
+import copy
+import errno
+import os
+import readline
+import stat
+import sys
+import time
import apt_pkg, apt_inst
import examine_package
from daklib import database
from daklib import logging
from daklib import queue
from daklib import utils
+from daklib.regexes import re_no_epoch, re_default_answer, re_isanum
# Globals
-Cnf = None
+Cnf = None #: Configuration, apt_pkg.Configuration
Options = None
Upload = None
-projectB = None
+projectB = None #: database connection, pgobject
Logger = None
Priorities = None
source_package = files[f]["source package"]
if not Upload.pkg.changes["architecture"].has_key("source") \
and not Upload.source_exists(source_package, source_version, Upload.pkg.changes["distribution"].keys()):
- source_epochless_version = utils.re_no_epoch.sub('', source_version)
+ source_epochless_version = re_no_epoch.sub('', source_version)
dsc_filename = "%s_%s.dsc" % (source_package, source_epochless_version)
found = 0
for q in ["Accepted", "Embargoed", "Unembargoed"]:
# Version and file overwrite checks
if files[f]["type"] == "deb":
- reject(Upload.check_binary_against_db(f))
+ reject(Upload.check_binary_against_db(f), "")
elif files[f]["type"] == "dsc":
- reject(Upload.check_source_against_db(f))
+ reject(Upload.check_source_against_db(f), "")
(reject_msg, is_in_incoming) = Upload.check_dsc_against_db(f)
reject(reject_msg, "")
while prompt.find(answer) == -1:
answer = utils.our_raw_input(prompt)
- m = queue.re_default_answer.match(prompt)
+ m = re_default_answer.match(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
class Section_Completer:
def __init__ (self):
self.sections = []
+ self.matches = []
q = projectB.query("SELECT section FROM section")
for i in q.getresult():
self.sections.append(i[0])
class Priority_Completer:
def __init__ (self):
self.priorities = []
+ self.matches = []
q = projectB.query("SELECT priority FROM priority")
for i in q.getresult():
self.priorities.append(i[0])
def edit_new (new):
# Write the current data to a temporary file
- temp_filename = utils.temp_filename()
- temp_file = utils.open_file(temp_filename, 'w')
+ (fd, temp_filename) = utils.temp_filename()
+ temp_file = os.fdopen(fd, 'w')
print_new (new, 0, temp_file)
temp_file.close()
# Spawn an editor on that file
while prompt.find(answer) == -1:
answer = utils.our_raw_input(prompt)
- m = queue.re_default_answer.match(prompt)
+ m = re_default_answer.match(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
answer = answer[:1].upper()
if answer == "E" or answer == "D":
got_answer = 1
- elif queue.re_isanum.match (answer):
+ elif re_isanum.match (answer):
answer = int(answer)
if (answer < 1) or (answer > index):
print "%s is not a valid index (%s). Please retry." % (answer, index_range(index))
def edit_note(note):
# Write the current data to a temporary file
- temp_filename = utils.temp_filename()
- temp_file = utils.open_file(temp_filename, 'w')
+ (fd, temp_filename) = utils.temp_filename()
+ temp_file = os.fdopen(fd, 'w')
temp_file.write(note)
temp_file.close()
editor = os.environ.get("EDITOR","vi")
answer = "XXX"
while prompt.find(answer) == -1:
answer = utils.our_raw_input(prompt)
- m = queue.re_default_answer.search(prompt)
+ m = re_default_answer.search(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
stdout_fd = sys.stdout
try:
sys.stdout = less_fd
- examine_package.display_changes(Upload.pkg.changes_file)
+ changes = utils.parse_changes (Upload.pkg.changes_file)
+ examine_package.display_changes(changes['distribution'], Upload.pkg.changes_file)
files = Upload.pkg.files
for f in files.keys():
if files[f].has_key("new"):
ftype = files[f]["type"]
if ftype == "deb":
- examine_package.check_deb(f)
+ examine_package.check_deb(changes['distribution'], f)
elif ftype == "dsc":
- examine_package.check_dsc(f)
+ examine_package.check_dsc(changes['distribution'], f)
finally:
sys.stdout = stdout_fd
except IOError, e:
def prod_maintainer ():
# Here we prepare an editor and get them ready to prod...
- temp_filename = utils.temp_filename()
+ (fd, temp_filename) = utils.temp_filename()
editor = os.environ.get("EDITOR","vi")
answer = 'E'
while answer == 'E':
os.system("%s %s" % (editor, temp_filename))
- f = utils.open_file(temp_filename)
+ f = os.fdopen(fd)
prod_message = "".join(f.readlines())
f.close()
print "Prod message:"
answer = "XXX"
while prompt.find(answer) == -1:
answer = utils.our_raw_input(prompt)
- m = queue.re_default_answer.search(prompt)
+ m = re_default_answer.search(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
while prompt.find(answer) == -1:
answer = utils.our_raw_input(prompt)
- m = queue.re_default_answer.search(prompt)
+ m = re_default_answer.search(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
Cnf["Process-New::Options::%s" % (i)] = ""
changes_files = apt_pkg.ParseCommandLine(Cnf,Arguments,sys.argv)
+ if len(changes_files) == 0 and not Cnf.get("Process-New::Options::Comments-Dir",""):
+ changes_files = utils.get_changes_files(Cnf["Dir::Queue::New"])
+
Options = Cnf.SubTree("Process-New::Options")
if Options["Help"]:
while prompt.find(answer) == -1:
answer = utils.our_raw_input(prompt)
- m = queue.re_default_answer.search(prompt)
+ m = re_default_answer.search(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
for entry in entries:
# read the .dak
u = queue.Upload(Cnf)
- u.pkg.changes_file = entry
+ u.pkg.changes_file = os.path.join(qdir, entry)
u.update_vars()
- if not changes["architecture"].has_key("source"):
+ if not u.pkg.changes["architecture"].has_key("source"):
# another binary upload, ignore
continue
if Upload.pkg.changes["version"] != u.pkg.changes["version"]:
def move_to_holding(suite, queue_dir):
print "Moving to %s holding area." % (suite.upper(),)
+ if Options["No-Action"]:
+ return
Logger.log(["Moving to %s" % (suite,), Upload.pkg.changes_file])
Upload.dump_vars(queue_dir)
move_to_dir(queue_dir)
os.unlink(Upload.pkg.changes_file[:-8]+".dak")
+def _accept():
+ if Options["No-Action"]:
+ return
+ (summary, short_summary) = Upload.build_summaries()
+ Upload.accept(summary, short_summary)
+ os.unlink(Upload.pkg.changes_file[:-8]+".dak")
+
def do_accept_stableupdate(suite, q):
queue_dir = Cnf["Dir::Queue::%s" % (q,)]
if not Upload.pkg.changes["architecture"].has_key("source"):
# It is not a sourceful upload. So its source may be either in p-u
# holding, in new, in accepted or already installed.
- if is_source_in_qdir(queue_dir):
+ if is_source_in_queue_dir(queue_dir):
# It's in p-u holding, so move it there.
+ print "Binary-only upload, source in %s." % (q,)
move_to_holding(suite, queue_dir)
- elif is_source_in_qdir(Cnf["Dir::Queue::New"]):
- # It's in NEW. We expect the source to land in p-u holding
- # pretty soon.
- move_to_holding(suite, queue_dir)
- elif is_source_in_qdir(Cnf["Dir::Queue::Accepted"]):
- # The source is in accepted, the binary cleared NEW: accept it.
- Upload.accept(summary, short_summary)
- os.unlink(Upload.pkg.changes_file[:-8]+".dak")
elif Upload.source_exists(Upload.pkg.changes["source"],
Upload.pkg.changes["version"]):
# dak tells us that there is source available. At time of
# writing this means that it is installed, so put it into
# accepted.
- Upload.accept(summary, short_summary)
- os.unlink(Upload.pkg.changes_file[:-8]+".dak")
- return
- move_to_holding(suite, queue_dir)
+ print "Binary-only upload, source installed."
+ _accept()
+ elif is_source_in_queue_dir(Cnf["Dir::Queue::Accepted"]):
+ # The source is in accepted, the binary cleared NEW: accept it.
+ print "Binary-only upload, source in accepted."
+ _accept()
+ elif is_source_in_queue_dir(Cnf["Dir::Queue::New"]):
+ # It's in NEW. We expect the source to land in p-u holding
+ # pretty soon.
+ print "Binary-only upload, source in new."
+ move_to_holding(suite, queue_dir)
+ else:
+ # No case applicable. Bail out. Return will cause the upload
+ # to be skipped.
+ print "ERROR"
+ print "Stable update failed. Source not found."
+ return
+ else:
+ # We are handling a sourceful upload. Move to accepted if currently
+ # in p-u holding and to p-u holding otherwise.
+ if is_source_in_queue_dir(queue_dir):
+ print "Sourceful upload in %s, accepting." % (q,)
+ _accept()
+ else:
+ move_to_holding(suite, queue_dir)
def do_accept():
print "ACCEPT"
continue
return do_accept_stableupdate(suite, q)
# Just a normal upload, accept it...
- Upload.accept(summary, short_summary)
- os.unlink(Upload.pkg.changes_file[:-8]+".dak")
+ _accept()
finally:
if not Options["No-Action"]:
os.unlink(Cnf["Process-New::AcceptedLockFile"])