X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=daklib%2Fcontents.py;h=577a45aab3b6324d05bcb8c95b00d27c73f5e7f1;hb=ed48066b1d8974fc525ba0d67d3da6ca0f7fa02c;hp=4a0b3ae25237fe8a0b63b25f284d9d0e31903a04;hpb=e2ab59085f36e9fec253cb2b473595409ba99bd2;p=dak.git diff --git a/daklib/contents.py b/daklib/contents.py index 4a0b3ae2..577a45aa 100755 --- a/daklib/contents.py +++ b/daklib/contents.py @@ -29,20 +29,17 @@ from daklib.dbconn import * from daklib.config import Config from multiprocessing import Pool -from subprocess import Popen, PIPE +from shutil import rmtree +from subprocess import Popen, PIPE, check_call +from tempfile import mkdtemp import os.path -class ContentsWriter(object): +class BinaryContentsWriter(object): ''' - ContentsWriter writes the Contents-$arch.gz files. + BinaryContentsWriter writes the Contents-$arch.gz files. ''' def __init__(self, suite, architecture, overridetype, component = None): - ''' - The constructor clones its arguments into a new session object to make - sure that the new ContentsWriter object can be executed in a different - thread. - ''' self.suite = suite self.architecture = architecture self.overridetype = overridetype @@ -188,13 +185,123 @@ select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package gzip.stdin.close() output_file.close() gzip.wait() - try: - os.remove(final_filename) - except: - pass + os.chmod(temp_filename, 0664) + os.rename(temp_filename, final_filename) + + +class SourceContentsWriter(object): + ''' + SourceContentsWriter writes the Contents-source.gz files. + ''' + def __init__(self, suite, component): + self.suite = suite + self.component = component + self.session = suite.session() + + def query(self): + ''' + Returns a query object that is doing most of the work. + ''' + params = { + 'suite_id': self.suite.suite_id, + 'component_id': self.component.component_id, + } + + sql = ''' +create temp table newest_sources ( + id integer primary key, + source text); + +create index sources_binaries_by_source on newest_sources (source); + +insert into newest_sources (id, source) + select distinct on (source) s.id, s.source from source s + join files f on f.id = s.file + join location l on l.id = f.location + where s.id in (select source from src_associations where suite = :suite_id) + and l.component = :component_id + order by source, version desc; + +select sc.file, string_agg(s.source, ',' order by s.source) as pkglist + from newest_sources s, src_contents sc + where s.id = sc.source_id group by sc.file''' + + return self.session.query("file", "pkglist").from_statement(sql). \ + params(params) + + def formatline(self, filename, package_list): + ''' + Returns a formatted string for the filename argument. + ''' + return "%s\t%s\n" % (filename, package_list) + + def fetch(self): + ''' + Yields a new line of the Contents-source.gz file in filename order. + ''' + for filename, package_list in self.query().yield_per(100): + yield self.formatline(filename, package_list) + # end transaction to return connection to pool + self.session.rollback() + + def get_list(self): + ''' + Returns a list of lines for the Contents-source.gz file. + ''' + return [item for item in self.fetch()] + + def output_filename(self): + ''' + Returns the name of the output file. + ''' + values = { + 'root': Config()['Dir::Root'], + 'suite': self.suite.suite_name, + 'component': self.component.component_name + } + return "%(root)s/dists/%(suite)s/%(component)s/Contents-source.gz" % values + + def write_file(self): + ''' + Write the output file. + ''' + command = ['gzip', '--rsyncable'] + final_filename = self.output_filename() + temp_filename = final_filename + '.new' + output_file = open(temp_filename, 'w') + gzip = Popen(command, stdin = PIPE, stdout = output_file) + for item in self.fetch(): + gzip.stdin.write(item) + gzip.stdin.close() + output_file.close() + gzip.wait() + os.chmod(temp_filename, 0664) os.rename(temp_filename, final_filename) - os.chmod(final_filename, 0664) + +def generate_helper(suite_id, arch_id, overridetype_id, component_id = None): + ''' + This function is called in a new subprocess. + ''' + session = DBConn().session() + suite = Suite.get(suite_id, session) + architecture = Architecture.get(arch_id, session) + overridetype = OverrideType.get(overridetype_id, session) + log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype] + if component_id is None: + component = None + else: + component = Component.get(component_id, session) + log_message.append(component.component_name) + contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) + contents_writer.write_file() + return log_message + +class ContentsWriter(object): + ''' + Loop over all suites, architectures, overridetypes, and components to write + all contents files. + ''' @classmethod def log_result(class_, result): ''' @@ -237,29 +344,11 @@ select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package pool.join() session.close() -def generate_helper(suite_id, arch_id, overridetype_id, component_id = None): - ''' - This function is called in a new subprocess. - ''' - session = DBConn().session() - suite = Suite.get(suite_id, session) - architecture = Architecture.get(arch_id, session) - overridetype = OverrideType.get(overridetype_id, session) - log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype] - if component_id is None: - component = None - else: - component = Component.get(component_id, session) - log_message.append(component.component_name) - contents_writer = ContentsWriter(suite, architecture, overridetype, component) - contents_writer.write_file() - return log_message - -class ContentsScanner(object): +class BinaryContentsScanner(object): ''' - ContentsScanner provides a threadsafe method scan() to scan the contents of - a DBBinary object. + BinaryContentsScanner provides a threadsafe method scan() to scan the + contents of a DBBinary object. ''' def __init__(self, binary_id): ''' @@ -300,16 +389,137 @@ class ContentsScanner(object): processed = query.count() pool = Pool() for binary in query.yield_per(100): - pool.apply_async(scan_helper, (binary.binary_id, )) + pool.apply_async(binary_scan_helper, (binary.binary_id, )) pool.close() pool.join() remaining = remaining() session.close() return { 'processed': processed, 'remaining': remaining } -def scan_helper(binary_id): +def binary_scan_helper(binary_id): ''' This function runs in a subprocess. ''' - scanner = ContentsScanner(binary_id) + scanner = BinaryContentsScanner(binary_id) scanner.scan() + + +class UnpackedSource(object): + ''' + UnpackedSource extracts a source package into a temporary location and + gives you some convinient function for accessing it. + ''' + def __init__(self, dscfilename): + ''' + The dscfilename is a name of a DSC file that will be extracted. + ''' + self.root_directory = os.path.join(mkdtemp(), 'root') + command = ('dpkg-source', '--no-copy', '--no-check', '-x', dscfilename, + self.root_directory) + # dpkg-source does not have a --quiet option + devnull = open(os.devnull, 'w') + check_call(command, stdout = devnull, stderr = devnull) + devnull.close() + + def get_root_directory(self): + ''' + Returns the name of the package's root directory which is the directory + where the debian subdirectory is located. + ''' + return self.root_directory + + def get_changelog_file(self): + ''' + Returns a file object for debian/changelog or None if no such file exists. + ''' + changelog_name = os.path.join(self.root_directory, 'debian', 'changelog') + try: + return open(changelog_name) + except IOError: + return None + + def get_all_filenames(self): + ''' + Returns an iterator over all filenames. The filenames will be relative + to the root directory. + ''' + skip = len(self.root_directory) + 1 + for root, _, files in os.walk(self.root_directory): + for name in files: + yield os.path.join(root[skip:], name) + + def cleanup(self): + ''' + Removes all temporary files. + ''' + if self.root_directory is None: + return + parent_directory = os.path.dirname(self.root_directory) + rmtree(parent_directory) + self.root_directory = None + + def __del__(self): + ''' + Enforce cleanup. + ''' + self.cleanup() + + +class SourceContentsScanner(object): + ''' + SourceContentsScanner provides a method scan() to scan the contents of a + DBSource object. + ''' + def __init__(self, source_id): + ''' + The argument source_id is the id of the DBSource object that + should be scanned. + ''' + self.source_id = source_id + + def scan(self): + ''' + This method does the actual scan and fills in the associated SrcContents + property. It commits any changes to the database. + ''' + session = DBConn().session() + source = session.query(DBSource).get(self.source_id) + fileset = set(source.scan_contents()) + for filename in fileset: + source.contents.append(SrcContents(file = filename)) + session.commit() + session.close() + + @classmethod + def scan_all(class_, limit = None): + ''' + The class method scan_all() scans all source using multiple processes. + The number of sources to be scanned can be limited with the limit + argument. Returns the number of processed and remaining packages as a + dict. + ''' + session = DBConn().session() + query = session.query(DBSource).filter(DBSource.contents == None) + remaining = query.count + if limit is not None: + query = query.limit(limit) + processed = query.count() + pool = Pool() + for source in query.yield_per(100): + pool.apply_async(source_scan_helper, (source.source_id, )) + pool.close() + pool.join() + remaining = remaining() + session.close() + return { 'processed': processed, 'remaining': remaining } + +def source_scan_helper(source_id): + ''' + This function runs in a subprocess. + ''' + try: + scanner = SourceContentsScanner(source_id) + scanner.scan() + except Exception, e: + print e +