X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=dak%2Fimport_known_changes.py;h=84444ff1132601f421b6e58ec1dc49c8f261d4b8;hb=8f1a6ffdbf7683fba88d8d5a1d8cea4dc5512abb;hp=d286775abcb977770b9f15ca99815d2f94017ff4;hpb=1cbe83ac12a34c4969b8725c54423db3d8170577;p=dak.git diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py index d286775a..84444ff1 100755 --- a/dak/import_known_changes.py +++ b/dak/import_known_changes.py @@ -32,7 +32,7 @@ import sys import os import logging import threading -from daklib.dbconn import DBConn,get_knownchange +from daklib.dbconn import DBConn, get_dbchange from daklib.config import Config import apt_pkg from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError @@ -128,37 +128,48 @@ class OneAtATime(object): """ def __init__(self): self.next_in_line = None - self.next_lock = threading.Condition() + self.read_lock = threading.Condition() + self.write_lock = threading.Condition() self.die = False def plsDie(self): self.die = True - self.next_lock.notify() + self.write_lock.acquire() + self.write_lock.notifyAll() + self.write_lock.release() + + self.read_lock.acquire() + self.read_lock.notifyAll() + self.read_lock.release() def enqueue(self, next): - self.next_lock.acquire() + self.write_lock.acquire() while self.next_in_line: if self.die: return - self.next_lock.wait() + self.write_lock.wait() assert( not self.next_in_line ) self.next_in_line = next - self.next_lock.notify() - self.next_lock.release() + self.write_lock.release() + self.read_lock.acquire() + self.read_lock.notify() + self.read_lock.release() def dequeue(self): - self.next_lock.acquire() + self.read_lock.acquire() while not self.next_in_line: if self.die: return - self.next_lock.wait() + self.read_lock.wait() result = self.next_in_line self.next_in_line = None - self.next_lock.notify() - self.next_lock.release() + self.read_lock.release() + self.write_lock.acquire() + self.write_lock.notify() + self.write_lock.release() if isinstance(result, EndOfChanges): return None @@ -190,7 +201,7 @@ class ChangesGenerator(threading.Thread): def run(self): cnf = Config() count = 1 - for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: + for directory in [ "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: checkdir = cnf["Dir::Queue::%s" % (directory) ] if os.path.exists(checkdir): print "Looking into %s" % (checkdir) @@ -207,7 +218,7 @@ class ChangesGenerator(threading.Thread): continue count += 1 - if not get_knownchange(changesfile, self.session): + if not get_dbchange(changesfile, self.session): to_import = ChangesToImport(dirpath, changesfile, count) if self.die: return