import os
import logging
import threading
-from daklib.dbconn import DBConn,get_knownchange
-
+from daklib.dbconn import DBConn, get_dbchange, get_policy_queue
from daklib.config import Config
+import apt_pkg
+from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError
+from daklib.changes import Changes
+from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output
+import traceback
# where in dak.conf all of our configuration will be stowed
options_prefix = "KnownChanges"
sys.exit(exit_code)
def check_signature (sig_filename, data_filename=""):
+ fingerprint = None
+
keyrings = [
"/home/joerg/keyring/keyrings/debian-keyring.gpg",
- "/home/joerg/keyring/keyrings/debian-keyring.pgp",
"/home/joerg/keyring/keyrings/debian-maintainers.gpg",
"/home/joerg/keyring/keyrings/debian-role-keys.gpg",
"/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
"""
def __init__(self):
self.next_in_line = None
- self.next_lock = threading.Condition()
+ self.read_lock = threading.Condition()
+ self.write_lock = threading.Condition()
+ self.die = False
+
+ def plsDie(self):
+ self.die = True
+ self.write_lock.acquire()
+ self.write_lock.notifyAll()
+ self.write_lock.release()
+
+ self.read_lock.acquire()
+ self.read_lock.notifyAll()
+ self.read_lock.release()
def enqueue(self, next):
- self.next_lock.acquire()
+ self.write_lock.acquire()
while self.next_in_line:
- self.next_lock.wait()
+ if self.die:
+ return
+ self.write_lock.wait()
assert( not self.next_in_line )
self.next_in_line = next
- self.next_lock.notify()
- self.next_lock.release()
+ self.write_lock.release()
+ self.read_lock.acquire()
+ self.read_lock.notify()
+ self.read_lock.release()
def dequeue(self):
- self.next_lock.acquire()
+ self.read_lock.acquire()
while not self.next_in_line:
- self.next_lock.wait()
+ if self.die:
+ return
+ self.read_lock.wait()
+
result = self.next_in_line
- if isinstance(next, EndOfChanges):
+ self.next_in_line = None
+ self.read_lock.release()
+ self.write_lock.acquire()
+ self.write_lock.notify()
+ self.write_lock.release()
+
+ if isinstance(result, EndOfChanges):
return None
- self.next_in_line = None
- self.next_lock.notify()
- self.next_lock.release()
return result
class ChangesToImport(object):
"""A changes file to be enqueued to be processed"""
- def __init__(self, queue, checkdir, changesfile, count):
- self.queue = queue
- self.checkdir = checkdir
+ def __init__(self, checkdir, changesfile, count):
+ self.dirpath = checkdir
self.changesfile = changesfile
self.count = count
+ def __str__(self):
+ return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath)
+
class ChangesGenerator(threading.Thread):
"""enqueues changes files to be imported"""
- def __init__(self, queue):
+ def __init__(self, parent, queue):
+ threading.Thread.__init__(self)
self.queue = queue
self.session = DBConn().session()
+ self.parent = parent
+ self.die = False
+
+ def plsDie(self):
+ self.die = True
def run(self):
cnf = Config()
- for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]:
- checkdir = cnf["Dir::Queue::%s" % (directory) ]
+ count = 1
+
+ dirs = []
+ dirs.append(cnf['Dir::Done'])
+
+ for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates" ]:
+ queue = get_policy_queue(queue_name)
+ if queue:
+ dirs.append(os.path.abspath(queue.path))
+ else:
+ utils.warn("Could not find queue %s in database" % queue_name)
+
+ for checkdir in dirs:
if os.path.exists(checkdir):
print "Looking into %s" % (checkdir)
- for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False):
+ for dirpath, dirnames, filenames in os.walk(checkdir, topdown=True):
if not filenames:
# Empty directory (or only subdirectories), next
continue
+
for changesfile in filenames:
- if not changesfile.endswith(".changes"):
- # Only interested in changes files.
- continue
+ try:
+ if not changesfile.endswith(".changes"):
+ # Only interested in changes files.
+ continue
count += 1
- if not get_knownchange(session):
- self.queue.enqueue(ChangesToImport(directory, checkdir, changesfile, count))
+ if not get_dbchange(changesfile, self.session):
+ to_import = ChangesToImport(dirpath, changesfile, count)
+ if self.die:
+ return
+ self.queue.enqueue(to_import)
+ except KeyboardInterrupt:
+ print("got Ctrl-c in enqueue thread. terminating")
+ self.parent.plsDie()
+ sys.exit(1)
self.queue.enqueue(EndOfChanges())
class ImportThread(threading.Thread):
- def __init__(self, queue):
+ def __init__(self, parent, queue):
+ threading.Thread.__init__(self)
self.queue = queue
self.session = DBConn().session()
+ self.parent = parent
+ self.die = False
+
+ def plsDie(self):
+ self.die = True
def run(self):
while True:
try:
- to_import = queue.dequeue()
+ if self.die:
+ return
+ to_import = self.queue.dequeue()
if not to_import:
return
- print( "Directory %s, file %7d, failures %3d. (%s)" % (to_import.dirpath[-10:], to_import.count, failure, to_import.changesfile) )
+ print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) )
changes = Changes()
changes.changes_file = to_import.changesfile
changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
changes.changes = parse_changes(changesfile, signing_rules=-1)
changes.changes["fingerprint"] = check_signature(changesfile)
- changes.add_known_changes(to_import.queue, self.session)
+ changes.add_known_changes(to_import.dirpath, session=self.session)
self.session.commit()
except InvalidDscError, line:
warn("syntax error in .dsc file '%s', line %s." % (f, line))
- failure += 1
except ChangesUnicodeError:
warn("found invalid changes file, not properly utf-8 encoded")
- failure += 1
- print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile)
+ except KeyboardInterrupt:
+ print("Caught C-c; on ImportThread. terminating.")
+ self.parent.plsDie()
+ sys.exit(1)
+
+ except:
+ self.parent.plsDie()
+ sys.exit(1)
+
+class ImportKnownChanges(object):
+ def __init__(self,num_threads):
+ self.queue = OneAtATime()
+ self.threads = [ ChangesGenerator(self,self.queue) ]
+
+ for i in range(num_threads):
+ self.threads.append( ImportThread(self,self.queue) )
+
+ try:
+ for thread in self.threads:
+ thread.start()
+ except KeyboardInterrupt:
+ print("Caught C-c; terminating.")
+ utils.warn("Caught C-c; terminating.")
+ self.plsDie()
+
+ def plsDie(self):
+ traceback.print_stack90
+ for thread in self.threads:
+ print( "STU: before ask %s to die" % thread )
+ thread.plsDie()
+ print( "STU: after ask %s to die" % thread )
+
+ self.threads=[]
+ sys.exit(1)
def main():
num_threads = 1
- if (len(args) < 1) or not commands.has_key(args[0]):
+ if len(args) > 0:
usage()
if cnf.has_key("%s::%s" % (options_prefix,"Help")):
stream = sys.stderr )
if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
- num_threads = int(Config()[ "%s::%s" %(options_prefix,"Suite")])
-
-
- queue = OneAtATime()
- ChangesGenerator(queue).start()
+ num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")])
- for i in range(num_threads):
- ImportThread(queue).start()
+ ImportKnownChanges(num_threads)
-def which_suites(session):
- """
- return a list of suites to operate on
- """
- if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
- suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
- else:
- suites = Config().SubTree("Suite").List()
- return [get_suite(s.lower(), session) for s in suites]
if __name__ == '__main__':