From: Joerg Jaspert Date: Fri, 30 Oct 2009 13:48:38 +0000 (+0100) Subject: Merge commit 'stew/knownchanges' into merge X-Git-Url: https://git.decadent.org.uk/gitweb/?a=commitdiff_plain;h=f9264640fc54af9950119c956973fe5573ada5ab;hp=b2e566ae329d943620488436bf09101151396ea7;p=dak.git Merge commit 'stew/knownchanges' into merge * commit 'stew/knownchanges': I LOVE WHITESPACE i was trying to interrupt this with the keyboards and failing get rid of log msgs import known_changes with correct name. catch keybord exception Signed-off-by: Joerg Jaspert --- diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py index dcd3a335..3702411d 100755 --- a/dak/import_known_changes.py +++ b/dak/import_known_changes.py @@ -66,6 +66,8 @@ OPTIONS sys.exit(exit_code) def check_signature (sig_filename, data_filename=""): + fingerprint = None + keyrings = [ "/home/joerg/keyring/keyrings/debian-keyring.gpg", "/home/joerg/keyring/keyrings/debian-keyring.pgp", @@ -127,10 +129,17 @@ class OneAtATime(object): def __init__(self): self.next_in_line = None self.next_lock = threading.Condition() + self.die = False + + def plsDie(self): + self.die = True + self.next_lock.notify() def enqueue(self, next): self.next_lock.acquire() while self.next_in_line: + if self.die: + return self.next_lock.wait() assert( not self.next_in_line ) @@ -141,15 +150,19 @@ class OneAtATime(object): def dequeue(self): self.next_lock.acquire() while not self.next_in_line: + if self.die: + return self.next_lock.wait() - result = self.next_in_line - if isinstance(result, EndOfChanges): - return None + result = self.next_in_line self.next_in_line = None self.next_lock.notify() self.next_lock.release() + + if isinstance(result, EndOfChanges): + return None + return result class ChangesToImport(object): @@ -164,10 +177,15 @@ class ChangesToImport(object): class ChangesGenerator(threading.Thread): """enqueues changes files to be imported""" - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): cnf = Config() @@ -181,28 +199,42 @@ class ChangesGenerator(threading.Thread): if not filenames: # Empty directory (or only subdirectories), next continue - for changesfile in filenames: - if not changesfile.endswith(".changes"): - # Only interested in changes files. - continue - count += 1 - if not get_knownchange(changesfile, self.session): - to_import = ChangesToImport(dirpath, changesfile, count) - print("enqueue: %s" % to_import) - self.queue.enqueue(to_import) + for changesfile in filenames: + try: + if not changesfile.endswith(".changes"): + # Only interested in changes files. + continue + count += 1 + + if not get_knownchange(changesfile, self.session): + to_import = ChangesToImport(dirpath, changesfile, count) + if self.die: + return + self.queue.enqueue(to_import) + except KeyboardInterrupt: + print("got Ctrl-c in enqueue thread. terminating") + self.parent.plsDie() + sys.exit(1) self.queue.enqueue(EndOfChanges()) class ImportThread(threading.Thread): - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): while True: try: + if self.die: + return to_import = self.queue.dequeue() if not to_import: return @@ -212,7 +244,6 @@ class ImportThread(threading.Thread): changes = Changes() changes.changes_file = to_import.changesfile changesfile = os.path.join(to_import.dirpath, to_import.changesfile) - print( "STU: %s / %s" % (to_import.dirpath, to_import.changesfile)) changes.changes = parse_changes(changesfile, signing_rules=-1) changes.changes["fingerprint"] = check_signature(changesfile) changes.add_known_changes(to_import.dirpath, self.session) @@ -220,17 +251,47 @@ class ImportThread(threading.Thread): except InvalidDscError, line: warn("syntax error in .dsc file '%s', line %s." % (f, line)) -# failure += 1 except ChangesUnicodeError: warn("found invalid changes file, not properly utf-8 encoded") -# failure += 1 - - print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile) + except KeyboardInterrupt: + print("Caught C-c; on ImportThread. terminating.") + self.parent.plsDie() + sys.exit(1) except: traceback.print_exc() + self.parent.plsDie() + sys.exit(1) + +class ImportKnownChanges(object): + def __init__(self,num_threads): + self.queue = OneAtATime() + self.threads = [ ChangesGenerator(self,self.queue) ] + + for i in range(num_threads): + self.threads.append( ImportThread(self,self.queue) ) + + try: + for thread in self.threads: + thread.start() + + except KeyboardInterrupt: + print("Caught C-c; terminating.") + utils.warn("Caught C-c; terminating.") + self.plsDie() + + def plsDie(self): + traceback.print_stack90 + for thread in self.threads: + print( "STU: before ask %s to die" % thread ) + thread.plsDie() + print( "STU: after ask %s to die" % thread ) + + self.threads=[] + sys.exit(1) + def main(): cnf = Config() @@ -266,12 +327,9 @@ def main(): if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")): num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")]) + ImportKnownChanges(num_threads) - queue = OneAtATime() - ChangesGenerator(queue).start() - for i in range(num_threads): - ImportThread(queue).start() if __name__ == '__main__': diff --git a/daklib/changes.py b/daklib/changes.py index 2413bf4c..3eb842d2 100755 --- a/daklib/changes.py +++ b/daklib/changes.py @@ -192,7 +192,7 @@ class Changes(object): def mark_missing_fields(self): """add "missing" in fields which we will require for the known_changes table""" - for key in ['urgency', 'maintainer', 'fingerprint', 'changedby' ]: + for key in ['urgency', 'maintainer', 'fingerprint', 'changed-by' ]: if (not self.changes.has_key(key)) or (not self.changes[key]): self.changes[key]='missing' @@ -215,7 +215,7 @@ class Changes(object): distribution, urgency, maintainer, fingerprint, changedby, date) VALUES (:changesfile,:filetime,:source,:binary, :architecture, :version,:distribution,:urgency,:maintainer,:fingerprint,:changedby,:date)""", - { 'changesfile':changesfile, + { 'changesfile':self.changes_file, 'filetime':filetime, 'source':self.changes["source"], 'binary':self.changes["binary"],