X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=dak%2Fcontents.py;h=e5fb129da119028254f5fe847edb1489604e19b0;hb=6d03dbb954cd67e522374c12225aa08a129e5d60;hp=c0d00c85abec80249c6b4f6a8726a4b87503b8d1;hpb=3919b6a71bc7592a953e8149d3fba8c00a96a539;p=dak.git diff --git a/dak/contents.py b/dak/contents.py index c0d00c85..e5fb129d 100755 --- a/dak/contents.py +++ b/dak/contents.py @@ -39,8 +39,11 @@ import os import logging import gzip import threading +import traceback import Queue import apt_pkg +import datetime +import traceback from daklib import utils from daklib.binary import Binary from daklib.config import Config @@ -58,6 +61,9 @@ COMMANDS bootstrap_bin scan the debs in the existing pool and load contents into the bin_contents table + bootstrap + copy data from the bin_contents table into the deb_contents / udeb_contents tables + cruft remove files/paths which are no longer referenced by a binary @@ -95,31 +101,47 @@ class EndOfContents(object): class OneAtATime(object): """ + a one space queue which sits between multiple possible producers + and multiple possible consumers """ def __init__(self): self.next_in_line = None - self.next_lock = threading.Condition() + self.read_lock = threading.Condition() + self.write_lock = threading.Condition() + self.die = False def enqueue(self, next): - self.next_lock.acquire() + self.write_lock.acquire() while self.next_in_line: - self.next_lock.wait() - + if self.die: + return + self.write_lock.wait() + assert( not self.next_in_line ) self.next_in_line = next - self.next_lock.notify() - self.next_lock.release() + self.write_lock.release() + self.read_lock.acquire() + self.read_lock.notify() + self.read_lock.release() def dequeue(self): - self.next_lock.acquire() + self.read_lock.acquire() while not self.next_in_line: - self.next_lock.wait() + if self.die: + return + self.read_lock.wait() + result = self.next_in_line + self.next_in_line = None - self.next_lock.notify() - self.next_lock.release() + self.read_lock.release() + self.write_lock.acquire() + self.write_lock.notify() + self.write_lock.release() + return result + class ContentsWorkThread(threading.Thread): """ """ @@ -193,7 +215,7 @@ class OutputThread(ContentsWorkThread): contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname])) contents_file.sorted_keys = None contents_file.filenames.clear() - + class GzipThread(ContentsWorkThread): def __init__(self, upstream, downstream): ContentsWorkThread.__init__(self, upstream, downstream) @@ -209,15 +231,16 @@ class ContentFile(object): def __init__(self, filename, suite_str, - suite_id) + suite_id): self.filename = filename self.filenames = {} self.sorted_keys = None self.suite_str = suite_str self.suite_id = suite_id - self.cursor = None + self.session = None self.filehandle = None + self.results = None def __str__(self): return self.filename @@ -228,30 +251,26 @@ class ContentFile(object): self.filenames = None self.sortedkeys = None self.filehandle.close() - self.cursor.close() + self.session.close() def ingest(self): while True: - r = self.cursor.fetchone() + r = self.results.fetchone() if not r: break filename, package = r - if self.filenames.has_key(filename): - self.filenames[filename] += ",%s" % (package) - else: - self.filenames[filename] = "%s" % (package) - self.cursor.close() + self.filenames[filename]=package + + self.session.close() def open_file(self): """ opens a gzip stream to the contents file """ -# filepath = Config()["Contents::Root"] + self.filename - self.filename = "/home/stew/contents/" + self.filename + filepath = Config()["Contents::Root"] + self.filename filedir = os.path.dirname(self.filename) if not os.path.isdir(filedir): os.makedirs(filedir) -# self.filehandle = gzip.open(self.filename, "w") self.filehandle = open(self.filename, "w") self._write_header() @@ -301,12 +320,12 @@ class DebContentFile(ContentFile): self.arch_id = arch_id def query(self): - self.cursor = DBConn().session(); + self.session = DBConn().session(); - self.cursor.execute("""SELECT file, component || section || '/' || package + self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package) FROM deb_contents WHERE ( arch=2 or arch = :arch) AND suite = :suite - """, { 'arch':self.arch_id, 'suite':self.suite_id } + """, { 'arch':self.arch_id, 'suite':self.suite_id } ) class UdebContentFile(ContentFile): def __init__(self, @@ -314,25 +333,25 @@ class UdebContentFile(ContentFile): suite_str, suite_id, section_name, - section_id) + section_id): ContentFile.__init__(self, filename, suite_str, suite_id ) def query(self): - self.cursor = DBConn().session(); + self.session = DBConn().session(); - self.cursor.execute("""SELECT file, component || section || '/' || package + self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package) FROM udeb_contents WHERE suite = :suite + group by filename """ , { 'suite': self.suite_id } ) class Contents(object): """ Class capable of generating Contents-$arch.gz files """ - def __init__(self): self.header = None @@ -377,8 +396,7 @@ class Contents(object): s = DBConn().session() - print( "bootstrap_bin" ) - for binary in s.query(DBBinary).yield_per(1000): + for binary in s.query(DBBinary).yield_per(100): print( "binary: %s" % binary.package ) filename = binary.poolfile.filename # Check for existing contents @@ -403,89 +421,61 @@ class Contents(object): """ s = DBConn().session() - for override in s.query(Override).all(): - binaries = s.execute("""SELECT b.binary_id, ba.arch + + # get a mapping of all the override types we care about (right now .deb an .udeb) + override_type_map = {}; + for override_type in s.query(OverrideType).all(): + if override_type.overridetype.endswith('deb' ): + override_type_map[override_type.overridetype_id] = override_type.overridetype; + + for override in s.query(Override).yield_per(100): + if not override_type_map.has_key(override.overridetype_id): + #this isn't an override we care about + continue + + binaries = s.execute("""SELECT b.id, b.architecture FROM binaries b - JOIN bin_associations ba ON ba.binary_id=b.binary_id + JOIN bin_associations ba ON ba.bin=b.id WHERE ba.suite=:suite - AND b.package=override.package""", {'suite':override.suite}) + AND b.package=:package""", {'suite':override.suite_id, 'package':override.package}) while True: binary = binaries.fetchone() if not binary: break - filenames = s.execute( """SELECT file from bin_contents where binary_id=:id""", { 'id': binary.binary_id } ) - while True: - filename = filenames.fetchone() - if not binary: - break - - - - if override.type == 7: - s.execute( """INSERT INTO deb_contents (file,section,package,binary_id,arch,suite,component) - VALUES (:filename, :section, :package, :binary_id, :arch, :suite, :component);""", - { 'filename' : filename, - 'section' : override.section, - 'package' : override.package, - 'binary_id' : binary.binary_id, - 'arch' : binary.arch, - 'suite' : override.suite, - 'component' : override.component } ) - - - elif override.type == 9: - s.execute( """INSERT INTO deb_contents (file,section,package,binary_id,arch,suite,component) - VALUES (:filename, :section, :package, :binary_id, :arch, :suite, :component);""", - { 'filename' : filename, - 'section' : override.section, - 'package' : override.package, - 'binary_id' : binary.binary_id, - 'arch' : binary.arch, - 'suite' : override.suite, - 'component' : override.component } ) - -# def bootstrap(self): -# """ -# scan the existing debs in the pool to populate the contents database tables -# """ -# pooldir = Config()[ 'Dir::Pool' ] - -# s = DBConn().session() - -# for suite in s.query(Suite).all(): -# for arch in get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=s): -# q = s.query(BinAssociation).join(Suite) -# q = q.join(Suite).filter_by(suite_name=suite.suite_name) -# q = q.join(DBBinary).join(Architecture).filter_by(arch.arch_string) -# for ba in q: -# filename = ba.binary.poolfile.filename -# # Check for existing contents -# existingq = s.query(ContentAssociations).filter_by(binary_pkg=ba.binary_id).limit(1) -# if existingq.count() > 0: -# log.debug( "already imported: %s" % (filename)) -# else: -# # We don't have existing contents so import them -# log.debug( "scanning: %s" % (filename) ) -# debfile = os.path.join(pooldir, filename) -# if os.path.exists(debfile): -# Binary(debfile, self.reject).scan_package(ba.binary_id, True) -# else: -# log.error("missing .deb: %s" % filename) + exists = s.execute("SELECT 1 FROM %s_contents WHERE binary_id=:id limit 1" % override_type_map[override.overridetype_id], {'id':binary.id}) + + + if exists.fetchone(): + print '.', + continue + else: + print '+', + + s.execute( """INSERT INTO %s_contents (filename,section,package,binary_id,arch,suite) + SELECT file, :section, :package, :binary_id, :arch, :suite + FROM bin_contents + WHERE binary_id=:binary_id;""" % override_type_map[override.overridetype_id], + { 'section' : override.section_id, + 'package' : override.package, + 'binary_id' : binary.id, + 'arch' : binary.architecture, + 'suite' : override.suite_id } ) + s.commit() + def generate(self): """ Generate contents files for both deb and udeb """ - DBConn().prepare("arches_q", arches_q) self.deb_generate() -# self.udeb_generate() + self.udeb_generate() def deb_generate(self): """ Generate Contents-$arch.gz files for every available arch in each given suite. """ - cursor = DBConn().session() - debtype_id = DBConn().get_override_type_id("deb") + session = DBConn().session() + debtype_id = get_override_type("deb", session) suites = self._suites() inputtoquery = OneAtATime() @@ -496,34 +486,27 @@ class Contents(object): qt = QueryThread(inputtoquery,querytoingest) it = IngestThread(querytoingest,ingesttosort) -# these actually make things worse -# it2 = IngestThread(querytoingest,ingesttosort) -# it3 = IngestThread(querytoingest,ingesttosort) -# it4 = IngestThread(querytoingest,ingesttosort) st = SortThread(ingesttosort,sorttooutput) ot = OutputThread(sorttooutput,outputtogzip) gt = GzipThread(outputtogzip, None) qt.start() it.start() -# it2.start() -# it3.start() -# it2.start() st.start() ot.start() gt.start() - + # Get our suites, and the architectures for suite in [i.lower() for i in suites]: - suite_id = DBConn().get_suite_id(suite) - arch_list = self._arches(cursor, suite_id) + suite_id = get_suite(suite, session).suite_id + print( "got suite_id: %s for suite: %s" % (suite_id, suite ) ) + arch_list = self._arches(suite_id, session) for (arch_id,arch_str) in arch_list: print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) ) -# filename = "dists/%s/Contents-%s.gz" % (suite, arch_str) filename = "dists/%s/Contents-%s" % (suite, arch_str) - cf = ContentFile(filename, suite, suite_id, arch_str, arch_id) + cf = DebContentFile(filename, suite, suite_id, arch_str, arch_id) inputtoquery.enqueue( cf ) inputtoquery.enqueue( EndOfContents() ) @@ -533,7 +516,7 @@ class Contents(object): """ Generate Contents-$arch.gz files for every available arch in each given suite. """ - cursor = DBConn().session() + session = DBConn().session() udebtype_id=DBConn().get_override_type_id("udeb") suites = self._suites() @@ -545,54 +528,15 @@ class Contents(object): qt = QueryThread(inputtoquery,querytoingest) it = IngestThread(querytoingest,ingesttosort) -# these actually make things worse -# it2 = IngestThread(querytoingest,ingesttosort) -# it3 = IngestThread(querytoingest,ingesttosort) -# it4 = IngestThread(querytoingest,ingesttosort) st = SortThread(ingesttosort,sorttooutput) ot = OutputThread(sorttooutput,outputtogzip) gt = GzipThread(outputtogzip, None) qt.start() it.start() -# it2.start() -# it3.start() -# it2.start() st.start() ot.start() gt.start() - - for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"), - ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]: - - section_id = DBConn().get_section_id(section) # all udebs should be here) - if section_id != -1: - - - - # Get our suites, and the architectures - for suite in [i.lower() for i in suites]: - suite_id = DBConn().get_suite_id(suite) - arch_list = self._arches(cursor, suite_id) - - for arch_id in arch_list: - - writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1])) - try: - - cursor.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id)) - - while True: - r = cursor.fetchone() - if not r: - break - - filename, section, package, arch = r - writer.write(filename, section, package) - finally: - writer.close() - - def generate(self): @@ -617,7 +561,7 @@ class Contents(object): section = get_section(section, session) # Get our suites - for suite in which_suites(): + for suite in which_suites(session): # Which architectures do we need to work on arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session) @@ -650,19 +594,24 @@ class Contents(object): if Config().has_key( "%s::%s" %(options_prefix,"Suite")): suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")]) else: - suites = [ 'unstable', 'testing' ] -# suites = Config().SubTree("Suite").List() + suites = Config().SubTree("Suite").List() return suites - def _arches(self, cursor, suite): + def _arches(self, suite, session): """ return a list of archs to operate on """ arch_list = [] - cursor.execute("EXECUTE arches_q(%d)" % (suite)) + arches = session.execute( + """SELECT s.architecture, a.arch_string + FROM suite_architectures s + JOIN architecture a ON (s.architecture=a.id) + WHERE suite = :suite_id""", + {'suite_id':suite } ) + while True: - r = cursor.fetchone() + r = arches.fetchone() if not r: break @@ -685,6 +634,7 @@ def main(): commands = {'generate' : Contents.generate, 'bootstrap_bin' : Contents.bootstrap_bin, + 'bootstrap' : Contents.bootstrap, 'cruft' : Contents.cruft, }