X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=daklib%2Fcontents.py;h=4a0b3ae25237fe8a0b63b25f284d9d0e31903a04;hb=e2ab59085f36e9fec253cb2b473595409ba99bd2;hp=740c0b2942951a5ff6f38df9581e9dd376698222;hpb=4bc88b4276c5b8cf2b03f24c81f19c90e8e12265;p=dak.git diff --git a/daklib/contents.py b/daklib/contents.py index 740c0b29..4a0b3ae2 100755 --- a/daklib/contents.py +++ b/daklib/contents.py @@ -27,11 +27,12 @@ Helper code for contents generation. from daklib.dbconn import * from daklib.config import Config -from daklib.threadpool import ThreadPool -from sqlalchemy import desc, or_ +from multiprocessing import Pool from subprocess import Popen, PIPE +import os.path + class ContentsWriter(object): ''' ContentsWriter writes the Contents-$arch.gz files. @@ -42,29 +43,30 @@ class ContentsWriter(object): sure that the new ContentsWriter object can be executed in a different thread. ''' - self.suite = suite.clone() - self.session = self.suite.session() - self.architecture = architecture.clone(self.session) - self.overridetype = overridetype.clone(self.session) - if component is not None: - self.component = component.clone(self.session) - else: - self.component = None + self.suite = suite + self.architecture = architecture + self.overridetype = overridetype + self.component = component + self.session = suite.session() def query(self): ''' Returns a query object that is doing most of the work. ''' + overridesuite = self.suite + if self.suite.overridesuite is not None: + overridesuite = get_suite(self.suite.overridesuite, self.session) params = { - 'suite': self.suite.suite_id, - 'arch_all': get_architecture('all', self.session).arch_id, - 'arch': self.architecture.arch_id, - 'type_id': self.overridetype.overridetype_id, - 'type': self.overridetype.overridetype, + 'suite': self.suite.suite_id, + 'overridesuite': overridesuite.suite_id, + 'arch_all': get_architecture('all', self.session).arch_id, + 'arch': self.architecture.arch_id, + 'type_id': self.overridetype.overridetype_id, + 'type': self.overridetype.overridetype, } if self.component is not None: - params['component'] = component.component_id + params['component'] = self.component.component_id sql = ''' create temp table newest_binaries ( id integer primary key, @@ -84,13 +86,13 @@ with unique_override as (select o.package, s.section from override o, section s - where o.suite = :suite and o.type = :type_id and o.section = s.id and + where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and o.component = :component) -select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package +select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist from newest_binaries b, bin_contents bc, unique_override o where b.id = bc.binary_id and o.package = b.package - order by bc.file, b.package''' + group by bc.file''' else: sql = ''' @@ -112,38 +114,29 @@ with unique_override as (select distinct on (o.package, s.section) o.package, s.section from override o, section s - where o.suite = :suite and o.type = :type_id and o.section = s.id + where o.suite = :overridesuite and o.type = :type_id and o.section = s.id order by o.package, s.section, o.modified desc) -select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package +select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist from newest_binaries b, bin_contents bc, unique_override o where b.id = bc.binary_id and o.package = b.package - order by bc.file, b.package''' + group by bc.file''' - return self.session.query("file", "package").from_statement(sql). \ + return self.session.query("file", "pkglist").from_statement(sql). \ params(params) def formatline(self, filename, package_list): ''' Returns a formatted string for the filename argument. ''' - package_list = ','.join(package_list) - return "%-60s%s\n" % (filename, package_list) + return "%-55s %s\n" % (filename, package_list) def fetch(self): ''' Yields a new line of the Contents-$arch.gz file in filename order. ''' - last_filename = None - package_list = [] - for filename, package in self.query().yield_per(100): - if filename != last_filename: - if last_filename is not None: - yield self.formatline(last_filename, package_list) - last_filename = filename - package_list = [] - package_list.append(package) - yield self.formatline(last_filename, package_list) + for filename, package_list in self.query().yield_per(100): + yield self.formatline(filename, package_list) # end transaction to return connection to pool self.session.rollback() @@ -163,21 +156,104 @@ select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' 'architecture': self.architecture.arch_string } if self.component is None: - return "%(root)s%(suite)s/Contents-%(architecture)s.gz" % values + return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values values['component'] = self.component.component_name - return "%(root)s%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values + return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values + + def get_header(self): + ''' + Returns the header for the Contents files as a string. + ''' + header_file = None + try: + filename = os.path.join(Config()['Dir::Templates'], 'contents') + header_file = open(filename) + return header_file.read() + finally: + if header_file: + header_file.close() def write_file(self): ''' Write the output file. ''' command = ['gzip', '--rsyncable'] - output_file = open(self.output_filename(), 'w') - pipe = Popen(command, stdin = PIPE, stdout = output_file).stdin + final_filename = self.output_filename() + temp_filename = final_filename + '.new' + output_file = open(temp_filename, 'w') + gzip = Popen(command, stdin = PIPE, stdout = output_file) + gzip.stdin.write(self.get_header()) for item in self.fetch(): - pipe.write(item) - pipe.close() + gzip.stdin.write(item) + gzip.stdin.close() output_file.close() + gzip.wait() + try: + os.remove(final_filename) + except: + pass + os.rename(temp_filename, final_filename) + os.chmod(final_filename, 0664) + + @classmethod + def log_result(class_, result): + ''' + Writes a result message to the logfile. + ''' + class_.logger.log(result) + + @classmethod + def write_all(class_, logger, suite_names = [], force = False): + ''' + Writes all Contents files for suites in list suite_names which defaults + to all 'touchable' suites if not specified explicitely. Untouchable + suites will be included if the force argument is set to True. + ''' + class_.logger = logger + session = DBConn().session() + suite_query = session.query(Suite) + if len(suite_names) > 0: + suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) + if not force: + suite_query = suite_query.filter_by(untouchable = False) + deb_id = get_override_type('deb', session).overridetype_id + udeb_id = get_override_type('udeb', session).overridetype_id + main_id = get_component('main', session).component_id + non_free_id = get_component('non-free', session).component_id + pool = Pool() + for suite in suite_query: + suite_id = suite.suite_id + for architecture in suite.get_architectures(skipsrc = True, skipall = True): + arch_id = architecture.arch_id + # handle 'deb' packages + pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \ + callback = class_.log_result) + # handle 'udeb' packages for 'main' and 'non-free' + pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \ + callback = class_.log_result) + pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \ + callback = class_.log_result) + pool.close() + pool.join() + session.close() + +def generate_helper(suite_id, arch_id, overridetype_id, component_id = None): + ''' + This function is called in a new subprocess. + ''' + session = DBConn().session() + suite = Suite.get(suite_id, session) + architecture = Architecture.get(arch_id, session) + overridetype = OverrideType.get(overridetype_id, session) + log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype] + if component_id is None: + component = None + else: + component = Component.get(component_id, session) + log_message.append(component.component_name) + contents_writer = ContentsWriter(suite, architecture, overridetype, component) + contents_writer.write_file() + return log_message class ContentsScanner(object): @@ -185,12 +261,12 @@ class ContentsScanner(object): ContentsScanner provides a threadsafe method scan() to scan the contents of a DBBinary object. ''' - def __init__(self, binary): + def __init__(self, binary_id): ''' - The argument binary is the actual DBBinary object that should be - scanned. + The argument binary_id is the id of the DBBinary object that + should be scanned. ''' - self.binary_id = binary.binary_id + self.binary_id = binary_id def scan(self, dummy_arg = None): ''' @@ -200,7 +276,10 @@ class ContentsScanner(object): ''' session = DBConn().session() binary = session.query(DBBinary).get(self.binary_id) - for filename in binary.scan_contents(): + fileset = set(binary.scan_contents()) + if len(fileset) == 0: + fileset.add('EMPTY_PACKAGE') + for filename in fileset: binary.contents.append(BinContents(file = filename)) session.commit() session.close() @@ -210,14 +289,27 @@ class ContentsScanner(object): ''' The class method scan_all() scans all binaries using multiple threads. The number of binaries to be scanned can be limited with the limit - argument. + argument. Returns the number of processed and remaining packages as a + dict. ''' session = DBConn().session() query = session.query(DBBinary).filter(DBBinary.contents == None) + remaining = query.count if limit is not None: query = query.limit(limit) - threadpool = ThreadPool() + processed = query.count() + pool = Pool() for binary in query.yield_per(100): - threadpool.queueTask(ContentsScanner(binary).scan) - threadpool.joinAll() + pool.apply_async(scan_helper, (binary.binary_id, )) + pool.close() + pool.join() + remaining = remaining() session.close() + return { 'processed': processed, 'remaining': remaining } + +def scan_helper(binary_id): + ''' + This function runs in a subprocess. + ''' + scanner = ContentsScanner(binary_id) + scanner.scan()