]> git.decadent.org.uk Git - dak.git/blobdiff - dak/contents.py
Refresh the wnpp bugs list on a daily basis
[dak.git] / dak / contents.py
index b94aa0de7ca7a99ba5d6f621aaba5b72336f450e..e5fb129da119028254f5fe847edb1489604e19b0 100755 (executable)
@@ -37,11 +37,13 @@ Create all the contents files
 import sys
 import os
 import logging
 import sys
 import os
 import logging
-import math
 import gzip
 import threading
 import gzip
 import threading
+import traceback
 import Queue
 import apt_pkg
 import Queue
 import apt_pkg
+import datetime
+import traceback
 from daklib import utils
 from daklib.binary import Binary
 from daklib.config import Config
 from daklib import utils
 from daklib.binary import Binary
 from daklib.config import Config
@@ -56,8 +58,11 @@ COMMANDS
     generate
         generate Contents-$arch.gz files
 
     generate
         generate Contents-$arch.gz files
 
+    bootstrap_bin
+        scan the debs in the existing pool and load contents into the bin_contents table
+
     bootstrap
     bootstrap
-        scan the debs in the existing pool and load contents in the the database
+        copy data from the bin_contents table into the deb_contents / udeb_contents tables
 
     cruft
         remove files/paths which are no longer referenced by a binary
 
     cruft
         remove files/paths which are no longer referenced by a binary
@@ -94,107 +99,259 @@ class EndOfContents(object):
     """
     pass
 
     """
     pass
 
-class GzippedContentWriter(object):
+class OneAtATime(object):
+    """
+    a one space queue which sits between multiple possible producers
+    and multiple possible consumers
     """
     """
-    An object which will write contents out to a Contents-$arch.gz
-    file on a separate thread
+    def __init__(self):
+        self.next_in_line = None
+        self.read_lock = threading.Condition()
+        self.write_lock = threading.Condition()
+        self.die = False
+
+    def enqueue(self, next):
+        self.write_lock.acquire()
+        while self.next_in_line:
+            if self.die:
+                return
+            self.write_lock.wait()
+
+        assert( not self.next_in_line )
+        self.next_in_line = next
+        self.write_lock.release()
+        self.read_lock.acquire()
+        self.read_lock.notify()
+        self.read_lock.release()
+
+    def dequeue(self):
+        self.read_lock.acquire()
+        while not self.next_in_line:
+            if self.die:
+                return
+            self.read_lock.wait()
+
+        result = self.next_in_line
+
+        self.next_in_line = None
+        self.read_lock.release()
+        self.write_lock.acquire()
+        self.write_lock.notify()
+        self.write_lock.release()
+
+        return result
+
+
+class ContentsWorkThread(threading.Thread):
     """
     """
+    """
+    def __init__(self, upstream, downstream):
+        threading.Thread.__init__(self)
+        self.upstream = upstream
+        self.downstream = downstream
 
 
-    header = None # a class object holding the header section of contents file
+    def run(self):
+        while True:
+            try:
+                contents_file = self.upstream.dequeue()
+                if isinstance(contents_file,EndOfContents):
+                    if self.downstream:
+                        self.downstream.enqueue(contents_file)
+                    break
+
+                s = datetime.datetime.now()
+                print("%s start: %s" % (self,contents_file) )
+                self._run(contents_file)
+                print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
+                if self.downstream:
+                    self.downstream.enqueue(contents_file)
+            except:
+                traceback.print_exc()
+
+class QueryThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "QueryThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.query()
+
+class IngestThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "IngestThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.ingest()
+
+class SortThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "SortThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.sorted_keys = sorted(contents_file.filenames.keys())
+
+class OutputThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "OutputThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.open_file()
+        for fname in contents_file.sorted_keys:
+            contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
+        contents_file.sorted_keys = None
+        contents_file.filenames.clear()
+
+class GzipThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "GzipThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        os.system("gzip -f %s" % contents_file.filename)
+
+class ContentFile(object):
+    def __init__(self,
+                 filename,
+                 suite_str,
+                 suite_id):
+
+        self.filename = filename
+        self.filenames = {}
+        self.sorted_keys = None
+        self.suite_str = suite_str
+        self.suite_id = suite_id
+        self.session = None
+        self.filehandle = None
+        self.results = None
+
+    def __str__(self):
+        return self.filename
+    __repr__ = __str__
+
+
+    def cleanup(self):
+        self.filenames = None
+        self.sortedkeys = None
+        self.filehandle.close()
+        self.session.close()
+
+    def ingest(self):
+        while True:
+            r = self.results.fetchone()
+            if not r:
+                break
+            filename, package = r
+            self.filenames[filename]=package
 
 
-    def __init__(self, filename):
-        """
-        @ptype filename: string
-        @param filename: the name of the file to write to
-        """
-        self.queue = Queue.Queue()
-        self.current_file = None
-        self.first_package = True
-        self.output = self.open_file(filename)
-        self.thread = threading.Thread(target=self.write_thread,
-                                       name='Contents writer')
-        self.thread.start()
-
-    def open_file(self, filename):
+        self.session.close()
+
+    def open_file(self):
         """
         opens a gzip stream to the contents file
         """
         """
         opens a gzip stream to the contents file
         """
-        filepath = Config()["Contents::Root"] + filename
-        filedir = os.path.dirname(filepath)
+        filepath = Config()["Contents::Root"] + self.filename
+        filedir = os.path.dirname(self.filename)
         if not os.path.isdir(filedir):
             os.makedirs(filedir)
         if not os.path.isdir(filedir):
             os.makedirs(filedir)
-        return gzip.open(filepath, "w")
-
-    def write(self, filename, section, package):
-        """
-        enqueue content to be written to the file on a separate thread
-        """
-        self.queue.put((filename,section,package))
-
-    def write_thread(self):
-        """
-        the target of a Thread which will do the actual writing
-        """
-        while True:
-            next = self.queue.get()
-            if isinstance(next, EndOfContents):
-                self.output.write('\n')
-                self.output.close()
-                break
+        self.filehandle = open(self.filename, "w")
+        self._write_header()
 
 
-            (filename,section,package)=next
-            if next != self.current_file:
-                # this is the first file, so write the header first
-                if not self.current_file:
-                    self.output.write(self._getHeader())
+    def _write_header(self):
+        self._get_header();
+        self.filehandle.write(ContentFile.header)
 
 
-                self.output.write('\n%s\t' % filename)
-                self.first_package = True
-
-            self.current_file=filename
-
-            if not self.first_package:
-                self.output.write(',')
-            else:
-                self.first_package=False
-            self.output.write('%s/%s' % (section,package))
-
-    def finish(self):
-        """
-        enqueue the sentry object so that writers will know to terminate
-        """
-        self.queue.put(EndOfContents())
+    header=None
 
     @classmethod
 
     @classmethod
-    def _getHeader(self):
+    def _get_header(self):
         """
         Internal method to return the header for Contents.gz files
 
         This is boilerplate which explains the contents of the file and how
         it can be used.
         """
         """
         Internal method to return the header for Contents.gz files
 
         This is boilerplate which explains the contents of the file and how
         it can be used.
         """
-        if not GzippedContentWriter.header:
+        if not ContentFile.header:
             if Config().has_key("Contents::Header"):
                 try:
                     h = open(os.path.join( Config()["Dir::Templates"],
                                            Config()["Contents::Header"] ), "r")
             if Config().has_key("Contents::Header"):
                 try:
                     h = open(os.path.join( Config()["Dir::Templates"],
                                            Config()["Contents::Header"] ), "r")
-                    GzippedContentWriter.header = h.read()
+                    ContentFile.header = h.read()
                     h.close()
                 except:
                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
                                                                       traceback.format_exc() ))
                     h.close()
                 except:
                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
                                                                       traceback.format_exc() ))
-                    GzippedContentWriter.header = None
+                    ContentFile.header = None
             else:
             else:
-                GzippedContentWriter.header = None
-
-        return GzippedContentWriter.header
-
+                ContentFile.header = None
+
+        return ContentFile.header
+
+
+class DebContentFile(ContentFile):
+    def __init__(self,
+                 filename,
+                 suite_str,
+                 suite_id,
+                 arch_str,
+                 arch_id):
+        ContentFile.__init__(self,
+                             filename,
+                             suite_str,
+                             suite_id )
+        self.arch_str = arch_str
+        self.arch_id = arch_id
+
+    def query(self):
+        self.session = DBConn().session();
+
+        self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
+        FROM deb_contents
+        WHERE ( arch=2 or arch = :arch) AND suite = :suite
+        """, { 'arch':self.arch_id, 'suite':self.suite_id } )
+
+class UdebContentFile(ContentFile):
+    def __init__(self,
+                 filename,
+                 suite_str,
+                 suite_id,
+                 section_name,
+                 section_id):
+        ContentFile.__init__(self,
+                             filename,
+                             suite_str,
+                             suite_id )
+
+    def query(self):
+        self.session = DBConn().session();
+
+        self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
+        FROM udeb_contents
+        WHERE suite = :suite
+        group by filename
+        """ , { 'suite': self.suite_id } )
 
 class Contents(object):
     """
     Class capable of generating Contents-$arch.gz files
     """
 
 class Contents(object):
     """
     Class capable of generating Contents-$arch.gz files
     """
-
     def __init__(self):
         self.header = None
 
     def __init__(self):
         self.header = None
 
@@ -231,33 +388,155 @@ class Contents(object):
         s.commit()
 
 
         s.commit()
 
 
-    def bootstrap(self):
+    def bootstrap_bin(self):
         """
         """
-        scan the existing debs in the pool to populate the contents database tables
+        scan the existing debs in the pool to populate the bin_contents table
         """
         pooldir = Config()[ 'Dir::Pool' ]
 
         s = DBConn().session()
 
         """
         pooldir = Config()[ 'Dir::Pool' ]
 
         s = DBConn().session()
 
-        for suite in s.query(Suite).all():
-            for arch in get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=s):
-                q = s.query(BinAssociation).join(Suite)
-                q = q.join(Suite).filter_by(suite_name=suite.suite_name)
-                q = q.join(DBBinary).join(Architecture).filter_by(arch.arch_string)
-                for ba in q:
-                    filename = ba.binary.poolfile.filename
-                    # Check for existing contents
-                    existingq = s.query(ContentAssociations).filter_by(binary_pkg=ba.binary_id).limit(1)
-                    if existingq.count() > 0:
-                        log.debug( "already imported: %s" % (filename))
-                    else:
-                        # We don't have existing contents so import them
-                        log.debug( "scanning: %s" % (filename) )
-                        debfile = os.path.join(pooldir, filename)
-                        if os.path.exists(debfile):
-                            Binary(debfile, self.reject).scan_package(ba.binary_id, True)
-                        else:
-                            log.error("missing .deb: %s" % filename)
+        for binary in s.query(DBBinary).yield_per(100):
+            print( "binary: %s" % binary.package )
+            filename = binary.poolfile.filename
+             # Check for existing contents
+            existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
+            if existingq.fetchone():
+                log.debug( "already imported: %s" % (filename))
+            else:
+                # We don't have existing contents so import them
+                log.debug( "scanning: %s" % (filename) )
+
+                debfile = os.path.join(pooldir, filename)
+                if os.path.exists(debfile):
+                    Binary(debfile, self.reject).scan_package(binary.binary_id, True)
+                else:
+                    log.error("missing .deb: %s" % filename)
+
+
+
+    def bootstrap(self):
+        """
+        scan the existing debs in the pool to populate the contents database tables
+        """
+        s = DBConn().session()
+
+
+        # get a mapping of all the override types we care about (right now .deb an .udeb)
+        override_type_map = {};
+        for override_type in s.query(OverrideType).all():
+            if override_type.overridetype.endswith('deb' ):
+                override_type_map[override_type.overridetype_id] = override_type.overridetype;
+
+        for override in s.query(Override).yield_per(100):
+            if not override_type_map.has_key(override.overridetype_id):
+                #this isn't an override we care about
+                continue
+
+            binaries = s.execute("""SELECT b.id, b.architecture
+                                    FROM binaries b
+                                    JOIN bin_associations ba ON ba.bin=b.id
+                                    WHERE ba.suite=:suite
+                                    AND b.package=:package""", {'suite':override.suite_id, 'package':override.package})
+            while True:
+                binary = binaries.fetchone()
+                if not binary:
+                    break
+
+                exists = s.execute("SELECT 1 FROM %s_contents WHERE binary_id=:id limit 1" % override_type_map[override.overridetype_id], {'id':binary.id})
+
+
+                if exists.fetchone():
+                    print '.',
+                    continue
+                else:
+                    print '+',
+
+                s.execute( """INSERT INTO %s_contents (filename,section,package,binary_id,arch,suite)
+                              SELECT file, :section, :package, :binary_id, :arch, :suite
+                              FROM bin_contents
+                              WHERE binary_id=:binary_id;""" % override_type_map[override.overridetype_id],
+                           { 'section' : override.section_id,
+                             'package' : override.package,
+                             'binary_id' : binary.id,
+                             'arch' : binary.architecture,
+                             'suite' : override.suite_id } )
+                s.commit()
+
+    def generate(self):
+        """
+        Generate contents files for both deb and udeb
+        """
+        self.deb_generate()
+        self.udeb_generate()
+
+    def deb_generate(self):
+        """
+        Generate Contents-$arch.gz files for every available arch in each given suite.
+        """
+        session = DBConn().session()
+        debtype_id = get_override_type("deb", session)
+        suites = self._suites()
+
+        inputtoquery = OneAtATime()
+        querytoingest = OneAtATime()
+        ingesttosort = OneAtATime()
+        sorttooutput = OneAtATime()
+        outputtogzip = OneAtATime()
+
+        qt = QueryThread(inputtoquery,querytoingest)
+        it = IngestThread(querytoingest,ingesttosort)
+        st = SortThread(ingesttosort,sorttooutput)
+        ot = OutputThread(sorttooutput,outputtogzip)
+        gt = GzipThread(outputtogzip, None)
+
+        qt.start()
+        it.start()
+        st.start()
+        ot.start()
+        gt.start()
+
+        # Get our suites, and the architectures
+        for suite in [i.lower() for i in suites]:
+            suite_id = get_suite(suite, session).suite_id
+            print( "got suite_id: %s for suite: %s" % (suite_id, suite ) )
+            arch_list = self._arches(suite_id, session)
+
+            for (arch_id,arch_str) in arch_list:
+                print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
+
+                filename = "dists/%s/Contents-%s" % (suite, arch_str)
+                cf = DebContentFile(filename, suite, suite_id, arch_str, arch_id)
+                inputtoquery.enqueue( cf )
+
+        inputtoquery.enqueue( EndOfContents() )
+        gt.join()
+
+    def udeb_generate(self):
+        """
+        Generate Contents-$arch.gz files for every available arch in each given suite.
+        """
+        session = DBConn().session()
+        udebtype_id=DBConn().get_override_type_id("udeb")
+        suites = self._suites()
+
+        inputtoquery = OneAtATime()
+        querytoingest = OneAtATime()
+        ingesttosort = OneAtATime()
+        sorttooutput = OneAtATime()
+        outputtogzip = OneAtATime()
+
+        qt = QueryThread(inputtoquery,querytoingest)
+        it = IngestThread(querytoingest,ingesttosort)
+        st = SortThread(ingesttosort,sorttooutput)
+        ot = OutputThread(sorttooutput,outputtogzip)
+        gt = GzipThread(outputtogzip, None)
+
+        qt.start()
+        it.start()
+        st.start()
+        ot.start()
+        gt.start()
 
 
     def generate(self):
 
 
     def generate(self):
@@ -282,7 +561,7 @@ class Contents(object):
                 section = get_section(section, session)
 
             # Get our suites
                 section = get_section(section, session)
 
             # Get our suites
-            for suite in which_suites():
+            for suite in which_suites(session):
                 # Which architectures do we need to work on
                 arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
 
                 # Which architectures do we need to work on
                 arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
 
@@ -308,6 +587,39 @@ class Contents(object):
                     # close all the files
                     for writer in file_writers.values():
                         writer.finish()
                     # close all the files
                     for writer in file_writers.values():
                         writer.finish()
+    def _suites(self):
+        """
+        return a list of suites to operate on
+        """
+        if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
+            suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
+        else:
+            suites = Config().SubTree("Suite").List()
+
+        return suites
+
+    def _arches(self, suite, session):
+        """
+        return a list of archs to operate on
+        """
+        arch_list = []
+        arches = session.execute(
+            """SELECT s.architecture, a.arch_string
+            FROM suite_architectures s
+            JOIN architecture a ON (s.architecture=a.id)
+            WHERE suite = :suite_id""",
+            {'suite_id':suite } )
+
+        while True:
+            r = arches.fetchone()
+            if not r:
+                break
+
+            if r[1] != "source" and r[1] != "all":
+                arch_list.append((r[0], r[1]))
+
+        return arch_list
+
 
 ################################################################################
 
 
 ################################################################################
 
@@ -321,6 +633,7 @@ def main():
                 ]
 
     commands = {'generate' : Contents.generate,
                 ]
 
     commands = {'generate' : Contents.generate,
+                'bootstrap_bin' : Contents.bootstrap_bin,
                 'bootstrap' : Contents.bootstrap,
                 'cruft' : Contents.cruft,
                 }
                 'bootstrap' : Contents.bootstrap,
                 'cruft' : Contents.cruft,
                 }