- def run(self):
- while True:
- try:
- contents_file = self.upstream.dequeue()
- if isinstance(contents_file,EndOfContents):
- if self.downstream:
- self.downstream.enqueue(contents_file)
- break
-
- s = datetime.datetime.now()
- print("%s start: %s" % (self,contents_file) )
- self._run(contents_file)
- print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
- if self.downstream:
- self.downstream.enqueue(contents_file)
- except:
- traceback.print_exc()
-
-class QueryThread(ContentsWorkThread):
- def __init__(self, upstream, downstream):
- ContentsWorkThread.__init__(self, upstream, downstream)
-
- def __str__(self):
- return "QueryThread"
- __repr__ = __str__
-
- def _run(self, contents_file):
- contents_file.query()
-
-class IngestThread(ContentsWorkThread):
- def __init__(self, upstream, downstream):
- ContentsWorkThread.__init__(self, upstream, downstream)
-
- def __str__(self):
- return "IngestThread"
- __repr__ = __str__
-
- def _run(self, contents_file):
- contents_file.ingest()
-
-class SortThread(ContentsWorkThread):
- def __init__(self, upstream, downstream):
- ContentsWorkThread.__init__(self, upstream, downstream)
-
- def __str__(self):
- return "SortThread"
- __repr__ = __str__
-
- def _run(self, contents_file):
- contents_file.sorted_keys = sorted(contents_file.filenames.keys())
-
-class OutputThread(ContentsWorkThread):
- def __init__(self, upstream, downstream):
- ContentsWorkThread.__init__(self, upstream, downstream)
-
- def __str__(self):
- return "OutputThread"
- __repr__ = __str__
-
- def _run(self, contents_file):
- contents_file.open_file()
- for fname in contents_file.sorted_keys:
- contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
- contents_file.sorted_keys = None
- contents_file.filenames.clear()
-
-class GzipThread(ContentsWorkThread):
- def __init__(self, upstream, downstream):
- ContentsWorkThread.__init__(self, upstream, downstream)
-
- def __str__(self):
- return "GzipThread"
- __repr__ = __str__
-
- def _run(self, contents_file):
- os.system("gzip -f %s" % contents_file.filename)
-
-class ContentFile(object):
- def __init__(self,
- filename,
- suite_str,
- suite_id,
- arch_str,
- arch_id):
-
- self.filename = filename
- self.filenames = {}
- self.sorted_keys = None
- self.suite_str = suite_str
- self.suite_id = suite_id
- self.arch_str = arch_str
- self.arch_id = arch_id
- self.cursor = None
- self.filehandle = None
-
- def __str__(self):
- return self.filename
- __repr__ = __str__
-
-
- def cleanup(self):
- self.filenames = None
- self.sortedkeys = None
- self.filehandle.close()
- self.cursor.close()
-
- def query(self):
- self.cursor = DBConn().cursor();
-
- self.cursor.execute("""SELECT file, section || '/' || package
- FROM deb_contents
- WHERE ( arch=2 or arch = %d) AND suite = %d
- """ % (self.arch_id, self.suite_id))
-
- def ingest(self):
- while True:
- r = self.cursor.fetchone()
- if not r:
- break
- filename, package = r
- if self.filenames.has_key(filename):
- self.filenames[filename] += ",%s" % (package)
- else:
- self.filenames[filename] = "%s" % (package)
- self.cursor.close()