from daklib.dbconn import *
from daklib.config import Config
+from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
from multiprocessing import Pool
from shutil import rmtree
-from subprocess import Popen, PIPE, check_call
from tempfile import mkdtemp
+import daklib.daksubprocess
import os.path
-import signal
class BinaryContentsWriter(object):
'''
BinaryContentsWriter writes the Contents-$arch.gz files.
'''
- def __init__(self, suite, architecture, overridetype, component = None):
+ def __init__(self, suite, architecture, overridetype, component):
self.suite = suite
self.architecture = architecture
self.overridetype = overridetype
params = {
'suite': self.suite.suite_id,
'overridesuite': overridesuite.suite_id,
+ 'component': self.component.component_id,
'arch_all': get_architecture('all', self.session).arch_id,
'arch': self.architecture.arch_id,
'type_id': self.overridetype.overridetype_id,
'type': self.overridetype.overridetype,
}
- if self.component is not None:
- params['component'] = self.component.component_id
- sql = '''
+ sql_create_temp = '''
create temp table newest_binaries (
id integer primary key,
package text);
where type = :type and
(architecture = :arch_all or architecture = :arch) and
id in (select bin from bin_associations where suite = :suite)
- order by package, version desc;
+ order by package, version desc;'''
+ self.session.execute(sql_create_temp, params=params)
+ sql = '''
with
unique_override as
where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
o.component = :component)
-select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
- from newest_binaries b, bin_contents bc, unique_override o
- where b.id = bc.binary_id and o.package = b.package
- group by bc.file'''
-
- else:
- sql = '''
-create temp table newest_binaries (
- id integer primary key,
- package text);
-
-create index newest_binaries_by_package on newest_binaries (package);
-
-insert into newest_binaries (id, package)
- select distinct on (package) id, package from binaries
- where type = :type and
- (architecture = :arch_all or architecture = :arch) and
- id in (select bin from bin_associations where suite = :suite)
- order by package, version desc;
-
-with
-
-unique_override as
- (select distinct on (o.package, s.section) o.package, s.section
- from override o, section s
- where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
- order by o.package, s.section, o.modified desc)
-
select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
from newest_binaries b, bin_contents bc, unique_override o
where b.id = bc.binary_id and o.package = b.package
'''
return [item for item in self.fetch()]
- def output_filename(self):
+ def writer(self):
'''
- Returns the name of the output file.
+ Returns a writer object.
'''
values = {
- 'root': Config()['Dir::Root'],
- 'suite': self.suite.suite_name,
- 'architecture': self.architecture.arch_string
+ 'archive': self.suite.archive.path,
+ 'suite': self.suite.suite_name,
+ 'component': self.component.component_name,
+ 'debtype': self.overridetype.overridetype,
+ 'architecture': self.architecture.arch_string,
}
- if self.component is None:
- return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
- values['component'] = self.component.component_name
- return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
+ return BinaryContentsFileWriter(**values)
def get_header(self):
'''
'''
Write the output file.
'''
- command = ['gzip', '--rsyncable']
- final_filename = self.output_filename()
- temp_filename = final_filename + '.new'
- output_file = open(temp_filename, 'w')
- gzip = Popen(command, stdin = PIPE, stdout = output_file)
- gzip.stdin.write(self.get_header())
+ writer = self.writer()
+ file = writer.open()
+ file.write(self.get_header())
for item in self.fetch():
- gzip.stdin.write(item)
- gzip.stdin.close()
- output_file.close()
- gzip.wait()
- os.chmod(temp_filename, 0664)
- os.rename(temp_filename, final_filename)
+ file.write(item)
+ writer.close()
class SourceContentsWriter(object):
'component_id': self.component.component_id,
}
- sql = '''
+ sql_create_temp = '''
create temp table newest_sources (
id integer primary key,
source text);
insert into newest_sources (id, source)
select distinct on (source) s.id, s.source from source s
- join files f on f.id = s.file
- join location l on l.id = f.location
+ join files_archive_map af on s.file = af.file_id
where s.id in (select source from src_associations where suite = :suite_id)
- and l.component = :component_id
- order by source, version desc;
+ and af.component_id = :component_id
+ order by source, version desc;'''
+ self.session.execute(sql_create_temp, params=params)
+ sql = '''
select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
from newest_sources s, src_contents sc
where s.id = sc.source_id group by sc.file'''
'''
return [item for item in self.fetch()]
- def output_filename(self):
+ def writer(self):
'''
- Returns the name of the output file.
+ Returns a writer object.
'''
values = {
- 'root': Config()['Dir::Root'],
+ 'archive': self.suite.archive.path,
'suite': self.suite.suite_name,
'component': self.component.component_name
}
- return "%(root)s/dists/%(suite)s/%(component)s/Contents-source.gz" % values
+ return SourceContentsFileWriter(**values)
def write_file(self):
'''
Write the output file.
'''
- command = ['gzip', '--rsyncable']
- final_filename = self.output_filename()
- temp_filename = final_filename + '.new'
- output_file = open(temp_filename, 'w')
- gzip = Popen(command, stdin = PIPE, stdout = output_file)
+ writer = self.writer()
+ file = writer.open()
for item in self.fetch():
- gzip.stdin.write(item)
- gzip.stdin.close()
- output_file.close()
- gzip.wait()
- os.chmod(temp_filename, 0664)
- os.rename(temp_filename, final_filename)
+ file.write(item)
+ writer.close()
-def binary_helper(suite_id, arch_id, overridetype_id, component_id = None):
+def binary_helper(suite_id, arch_id, overridetype_id, component_id):
'''
This function is called in a new subprocess and multiprocessing wants a top
level function.
'''
- session = DBConn().session()
+ session = DBConn().session(work_mem = 1000)
suite = Suite.get(suite_id, session)
architecture = Architecture.get(arch_id, session)
overridetype = OverrideType.get(overridetype_id, session)
- log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
- if component_id is None:
- component = None
- else:
- component = Component.get(component_id, session)
- log_message.append(component.component_name)
+ component = Component.get(component_id, session)
+ log_message = [suite.suite_name, architecture.arch_string, \
+ overridetype.overridetype, component.component_name]
contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
contents_writer.write_file()
+ session.close()
return log_message
def source_helper(suite_id, component_id):
This function is called in a new subprocess and multiprocessing wants a top
level function.
'''
- session = DBConn().session()
+ session = DBConn().session(work_mem = 1000)
suite = Suite.get(suite_id, session)
component = Component.get(component_id, session)
log_message = [suite.suite_name, 'source', component.component_name]
contents_writer = SourceContentsWriter(suite, component)
contents_writer.write_file()
+ session.close()
return log_message
class ContentsWriter(object):
class_.logger.log(result)
@classmethod
- def write_all(class_, logger, suite_names = [], force = False):
+ def write_all(class_, logger, archive_names = [], suite_names = [], component_names = [], force = False):
'''
Writes all Contents files for suites in list suite_names which defaults
to all 'touchable' suites if not specified explicitely. Untouchable
class_.logger = logger
session = DBConn().session()
suite_query = session.query(Suite)
+ if len(archive_names) > 0:
+ suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
if len(suite_names) > 0:
suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
+ component_query = session.query(Component)
+ if len(component_names) > 0:
+ component_query = component_query.filter(Component.component_name.in_(component_names))
if not force:
- suite_query = suite_query.filter_by(untouchable = False)
+ suite_query = suite_query.filter(Suite.untouchable == False)
deb_id = get_override_type('deb', session).overridetype_id
udeb_id = get_override_type('udeb', session).overridetype_id
- main_id = get_component('main', session).component_id
- contrib_id = get_component('contrib', session).component_id
- non_free_id = get_component('non-free', session).component_id
pool = Pool()
for suite in suite_query:
suite_id = suite.suite_id
- # handle source packages
- pool.apply_async(source_helper, (suite_id, main_id),
- callback = class_.log_result)
- pool.apply_async(source_helper, (suite_id, contrib_id),
- callback = class_.log_result)
- pool.apply_async(source_helper, (suite_id, non_free_id),
- callback = class_.log_result)
- for architecture in suite.get_architectures(skipsrc = True, skipall = True):
- arch_id = architecture.arch_id
- # handle 'deb' packages
- pool.apply_async(binary_helper, (suite_id, arch_id, deb_id), \
- callback = class_.log_result)
- # handle 'udeb' packages for 'main' and 'non-free'
- pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, main_id), \
- callback = class_.log_result)
- pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, non_free_id), \
+ for component in component_query:
+ component_id = component.component_id
+ # handle source packages
+ pool.apply_async(source_helper, (suite_id, component_id),
callback = class_.log_result)
+ for architecture in suite.get_architectures(skipsrc = True, skipall = True):
+ arch_id = architecture.arch_id
+ # handle 'deb' packages
+ pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
+ callback = class_.log_result)
+ # handle 'udeb' packages
+ pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
+ callback = class_.log_result)
pool.close()
pool.join()
session.close()
scanner = BinaryContentsScanner(binary_id)
scanner.scan()
-
-def subprocess_setup():
- # Python installs a SIGPIPE handler by default. This is usually not what
- # non-Python subprocesses expect.
- signal.signal(signal.SIGPIPE, signal.SIG_DFL)
-
class UnpackedSource(object):
'''
UnpackedSource extracts a source package into a temporary location and
gives you some convinient function for accessing it.
'''
- def __init__(self, dscfilename):
+ def __init__(self, dscfilename, tmpbasedir=None):
'''
The dscfilename is a name of a DSC file that will be extracted.
'''
- temp_directory = mkdtemp(dir = Config()['Dir::TempPath'])
+ basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
+ temp_directory = mkdtemp(dir = basedir)
self.root_directory = os.path.join(temp_directory, 'root')
command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
dscfilename, self.root_directory)
- check_call(command, preexec_fn = subprocess_setup)
+ daklib.daksubprocess.check_call(command)
def get_root_directory(self):
'''
try:
scanner = SourceContentsScanner(source_id)
scanner.scan()
- except Exception, e:
+ except Exception as e:
print e
-