from multiprocessing import Pool
from sqlalchemy import desc, or_
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, call
import os.path
sure that the new ContentsWriter object can be executed in a different
thread.
'''
- self.suite = suite.clone()
- self.session = self.suite.session()
- self.architecture = architecture.clone(self.session)
- self.overridetype = overridetype.clone(self.session)
- if component is not None:
- self.component = component.clone(self.session)
- else:
- self.component = None
+ self.suite = suite
+ self.architecture = architecture
+ self.overridetype = overridetype
+ self.component = component
+ self.session = suite.session()
def query(self):
'''
suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
if not force:
suite_query = suite_query.filter_by(untouchable = False)
- main = get_component('main', session)
- non_free = get_component('non-free', session)
- deb = get_override_type('deb', session)
- udeb = get_override_type('udeb', session)
pool = Pool()
for suite in suite_query:
for architecture in suite.get_architectures(skipsrc = True, skipall = True):
# handle 'deb' packages
- writer = ContentsWriter(suite, architecture, deb)
- pool.apply(writer.write_file)
+ command = ['dak', 'contents', '-s', suite.suite_name, \
+ 'generate_helper', architecture.arch_string, 'deb']
+ pool.apply_async(call, (command, ))
# handle 'udeb' packages for 'main' and 'non-free'
- writer = ContentsWriter(suite, architecture, udeb, component = main)
- pool.apply(writer.write_file)
- writer = ContentsWriter(suite, architecture, udeb, component = non_free)
- pool.apply(writer.write_file)
+ command = ['dak', 'contents', '-s', suite.suite_name, \
+ 'generate_helper', architecture.arch_string, 'udeb', 'main']
+ pool.apply_async(call, (command, ))
+ command = ['dak', 'contents', '-s', suite.suite_name, \
+ 'generate_helper', architecture.arch_string, 'udeb', 'non-free']
+ pool.apply_async(call, (command, ))
pool.close()
pool.join()
session.close()
from daklib.dbconn import *
from daklib.contents import ContentsWriter, ContentsScanner
+from os.path import normpath
from sqlalchemy.exc import FlushError, IntegrityError
import unittest
self.assertEqual('/usr/bin/hello editors/emacs,python/hello,utils/sl\n', \
cw.formatline('/usr/bin/hello', ['editors/emacs', 'python/hello', 'utils/sl']))
# test output_filename
- self.assertEqual('tests/fixtures/ftp/squeeze/Contents-i386.gz', \
- cw.output_filename())
+ self.assertEqual('tests/fixtures/ftp/dists/squeeze/Contents-i386.gz', \
+ normpath(cw.output_filename()))
cw = ContentsWriter(self.suite['squeeze'], self.arch['i386'], \
self.otype['udeb'], self.comp['main'])
- self.assertEqual('tests/fixtures/ftp/squeeze/main/Contents-i386.gz', \
- cw.output_filename())
+ self.assertEqual('tests/fixtures/ftp/dists/squeeze/main/Contents-i386.gz', \
+ normpath(cw.output_filename()))
# test unicode support
self.binary['hello_2.2-1_i386'].contents.append(BinContents(file = '\xc3\xb6'))
self.session.commit()