]> git.decadent.org.uk Git - dak.git/blobdiff - daklib/contents.py
Add by-hash support
[dak.git] / daklib / contents.py
old mode 100755 (executable)
new mode 100644 (file)
index 1148758..75fb5e5
@@ -27,24 +27,20 @@ Helper code for contents generation.
 
 from daklib.dbconn import *
 from daklib.config import Config
-from daklib.threadpool import ThreadPool
-from multiprocessing import Pool
+from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
 
-from sqlalchemy import desc, or_
-from subprocess import Popen, PIPE, call
+from multiprocessing import Pool
+from shutil import rmtree
+from tempfile import mkdtemp
 
+import daklib.daksubprocess
 import os.path
 
-class ContentsWriter(object):
+class BinaryContentsWriter(object):
     '''
-    ContentsWriter writes the Contents-$arch.gz files.
+    BinaryContentsWriter writes the Contents-$arch.gz files.
     '''
-    def __init__(self, suite, architecture, overridetype, component = None):
-        '''
-        The constructor clones its arguments into a new session object to make
-        sure that the new ContentsWriter object can be executed in a different
-        thread.
-        '''
+    def __init__(self, suite, architecture, overridetype, component):
         self.suite = suite
         self.architecture = architecture
         self.overridetype = overridetype
@@ -55,17 +51,20 @@ class ContentsWriter(object):
         '''
         Returns a query object that is doing most of the work.
         '''
+        overridesuite = self.suite
+        if self.suite.overridesuite is not None:
+            overridesuite = get_suite(self.suite.overridesuite, self.session)
         params = {
-            'suite':    self.suite.suite_id,
-            'arch_all': get_architecture('all', self.session).arch_id,
-            'arch':     self.architecture.arch_id,
-            'type_id':  self.overridetype.overridetype_id,
-            'type':     self.overridetype.overridetype,
+            'suite':         self.suite.suite_id,
+            'overridesuite': overridesuite.suite_id,
+            'component':     self.component.component_id,
+            'arch_all':      get_architecture('all', self.session).arch_id,
+            'arch':          self.architecture.arch_id,
+            'type_id':       self.overridetype.overridetype_id,
+            'type':          self.overridetype.overridetype,
         }
 
-        if self.component is not None:
-            params['component'] = self.component.component_id
-            sql = '''
+        sql_create_temp = '''
 create temp table newest_binaries (
     id integer primary key,
     package text);
@@ -77,74 +76,38 @@ insert into newest_binaries (id, package)
         where type = :type and
             (architecture = :arch_all or architecture = :arch) and
             id in (select bin from bin_associations where suite = :suite)
-        order by package, version desc;
+        order by package, version desc;'''
+        self.session.execute(sql_create_temp, params=params)
 
+        sql = '''
 with
 
 unique_override as
     (select o.package, s.section
         from override o, section s
-        where o.suite = :suite and o.type = :type_id and o.section = s.id and
+        where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
         o.component = :component)
 
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
+select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
     from newest_binaries b, bin_contents bc, unique_override o
     where b.id = bc.binary_id and o.package = b.package
-    order by bc.file, b.package'''
-
-        else:
-            sql = '''
-create temp table newest_binaries (
-    id integer primary key,
-    package text);
-
-create index newest_binaries_by_package on newest_binaries (package);
-
-insert into newest_binaries (id, package)
-    select distinct on (package) id, package from binaries
-        where type = :type and
-            (architecture = :arch_all or architecture = :arch) and
-            id in (select bin from bin_associations where suite = :suite)
-        order by package, version desc;
-
-with
+    group by bc.file'''
 
-unique_override as
-    (select distinct on (o.package, s.section) o.package, s.section
-        from override o, section s
-        where o.suite = :suite and o.type = :type_id and o.section = s.id
-        order by o.package, s.section, o.modified desc)
-
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
-    from newest_binaries b, bin_contents bc, unique_override o
-    where b.id = bc.binary_id and o.package = b.package
-    order by bc.file, b.package'''
-
-        return self.session.query("file", "package").from_statement(sql). \
+        return self.session.query("file", "pkglist").from_statement(sql). \
             params(params)
 
     def formatline(self, filename, package_list):
         '''
         Returns a formatted string for the filename argument.
         '''
-        package_list = ','.join(package_list)
         return "%-55s %s\n" % (filename, package_list)
 
     def fetch(self):
         '''
         Yields a new line of the Contents-$arch.gz file in filename order.
         '''
-        last_filename = None
-        package_list = []
-        for filename, package in self.query().yield_per(100):
-            if filename != last_filename:
-                if last_filename is not None:
-                    yield self.formatline(last_filename, package_list)
-                last_filename = filename
-                package_list = []
-            package_list.append(package)
-        if last_filename is not None:
-            yield self.formatline(last_filename, package_list)
+        for filename, package_list in self.query().yield_per(100):
+            yield self.formatline(filename, package_list)
         # end transaction to return connection to pool
         self.session.rollback()
 
@@ -154,90 +117,224 @@ select bc.file, substring(o.section from position('/' in o.section) + 1) || '/'
         '''
         return [item for item in self.fetch()]
 
-    def output_filename(self):
+    def writer(self):
         '''
-        Returns the name of the output file.
+        Returns a writer object.
         '''
         values = {
-            'root': Config()['Dir::Root'],
-            'suite': self.suite.suite_name,
-            'architecture': self.architecture.arch_string
+            'archive':      self.suite.archive.path,
+            'suite':        self.suite.suite_name,
+            'component':    self.component.component_name,
+            'debtype':      self.overridetype.overridetype,
+            'architecture': self.architecture.arch_string,
         }
-        if self.component is None:
-            return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
-        values['component'] = self.component.component_name
-        return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
+        return BinaryContentsFileWriter(**values)
 
     def get_header(self):
         '''
         Returns the header for the Contents files as a string.
         '''
-        header_file = None
-        try:
-            filename = os.path.join(Config()['Dir::Templates'], 'contents')
-            header_file = open(filename)
+        filename = os.path.join(Config()['Dir::Templates'], 'contents')
+        with open(filename) as header_file:
             return header_file.read()
-        finally:
-            if header_file:
-                header_file.close()
 
     def write_file(self):
         '''
         Write the output file.
         '''
-        command = ['gzip', '--rsyncable']
-        output_file = open(self.output_filename(), 'w')
-        gzip = Popen(command, stdin = PIPE, stdout = output_file)
-        gzip.stdin.write(self.get_header())
+        writer = self.writer()
+        file = writer.open()
+        file.write(self.get_header())
+        for item in self.fetch():
+            file.write(item)
+        writer.close()
+
+
+class SourceContentsWriter(object):
+    '''
+    SourceContentsWriter writes the Contents-source.gz files.
+    '''
+    def __init__(self, suite, component):
+        self.suite = suite
+        self.component = component
+        self.session = suite.session()
+
+    def query(self):
+        '''
+        Returns a query object that is doing most of the work.
+        '''
+        params = {
+            'suite_id':     self.suite.suite_id,
+            'component_id': self.component.component_id,
+        }
+
+        sql_create_temp = '''
+create temp table newest_sources (
+    id integer primary key,
+    source text);
+
+create index sources_binaries_by_source on newest_sources (source);
+
+insert into newest_sources (id, source)
+    select distinct on (source) s.id, s.source from source s
+        join files_archive_map af on s.file = af.file_id
+        where s.id in (select source from src_associations where suite = :suite_id)
+            and af.component_id = :component_id
+        order by source, version desc;'''
+        self.session.execute(sql_create_temp, params=params)
+
+        sql = '''
+select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
+    from newest_sources s, src_contents sc
+    where s.id = sc.source_id group by sc.file'''
+
+        return self.session.query("file", "pkglist").from_statement(sql). \
+            params(params)
+
+    def formatline(self, filename, package_list):
+        '''
+        Returns a formatted string for the filename argument.
+        '''
+        return "%s\t%s\n" % (filename, package_list)
+
+    def fetch(self):
+        '''
+        Yields a new line of the Contents-source.gz file in filename order.
+        '''
+        for filename, package_list in self.query().yield_per(100):
+            yield self.formatline(filename, package_list)
+        # end transaction to return connection to pool
+        self.session.rollback()
+
+    def get_list(self):
+        '''
+        Returns a list of lines for the Contents-source.gz file.
+        '''
+        return [item for item in self.fetch()]
+
+    def writer(self):
+        '''
+        Returns a writer object.
+        '''
+        values = {
+            'archive':   self.suite.archive.path,
+            'suite':     self.suite.suite_name,
+            'component': self.component.component_name
+        }
+        return SourceContentsFileWriter(**values)
+
+    def write_file(self):
+        '''
+        Write the output file.
+        '''
+        writer = self.writer()
+        file = writer.open()
         for item in self.fetch():
-            gzip.stdin.write(item)
-        gzip.stdin.close()
-        output_file.close()
-        gzip.wait()
+            file.write(item)
+        writer.close()
 
+
+def binary_helper(suite_id, arch_id, overridetype_id, component_id):
+    '''
+    This function is called in a new subprocess and multiprocessing wants a top
+    level function.
+    '''
+    session = DBConn().session(work_mem = 1000)
+    suite = Suite.get(suite_id, session)
+    architecture = Architecture.get(arch_id, session)
+    overridetype = OverrideType.get(overridetype_id, session)
+    component = Component.get(component_id, session)
+    log_message = [suite.suite_name, architecture.arch_string, \
+        overridetype.overridetype, component.component_name]
+    contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
+    contents_writer.write_file()
+    session.close()
+    return log_message
+
+def source_helper(suite_id, component_id):
+    '''
+    This function is called in a new subprocess and multiprocessing wants a top
+    level function.
+    '''
+    session = DBConn().session(work_mem = 1000)
+    suite = Suite.get(suite_id, session)
+    component = Component.get(component_id, session)
+    log_message = [suite.suite_name, 'source', component.component_name]
+    contents_writer = SourceContentsWriter(suite, component)
+    contents_writer.write_file()
+    session.close()
+    return log_message
+
+class ContentsWriter(object):
+    '''
+    Loop over all suites, architectures, overridetypes, and components to write
+    all contents files.
+    '''
     @classmethod
-    def write_all(class_, suite_names = [], force = False):
+    def log_result(class_, result):
+        '''
+        Writes a result message to the logfile.
+        '''
+        class_.logger.log(result)
+
+    @classmethod
+    def write_all(class_, logger, archive_names = [], suite_names = [], component_names = [], force = False):
         '''
         Writes all Contents files for suites in list suite_names which defaults
         to all 'touchable' suites if not specified explicitely. Untouchable
         suites will be included if the force argument is set to True.
         '''
+        class_.logger = logger
         session = DBConn().session()
         suite_query = session.query(Suite)
+        if len(archive_names) > 0:
+            suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
         if len(suite_names) > 0:
             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
+        component_query = session.query(Component)
+        if len(component_names) > 0:
+            component_query = component_query.filter(Component.component_name.in_(component_names))
         if not force:
-            suite_query = suite_query.filter_by(untouchable = False)
+            suite_query = suite_query.filter(Suite.untouchable == False)
+        deb_id = get_override_type('deb', session).overridetype_id
+        udeb_id = get_override_type('udeb', session).overridetype_id
         pool = Pool()
+
+        # Lock tables so that nobody can change things underneath us
+        session.execute("LOCK TABLE bin_contents IN SHARE MODE")
+        session.execute("LOCK TABLE src_contents IN SHARE MODE")
+
         for suite in suite_query:
-            for architecture in suite.get_architectures(skipsrc = True, skipall = True):
-                # handle 'deb' packages
-                command = ['dak', 'contents', '-s', suite.suite_name, \
-                    'generate_helper', architecture.arch_string, 'deb']
-                pool.apply_async(call, (command, ))
-                # handle 'udeb' packages for 'main' and 'non-free'
-                command = ['dak', 'contents', '-s', suite.suite_name, \
-                    'generate_helper', architecture.arch_string, 'udeb', 'main']
-                pool.apply_async(call, (command, ))
-                command = ['dak', 'contents', '-s', suite.suite_name, \
-                    'generate_helper', architecture.arch_string, 'udeb', 'non-free']
-                pool.apply_async(call, (command, ))
+            suite_id = suite.suite_id
+            for component in component_query:
+                component_id = component.component_id
+                # handle source packages
+                pool.apply_async(source_helper, (suite_id, component_id),
+                    callback = class_.log_result)
+                for architecture in suite.get_architectures(skipsrc = True, skipall = True):
+                    arch_id = architecture.arch_id
+                    # handle 'deb' packages
+                    pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
+                        callback = class_.log_result)
+                    # handle 'udeb' packages
+                    pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
+                        callback = class_.log_result)
         pool.close()
         pool.join()
         session.close()
 
 
-class ContentsScanner(object):
+class BinaryContentsScanner(object):
     '''
-    ContentsScanner provides a threadsafe method scan() to scan the contents of
-    a DBBinary object.
+    BinaryContentsScanner provides a threadsafe method scan() to scan the
+    contents of a DBBinary object.
     '''
-    def __init__(self, binary):
+    def __init__(self, binary_id):
         '''
-        The argument binary is the actual DBBinary object that should be
-        scanned.
+        The argument binary_id is the id of the DBBinary object that
+        should be scanned.
         '''
-        self.binary_id = binary.binary_id
+        self.binary_id = binary_id
 
     def scan(self, dummy_arg = None):
         '''
@@ -247,7 +344,10 @@ class ContentsScanner(object):
         '''
         session = DBConn().session()
         binary = session.query(DBBinary).get(self.binary_id)
-        for filename in binary.scan_contents():
+        fileset = set(binary.scan_contents())
+        if len(fileset) == 0:
+            fileset.add('EMPTY_PACKAGE')
+        for filename in fileset:
             binary.contents.append(BinContents(file = filename))
         session.commit()
         session.close()
@@ -266,10 +366,136 @@ class ContentsScanner(object):
         if limit is not None:
             query = query.limit(limit)
         processed = query.count()
-        threadpool = ThreadPool()
+        pool = Pool()
         for binary in query.yield_per(100):
-            threadpool.queueTask(ContentsScanner(binary).scan)
-        threadpool.joinAll()
+            pool.apply_async(binary_scan_helper, (binary.binary_id, ))
+        pool.close()
+        pool.join()
+        remaining = remaining()
+        session.close()
+        return { 'processed': processed, 'remaining': remaining }
+
+def binary_scan_helper(binary_id):
+    '''
+    This function runs in a subprocess.
+    '''
+    scanner = BinaryContentsScanner(binary_id)
+    scanner.scan()
+
+class UnpackedSource(object):
+    '''
+    UnpackedSource extracts a source package into a temporary location and
+    gives you some convinient function for accessing it.
+    '''
+    def __init__(self, dscfilename, tmpbasedir=None):
+        '''
+        The dscfilename is a name of a DSC file that will be extracted.
+        '''
+        basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
+        temp_directory = mkdtemp(dir = basedir)
+        self.root_directory = os.path.join(temp_directory, 'root')
+        command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
+            dscfilename, self.root_directory)
+        daklib.daksubprocess.check_call(command)
+
+    def get_root_directory(self):
+        '''
+        Returns the name of the package's root directory which is the directory
+        where the debian subdirectory is located.
+        '''
+        return self.root_directory
+
+    def get_changelog_file(self):
+        '''
+        Returns a file object for debian/changelog or None if no such file exists.
+        '''
+        changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
+        try:
+            return open(changelog_name)
+        except IOError:
+            return None
+
+    def get_all_filenames(self):
+        '''
+        Returns an iterator over all filenames. The filenames will be relative
+        to the root directory.
+        '''
+        skip = len(self.root_directory) + 1
+        for root, _, files in os.walk(self.root_directory):
+            for name in files:
+                yield os.path.join(root[skip:], name)
+
+    def cleanup(self):
+        '''
+        Removes all temporary files.
+        '''
+        if self.root_directory is None:
+            return
+        parent_directory = os.path.dirname(self.root_directory)
+        rmtree(parent_directory)
+        self.root_directory = None
+
+    def __del__(self):
+        '''
+        Enforce cleanup.
+        '''
+        self.cleanup()
+
+
+class SourceContentsScanner(object):
+    '''
+    SourceContentsScanner provides a method scan() to scan the contents of a
+    DBSource object.
+    '''
+    def __init__(self, source_id):
+        '''
+        The argument source_id is the id of the DBSource object that
+        should be scanned.
+        '''
+        self.source_id = source_id
+
+    def scan(self):
+        '''
+        This method does the actual scan and fills in the associated SrcContents
+        property. It commits any changes to the database.
+        '''
+        session = DBConn().session()
+        source = session.query(DBSource).get(self.source_id)
+        fileset = set(source.scan_contents())
+        for filename in fileset:
+            source.contents.append(SrcContents(file = filename))
+        session.commit()
+        session.close()
+
+    @classmethod
+    def scan_all(class_, limit = None):
+        '''
+        The class method scan_all() scans all source using multiple processes.
+        The number of sources to be scanned can be limited with the limit
+        argument. Returns the number of processed and remaining packages as a
+        dict.
+        '''
+        session = DBConn().session()
+        query = session.query(DBSource).filter(DBSource.contents == None)
+        remaining = query.count
+        if limit is not None:
+            query = query.limit(limit)
+        processed = query.count()
+        pool = Pool()
+        for source in query.yield_per(100):
+            pool.apply_async(source_scan_helper, (source.source_id, ))
+        pool.close()
+        pool.join()
         remaining = remaining()
         session.close()
         return { 'processed': processed, 'remaining': remaining }
+
+def source_scan_helper(source_id):
+    '''
+    This function runs in a subprocess.
+    '''
+    try:
+        scanner = SourceContentsScanner(source_id)
+        scanner.scan()
+    except Exception as e:
+        print e