X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=dak%2Fimport_known_changes.py;h=84444ff1132601f421b6e58ec1dc49c8f261d4b8;hb=8f1a6ffdbf7683fba88d8d5a1d8cea4dc5512abb;hp=475c8f76433d87b7a8c86abc3da1363085cfd805;hpb=efda164c1f3209e2522f222dfb6553736a10449c;p=dak.git diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py index 475c8f76..84444ff1 100755 --- a/dak/import_known_changes.py +++ b/dak/import_known_changes.py @@ -32,7 +32,7 @@ import sys import os import logging import threading -from daklib.dbconn import DBConn,get_knownchange +from daklib.dbconn import DBConn, get_dbchange from daklib.config import Config import apt_pkg from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError @@ -66,6 +66,8 @@ OPTIONS sys.exit(exit_code) def check_signature (sig_filename, data_filename=""): + fingerprint = None + keyrings = [ "/home/joerg/keyring/keyrings/debian-keyring.gpg", "/home/joerg/keyring/keyrings/debian-keyring.pgp", @@ -126,30 +128,52 @@ class OneAtATime(object): """ def __init__(self): self.next_in_line = None - self.next_lock = threading.Condition() + self.read_lock = threading.Condition() + self.write_lock = threading.Condition() + self.die = False + + def plsDie(self): + self.die = True + self.write_lock.acquire() + self.write_lock.notifyAll() + self.write_lock.release() + + self.read_lock.acquire() + self.read_lock.notifyAll() + self.read_lock.release() def enqueue(self, next): - self.next_lock.acquire() + self.write_lock.acquire() while self.next_in_line: - self.next_lock.wait() + if self.die: + return + self.write_lock.wait() assert( not self.next_in_line ) self.next_in_line = next - self.next_lock.notify() - self.next_lock.release() + self.write_lock.release() + self.read_lock.acquire() + self.read_lock.notify() + self.read_lock.release() def dequeue(self): - self.next_lock.acquire() + self.read_lock.acquire() while not self.next_in_line: - self.next_lock.wait() + if self.die: + return + self.read_lock.wait() + result = self.next_in_line + self.next_in_line = None + self.read_lock.release() + self.write_lock.acquire() + self.write_lock.notify() + self.write_lock.release() + if isinstance(result, EndOfChanges): return None - self.next_in_line = None - self.next_lock.notify() - self.next_lock.release() return result class ChangesToImport(object): @@ -164,44 +188,64 @@ class ChangesToImport(object): class ChangesGenerator(threading.Thread): """enqueues changes files to be imported""" - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): cnf = Config() count = 1 - for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: + for directory in [ "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: checkdir = cnf["Dir::Queue::%s" % (directory) ] if os.path.exists(checkdir): print "Looking into %s" % (checkdir) - for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False): + for dirpath, dirnames, filenames in os.walk(checkdir, topdown=True): if not filenames: # Empty directory (or only subdirectories), next continue - for changesfile in filenames: - if not changesfile.endswith(".changes"): - # Only interested in changes files. - continue - count += 1 - if not get_knownchange(changesfile, self.session): - to_import = ChangesToImport(dirpath, changesfile, count) - self.queue.enqueue(to_import) + for changesfile in filenames: + try: + if not changesfile.endswith(".changes"): + # Only interested in changes files. + continue + count += 1 + + if not get_dbchange(changesfile, self.session): + to_import = ChangesToImport(dirpath, changesfile, count) + if self.die: + return + self.queue.enqueue(to_import) + except KeyboardInterrupt: + print("got Ctrl-c in enqueue thread. terminating") + self.parent.plsDie() + sys.exit(1) self.queue.enqueue(EndOfChanges()) class ImportThread(threading.Thread): - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): while True: try: + if self.die: + return to_import = self.queue.dequeue() if not to_import: return @@ -218,17 +262,46 @@ class ImportThread(threading.Thread): except InvalidDscError, line: warn("syntax error in .dsc file '%s', line %s." % (f, line)) -# failure += 1 except ChangesUnicodeError: warn("found invalid changes file, not properly utf-8 encoded") -# failure += 1 - - print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile) + except KeyboardInterrupt: + print("Caught C-c; on ImportThread. terminating.") + self.parent.plsDie() + sys.exit(1) except: - traceback.print_exc() + self.parent.plsDie() + sys.exit(1) + +class ImportKnownChanges(object): + def __init__(self,num_threads): + self.queue = OneAtATime() + self.threads = [ ChangesGenerator(self,self.queue) ] + + for i in range(num_threads): + self.threads.append( ImportThread(self,self.queue) ) + + try: + for thread in self.threads: + thread.start() + + except KeyboardInterrupt: + print("Caught C-c; terminating.") + utils.warn("Caught C-c; terminating.") + self.plsDie() + + def plsDie(self): + traceback.print_stack90 + for thread in self.threads: + print( "STU: before ask %s to die" % thread ) + thread.plsDie() + print( "STU: after ask %s to die" % thread ) + + self.threads=[] + sys.exit(1) + def main(): cnf = Config() @@ -264,12 +337,9 @@ def main(): if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")): num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")]) + ImportKnownChanges(num_threads) - queue = OneAtATime() - ChangesGenerator(queue).start() - for i in range(num_threads): - ImportThread(queue).start() if __name__ == '__main__':