X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=dak%2Fimport_known_changes.py;h=b3f8e8a8359b2c7ac991862d174550e84222a3f7;hb=066d01be84db56933591117e616ac35203dffd18;hp=d95ad2c50012daf6afef68170ef1ee9eef6094ae;hpb=1ac75877715b72de5903cc6c16aeb44bd2d4b41f;p=dak.git diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py index d95ad2c5..b3f8e8a8 100755 --- a/dak/import_known_changes.py +++ b/dak/import_known_changes.py @@ -32,9 +32,13 @@ import sys import os import logging import threading -from daklib.dbconn import DBConn,get_knownchange - +from daklib.dbconn import DBConn, get_dbchange from daklib.config import Config +import apt_pkg +from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError +from daklib.changes import Changes +from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output +import traceback # where in dak.conf all of our configuration will be stowed options_prefix = "KnownChanges" @@ -62,9 +66,10 @@ OPTIONS sys.exit(exit_code) def check_signature (sig_filename, data_filename=""): + fingerprint = None + keyrings = [ "/home/joerg/keyring/keyrings/debian-keyring.gpg", - "/home/joerg/keyring/keyrings/debian-keyring.pgp", "/home/joerg/keyring/keyrings/debian-maintainers.gpg", "/home/joerg/keyring/keyrings/debian-role-keys.gpg", "/home/joerg/keyring/keyrings/emeritus-keyring.pgp", @@ -122,100 +127,179 @@ class OneAtATime(object): """ def __init__(self): self.next_in_line = None - self.next_lock = threading.Condition() + self.read_lock = threading.Condition() + self.write_lock = threading.Condition() + self.die = False + + def plsDie(self): + self.die = True + self.write_lock.acquire() + self.write_lock.notifyAll() + self.write_lock.release() + + self.read_lock.acquire() + self.read_lock.notifyAll() + self.read_lock.release() def enqueue(self, next): - self.next_lock.acquire() + self.write_lock.acquire() while self.next_in_line: - self.next_lock.wait() + if self.die: + return + self.write_lock.wait() assert( not self.next_in_line ) self.next_in_line = next - self.next_lock.notify() - self.next_lock.release() + self.write_lock.release() + self.read_lock.acquire() + self.read_lock.notify() + self.read_lock.release() def dequeue(self): - self.next_lock.acquire() + self.read_lock.acquire() while not self.next_in_line: - self.next_lock.wait() + if self.die: + return + self.read_lock.wait() + result = self.next_in_line - if isinstance(next, EndOfChanges): + self.next_in_line = None + self.read_lock.release() + self.write_lock.acquire() + self.write_lock.notify() + self.write_lock.release() + + if isinstance(result, EndOfChanges): return None - self.next_in_line = None - self.next_lock.notify() - self.next_lock.release() return result class ChangesToImport(object): """A changes file to be enqueued to be processed""" - def __init__(self, queue, checkdir, changesfile, count): - self.queue = queue - self.checkdir = checkdir + def __init__(self, checkdir, changesfile, count): + self.dirpath = checkdir self.changesfile = changesfile self.count = count + def __str__(self): + return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath) + class ChangesGenerator(threading.Thread): """enqueues changes files to be imported""" - def __init__(self, queue): + def __init__(self, parent, queue): + threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): cnf = Config() - for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: + count = 1 + for directory in [ "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: checkdir = cnf["Dir::Queue::%s" % (directory) ] if os.path.exists(checkdir): print "Looking into %s" % (checkdir) - for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False): + for dirpath, dirnames, filenames in os.walk(checkdir, topdown=True): if not filenames: # Empty directory (or only subdirectories), next continue + for changesfile in filenames: - if not changesfile.endswith(".changes"): - # Only interested in changes files. - continue + try: + if not changesfile.endswith(".changes"): + # Only interested in changes files. + continue count += 1 - if not get_knownchange(session): - self.queue.enqueue(ChangesToImport(directory, checkdir, changesfile, count)) + if not get_dbchange(changesfile, self.session): + to_import = ChangesToImport(dirpath, changesfile, count) + if self.die: + return + self.queue.enqueue(to_import) + except KeyboardInterrupt: + print("got Ctrl-c in enqueue thread. terminating") + self.parent.plsDie() + sys.exit(1) self.queue.enqueue(EndOfChanges()) class ImportThread(threading.Thread): - def __init__(self, queue): + def __init__(self, parent, queue): + threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent + self.die = False + + def plsDie(self): + self.die = True def run(self): while True: try: - to_import = queue.dequeue() + if self.die: + return + to_import = self.queue.dequeue() if not to_import: return - print( "Directory %s, file %7d, failures %3d. (%s)" % (to_import.dirpath[-10:], to_import.count, failure, to_import.changesfile) ) + print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) ) changes = Changes() changes.changes_file = to_import.changesfile changesfile = os.path.join(to_import.dirpath, to_import.changesfile) changes.changes = parse_changes(changesfile, signing_rules=-1) changes.changes["fingerprint"] = check_signature(changesfile) - changes.add_known_changes(to_import.queue, self.session) + changes.add_known_changes(to_import.dirpath, session=self.session) self.session.commit() except InvalidDscError, line: warn("syntax error in .dsc file '%s', line %s." % (f, line)) - failure += 1 except ChangesUnicodeError: warn("found invalid changes file, not properly utf-8 encoded") - failure += 1 - print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile) + except KeyboardInterrupt: + print("Caught C-c; on ImportThread. terminating.") + self.parent.plsDie() + sys.exit(1) + + except: + self.parent.plsDie() + sys.exit(1) + +class ImportKnownChanges(object): + def __init__(self,num_threads): + self.queue = OneAtATime() + self.threads = [ ChangesGenerator(self,self.queue) ] + + for i in range(num_threads): + self.threads.append( ImportThread(self,self.queue) ) + + try: + for thread in self.threads: + thread.start() + + except KeyboardInterrupt: + print("Caught C-c; terminating.") + utils.warn("Caught C-c; terminating.") + self.plsDie() + def plsDie(self): + traceback.print_stack90 + for thread in self.threads: + print( "STU: before ask %s to die" % thread ) + thread.plsDie() + print( "STU: after ask %s to die" % thread ) + + self.threads=[] + sys.exit(1) def main(): @@ -231,7 +315,7 @@ def main(): num_threads = 1 - if (len(args) < 1) or not commands.has_key(args[0]): + if len(args) > 0: usage() if cnf.has_key("%s::%s" % (options_prefix,"Help")): @@ -250,25 +334,11 @@ def main(): stream = sys.stderr ) if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")): - num_threads = int(Config()[ "%s::%s" %(options_prefix,"Suite")]) - - - queue = OneAtATime() - ChangesGenerator(queue).start() + num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")]) - for i in range(num_threads): - ImportThread(queue).start() + ImportKnownChanges(num_threads) -def which_suites(session): - """ - return a list of suites to operate on - """ - if Config().has_key( "%s::%s" %(options_prefix,"Suite")): - suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")]) - else: - suites = Config().SubTree("Suite").List() - return [get_suite(s.lower(), session) for s in suites] if __name__ == '__main__':