]> git.decadent.org.uk Git - dak.git/blobdiff - daklib/contents.py
Merge branch 'contents' of ftp-master.debian.org:public_html/dak into contents
[dak.git] / daklib / contents.py
index 99d852b37970112edfae1b20bbceca858f4cdc54..4a0b3ae25237fe8a0b63b25f284d9d0e31903a04 100755 (executable)
@@ -27,11 +27,8 @@ Helper code for contents generation.
 
 from daklib.dbconn import *
 from daklib.config import Config
-from daklib.threadpool import ThreadPool
-from multiprocessing import Pool
 
-from sqlalchemy import desc, or_
-from sqlalchemy.exc import IntegrityError
+from multiprocessing import Pool
 from subprocess import Popen, PIPE
 
 import os.path
@@ -191,17 +188,28 @@ select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package
         gzip.stdin.close()
         output_file.close()
         gzip.wait()
-        os.remove(final_filename)
+        try:
+            os.remove(final_filename)
+        except:
+            pass
         os.rename(temp_filename, final_filename)
         os.chmod(final_filename, 0664)
 
     @classmethod
-    def write_all(class_, suite_names = [], force = False):
+    def log_result(class_, result):
+        '''
+        Writes a result message to the logfile.
+        '''
+        class_.logger.log(result)
+
+    @classmethod
+    def write_all(class_, logger, suite_names = [], force = False):
         '''
         Writes all Contents files for suites in list suite_names which defaults
         to all 'touchable' suites if not specified explicitely. Untouchable
         suites will be included if the force argument is set to True.
         '''
+        class_.logger = logger
         session = DBConn().session()
         suite_query = session.query(Suite)
         if len(suite_names) > 0:
@@ -218,10 +226,13 @@ select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package
             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
                 arch_id = architecture.arch_id
                 # handle 'deb' packages
-                pool.apply_async(generate_helper, (suite_id, arch_id, deb_id))
+                pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
+                    callback = class_.log_result)
                 # handle 'udeb' packages for 'main' and 'non-free'
-                pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id))
-                pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id))
+                pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
+                    callback = class_.log_result)
+                pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
+                    callback = class_.log_result)
         pool.close()
         pool.join()
         session.close()
@@ -230,17 +241,19 @@ def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
     '''
     This function is called in a new subprocess.
     '''
-    DBConn().reset()
     session = DBConn().session()
     suite = Suite.get(suite_id, session)
     architecture = Architecture.get(arch_id, session)
     overridetype = OverrideType.get(overridetype_id, session)
+    log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
     if component_id is None:
         component = None
     else:
         component = Component.get(component_id, session)
+        log_message.append(component.component_name)
     contents_writer = ContentsWriter(suite, architecture, overridetype, component)
     contents_writer.write_file()
+    return log_message
 
 
 class ContentsScanner(object):
@@ -248,12 +261,12 @@ class ContentsScanner(object):
     ContentsScanner provides a threadsafe method scan() to scan the contents of
     a DBBinary object.
     '''
-    def __init__(self, binary):
+    def __init__(self, binary_id):
         '''
-        The argument binary is the actual DBBinary object that should be
-        scanned.
+        The argument binary_id is the id of the DBBinary object that
+        should be scanned.
         '''
-        self.binary_id = binary.binary_id
+        self.binary_id = binary_id
 
     def scan(self, dummy_arg = None):
         '''
@@ -285,10 +298,18 @@ class ContentsScanner(object):
         if limit is not None:
             query = query.limit(limit)
         processed = query.count()
-        threadpool = ThreadPool()
+        pool = Pool()
         for binary in query.yield_per(100):
-            threadpool.queueTask(ContentsScanner(binary).scan)
-        threadpool.joinAll()
+            pool.apply_async(scan_helper, (binary.binary_id, ))
+        pool.close()
+        pool.join()
         remaining = remaining()
         session.close()
         return { 'processed': processed, 'remaining': remaining }
+
+def scan_helper(binary_id):
+    '''
+    This function runs in a subprocess.
+    '''
+    scanner = ContentsScanner(binary_id)
+    scanner.scan()