]> git.decadent.org.uk Git - dak.git/blobdiff - daklib/contents.py
Add by-hash support
[dak.git] / daklib / contents.py
old mode 100755 (executable)
new mode 100644 (file)
index 1148758..75fb5e5
@@ -27,24 +27,20 @@ Helper code for contents generation.
 
 from daklib.dbconn import *
 from daklib.config import Config
 
 from daklib.dbconn import *
 from daklib.config import Config
-from daklib.threadpool import ThreadPool
-from multiprocessing import Pool
+from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
 
 
-from sqlalchemy import desc, or_
-from subprocess import Popen, PIPE, call
+from multiprocessing import Pool
+from shutil import rmtree
+from tempfile import mkdtemp
 
 
+import daklib.daksubprocess
 import os.path
 
 import os.path
 
-class ContentsWriter(object):
+class BinaryContentsWriter(object):
     '''
     '''
-    ContentsWriter writes the Contents-$arch.gz files.
+    BinaryContentsWriter writes the Contents-$arch.gz files.
     '''
     '''
-    def __init__(self, suite, architecture, overridetype, component = None):
-        '''
-        The constructor clones its arguments into a new session object to make
-        sure that the new ContentsWriter object can be executed in a different
-        thread.
-        '''
+    def __init__(self, suite, architecture, overridetype, component):
         self.suite = suite
         self.architecture = architecture
         self.overridetype = overridetype
         self.suite = suite
         self.architecture = architecture
         self.overridetype = overridetype
@@ -55,17 +51,20 @@ class ContentsWriter(object):
         '''
         Returns a query object that is doing most of the work.
         '''
         '''
         Returns a query object that is doing most of the work.
         '''
+        overridesuite = self.suite
+        if self.suite.overridesuite is not None:
+            overridesuite = get_suite(self.suite.overridesuite, self.session)
         params = {
         params = {
-            'suite':    self.suite.suite_id,
-            'arch_all': get_architecture('all', self.session).arch_id,
-            'arch':     self.architecture.arch_id,
-            'type_id':  self.overridetype.overridetype_id,
-            'type':     self.overridetype.overridetype,
+            'suite':         self.suite.suite_id,
+            'overridesuite': overridesuite.suite_id,
+            'component':     self.component.component_id,
+            'arch_all':      get_architecture('all', self.session).arch_id,
+            'arch':          self.architecture.arch_id,
+            'type_id':       self.overridetype.overridetype_id,
+            'type':          self.overridetype.overridetype,
         }
 
         }
 
-        if self.component is not None:
-            params['component'] = self.component.component_id
-            sql = '''
+        sql_create_temp = '''
 create temp table newest_binaries (
     id integer primary key,
     package text);
 create temp table newest_binaries (
     id integer primary key,
     package text);
@@ -77,74 +76,38 @@ insert into newest_binaries (id, package)
         where type = :type and
             (architecture = :arch_all or architecture = :arch) and
             id in (select bin from bin_associations where suite = :suite)
         where type = :type and
             (architecture = :arch_all or architecture = :arch) and
             id in (select bin from bin_associations where suite = :suite)
-        order by package, version desc;
+        order by package, version desc;'''
+        self.session.execute(sql_create_temp, params=params)
 
 
+        sql = '''
 with
 
 unique_override as
     (select o.package, s.section
         from override o, section s
 with
 
 unique_override as
     (select o.package, s.section
         from override o, section s
-        where o.suite = :suite and o.type = :type_id and o.section = s.id and
+        where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
         o.component = :component)
 
         o.component = :component)
 
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
+select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
     from newest_binaries b, bin_contents bc, unique_override o
     where b.id = bc.binary_id and o.package = b.package
     from newest_binaries b, bin_contents bc, unique_override o
     where b.id = bc.binary_id and o.package = b.package
-    order by bc.file, b.package'''
-
-        else:
-            sql = '''
-create temp table newest_binaries (
-    id integer primary key,
-    package text);
-
-create index newest_binaries_by_package on newest_binaries (package);
-
-insert into newest_binaries (id, package)
-    select distinct on (package) id, package from binaries
-        where type = :type and
-            (architecture = :arch_all or architecture = :arch) and
-            id in (select bin from bin_associations where suite = :suite)
-        order by package, version desc;
-
-with
+    group by bc.file'''
 
 
-unique_override as
-    (select distinct on (o.package, s.section) o.package, s.section
-        from override o, section s
-        where o.suite = :suite and o.type = :type_id and o.section = s.id
-        order by o.package, s.section, o.modified desc)
-
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
-    from newest_binaries b, bin_contents bc, unique_override o
-    where b.id = bc.binary_id and o.package = b.package
-    order by bc.file, b.package'''
-
-        return self.session.query("file", "package").from_statement(sql). \
+        return self.session.query("file", "pkglist").from_statement(sql). \
             params(params)
 
     def formatline(self, filename, package_list):
         '''
         Returns a formatted string for the filename argument.
         '''
             params(params)
 
     def formatline(self, filename, package_list):
         '''
         Returns a formatted string for the filename argument.
         '''
-        package_list = ','.join(package_list)
         return "%-55s %s\n" % (filename, package_list)
 
     def fetch(self):
         '''
         Yields a new line of the Contents-$arch.gz file in filename order.
         '''
         return "%-55s %s\n" % (filename, package_list)
 
     def fetch(self):
         '''
         Yields a new line of the Contents-$arch.gz file in filename order.
         '''
-        last_filename = None
-        package_list = []
-        for filename, package in self.query().yield_per(100):
-            if filename != last_filename:
-                if last_filename is not None:
-                    yield self.formatline(last_filename, package_list)
-                last_filename = filename
-                package_list = []
-            package_list.append(package)
-        if last_filename is not None:
-            yield self.formatline(last_filename, package_list)
+        for filename, package_list in self.query().yield_per(100):
+            yield self.formatline(filename, package_list)
         # end transaction to return connection to pool
         self.session.rollback()
 
         # end transaction to return connection to pool
         self.session.rollback()
 
@@ -154,90 +117,224 @@ select bc.file, substring(o.section from position('/' in o.section) + 1) || '/'
         '''
         return [item for item in self.fetch()]
 
         '''
         return [item for item in self.fetch()]
 
-    def output_filename(self):
+    def writer(self):
         '''
         '''
-        Returns the name of the output file.
+        Returns a writer object.
         '''
         values = {
         '''
         values = {
-            'root': Config()['Dir::Root'],
-            'suite': self.suite.suite_name,
-            'architecture': self.architecture.arch_string
+            'archive':      self.suite.archive.path,
+            'suite':        self.suite.suite_name,
+            'component':    self.component.component_name,
+            'debtype':      self.overridetype.overridetype,
+            'architecture': self.architecture.arch_string,
         }
         }
-        if self.component is None:
-            return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
-        values['component'] = self.component.component_name
-        return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
+        return BinaryContentsFileWriter(**values)
 
     def get_header(self):
         '''
         Returns the header for the Contents files as a string.
         '''
 
     def get_header(self):
         '''
         Returns the header for the Contents files as a string.
         '''
-        header_file = None
-        try:
-            filename = os.path.join(Config()['Dir::Templates'], 'contents')
-            header_file = open(filename)
+        filename = os.path.join(Config()['Dir::Templates'], 'contents')
+        with open(filename) as header_file:
             return header_file.read()
             return header_file.read()
-        finally:
-            if header_file:
-                header_file.close()
 
     def write_file(self):
         '''
         Write the output file.
         '''
 
     def write_file(self):
         '''
         Write the output file.
         '''
-        command = ['gzip', '--rsyncable']
-        output_file = open(self.output_filename(), 'w')
-        gzip = Popen(command, stdin = PIPE, stdout = output_file)
-        gzip.stdin.write(self.get_header())
+        writer = self.writer()
+        file = writer.open()
+        file.write(self.get_header())
+        for item in self.fetch():
+            file.write(item)
+        writer.close()
+
+
+class SourceContentsWriter(object):
+    '''
+    SourceContentsWriter writes the Contents-source.gz files.
+    '''
+    def __init__(self, suite, component):
+        self.suite = suite
+        self.component = component
+        self.session = suite.session()
+
+    def query(self):
+        '''
+        Returns a query object that is doing most of the work.
+        '''
+        params = {
+            'suite_id':     self.suite.suite_id,
+            'component_id': self.component.component_id,
+        }
+
+        sql_create_temp = '''
+create temp table newest_sources (
+    id integer primary key,
+    source text);
+
+create index sources_binaries_by_source on newest_sources (source);
+
+insert into newest_sources (id, source)
+    select distinct on (source) s.id, s.source from source s
+        join files_archive_map af on s.file = af.file_id
+        where s.id in (select source from src_associations where suite = :suite_id)
+            and af.component_id = :component_id
+        order by source, version desc;'''
+        self.session.execute(sql_create_temp, params=params)
+
+        sql = '''
+select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
+    from newest_sources s, src_contents sc
+    where s.id = sc.source_id group by sc.file'''
+
+        return self.session.query("file", "pkglist").from_statement(sql). \
+            params(params)
+
+    def formatline(self, filename, package_list):
+        '''
+        Returns a formatted string for the filename argument.
+        '''
+        return "%s\t%s\n" % (filename, package_list)
+
+    def fetch(self):
+        '''
+        Yields a new line of the Contents-source.gz file in filename order.
+        '''
+        for filename, package_list in self.query().yield_per(100):
+            yield self.formatline(filename, package_list)
+        # end transaction to return connection to pool
+        self.session.rollback()
+
+    def get_list(self):
+        '''
+        Returns a list of lines for the Contents-source.gz file.
+        '''
+        return [item for item in self.fetch()]
+
+    def writer(self):
+        '''
+        Returns a writer object.
+        '''
+        values = {
+            'archive':   self.suite.archive.path,
+            'suite':     self.suite.suite_name,
+            'component': self.component.component_name
+        }
+        return SourceContentsFileWriter(**values)
+
+    def write_file(self):
+        '''
+        Write the output file.
+        '''
+        writer = self.writer()
+        file = writer.open()
         for item in self.fetch():
         for item in self.fetch():
-            gzip.stdin.write(item)
-        gzip.stdin.close()
-        output_file.close()
-        gzip.wait()
+            file.write(item)
+        writer.close()
 
 
+
+def binary_helper(suite_id, arch_id, overridetype_id, component_id):
+    '''
+    This function is called in a new subprocess and multiprocessing wants a top
+    level function.
+    '''
+    session = DBConn().session(work_mem = 1000)
+    suite = Suite.get(suite_id, session)
+    architecture = Architecture.get(arch_id, session)
+    overridetype = OverrideType.get(overridetype_id, session)
+    component = Component.get(component_id, session)
+    log_message = [suite.suite_name, architecture.arch_string, \
+        overridetype.overridetype, component.component_name]
+    contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
+    contents_writer.write_file()
+    session.close()
+    return log_message
+
+def source_helper(suite_id, component_id):
+    '''
+    This function is called in a new subprocess and multiprocessing wants a top
+    level function.
+    '''
+    session = DBConn().session(work_mem = 1000)
+    suite = Suite.get(suite_id, session)
+    component = Component.get(component_id, session)
+    log_message = [suite.suite_name, 'source', component.component_name]
+    contents_writer = SourceContentsWriter(suite, component)
+    contents_writer.write_file()
+    session.close()
+    return log_message
+
+class ContentsWriter(object):
+    '''
+    Loop over all suites, architectures, overridetypes, and components to write
+    all contents files.
+    '''
     @classmethod
     @classmethod
-    def write_all(class_, suite_names = [], force = False):
+    def log_result(class_, result):
+        '''
+        Writes a result message to the logfile.
+        '''
+        class_.logger.log(result)
+
+    @classmethod
+    def write_all(class_, logger, archive_names = [], suite_names = [], component_names = [], force = False):
         '''
         Writes all Contents files for suites in list suite_names which defaults
         to all 'touchable' suites if not specified explicitely. Untouchable
         suites will be included if the force argument is set to True.
         '''
         '''
         Writes all Contents files for suites in list suite_names which defaults
         to all 'touchable' suites if not specified explicitely. Untouchable
         suites will be included if the force argument is set to True.
         '''
+        class_.logger = logger
         session = DBConn().session()
         suite_query = session.query(Suite)
         session = DBConn().session()
         suite_query = session.query(Suite)
+        if len(archive_names) > 0:
+            suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
         if len(suite_names) > 0:
             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
         if len(suite_names) > 0:
             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
+        component_query = session.query(Component)
+        if len(component_names) > 0:
+            component_query = component_query.filter(Component.component_name.in_(component_names))
         if not force:
         if not force:
-            suite_query = suite_query.filter_by(untouchable = False)
+            suite_query = suite_query.filter(Suite.untouchable == False)
+        deb_id = get_override_type('deb', session).overridetype_id
+        udeb_id = get_override_type('udeb', session).overridetype_id
         pool = Pool()
         pool = Pool()
+
+        # Lock tables so that nobody can change things underneath us
+        session.execute("LOCK TABLE bin_contents IN SHARE MODE")
+        session.execute("LOCK TABLE src_contents IN SHARE MODE")
+
         for suite in suite_query:
         for suite in suite_query:
-            for architecture in suite.get_architectures(skipsrc = True, skipall = True):
-                # handle 'deb' packages
-                command = ['dak', 'contents', '-s', suite.suite_name, \
-                    'generate_helper', architecture.arch_string, 'deb']
-                pool.apply_async(call, (command, ))
-                # handle 'udeb' packages for 'main' and 'non-free'
-                command = ['dak', 'contents', '-s', suite.suite_name, \
-                    'generate_helper', architecture.arch_string, 'udeb', 'main']
-                pool.apply_async(call, (command, ))
-                command = ['dak', 'contents', '-s', suite.suite_name, \
-                    'generate_helper', architecture.arch_string, 'udeb', 'non-free']
-                pool.apply_async(call, (command, ))
+            suite_id = suite.suite_id
+            for component in component_query:
+                component_id = component.component_id
+                # handle source packages
+                pool.apply_async(source_helper, (suite_id, component_id),
+                    callback = class_.log_result)
+                for architecture in suite.get_architectures(skipsrc = True, skipall = True):
+                    arch_id = architecture.arch_id
+                    # handle 'deb' packages
+                    pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
+                        callback = class_.log_result)
+                    # handle 'udeb' packages
+                    pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
+                        callback = class_.log_result)
         pool.close()
         pool.join()
         session.close()
 
 
         pool.close()
         pool.join()
         session.close()
 
 
-class ContentsScanner(object):
+class BinaryContentsScanner(object):
     '''
     '''
-    ContentsScanner provides a threadsafe method scan() to scan the contents of
-    a DBBinary object.
+    BinaryContentsScanner provides a threadsafe method scan() to scan the
+    contents of a DBBinary object.
     '''
     '''
-    def __init__(self, binary):
+    def __init__(self, binary_id):
         '''
         '''
-        The argument binary is the actual DBBinary object that should be
-        scanned.
+        The argument binary_id is the id of the DBBinary object that
+        should be scanned.
         '''
         '''
-        self.binary_id = binary.binary_id
+        self.binary_id = binary_id
 
     def scan(self, dummy_arg = None):
         '''
 
     def scan(self, dummy_arg = None):
         '''
@@ -247,7 +344,10 @@ class ContentsScanner(object):
         '''
         session = DBConn().session()
         binary = session.query(DBBinary).get(self.binary_id)
         '''
         session = DBConn().session()
         binary = session.query(DBBinary).get(self.binary_id)
-        for filename in binary.scan_contents():
+        fileset = set(binary.scan_contents())
+        if len(fileset) == 0:
+            fileset.add('EMPTY_PACKAGE')
+        for filename in fileset:
             binary.contents.append(BinContents(file = filename))
         session.commit()
         session.close()
             binary.contents.append(BinContents(file = filename))
         session.commit()
         session.close()
@@ -266,10 +366,136 @@ class ContentsScanner(object):
         if limit is not None:
             query = query.limit(limit)
         processed = query.count()
         if limit is not None:
             query = query.limit(limit)
         processed = query.count()
-        threadpool = ThreadPool()
+        pool = Pool()
         for binary in query.yield_per(100):
         for binary in query.yield_per(100):
-            threadpool.queueTask(ContentsScanner(binary).scan)
-        threadpool.joinAll()
+            pool.apply_async(binary_scan_helper, (binary.binary_id, ))
+        pool.close()
+        pool.join()
+        remaining = remaining()
+        session.close()
+        return { 'processed': processed, 'remaining': remaining }
+
+def binary_scan_helper(binary_id):
+    '''
+    This function runs in a subprocess.
+    '''
+    scanner = BinaryContentsScanner(binary_id)
+    scanner.scan()
+
+class UnpackedSource(object):
+    '''
+    UnpackedSource extracts a source package into a temporary location and
+    gives you some convinient function for accessing it.
+    '''
+    def __init__(self, dscfilename, tmpbasedir=None):
+        '''
+        The dscfilename is a name of a DSC file that will be extracted.
+        '''
+        basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
+        temp_directory = mkdtemp(dir = basedir)
+        self.root_directory = os.path.join(temp_directory, 'root')
+        command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
+            dscfilename, self.root_directory)
+        daklib.daksubprocess.check_call(command)
+
+    def get_root_directory(self):
+        '''
+        Returns the name of the package's root directory which is the directory
+        where the debian subdirectory is located.
+        '''
+        return self.root_directory
+
+    def get_changelog_file(self):
+        '''
+        Returns a file object for debian/changelog or None if no such file exists.
+        '''
+        changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
+        try:
+            return open(changelog_name)
+        except IOError:
+            return None
+
+    def get_all_filenames(self):
+        '''
+        Returns an iterator over all filenames. The filenames will be relative
+        to the root directory.
+        '''
+        skip = len(self.root_directory) + 1
+        for root, _, files in os.walk(self.root_directory):
+            for name in files:
+                yield os.path.join(root[skip:], name)
+
+    def cleanup(self):
+        '''
+        Removes all temporary files.
+        '''
+        if self.root_directory is None:
+            return
+        parent_directory = os.path.dirname(self.root_directory)
+        rmtree(parent_directory)
+        self.root_directory = None
+
+    def __del__(self):
+        '''
+        Enforce cleanup.
+        '''
+        self.cleanup()
+
+
+class SourceContentsScanner(object):
+    '''
+    SourceContentsScanner provides a method scan() to scan the contents of a
+    DBSource object.
+    '''
+    def __init__(self, source_id):
+        '''
+        The argument source_id is the id of the DBSource object that
+        should be scanned.
+        '''
+        self.source_id = source_id
+
+    def scan(self):
+        '''
+        This method does the actual scan and fills in the associated SrcContents
+        property. It commits any changes to the database.
+        '''
+        session = DBConn().session()
+        source = session.query(DBSource).get(self.source_id)
+        fileset = set(source.scan_contents())
+        for filename in fileset:
+            source.contents.append(SrcContents(file = filename))
+        session.commit()
+        session.close()
+
+    @classmethod
+    def scan_all(class_, limit = None):
+        '''
+        The class method scan_all() scans all source using multiple processes.
+        The number of sources to be scanned can be limited with the limit
+        argument. Returns the number of processed and remaining packages as a
+        dict.
+        '''
+        session = DBConn().session()
+        query = session.query(DBSource).filter(DBSource.contents == None)
+        remaining = query.count
+        if limit is not None:
+            query = query.limit(limit)
+        processed = query.count()
+        pool = Pool()
+        for source in query.yield_per(100):
+            pool.apply_async(source_scan_helper, (source.source_id, ))
+        pool.close()
+        pool.join()
         remaining = remaining()
         session.close()
         return { 'processed': processed, 'remaining': remaining }
         remaining = remaining()
         session.close()
         return { 'processed': processed, 'remaining': remaining }
+
+def source_scan_helper(source_id):
+    '''
+    This function runs in a subprocess.
+    '''
+    try:
+        scanner = SourceContentsScanner(source_id)
+        scanner.scan()
+    except Exception as e:
+        print e