]> git.decadent.org.uk Git - dak.git/blobdiff - daklib/contents.py
Merge branch 'contents' of ftp-master.debian.org:public_html/dak into contents
[dak.git] / daklib / contents.py
index 730d21488b27916a08f0e0771c7533904fe635b5..4a0b3ae25237fe8a0b63b25f284d9d0e31903a04 100755 (executable)
@@ -27,10 +27,8 @@ Helper code for contents generation.
 
 from daklib.dbconn import *
 from daklib.config import Config
 
 from daklib.dbconn import *
 from daklib.config import Config
-from daklib.threadpool import ThreadPool
-from multiprocessing import Pool
 
 
-from sqlalchemy import desc, or_
+from multiprocessing import Pool
 from subprocess import Popen, PIPE
 
 import os.path
 from subprocess import Popen, PIPE
 
 import os.path
@@ -45,25 +43,26 @@ class ContentsWriter(object):
         sure that the new ContentsWriter object can be executed in a different
         thread.
         '''
         sure that the new ContentsWriter object can be executed in a different
         thread.
         '''
-        self.suite = suite.clone()
-        self.session = self.suite.session()
-        self.architecture = architecture.clone(self.session)
-        self.overridetype = overridetype.clone(self.session)
-        if component is not None:
-            self.component = component.clone(self.session)
-        else:
-            self.component = None
+        self.suite = suite
+        self.architecture = architecture
+        self.overridetype = overridetype
+        self.component = component
+        self.session = suite.session()
 
     def query(self):
         '''
         Returns a query object that is doing most of the work.
         '''
 
     def query(self):
         '''
         Returns a query object that is doing most of the work.
         '''
+        overridesuite = self.suite
+        if self.suite.overridesuite is not None:
+            overridesuite = get_suite(self.suite.overridesuite, self.session)
         params = {
         params = {
-            'suite':    self.suite.suite_id,
-            'arch_all': get_architecture('all', self.session).arch_id,
-            'arch':     self.architecture.arch_id,
-            'type_id':  self.overridetype.overridetype_id,
-            'type':     self.overridetype.overridetype,
+            'suite':         self.suite.suite_id,
+            'overridesuite': overridesuite.suite_id,
+            'arch_all':      get_architecture('all', self.session).arch_id,
+            'arch':          self.architecture.arch_id,
+            'type_id':       self.overridetype.overridetype_id,
+            'type':          self.overridetype.overridetype,
         }
 
         if self.component is not None:
         }
 
         if self.component is not None:
@@ -87,13 +86,13 @@ with
 unique_override as
     (select o.package, s.section
         from override o, section s
 unique_override as
     (select o.package, s.section
         from override o, section s
-        where o.suite = :suite and o.type = :type_id and o.section = s.id and
+        where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
         o.component = :component)
 
         o.component = :component)
 
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
+select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
     from newest_binaries b, bin_contents bc, unique_override o
     where b.id = bc.binary_id and o.package = b.package
     from newest_binaries b, bin_contents bc, unique_override o
     where b.id = bc.binary_id and o.package = b.package
-    order by bc.file, b.package'''
+    group by bc.file'''
 
         else:
             sql = '''
 
         else:
             sql = '''
@@ -115,39 +114,29 @@ with
 unique_override as
     (select distinct on (o.package, s.section) o.package, s.section
         from override o, section s
 unique_override as
     (select distinct on (o.package, s.section) o.package, s.section
         from override o, section s
-        where o.suite = :suite and o.type = :type_id and o.section = s.id
+        where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
         order by o.package, s.section, o.modified desc)
 
         order by o.package, s.section, o.modified desc)
 
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
+select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
     from newest_binaries b, bin_contents bc, unique_override o
     where b.id = bc.binary_id and o.package = b.package
     from newest_binaries b, bin_contents bc, unique_override o
     where b.id = bc.binary_id and o.package = b.package
-    order by bc.file, b.package'''
+    group by bc.file'''
 
 
-        return self.session.query("file", "package").from_statement(sql). \
+        return self.session.query("file", "pkglist").from_statement(sql). \
             params(params)
 
     def formatline(self, filename, package_list):
         '''
         Returns a formatted string for the filename argument.
         '''
             params(params)
 
     def formatline(self, filename, package_list):
         '''
         Returns a formatted string for the filename argument.
         '''
-        package_list = ','.join(package_list)
         return "%-55s %s\n" % (filename, package_list)
 
     def fetch(self):
         '''
         Yields a new line of the Contents-$arch.gz file in filename order.
         '''
         return "%-55s %s\n" % (filename, package_list)
 
     def fetch(self):
         '''
         Yields a new line of the Contents-$arch.gz file in filename order.
         '''
-        last_filename = None
-        package_list = []
-        for filename, package in self.query().yield_per(100):
-            if filename != last_filename:
-                if last_filename is not None:
-                    yield self.formatline(last_filename, package_list)
-                last_filename = filename
-                package_list = []
-            package_list.append(package)
-        if last_filename is not None:
-            yield self.formatline(last_filename, package_list)
+        for filename, package_list in self.query().yield_per(100):
+            yield self.formatline(filename, package_list)
         # end transaction to return connection to pool
         self.session.rollback()
 
         # end transaction to return connection to pool
         self.session.rollback()
 
@@ -189,7 +178,9 @@ select bc.file, substring(o.section from position('/' in o.section) + 1) || '/'
         Write the output file.
         '''
         command = ['gzip', '--rsyncable']
         Write the output file.
         '''
         command = ['gzip', '--rsyncable']
-        output_file = open(self.output_filename(), 'w')
+        final_filename = self.output_filename()
+        temp_filename = final_filename + '.new'
+        output_file = open(temp_filename, 'w')
         gzip = Popen(command, stdin = PIPE, stdout = output_file)
         gzip.stdin.write(self.get_header())
         for item in self.fetch():
         gzip = Popen(command, stdin = PIPE, stdout = output_file)
         gzip.stdin.write(self.get_header())
         for item in self.fetch():
@@ -197,51 +188,85 @@ select bc.file, substring(o.section from position('/' in o.section) + 1) || '/'
         gzip.stdin.close()
         output_file.close()
         gzip.wait()
         gzip.stdin.close()
         output_file.close()
         gzip.wait()
+        try:
+            os.remove(final_filename)
+        except:
+            pass
+        os.rename(temp_filename, final_filename)
+        os.chmod(final_filename, 0664)
 
     @classmethod
 
     @classmethod
-    def write_all(class_, suite_names = [], force = False):
+    def log_result(class_, result):
+        '''
+        Writes a result message to the logfile.
+        '''
+        class_.logger.log(result)
+
+    @classmethod
+    def write_all(class_, logger, suite_names = [], force = False):
         '''
         Writes all Contents files for suites in list suite_names which defaults
         to all 'touchable' suites if not specified explicitely. Untouchable
         suites will be included if the force argument is set to True.
         '''
         '''
         Writes all Contents files for suites in list suite_names which defaults
         to all 'touchable' suites if not specified explicitely. Untouchable
         suites will be included if the force argument is set to True.
         '''
+        class_.logger = logger
         session = DBConn().session()
         suite_query = session.query(Suite)
         if len(suite_names) > 0:
             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
         if not force:
             suite_query = suite_query.filter_by(untouchable = False)
         session = DBConn().session()
         suite_query = session.query(Suite)
         if len(suite_names) > 0:
             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
         if not force:
             suite_query = suite_query.filter_by(untouchable = False)
-        main = get_component('main', session)
-        non_free = get_component('non-free', session)
-        deb = get_override_type('deb', session)
-        udeb = get_override_type('udeb', session)
+        deb_id = get_override_type('deb', session).overridetype_id
+        udeb_id = get_override_type('udeb', session).overridetype_id
+        main_id = get_component('main', session).component_id
+        non_free_id = get_component('non-free', session).component_id
         pool = Pool()
         for suite in suite_query:
         pool = Pool()
         for suite in suite_query:
+            suite_id = suite.suite_id
             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
+                arch_id = architecture.arch_id
                 # handle 'deb' packages
                 # handle 'deb' packages
-                writer = ContentsWriter(suite, architecture, deb)
-                pool.apply(writer.write_file)
+                pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
+                    callback = class_.log_result)
                 # handle 'udeb' packages for 'main' and 'non-free'
                 # handle 'udeb' packages for 'main' and 'non-free'
-                writer = ContentsWriter(suite, architecture, udeb, component = main)
-                pool.apply(writer.write_file)
-                writer = ContentsWriter(suite, architecture, udeb, component = non_free)
-                pool.apply(writer.write_file)
+                pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
+                    callback = class_.log_result)
+                pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
+                    callback = class_.log_result)
         pool.close()
         pool.join()
         session.close()
 
         pool.close()
         pool.join()
         session.close()
 
+def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
+    '''
+    This function is called in a new subprocess.
+    '''
+    session = DBConn().session()
+    suite = Suite.get(suite_id, session)
+    architecture = Architecture.get(arch_id, session)
+    overridetype = OverrideType.get(overridetype_id, session)
+    log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
+    if component_id is None:
+        component = None
+    else:
+        component = Component.get(component_id, session)
+        log_message.append(component.component_name)
+    contents_writer = ContentsWriter(suite, architecture, overridetype, component)
+    contents_writer.write_file()
+    return log_message
+
 
 class ContentsScanner(object):
     '''
     ContentsScanner provides a threadsafe method scan() to scan the contents of
     a DBBinary object.
     '''
 
 class ContentsScanner(object):
     '''
     ContentsScanner provides a threadsafe method scan() to scan the contents of
     a DBBinary object.
     '''
-    def __init__(self, binary):
+    def __init__(self, binary_id):
         '''
         '''
-        The argument binary is the actual DBBinary object that should be
-        scanned.
+        The argument binary_id is the id of the DBBinary object that
+        should be scanned.
         '''
         '''
-        self.binary_id = binary.binary_id
+        self.binary_id = binary_id
 
     def scan(self, dummy_arg = None):
         '''
 
     def scan(self, dummy_arg = None):
         '''
@@ -251,7 +276,10 @@ class ContentsScanner(object):
         '''
         session = DBConn().session()
         binary = session.query(DBBinary).get(self.binary_id)
         '''
         session = DBConn().session()
         binary = session.query(DBBinary).get(self.binary_id)
-        for filename in binary.scan_contents():
+        fileset = set(binary.scan_contents())
+        if len(fileset) == 0:
+            fileset.add('EMPTY_PACKAGE')
+        for filename in fileset:
             binary.contents.append(BinContents(file = filename))
         session.commit()
         session.close()
             binary.contents.append(BinContents(file = filename))
         session.commit()
         session.close()
@@ -270,10 +298,18 @@ class ContentsScanner(object):
         if limit is not None:
             query = query.limit(limit)
         processed = query.count()
         if limit is not None:
             query = query.limit(limit)
         processed = query.count()
-        threadpool = ThreadPool()
+        pool = Pool()
         for binary in query.yield_per(100):
         for binary in query.yield_per(100):
-            threadpool.queueTask(ContentsScanner(binary).scan)
-        threadpool.joinAll()
+            pool.apply_async(scan_helper, (binary.binary_id, ))
+        pool.close()
+        pool.join()
         remaining = remaining()
         session.close()
         return { 'processed': processed, 'remaining': remaining }
         remaining = remaining()
         session.close()
         return { 'processed': processed, 'remaining': remaining }
+
+def scan_helper(binary_id):
+    '''
+    This function runs in a subprocess.
+    '''
+    scanner = ContentsScanner(binary_id)
+    scanner.scan()