From 1dbc111b490bf36846f6ad32d6e583f9fcf23e17 Mon Sep 17 00:00:00 2001 From: Mike O'Connor Date: Fri, 30 Oct 2009 11:28:09 +0000 Subject: [PATCH] now actuall working --- dak/import_known_changes.py | 55 +++++++++++++++++++------------------ dak/update_db.py | 2 +- daklib/changes.py | 4 +-- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py index d95ad2c5..dcd3a335 100755 --- a/dak/import_known_changes.py +++ b/dak/import_known_changes.py @@ -33,8 +33,12 @@ import os import logging import threading from daklib.dbconn import DBConn,get_knownchange - from daklib.config import Config +import apt_pkg +from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError +from daklib.changes import Changes +from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output +import traceback # where in dak.conf all of our configuration will be stowed options_prefix = "KnownChanges" @@ -140,7 +144,7 @@ class OneAtATime(object): self.next_lock.wait() result = self.next_in_line - if isinstance(next, EndOfChanges): + if isinstance(result, EndOfChanges): return None self.next_in_line = None @@ -150,20 +154,24 @@ class OneAtATime(object): class ChangesToImport(object): """A changes file to be enqueued to be processed""" - def __init__(self, queue, checkdir, changesfile, count): - self.queue = queue - self.checkdir = checkdir + def __init__(self, checkdir, changesfile, count): + self.dirpath = checkdir self.changesfile = changesfile self.count = count + def __str__(self): + return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath) + class ChangesGenerator(threading.Thread): """enqueues changes files to be imported""" def __init__(self, queue): + threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() def run(self): cnf = Config() + count = 1 for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: checkdir = cnf["Dir::Queue::%s" % (directory) ] if os.path.exists(checkdir): @@ -177,46 +185,52 @@ class ChangesGenerator(threading.Thread): if not changesfile.endswith(".changes"): # Only interested in changes files. continue - count += 1 + count += 1 - if not get_knownchange(session): - self.queue.enqueue(ChangesToImport(directory, checkdir, changesfile, count)) + if not get_knownchange(changesfile, self.session): + to_import = ChangesToImport(dirpath, changesfile, count) + print("enqueue: %s" % to_import) + self.queue.enqueue(to_import) self.queue.enqueue(EndOfChanges()) class ImportThread(threading.Thread): def __init__(self, queue): + threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() def run(self): while True: try: - to_import = queue.dequeue() + to_import = self.queue.dequeue() if not to_import: return - print( "Directory %s, file %7d, failures %3d. (%s)" % (to_import.dirpath[-10:], to_import.count, failure, to_import.changesfile) ) + print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) ) changes = Changes() changes.changes_file = to_import.changesfile changesfile = os.path.join(to_import.dirpath, to_import.changesfile) + print( "STU: %s / %s" % (to_import.dirpath, to_import.changesfile)) changes.changes = parse_changes(changesfile, signing_rules=-1) changes.changes["fingerprint"] = check_signature(changesfile) - changes.add_known_changes(to_import.queue, self.session) + changes.add_known_changes(to_import.dirpath, self.session) self.session.commit() except InvalidDscError, line: warn("syntax error in .dsc file '%s', line %s." % (f, line)) - failure += 1 +# failure += 1 except ChangesUnicodeError: warn("found invalid changes file, not properly utf-8 encoded") - failure += 1 +# failure += 1 print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile) + except: + traceback.print_exc() def main(): cnf = Config() @@ -231,7 +245,7 @@ def main(): num_threads = 1 - if (len(args) < 1) or not commands.has_key(args[0]): + if len(args) > 0: usage() if cnf.has_key("%s::%s" % (options_prefix,"Help")): @@ -250,7 +264,7 @@ def main(): stream = sys.stderr ) if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")): - num_threads = int(Config()[ "%s::%s" %(options_prefix,"Suite")]) + num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")]) queue = OneAtATime() @@ -259,17 +273,6 @@ def main(): for i in range(num_threads): ImportThread(queue).start() -def which_suites(session): - """ - return a list of suites to operate on - """ - if Config().has_key( "%s::%s" %(options_prefix,"Suite")): - suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")]) - else: - suites = Config().SubTree("Suite").List() - - return [get_suite(s.lower(), session) for s in suites] - if __name__ == '__main__': main() diff --git a/dak/update_db.py b/dak/update_db.py index 701fdfdc..c54971cf 100755 --- a/dak/update_db.py +++ b/dak/update_db.py @@ -44,7 +44,7 @@ from daklib.dak_exceptions import DBUpdateError ################################################################################ Cnf = None -required_database_schema = 18 +required_database_schema = 16 ################################################################################ diff --git a/daklib/changes.py b/daklib/changes.py index 1bca6dc1..2413bf4c 100755 --- a/daklib/changes.py +++ b/daklib/changes.py @@ -196,14 +196,14 @@ class Changes(object): if (not self.changes.has_key(key)) or (not self.changes[key]): self.changes[key]='missing' - def add_known_changes(self, queue, session=None): + def add_known_changes(self, dirpath, session=None): """add "missing" in fields which we will require for the known_changes table""" cnf = Config() + privatetrans = False if session is None: session = DBConn().session() privatetrans = True - dirpath = cnf["Dir::Queue::%s" % (queue) ] changesfile = os.path.join(dirpath, self.changes_file) filetime = datetime.datetime.fromtimestamp(os.path.getctime(changesfile)) -- 2.39.2