+ self.next_in_line = None
+ self.read_lock = threading.Condition()
+ self.write_lock = threading.Condition()
+ self.die = False
+
+ def enqueue(self, next):
+ self.write_lock.acquire()
+ while self.next_in_line:
+ if self.die:
+ return
+ self.write_lock.wait()
+
+ assert( not self.next_in_line )
+ self.next_in_line = next
+ self.write_lock.release()
+ self.read_lock.acquire()
+ self.read_lock.notify()
+ self.read_lock.release()
+
+ def dequeue(self):
+ self.read_lock.acquire()
+ while not self.next_in_line:
+ if self.die:
+ return
+ self.read_lock.wait()
+
+ result = self.next_in_line
+
+ self.next_in_line = None
+ self.read_lock.release()
+ self.write_lock.acquire()
+ self.write_lock.notify()
+ self.write_lock.release()
+
+ return result
+
+
+class ContentsWorkThread(threading.Thread):
+ """
+ """
+ def __init__(self, upstream, downstream):
+ threading.Thread.__init__(self)
+ self.upstream = upstream
+ self.downstream = downstream
+
+ def run(self):
+ while True:
+ try:
+ contents_file = self.upstream.dequeue()
+ if isinstance(contents_file,EndOfContents):
+ if self.downstream:
+ self.downstream.enqueue(contents_file)
+ break
+
+ s = datetime.datetime.now()
+ print("%s start: %s" % (self,contents_file) )
+ self._run(contents_file)
+ print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
+ if self.downstream:
+ self.downstream.enqueue(contents_file)
+ except:
+ traceback.print_exc()
+
+class QueryThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "QueryThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.query()
+
+class IngestThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "IngestThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.ingest()
+
+class SortThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "SortThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.sorted_keys = sorted(contents_file.filenames.keys())
+
+class OutputThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "OutputThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.open_file()
+ for fname in contents_file.sorted_keys:
+ contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
+ contents_file.sorted_keys = None
+ contents_file.filenames.clear()
+
+class GzipThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "GzipThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ os.system("gzip -f %s" % contents_file.filename)
+
+class ContentFile(object):
+ def __init__(self,
+ filename,
+ suite_str,
+ suite_id):
+
+ self.filename = filename
+ self.filenames = {}
+ self.sorted_keys = None
+ self.suite_str = suite_str
+ self.suite_id = suite_id
+ self.session = None
+ self.filehandle = None
+ self.results = None
+
+ def __str__(self):
+ return self.filename
+ __repr__ = __str__
+
+
+ def cleanup(self):
+ self.filenames = None
+ self.sortedkeys = None
+ self.filehandle.close()
+ self.session.close()
+
+ def ingest(self):
+ while True:
+ r = self.results.fetchone()
+ if not r:
+ break
+ filename, package = r
+ self.filenames[filename]=package
+
+ self.session.close()
+
+ def open_file(self):
+ """
+ opens a gzip stream to the contents file
+ """
+ filepath = Config()["Contents::Root"] + self.filename
+ filedir = os.path.dirname(self.filename)
+ if not os.path.isdir(filedir):
+ os.makedirs(filedir)
+ self.filehandle = open(self.filename, "w")
+ self._write_header()