]> git.decadent.org.uk Git - dak.git/blobdiff - dak/contents.py
before I rip out pending_*
[dak.git] / dak / contents.py
index 834cbccf0cc5027f99ec98433403fab8895392ee..c0d00c85abec80249c6b4f6a8726a4b87503b8d1 100755 (executable)
@@ -93,101 +93,240 @@ class EndOfContents(object):
     """
     pass
 
     """
     pass
 
-class GzippedContentWriter(object):
+class OneAtATime(object):
     """
     """
-    An object which will write contents out to a Contents-$arch.gz
-    file on a separate thread
     """
     """
+    def __init__(self):
+        self.next_in_line = None
+        self.next_lock = threading.Condition()
+
+    def enqueue(self, next):
+        self.next_lock.acquire()
+        while self.next_in_line:
+            self.next_lock.wait()
+            
+        assert( not self.next_in_line )
+        self.next_in_line = next
+        self.next_lock.notify()
+        self.next_lock.release()
+
+    def dequeue(self):
+        self.next_lock.acquire()
+        while not self.next_in_line:
+            self.next_lock.wait()
+        result = self.next_in_line
+        self.next_in_line = None
+        self.next_lock.notify()
+        self.next_lock.release()
+        return result
+
+class ContentsWorkThread(threading.Thread):
+    """
+    """
+    def __init__(self, upstream, downstream):
+        threading.Thread.__init__(self)
+        self.upstream = upstream
+        self.downstream = downstream
 
 
-    header = None # a class object holding the header section of contents file
+    def run(self):
+        while True:
+            try:
+                contents_file = self.upstream.dequeue()
+                if isinstance(contents_file,EndOfContents):
+                    if self.downstream:
+                        self.downstream.enqueue(contents_file)
+                    break
+
+                s = datetime.datetime.now()
+                print("%s start: %s" % (self,contents_file) )
+                self._run(contents_file)
+                print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
+                if self.downstream:
+                    self.downstream.enqueue(contents_file)
+            except:
+                traceback.print_exc()
+
+class QueryThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "QueryThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.query()
+
+class IngestThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "IngestThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.ingest()
+
+class SortThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "SortThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.sorted_keys = sorted(contents_file.filenames.keys())
+
+class OutputThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "OutputThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        contents_file.open_file()
+        for fname in contents_file.sorted_keys:
+            contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
+        contents_file.sorted_keys = None
+        contents_file.filenames.clear()
+    
+class GzipThread(ContentsWorkThread):
+    def __init__(self, upstream, downstream):
+        ContentsWorkThread.__init__(self, upstream, downstream)
+
+    def __str__(self):
+        return "GzipThread"
+    __repr__ = __str__
+
+    def _run(self, contents_file):
+        os.system("gzip -f %s" % contents_file.filename)
+
+class ContentFile(object):
+    def __init__(self,
+                 filename,
+                 suite_str,
+                 suite_id)
+
+        self.filename = filename
+        self.filenames = {}
+        self.sorted_keys = None
+        self.suite_str = suite_str
+        self.suite_id = suite_id
+        self.cursor = None
+        self.filehandle = None
+
+    def __str__(self):
+        return self.filename
+    __repr__ = __str__
+
+
+    def cleanup(self):
+        self.filenames = None
+        self.sortedkeys = None
+        self.filehandle.close()
+        self.cursor.close()
+
+    def ingest(self):
+        while True:
+            r = self.cursor.fetchone()
+            if not r:
+                break
+            filename, package = r
+            if self.filenames.has_key(filename):
+                self.filenames[filename] += ",%s" % (package)
+            else:
+                self.filenames[filename] = "%s" % (package)
+        self.cursor.close()
 
 
-    def __init__(self, filename):
-        """
-        @type filename: string
-        @param filename: the name of the file to write to
-        """
-        self.queue = Queue.Queue()
-        self.current_file = None
-        self.first_package = True
-        self.output = self.open_file(filename)
-        self.thread = threading.Thread(target=self.write_thread,
-                                       name='Contents writer')
-        self.thread.start()
-
-    def open_file(self, filename):
+    def open_file(self):
         """
         opens a gzip stream to the contents file
         """
         """
         opens a gzip stream to the contents file
         """
-        filepath = Config()["Contents::Root"] + filename
-        filedir = os.path.dirname(filepath)
+#        filepath = Config()["Contents::Root"] + self.filename
+        self.filename = "/home/stew/contents/" + self.filename
+        filedir = os.path.dirname(self.filename)
         if not os.path.isdir(filedir):
             os.makedirs(filedir)
         if not os.path.isdir(filedir):
             os.makedirs(filedir)
-        return gzip.open(filepath, "w")
-
-    def write(self, filename, section, package):
-        """
-        enqueue content to be written to the file on a separate thread
-        """
-        self.queue.put((filename,section,package))
-
-    def write_thread(self):
-        """
-        the target of a Thread which will do the actual writing
-        """
-        while True:
-            next = self.queue.get()
-            if isinstance(next, EndOfContents):
-                self.output.write('\n')
-                self.output.close()
-                break
+#        self.filehandle = gzip.open(self.filename, "w")
+        self.filehandle = open(self.filename, "w")
+        self._write_header()
 
 
-            (filename,section,package)=next
-            if next != self.current_file:
-                # this is the first file, so write the header first
-                if not self.current_file:
-                    self.output.write(self._getHeader())
+    def _write_header(self):
+        self._get_header();
+        self.filehandle.write(ContentFile.header)
 
 
-                self.output.write('\n%s\t' % filename)
-                self.first_package = True
-
-            self.current_file=filename
-
-            if not self.first_package:
-                self.output.write(',')
-            else:
-                self.first_package=False
-            self.output.write('%s/%s' % (section,package))
-
-    def finish(self):
-        """
-        enqueue the sentry object so that writers will know to terminate
-        """
-        self.queue.put(EndOfContents())
+    header=None
 
     @classmethod
 
     @classmethod
-    def _getHeader(self):
+    def _get_header(self):
         """
         Internal method to return the header for Contents.gz files
 
         This is boilerplate which explains the contents of the file and how
         it can be used.
         """
         """
         Internal method to return the header for Contents.gz files
 
         This is boilerplate which explains the contents of the file and how
         it can be used.
         """
-        if not GzippedContentWriter.header:
+        if not ContentFile.header:
             if Config().has_key("Contents::Header"):
                 try:
                     h = open(os.path.join( Config()["Dir::Templates"],
                                            Config()["Contents::Header"] ), "r")
             if Config().has_key("Contents::Header"):
                 try:
                     h = open(os.path.join( Config()["Dir::Templates"],
                                            Config()["Contents::Header"] ), "r")
-                    GzippedContentWriter.header = h.read()
+                    ContentFile.header = h.read()
                     h.close()
                 except:
                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
                                                                       traceback.format_exc() ))
                     h.close()
                 except:
                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
                                                                       traceback.format_exc() ))
-                    GzippedContentWriter.header = None
+                    ContentFile.header = None
             else:
             else:
-                GzippedContentWriter.header = None
-
-        return GzippedContentWriter.header
-
+                ContentFile.header = None
+
+        return ContentFile.header
+
+
+class DebContentFile(ContentFile):
+    def __init__(self,
+                 filename,
+                 suite_str,
+                 suite_id,
+                 arch_str,
+                 arch_id):
+        ContentFile.__init__(self,
+                             filename,
+                             suite_str,
+                             suite_id )
+        self.arch_str = arch_str
+        self.arch_id = arch_id
+
+    def query(self):
+        self.cursor = DBConn().session();
+
+        self.cursor.execute("""SELECT file, component || section || '/' || package
+        FROM deb_contents
+        WHERE ( arch=2 or arch = :arch) AND suite = :suite
+        """, { 'arch':self.arch_id, 'suite':self.suite_id }
+
+class UdebContentFile(ContentFile):
+    def __init__(self,
+                 filename,
+                 suite_str,
+                 suite_id,
+                 section_name,
+                 section_id)
+        ContentFile.__init__(self,
+                             filename,
+                             suite_str,
+                             suite_id )
+
+    def query(self):
+        self.cursor = DBConn().session();
+
+        self.cursor.execute("""SELECT file, component || section || '/' || package
+        FROM udeb_contents
+        WHERE suite = :suite
+        """ , { 'suite': self.suite_id } )
 
 class Contents(object):
     """
 
 class Contents(object):
     """
@@ -238,9 +377,9 @@ class Contents(object):
 
         s = DBConn().session()
 
 
         s = DBConn().session()
 
-        #        for binary in s.query(DBBinary).all() ):
-        binary = s.query(DBBinary).first()
-        if binary:
+        print( "bootstrap_bin" )
+        for binary in s.query(DBBinary).yield_per(1000):
+            print( "binary: %s" % binary.package )
             filename = binary.poolfile.filename
              # Check for existing contents
             existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
             filename = binary.poolfile.filename
              # Check for existing contents
             existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
@@ -262,29 +401,198 @@ class Contents(object):
         """
         scan the existing debs in the pool to populate the contents database tables
         """
         """
         scan the existing debs in the pool to populate the contents database tables
         """
-        pooldir = Config()[ 'Dir::Pool' ]
-
         s = DBConn().session()
 
         s = DBConn().session()
 
-        for suite in s.query(Suite).all():
-            for arch in get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=s):
-                q = s.query(BinAssociation).join(Suite)
-                q = q.join(Suite).filter_by(suite_name=suite.suite_name)
-                q = q.join(DBBinary).join(Architecture).filter_by(arch.arch_string)
-                for ba in q:
-                    filename = ba.binary.poolfile.filename
-                    # Check for existing contents
-                    existingq = s.query(ContentAssociations).filter_by(binary_pkg=ba.binary_id).limit(1)
-                    if existingq.count() > 0:
-                        log.debug( "already imported: %s" % (filename))
-                    else:
-                        # We don't have existing contents so import them
-                        log.debug( "scanning: %s" % (filename) )
-                        debfile = os.path.join(pooldir, filename)
-                        if os.path.exists(debfile):
-                            Binary(debfile, self.reject).scan_package(ba.binary_id, True)
-                        else:
-                            log.error("missing .deb: %s" % filename)
+        for override in s.query(Override).all():
+            binaries = s.execute("""SELECT b.binary_id, ba.arch
+                                    FROM binaries b
+                                    JOIN bin_associations ba ON ba.binary_id=b.binary_id
+                                    WHERE ba.suite=:suite
+                                    AND b.package=override.package""", {'suite':override.suite})
+            while True:
+                binary = binaries.fetchone()
+                if not binary:
+                    break
+
+                filenames = s.execute( """SELECT file from bin_contents where binary_id=:id""", { 'id': binary.binary_id } )
+                while True:
+                    filename = filenames.fetchone()
+                    if not binary:
+                        break
+
+                
+
+                    if override.type == 7:
+                        s.execute( """INSERT INTO deb_contents (file,section,package,binary_id,arch,suite,component)
+                                      VALUES (:filename, :section, :package, :binary_id, :arch, :suite, :component);""",
+                                   { 'filename' : filename,
+                                     'section' : override.section,
+                                     'package' : override.package,
+                                     'binary_id' : binary.binary_id,
+                                     'arch' : binary.arch,
+                                     'suite' : override.suite,
+                                     'component' : override.component } )
+
+                    
+                    elif override.type == 9:
+                        s.execute( """INSERT INTO deb_contents (file,section,package,binary_id,arch,suite,component)
+                                      VALUES (:filename, :section, :package, :binary_id, :arch, :suite, :component);""",
+                                   { 'filename' : filename,
+                                     'section' : override.section,
+                                     'package' : override.package,
+                                     'binary_id' : binary.binary_id,
+                                     'arch' : binary.arch,
+                                     'suite' : override.suite,
+                                     'component' : override.component } )
+
+#     def bootstrap(self):
+#         """
+#         scan the existing debs in the pool to populate the contents database tables
+#         """
+#         pooldir = Config()[ 'Dir::Pool' ]
+
+#         s = DBConn().session()
+
+#         for suite in s.query(Suite).all():
+#             for arch in get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=s):
+#                 q = s.query(BinAssociation).join(Suite)
+#                 q = q.join(Suite).filter_by(suite_name=suite.suite_name)
+#                 q = q.join(DBBinary).join(Architecture).filter_by(arch.arch_string)
+#                 for ba in q:
+#                     filename = ba.binary.poolfile.filename
+#                     # Check for existing contents
+#                     existingq = s.query(ContentAssociations).filter_by(binary_pkg=ba.binary_id).limit(1)
+#                     if existingq.count() > 0:
+#                         log.debug( "already imported: %s" % (filename))
+#                     else:
+#                         # We don't have existing contents so import them
+#                         log.debug( "scanning: %s" % (filename) )
+#                         debfile = os.path.join(pooldir, filename)
+#                         if os.path.exists(debfile):
+#                             Binary(debfile, self.reject).scan_package(ba.binary_id, True)
+#                         else:
+#                             log.error("missing .deb: %s" % filename)
+    def generate(self):
+        """
+        Generate contents files for both deb and udeb
+        """
+        DBConn().prepare("arches_q", arches_q)
+        self.deb_generate()
+#        self.udeb_generate()
+
+    def deb_generate(self):
+        """
+        Generate Contents-$arch.gz files for every available arch in each given suite.
+        """
+        cursor = DBConn().session()
+        debtype_id = DBConn().get_override_type_id("deb")
+        suites = self._suites()
+
+        inputtoquery = OneAtATime()
+        querytoingest = OneAtATime()
+        ingesttosort = OneAtATime()
+        sorttooutput = OneAtATime()
+        outputtogzip = OneAtATime()
+
+        qt = QueryThread(inputtoquery,querytoingest)
+        it = IngestThread(querytoingest,ingesttosort)
+# these actually make things worse
+#        it2 = IngestThread(querytoingest,ingesttosort)
+#        it3 = IngestThread(querytoingest,ingesttosort)
+#        it4 = IngestThread(querytoingest,ingesttosort)
+        st = SortThread(ingesttosort,sorttooutput)
+        ot = OutputThread(sorttooutput,outputtogzip)
+        gt = GzipThread(outputtogzip, None)
+
+        qt.start()
+        it.start()
+#        it2.start()
+#        it3.start()
+#        it2.start()
+        st.start()
+        ot.start()
+        gt.start()
+        
+        # Get our suites, and the architectures
+        for suite in [i.lower() for i in suites]:
+            suite_id = DBConn().get_suite_id(suite)
+            arch_list = self._arches(cursor, suite_id)
+
+            for (arch_id,arch_str) in arch_list:
+                print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
+
+#                filename = "dists/%s/Contents-%s.gz" % (suite, arch_str)
+                filename = "dists/%s/Contents-%s" % (suite, arch_str)
+                cf = ContentFile(filename, suite, suite_id, arch_str, arch_id)
+                inputtoquery.enqueue( cf )
+
+        inputtoquery.enqueue( EndOfContents() )
+        gt.join()
+
+    def udeb_generate(self):
+        """
+        Generate Contents-$arch.gz files for every available arch in each given suite.
+        """
+        cursor = DBConn().session()
+        udebtype_id=DBConn().get_override_type_id("udeb")
+        suites = self._suites()
+
+        inputtoquery = OneAtATime()
+        querytoingest = OneAtATime()
+        ingesttosort = OneAtATime()
+        sorttooutput = OneAtATime()
+        outputtogzip = OneAtATime()
+
+        qt = QueryThread(inputtoquery,querytoingest)
+        it = IngestThread(querytoingest,ingesttosort)
+# these actually make things worse
+#        it2 = IngestThread(querytoingest,ingesttosort)
+#        it3 = IngestThread(querytoingest,ingesttosort)
+#        it4 = IngestThread(querytoingest,ingesttosort)
+        st = SortThread(ingesttosort,sorttooutput)
+        ot = OutputThread(sorttooutput,outputtogzip)
+        gt = GzipThread(outputtogzip, None)
+
+        qt.start()
+        it.start()
+#        it2.start()
+#        it3.start()
+#        it2.start()
+        st.start()
+        ot.start()
+        gt.start()
+        
+        for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"),
+                                    ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]:
+
+            section_id = DBConn().get_section_id(section) # all udebs should be here)
+            if section_id != -1:
+
+                
+
+                # Get our suites, and the architectures
+                for suite in [i.lower() for i in suites]:
+                    suite_id = DBConn().get_suite_id(suite)
+                    arch_list = self._arches(cursor, suite_id)
+
+                    for arch_id in arch_list:
+
+                        writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
+                        try:
+
+                            cursor.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id))
+
+                            while True:
+                                r = cursor.fetchone()
+                                if not r:
+                                    break
+
+                                filename, section, package, arch = r
+                                writer.write(filename, section, package)
+                        finally:
+                            writer.close()
+
+
 
 
     def generate(self):
 
 
     def generate(self):
@@ -335,6 +643,34 @@ class Contents(object):
                     # close all the files
                     for writer in file_writers.values():
                         writer.finish()
                     # close all the files
                     for writer in file_writers.values():
                         writer.finish()
+    def _suites(self):
+        """
+        return a list of suites to operate on
+        """
+        if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
+            suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
+        else:
+            suites = [ 'unstable', 'testing' ]
+#            suites = Config().SubTree("Suite").List()
+
+        return suites
+
+    def _arches(self, cursor, suite):
+        """
+        return a list of archs to operate on
+        """
+        arch_list = []
+        cursor.execute("EXECUTE arches_q(%d)" % (suite))
+        while True:
+            r = cursor.fetchone()
+            if not r:
+                break
+
+            if r[1] != "source" and r[1] != "all":
+                arch_list.append((r[0], r[1]))
+
+        return arch_list
+
 
 ################################################################################
 
 
 ################################################################################