X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=dak%2Fimport_known_changes.py;h=5d666b6b6f4ef58b730cd3a33656cb829d9dd247;hb=ab58bad73376ccc69685fcb731916ac03edadc68;hp=dcd3a3350e29867d2361459647b0b903e0489349;hpb=2fb2e3cbd37af78c703bab894ff51664be1f41ae;p=dak.git diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py index dcd3a335..5d666b6b 100755 --- a/dak/import_known_changes.py +++ b/dak/import_known_changes.py @@ -32,7 +32,7 @@ import sys import os import logging import threading -from daklib.dbconn import DBConn,get_knownchange +from daklib.dbconn import DBConn, get_dbchange from daklib.config import Config import apt_pkg from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError @@ -66,9 +66,10 @@ OPTIONS sys.exit(exit_code) def check_signature (sig_filename, data_filename=""): + fingerprint = None + keyrings = [ "/home/joerg/keyring/keyrings/debian-keyring.gpg", - "/home/joerg/keyring/keyrings/debian-keyring.pgp", "/home/joerg/keyring/keyrings/debian-maintainers.gpg", "/home/joerg/keyring/keyrings/debian-role-keys.gpg", "/home/joerg/keyring/keyrings/emeritus-keyring.pgp", @@ -126,30 +127,52 @@ class OneAtATime(object): """ def __init__(self): self.next_in_line = None - self.next_lock = threading.Condition() + self.read_lock = threading.Condition() + self.write_lock = threading.Condition() + self.die = False + + def plsDie(self): + self.die = True + self.write_lock.acquire() + self.write_lock.notifyAll() + self.write_lock.release() + + self.read_lock.acquire() + self.read_lock.notifyAll() + self.read_lock.release() def enqueue(self, next): - self.next_lock.acquire() + self.write_lock.acquire() while self.next_in_line: - self.next_lock.wait() + if self.die: + return + self.write_lock.wait() assert( not self.next_in_line ) self.next_in_line = next - self.next_lock.notify() - self.next_lock.release() + self.write_lock.release() + self.read_lock.acquire() + self.read_lock.notify() + self.read_lock.release() def dequeue(self): - self.next_lock.acquire() + self.read_lock.acquire() while not self.next_in_line: - self.next_lock.wait() + if self.die: + return + self.read_lock.wait() + result = self.next_in_line + self.next_in_line = None + self.read_lock.release() + self.write_lock.acquire() + self.write_lock.notify() + self.write_lock.release() + if isinstance(result, EndOfChanges): return None - self.next_in_line = None - self.next_lock.notify() - self.next_lock.release() return result class ChangesToImport(object): @@ -164,45 +187,74 @@ class ChangesToImport(object): class ChangesGenerator(threading.Thread): """enqueues changes files to be imported""" - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): cnf = Config() count = 1 - for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: - checkdir = cnf["Dir::Queue::%s" % (directory) ] + + dirs = [] + dirs.append(cnf['Dir::Done']) + + for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates" ]: + queue = get_policy_queue(queue_name) + if queue: + dirs.append(os.path.abspath(queue.path)) + else: + utils.warn("Could not find queue %s in database" % queue_name) + + for checkdir in dirs if os.path.exists(checkdir): print "Looking into %s" % (checkdir) - for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False): + for dirpath, dirnames, filenames in os.walk(checkdir, topdown=True): if not filenames: # Empty directory (or only subdirectories), next continue - for changesfile in filenames: - if not changesfile.endswith(".changes"): - # Only interested in changes files. - continue - count += 1 - if not get_knownchange(changesfile, self.session): - to_import = ChangesToImport(dirpath, changesfile, count) - print("enqueue: %s" % to_import) - self.queue.enqueue(to_import) + for changesfile in filenames: + try: + if not changesfile.endswith(".changes"): + # Only interested in changes files. + continue + count += 1 + + if not get_dbchange(changesfile, self.session): + to_import = ChangesToImport(dirpath, changesfile, count) + if self.die: + return + self.queue.enqueue(to_import) + except KeyboardInterrupt: + print("got Ctrl-c in enqueue thread. terminating") + self.parent.plsDie() + sys.exit(1) self.queue.enqueue(EndOfChanges()) class ImportThread(threading.Thread): - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): while True: try: + if self.die: + return to_import = self.queue.dequeue() if not to_import: return @@ -212,25 +264,53 @@ class ImportThread(threading.Thread): changes = Changes() changes.changes_file = to_import.changesfile changesfile = os.path.join(to_import.dirpath, to_import.changesfile) - print( "STU: %s / %s" % (to_import.dirpath, to_import.changesfile)) changes.changes = parse_changes(changesfile, signing_rules=-1) changes.changes["fingerprint"] = check_signature(changesfile) - changes.add_known_changes(to_import.dirpath, self.session) + changes.add_known_changes(to_import.dirpath, session=self.session) self.session.commit() except InvalidDscError, line: warn("syntax error in .dsc file '%s', line %s." % (f, line)) -# failure += 1 except ChangesUnicodeError: warn("found invalid changes file, not properly utf-8 encoded") -# failure += 1 - - print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile) + except KeyboardInterrupt: + print("Caught C-c; on ImportThread. terminating.") + self.parent.plsDie() + sys.exit(1) except: - traceback.print_exc() + self.parent.plsDie() + sys.exit(1) + +class ImportKnownChanges(object): + def __init__(self,num_threads): + self.queue = OneAtATime() + self.threads = [ ChangesGenerator(self,self.queue) ] + + for i in range(num_threads): + self.threads.append( ImportThread(self,self.queue) ) + + try: + for thread in self.threads: + thread.start() + + except KeyboardInterrupt: + print("Caught C-c; terminating.") + utils.warn("Caught C-c; terminating.") + self.plsDie() + + def plsDie(self): + traceback.print_stack90 + for thread in self.threads: + print( "STU: before ask %s to die" % thread ) + thread.plsDie() + print( "STU: after ask %s to die" % thread ) + + self.threads=[] + sys.exit(1) + def main(): cnf = Config() @@ -266,12 +346,9 @@ def main(): if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")): num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")]) + ImportKnownChanges(num_threads) - queue = OneAtATime() - ChangesGenerator(queue).start() - for i in range(num_threads): - ImportThread(queue).start() if __name__ == '__main__':