]> git.decadent.org.uk Git - dak.git/blobdiff - dak/import_known_changes.py
Fix typo
[dak.git] / dak / import_known_changes.py
index d95ad2c50012daf6afef68170ef1ee9eef6094ae..6901144f4eea2c864e8ca66f2dac9d2378cf2554 100755 (executable)
@@ -32,9 +32,13 @@ import sys
 import os
 import logging
 import threading
-from daklib.dbconn import DBConn,get_knownchange
-
+from daklib.dbconn import DBConn, get_dbchange
 from daklib.config import Config
+import apt_pkg
+from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError
+from daklib.changes import Changes
+from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output
+import traceback
 
 # where in dak.conf all of our configuration will be stowed
 options_prefix = "KnownChanges"
@@ -62,9 +66,10 @@ OPTIONS
     sys.exit(exit_code)
 
 def check_signature (sig_filename, data_filename=""):
+    fingerprint = None
+
     keyrings = [
         "/home/joerg/keyring/keyrings/debian-keyring.gpg",
-        "/home/joerg/keyring/keyrings/debian-keyring.pgp",
         "/home/joerg/keyring/keyrings/debian-maintainers.gpg",
         "/home/joerg/keyring/keyrings/debian-role-keys.gpg",
         "/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
@@ -122,100 +127,189 @@ class OneAtATime(object):
     """
     def __init__(self):
         self.next_in_line = None
-        self.next_lock = threading.Condition()
+        self.read_lock = threading.Condition()
+        self.write_lock = threading.Condition()
+        self.die = False
+
+    def plsDie(self):
+        self.die = True
+        self.write_lock.acquire()
+        self.write_lock.notifyAll()
+        self.write_lock.release()
+
+        self.read_lock.acquire()
+        self.read_lock.notifyAll()
+        self.read_lock.release()
 
     def enqueue(self, next):
-        self.next_lock.acquire()
+        self.write_lock.acquire()
         while self.next_in_line:
-            self.next_lock.wait()
+            if self.die:
+                return
+            self.write_lock.wait()
 
         assert( not self.next_in_line )
         self.next_in_line = next
-        self.next_lock.notify()
-        self.next_lock.release()
+        self.write_lock.release()
+        self.read_lock.acquire()
+        self.read_lock.notify()
+        self.read_lock.release()
 
     def dequeue(self):
-        self.next_lock.acquire()
+        self.read_lock.acquire()
         while not self.next_in_line:
-            self.next_lock.wait()
+            if self.die:
+                return
+            self.read_lock.wait()
+
         result = self.next_in_line
 
-        if isinstance(next, EndOfChanges):
+        self.next_in_line = None
+        self.read_lock.release()
+        self.write_lock.acquire()
+        self.write_lock.notify()
+        self.write_lock.release()
+
+        if isinstance(result, EndOfChanges):
             return None
 
-        self.next_in_line = None
-        self.next_lock.notify()
-        self.next_lock.release()
         return result
 
 class ChangesToImport(object):
     """A changes file to be enqueued to be processed"""
-    def __init__(self, queue, checkdir, changesfile, count):
-        self.queue = queue
-        self.checkdir = checkdir
+    def __init__(self, checkdir, changesfile, count):
+        self.dirpath = checkdir
         self.changesfile = changesfile
         self.count = count
 
+    def __str__(self):
+        return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath)
+
 class ChangesGenerator(threading.Thread):
     """enqueues changes files to be imported"""
-    def __init__(self, queue):
+    def __init__(self, parent, queue):
+        threading.Thread.__init__(self)
         self.queue = queue
         self.session = DBConn().session()
+        self.parent = parent
+        self.die = False
+
+    def plsDie(self):
+        self.die = True
 
     def run(self):
         cnf = Config()
-        for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]:
-            checkdir = cnf["Dir::Queue::%s" % (directory) ]
+        count = 1
+
+        dirs = []
+        dirs.append(cnf['Dir::Done'])
+
+        for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates" ]:
+            queue = get_policy_queue(queue_name)
+            if queue:
+                dirs.append(os.path.abspath(queue.path))
+            else:
+                utils.warn("Could not find queue %s in database" % queue_name)
+
+        for checkdir in dirs:
             if os.path.exists(checkdir):
                 print "Looking into %s" % (checkdir)
 
-                for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False):
+                for dirpath, dirnames, filenames in os.walk(checkdir, topdown=True):
                     if not filenames:
                         # Empty directory (or only subdirectories), next
                         continue
+
                     for changesfile in filenames:
-                        if not changesfile.endswith(".changes"):
-                            # Only interested in changes files.
-                            continue
+                        try:
+                            if not changesfile.endswith(".changes"):
+                                # Only interested in changes files.
+                                continue
                             count += 1
 
-                            if not get_knownchange(session):
-                                self.queue.enqueue(ChangesToImport(directory, checkdir, changesfile, count))
+                            if not get_dbchange(changesfile, self.session):
+                                to_import = ChangesToImport(dirpath, changesfile, count)
+                                if self.die:
+                                    return
+                                self.queue.enqueue(to_import)
+                        except KeyboardInterrupt:
+                            print("got Ctrl-c in enqueue thread.  terminating")
+                            self.parent.plsDie()
+                            sys.exit(1)
 
         self.queue.enqueue(EndOfChanges())
 
 class ImportThread(threading.Thread):
-    def __init__(self, queue):
+    def __init__(self, parent, queue):
+        threading.Thread.__init__(self)
         self.queue = queue
         self.session = DBConn().session()
+        self.parent = parent
+        self.die = False
+
+    def plsDie(self):
+        self.die = True
 
     def run(self):
         while True:
             try:
-                to_import = queue.dequeue()
+                if self.die:
+                    return
+                to_import = self.queue.dequeue()
                 if not to_import:
                     return
 
-                print( "Directory %s, file %7d, failures %3d. (%s)" % (to_import.dirpath[-10:], to_import.count, failure, to_import.changesfile) )
+                print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) )
 
                 changes = Changes()
                 changes.changes_file = to_import.changesfile
                 changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
                 changes.changes = parse_changes(changesfile, signing_rules=-1)
                 changes.changes["fingerprint"] = check_signature(changesfile)
-                changes.add_known_changes(to_import.queue, self.session)
+                changes.add_known_changes(to_import.dirpath, session=self.session)
                 self.session.commit()
 
             except InvalidDscError, line:
                 warn("syntax error in .dsc file '%s', line %s." % (f, line))
-                failure += 1
 
             except ChangesUnicodeError:
                 warn("found invalid changes file, not properly utf-8 encoded")
-                failure += 1
 
-                print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile)
+            except KeyboardInterrupt:
+                print("Caught C-c; on ImportThread. terminating.")
+                self.parent.plsDie()
+                sys.exit(1)
+
+            except:
+                self.parent.plsDie()
+                sys.exit(1)
+
+class ImportKnownChanges(object):
+    def __init__(self,num_threads):
+        self.queue = OneAtATime()
+        self.threads = [ ChangesGenerator(self,self.queue) ]
+
+        for i in range(num_threads):
+            self.threads.append( ImportThread(self,self.queue) )
+
+        try:
+            for thread in self.threads:
+                thread.start()
 
+        except KeyboardInterrupt:
+            print("Caught C-c; terminating.")
+            utils.warn("Caught C-c; terminating.")
+            self.plsDie()
+
+    def plsDie(self):
+        traceback.print_stack90
+        for thread in self.threads:
+            print( "STU: before ask %s to die" % thread )
+            thread.plsDie()
+            print( "STU: after ask %s to die" % thread )
+
+        self.threads=[]
+        sys.exit(1)
 
 
 def main():
@@ -231,7 +325,7 @@ def main():
 
     num_threads = 1
 
-    if (len(args) < 1) or not commands.has_key(args[0]):
+    if len(args) > 0:
         usage()
 
     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
@@ -250,25 +344,11 @@ def main():
                          stream = sys.stderr )
 
     if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
-        num_threads = int(Config()[ "%s::%s" %(options_prefix,"Suite")])
-
-
-    queue = OneAtATime()
-    ChangesGenerator(queue).start()
+        num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")])
 
-    for i in range(num_threads):
-        ImportThread(queue).start()
+    ImportKnownChanges(num_threads)
 
-def which_suites(session):
-    """
-    return a list of suites to operate on
-    """
-    if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
-        suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
-    else:
-        suites = Config().SubTree("Suite").List()
 
-    return [get_suite(s.lower(), session) for s in suites]
 
 
 if __name__ == '__main__':