from daklib.dbconn import *
from daklib.config import Config
+from daklib.threadpool import ThreadPool
+from multiprocessing import Pool
from sqlalchemy import desc, or_
-from subprocess import Popen, PIPE
+from sqlalchemy.exc import IntegrityError
+from subprocess import Popen, PIPE, call
+
+import os.path
class ContentsWriter(object):
'''
sure that the new ContentsWriter object can be executed in a different
thread.
'''
- self.suite = suite.clone()
- self.session = self.suite.session()
- self.architecture = architecture.clone(self.session)
- self.overridetype = overridetype.clone(self.session)
- if component is not None:
- self.component = component.clone(self.session)
- else:
- self.component = None
+ self.suite = suite
+ self.architecture = architecture
+ self.overridetype = overridetype
+ self.component = component
+ self.session = suite.session()
def query(self):
'''
Returns a query object that is doing most of the work.
'''
+ overridesuite = self.suite
+ if self.suite.overridesuite is not None:
+ overridesuite = get_suite(self.suite.overridesuite, self.session)
params = {
- 'suite': self.suite.suite_id,
- 'arch_all': get_architecture('all', self.session).arch_id,
- 'arch': self.architecture.arch_id,
- 'type_id': self.overridetype.overridetype_id,
- 'type': self.overridetype.overridetype,
+ 'suite': self.suite.suite_id,
+ 'overridesuite': overridesuite.suite_id,
+ 'arch_all': get_architecture('all', self.session).arch_id,
+ 'arch': self.architecture.arch_id,
+ 'type_id': self.overridetype.overridetype_id,
+ 'type': self.overridetype.overridetype,
}
if self.component is not None:
- params['component'] = component.component_id
+ params['component'] = self.component.component_id
sql = '''
create temp table newest_binaries (
id integer primary key,
unique_override as
(select o.package, s.section
from override o, section s
- where o.suite = :suite and o.type = :type_id and o.section = s.id and
+ where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
o.component = :component)
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
+select bc.file, o.section || '/' || b.package as package
from newest_binaries b, bin_contents bc, unique_override o
where b.id = bc.binary_id and o.package = b.package
order by bc.file, b.package'''
unique_override as
(select distinct on (o.package, s.section) o.package, s.section
from override o, section s
- where o.suite = :suite and o.type = :type_id and o.section = s.id
+ where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
order by o.package, s.section, o.modified desc)
-select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
+select bc.file, o.section || '/' || b.package as package
from newest_binaries b, bin_contents bc, unique_override o
where b.id = bc.binary_id and o.package = b.package
order by bc.file, b.package'''
Returns a formatted string for the filename argument.
'''
package_list = ','.join(package_list)
- return "%-60s%s\n" % (filename, package_list)
+ return "%-55s %s\n" % (filename, package_list)
def fetch(self):
'''
last_filename = filename
package_list = []
package_list.append(package)
- yield self.formatline(last_filename, package_list)
+ if last_filename is not None:
+ yield self.formatline(last_filename, package_list)
# end transaction to return connection to pool
self.session.rollback()
'architecture': self.architecture.arch_string
}
if self.component is None:
- return "%(root)s%(suite)s/Contents-%(architecture)s.gz" % values
+ return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
values['component'] = self.component.component_name
- return "%(root)s%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
+ return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
+
+ def get_header(self):
+ '''
+ Returns the header for the Contents files as a string.
+ '''
+ header_file = None
+ try:
+ filename = os.path.join(Config()['Dir::Templates'], 'contents')
+ header_file = open(filename)
+ return header_file.read()
+ finally:
+ if header_file:
+ header_file.close()
def write_file(self):
'''
'''
command = ['gzip', '--rsyncable']
output_file = open(self.output_filename(), 'w')
- pipe = Popen(command, stdin = PIPE, stdout = output_file).stdin
+ gzip = Popen(command, stdin = PIPE, stdout = output_file)
+ gzip.stdin.write(self.get_header())
for item in self.fetch():
- pipe.write(item)
- pipe.close()
+ gzip.stdin.write(item)
+ gzip.stdin.close()
output_file.close()
+ gzip.wait()
+
+ @classmethod
+ def write_all(class_, suite_names = [], force = False):
+ '''
+ Writes all Contents files for suites in list suite_names which defaults
+ to all 'touchable' suites if not specified explicitely. Untouchable
+ suites will be included if the force argument is set to True.
+ '''
+ session = DBConn().session()
+ suite_query = session.query(Suite)
+ if len(suite_names) > 0:
+ suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
+ if not force:
+ suite_query = suite_query.filter_by(untouchable = False)
+ pool = Pool()
+ for suite in suite_query:
+ for architecture in suite.get_architectures(skipsrc = True, skipall = True):
+ # handle 'deb' packages
+ command = ['dak', 'contents', '-s', suite.suite_name, \
+ 'generate_helper', architecture.arch_string, 'deb']
+ pool.apply_async(call, (command, ))
+ # handle 'udeb' packages for 'main' and 'non-free'
+ command = ['dak', 'contents', '-s', suite.suite_name, \
+ 'generate_helper', architecture.arch_string, 'udeb', 'main']
+ pool.apply_async(call, (command, ))
+ command = ['dak', 'contents', '-s', suite.suite_name, \
+ 'generate_helper', architecture.arch_string, 'udeb', 'non-free']
+ pool.apply_async(call, (command, ))
+ pool.close()
+ pool.join()
+ session.close()
+
+
+class ContentsScanner(object):
+ '''
+ ContentsScanner provides a threadsafe method scan() to scan the contents of
+ a DBBinary object.
+ '''
+ def __init__(self, binary):
+ '''
+ The argument binary is the actual DBBinary object that should be
+ scanned.
+ '''
+ self.binary_id = binary.binary_id
+
+ def scan(self, dummy_arg = None):
+ '''
+ This method does the actual scan and fills in the associated BinContents
+ property. It commits any changes to the database. The argument dummy_arg
+ is ignored but needed by our threadpool implementation.
+ '''
+ session = DBConn().session()
+ binary = session.query(DBBinary).get(self.binary_id)
+ empty_package = True
+ for filename in binary.scan_contents():
+ binary.contents.append(BinContents(file = filename))
+ empty_package = False
+ if empty_package:
+ binary.contents.append(BinContents(file = 'EMPTY_PACKAGE'))
+ try:
+ session.commit()
+ except IntegrityError:
+ session.rollback()
+ binary.contents.append(BinContents(file = 'DUPLICATE_FILENAMES'))
+ session.commit()
+ session.close()
+
+ @classmethod
+ def scan_all(class_, limit = None):
+ '''
+ The class method scan_all() scans all binaries using multiple threads.
+ The number of binaries to be scanned can be limited with the limit
+ argument. Returns the number of processed and remaining packages as a
+ dict.
+ '''
+ session = DBConn().session()
+ query = session.query(DBBinary).filter(DBBinary.contents == None)
+ remaining = query.count
+ if limit is not None:
+ query = query.limit(limit)
+ processed = query.count()
+ threadpool = ThreadPool()
+ for binary in query.yield_per(100):
+ threadpool.queueTask(ContentsScanner(binary).scan)
+ threadpool.joinAll()
+ remaining = remaining()
+ session.close()
+ return { 'processed': processed, 'remaining': remaining }