--- /dev/null
+#!/usr/bin/env python
+# coding=utf8
+
+"""
+Import known_changes files
+
+@contact: Debian FTP Master <ftpmaster@debian.org>
+@copyright: 2009 Mike O'Connor <stew@debian.org>
+@license: GNU General Public License version 2 or later
+"""
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+################################################################################
+
+
+################################################################################
+
+import sys
+import os
+import logging
+import threading
+from daklib.dbconn import DBConn,get_knownchange
+
+from daklib.config import Config
+
+# where in dak.conf all of our configuration will be stowed
+options_prefix = "KnownChanges"
+options_prefix = "%s::Options" % options_prefix
+
+log = logging.getLogger()
+
+################################################################################
+
+
+def usage (exit_code=0):
+ print """Usage: dak import-known-changes [options]
+
+OPTIONS
+ -j n
+ run with n threads concurrently
+
+ -v, --verbose
+ show verbose information messages
+
+ -q, --quiet
+ supress all output but errors
+
+"""
+ sys.exit(exit_code)
+
+def check_signature (sig_filename, data_filename=""):
+ keyrings = [
+ "/home/joerg/keyring/keyrings/debian-keyring.gpg",
+ "/home/joerg/keyring/keyrings/debian-keyring.pgp",
+ "/home/joerg/keyring/keyrings/debian-maintainers.gpg",
+ "/home/joerg/keyring/keyrings/debian-role-keys.gpg",
+ "/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
+ "/home/joerg/keyring/keyrings/emeritus-keyring.gpg",
+ "/home/joerg/keyring/keyrings/removed-keys.gpg",
+ "/home/joerg/keyring/keyrings/removed-keys.pgp"
+ ]
+
+ keyringargs = " ".join(["--keyring %s" % x for x in keyrings ])
+
+ # Build the command line
+ status_read, status_write = os.pipe()
+ cmd = "gpgv --status-fd %s %s %s" % (status_write, keyringargs, sig_filename)
+
+ # Invoke gpgv on the file
+ (output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write)
+
+ # Process the status-fd output
+ (keywords, internal_error) = process_gpgv_output(status)
+
+ # If we failed to parse the status-fd output, let's just whine and bail now
+ if internal_error:
+ warn("Couldn't parse signature")
+ return None
+
+ # usually one would check for bad things here. We, however, do not care.
+
+ # Next check gpgv exited with a zero return code
+ if exit_status:
+ warn("Couldn't parse signature")
+ return None
+
+ # Sanity check the good stuff we expect
+ if not keywords.has_key("VALIDSIG"):
+ warn("Couldn't parse signature")
+ else:
+ args = keywords["VALIDSIG"]
+ if len(args) < 1:
+ warn("Couldn't parse signature")
+ else:
+ fingerprint = args[0]
+
+ return fingerprint
+
+
+class EndOfChanges(object):
+ """something enqueued to signify the last change"""
+ pass
+
+
+class OneAtATime(object):
+ """
+ a one space queue which sits between multiple possible producers
+ and multiple possible consumers
+ """
+ def __init__(self):
+ self.next_in_line = None
+ self.next_lock = threading.Condition()
+
+ def enqueue(self, next):
+ self.next_lock.acquire()
+ while self.next_in_line:
+ self.next_lock.wait()
+
+ assert( not self.next_in_line )
+ self.next_in_line = next
+ self.next_lock.notify()
+ self.next_lock.release()
+
+ def dequeue(self):
+ self.next_lock.acquire()
+ while not self.next_in_line:
+ self.next_lock.wait()
+ result = self.next_in_line
+
+ if isinstance(next, EndOfChanges):
+ return None
+
+ self.next_in_line = None
+ self.next_lock.notify()
+ self.next_lock.release()
+ return result
+
+class ChangesToImport(object):
+ """A changes file to be enqueued to be processed"""
+ def __init__(self, queue, checkdir, changesfile, count):
+ self.queue = queue
+ self.checkdir = checkdir
+ self.changesfile = changesfile
+ self.count = count
+
+class ChangesGenerator(threading.Thread):
+ """enqueues changes files to be imported"""
+ def __init__(self, queue):
+ self.queue = queue
+ self.session = DBConn().session()
+
+ def run(self):
+ cnf = Config()
+ for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]:
+ checkdir = cnf["Dir::Queue::%s" % (directory) ]
+ if os.path.exists(checkdir):
+ print "Looking into %s" % (checkdir)
+
+ for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False):
+ if not filenames:
+ # Empty directory (or only subdirectories), next
+ continue
+ for changesfile in filenames:
+ if not changesfile.endswith(".changes"):
+ # Only interested in changes files.
+ continue
+ count += 1
+
+ if not get_knownchange(session):
+ self.queue.enqueue(ChangesToImport(directory, checkdir, changesfile, count))
+
+ self.queue.enqueue(EndOfChanges())
+
+class ImportThread(threading.Thread):
+ def __init__(self, queue):
+ self.queue = queue
+ self.session = DBConn().session()
+
+ def run(self):
+ while True:
+ try:
+ to_import = queue.dequeue()
+ if not to_import:
+ return
+
+ print( "Directory %s, file %7d, failures %3d. (%s)" % (to_import.dirpath[-10:], to_import.count, failure, to_import.changesfile) )
+
+ changes = Changes()
+ changes.changes_file = to_import.changesfile
+ changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
+ changes.changes = parse_changes(changesfile, signing_rules=-1)
+ changes.changes["fingerprint"] = check_signature(changesfile)
+ changes.add_known_changes(to_import.queue, self.session)
+ self.session.commit()
+
+ except InvalidDscError, line:
+ warn("syntax error in .dsc file '%s', line %s." % (f, line))
+ failure += 1
+
+ except ChangesUnicodeError:
+ warn("found invalid changes file, not properly utf-8 encoded")
+ failure += 1
+
+ print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile)
+
+
+
+def main():
+ cnf = Config()
+
+ arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
+ ('j',"concurrency", "%s::%s" % (options_prefix,"Concurrency"),"HasArg"),
+ ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
+ ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
+ ]
+
+ args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
+
+ num_threads = 1
+
+ if (len(args) < 1) or not commands.has_key(args[0]):
+ usage()
+
+ if cnf.has_key("%s::%s" % (options_prefix,"Help")):
+ usage()
+
+ level=logging.INFO
+ if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
+ level=logging.ERROR
+
+ elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
+ level=logging.DEBUG
+
+
+ logging.basicConfig( level=level,
+ format='%(asctime)s %(levelname)s %(message)s',
+ stream = sys.stderr )
+
+ if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
+ num_threads = int(Config()[ "%s::%s" %(options_prefix,"Suite")])
+
+
+ queue = OneAtATime()
+ ChangesGenerator(queue).start()
+
+ for i in range(num_threads):
+ ImportThread(queue).start()
+
+def which_suites(session):
+ """
+ return a list of suites to operate on
+ """
+ if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
+ suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
+ else:
+ suites = Config().SubTree("Suite").List()
+
+ return [get_suite(s.lower(), session) for s in suites]
+
+
+if __name__ == '__main__':
+ main()