]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Reset signal handler to default action in child processes
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
31
32 from multiprocessing import Pool
33 from shutil import rmtree
34 from tempfile import mkdtemp
35
36 import daklib.daksubprocess
37 import os.path
38
39 class BinaryContentsWriter(object):
40     '''
41     BinaryContentsWriter writes the Contents-$arch.gz files.
42     '''
43     def __init__(self, suite, architecture, overridetype, component):
44         self.suite = suite
45         self.architecture = architecture
46         self.overridetype = overridetype
47         self.component = component
48         self.session = suite.session()
49
50     def query(self):
51         '''
52         Returns a query object that is doing most of the work.
53         '''
54         overridesuite = self.suite
55         if self.suite.overridesuite is not None:
56             overridesuite = get_suite(self.suite.overridesuite, self.session)
57         params = {
58             'suite':         self.suite.suite_id,
59             'overridesuite': overridesuite.suite_id,
60             'component':     self.component.component_id,
61             'arch_all':      get_architecture('all', self.session).arch_id,
62             'arch':          self.architecture.arch_id,
63             'type_id':       self.overridetype.overridetype_id,
64             'type':          self.overridetype.overridetype,
65         }
66
67         sql_create_temp = '''
68 create temp table newest_binaries (
69     id integer primary key,
70     package text);
71
72 create index newest_binaries_by_package on newest_binaries (package);
73
74 insert into newest_binaries (id, package)
75     select distinct on (package) id, package from binaries
76         where type = :type and
77             (architecture = :arch_all or architecture = :arch) and
78             id in (select bin from bin_associations where suite = :suite)
79         order by package, version desc;'''
80         self.session.execute(sql_create_temp, params=params)
81
82         sql = '''
83 with
84
85 unique_override as
86     (select o.package, s.section
87         from override o, section s
88         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
89         o.component = :component)
90
91 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
92     from newest_binaries b, bin_contents bc, unique_override o
93     where b.id = bc.binary_id and o.package = b.package
94     group by bc.file'''
95
96         return self.session.query("file", "pkglist").from_statement(sql). \
97             params(params)
98
99     def formatline(self, filename, package_list):
100         '''
101         Returns a formatted string for the filename argument.
102         '''
103         return "%-55s %s\n" % (filename, package_list)
104
105     def fetch(self):
106         '''
107         Yields a new line of the Contents-$arch.gz file in filename order.
108         '''
109         for filename, package_list in self.query().yield_per(100):
110             yield self.formatline(filename, package_list)
111         # end transaction to return connection to pool
112         self.session.rollback()
113
114     def get_list(self):
115         '''
116         Returns a list of lines for the Contents-$arch.gz file.
117         '''
118         return [item for item in self.fetch()]
119
120     def writer(self):
121         '''
122         Returns a writer object.
123         '''
124         values = {
125             'archive':      self.suite.archive.path,
126             'suite':        self.suite.suite_name,
127             'component':    self.component.component_name,
128             'debtype':      self.overridetype.overridetype,
129             'architecture': self.architecture.arch_string,
130         }
131         return BinaryContentsFileWriter(**values)
132
133     def get_header(self):
134         '''
135         Returns the header for the Contents files as a string.
136         '''
137         header_file = None
138         try:
139             filename = os.path.join(Config()['Dir::Templates'], 'contents')
140             header_file = open(filename)
141             return header_file.read()
142         finally:
143             if header_file:
144                 header_file.close()
145
146     def write_file(self):
147         '''
148         Write the output file.
149         '''
150         writer = self.writer()
151         file = writer.open()
152         file.write(self.get_header())
153         for item in self.fetch():
154             file.write(item)
155         writer.close()
156
157
158 class SourceContentsWriter(object):
159     '''
160     SourceContentsWriter writes the Contents-source.gz files.
161     '''
162     def __init__(self, suite, component):
163         self.suite = suite
164         self.component = component
165         self.session = suite.session()
166
167     def query(self):
168         '''
169         Returns a query object that is doing most of the work.
170         '''
171         params = {
172             'suite_id':     self.suite.suite_id,
173             'component_id': self.component.component_id,
174         }
175
176         sql_create_temp = '''
177 create temp table newest_sources (
178     id integer primary key,
179     source text);
180
181 create index sources_binaries_by_source on newest_sources (source);
182
183 insert into newest_sources (id, source)
184     select distinct on (source) s.id, s.source from source s
185         join files_archive_map af on s.file = af.file_id
186         where s.id in (select source from src_associations where suite = :suite_id)
187             and af.component_id = :component_id
188         order by source, version desc;'''
189         self.session.execute(sql_create_temp, params=params)
190
191         sql = '''
192 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
193     from newest_sources s, src_contents sc
194     where s.id = sc.source_id group by sc.file'''
195
196         return self.session.query("file", "pkglist").from_statement(sql). \
197             params(params)
198
199     def formatline(self, filename, package_list):
200         '''
201         Returns a formatted string for the filename argument.
202         '''
203         return "%s\t%s\n" % (filename, package_list)
204
205     def fetch(self):
206         '''
207         Yields a new line of the Contents-source.gz file in filename order.
208         '''
209         for filename, package_list in self.query().yield_per(100):
210             yield self.formatline(filename, package_list)
211         # end transaction to return connection to pool
212         self.session.rollback()
213
214     def get_list(self):
215         '''
216         Returns a list of lines for the Contents-source.gz file.
217         '''
218         return [item for item in self.fetch()]
219
220     def writer(self):
221         '''
222         Returns a writer object.
223         '''
224         values = {
225             'archive':   self.suite.archive.path,
226             'suite':     self.suite.suite_name,
227             'component': self.component.component_name
228         }
229         return SourceContentsFileWriter(**values)
230
231     def write_file(self):
232         '''
233         Write the output file.
234         '''
235         writer = self.writer()
236         file = writer.open()
237         for item in self.fetch():
238             file.write(item)
239         writer.close()
240
241
242 def binary_helper(suite_id, arch_id, overridetype_id, component_id):
243     '''
244     This function is called in a new subprocess and multiprocessing wants a top
245     level function.
246     '''
247     session = DBConn().session(work_mem = 1000)
248     suite = Suite.get(suite_id, session)
249     architecture = Architecture.get(arch_id, session)
250     overridetype = OverrideType.get(overridetype_id, session)
251     component = Component.get(component_id, session)
252     log_message = [suite.suite_name, architecture.arch_string, \
253         overridetype.overridetype, component.component_name]
254     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
255     contents_writer.write_file()
256     session.close()
257     return log_message
258
259 def source_helper(suite_id, component_id):
260     '''
261     This function is called in a new subprocess and multiprocessing wants a top
262     level function.
263     '''
264     session = DBConn().session(work_mem = 1000)
265     suite = Suite.get(suite_id, session)
266     component = Component.get(component_id, session)
267     log_message = [suite.suite_name, 'source', component.component_name]
268     contents_writer = SourceContentsWriter(suite, component)
269     contents_writer.write_file()
270     session.close()
271     return log_message
272
273 class ContentsWriter(object):
274     '''
275     Loop over all suites, architectures, overridetypes, and components to write
276     all contents files.
277     '''
278     @classmethod
279     def log_result(class_, result):
280         '''
281         Writes a result message to the logfile.
282         '''
283         class_.logger.log(result)
284
285     @classmethod
286     def write_all(class_, logger, archive_names = [], suite_names = [], component_names = [], force = False):
287         '''
288         Writes all Contents files for suites in list suite_names which defaults
289         to all 'touchable' suites if not specified explicitely. Untouchable
290         suites will be included if the force argument is set to True.
291         '''
292         class_.logger = logger
293         session = DBConn().session()
294         suite_query = session.query(Suite)
295         if len(archive_names) > 0:
296             suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
297         if len(suite_names) > 0:
298             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
299         component_query = session.query(Component)
300         if len(component_names) > 0:
301             component_query = component_query.filter(Component.component_name.in_(component_names))
302         if not force:
303             suite_query = suite_query.filter(Suite.untouchable == False)
304         deb_id = get_override_type('deb', session).overridetype_id
305         udeb_id = get_override_type('udeb', session).overridetype_id
306         pool = Pool()
307         for suite in suite_query:
308             suite_id = suite.suite_id
309             for component in component_query:
310                 component_id = component.component_id
311                 # handle source packages
312                 pool.apply_async(source_helper, (suite_id, component_id),
313                     callback = class_.log_result)
314                 for architecture in suite.get_architectures(skipsrc = True, skipall = True):
315                     arch_id = architecture.arch_id
316                     # handle 'deb' packages
317                     pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
318                         callback = class_.log_result)
319                     # handle 'udeb' packages
320                     pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
321                         callback = class_.log_result)
322         pool.close()
323         pool.join()
324         session.close()
325
326
327 class BinaryContentsScanner(object):
328     '''
329     BinaryContentsScanner provides a threadsafe method scan() to scan the
330     contents of a DBBinary object.
331     '''
332     def __init__(self, binary_id):
333         '''
334         The argument binary_id is the id of the DBBinary object that
335         should be scanned.
336         '''
337         self.binary_id = binary_id
338
339     def scan(self, dummy_arg = None):
340         '''
341         This method does the actual scan and fills in the associated BinContents
342         property. It commits any changes to the database. The argument dummy_arg
343         is ignored but needed by our threadpool implementation.
344         '''
345         session = DBConn().session()
346         binary = session.query(DBBinary).get(self.binary_id)
347         fileset = set(binary.scan_contents())
348         if len(fileset) == 0:
349             fileset.add('EMPTY_PACKAGE')
350         for filename in fileset:
351             binary.contents.append(BinContents(file = filename))
352         session.commit()
353         session.close()
354
355     @classmethod
356     def scan_all(class_, limit = None):
357         '''
358         The class method scan_all() scans all binaries using multiple threads.
359         The number of binaries to be scanned can be limited with the limit
360         argument. Returns the number of processed and remaining packages as a
361         dict.
362         '''
363         session = DBConn().session()
364         query = session.query(DBBinary).filter(DBBinary.contents == None)
365         remaining = query.count
366         if limit is not None:
367             query = query.limit(limit)
368         processed = query.count()
369         pool = Pool()
370         for binary in query.yield_per(100):
371             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
372         pool.close()
373         pool.join()
374         remaining = remaining()
375         session.close()
376         return { 'processed': processed, 'remaining': remaining }
377
378 def binary_scan_helper(binary_id):
379     '''
380     This function runs in a subprocess.
381     '''
382     scanner = BinaryContentsScanner(binary_id)
383     scanner.scan()
384
385 class UnpackedSource(object):
386     '''
387     UnpackedSource extracts a source package into a temporary location and
388     gives you some convinient function for accessing it.
389     '''
390     def __init__(self, dscfilename, tmpbasedir=None):
391         '''
392         The dscfilename is a name of a DSC file that will be extracted.
393         '''
394         basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
395         temp_directory = mkdtemp(dir = basedir)
396         self.root_directory = os.path.join(temp_directory, 'root')
397         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
398             dscfilename, self.root_directory)
399         daklib.daksubprocess.check_call(command)
400
401     def get_root_directory(self):
402         '''
403         Returns the name of the package's root directory which is the directory
404         where the debian subdirectory is located.
405         '''
406         return self.root_directory
407
408     def get_changelog_file(self):
409         '''
410         Returns a file object for debian/changelog or None if no such file exists.
411         '''
412         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
413         try:
414             return open(changelog_name)
415         except IOError:
416             return None
417
418     def get_all_filenames(self):
419         '''
420         Returns an iterator over all filenames. The filenames will be relative
421         to the root directory.
422         '''
423         skip = len(self.root_directory) + 1
424         for root, _, files in os.walk(self.root_directory):
425             for name in files:
426                 yield os.path.join(root[skip:], name)
427
428     def cleanup(self):
429         '''
430         Removes all temporary files.
431         '''
432         if self.root_directory is None:
433             return
434         parent_directory = os.path.dirname(self.root_directory)
435         rmtree(parent_directory)
436         self.root_directory = None
437
438     def __del__(self):
439         '''
440         Enforce cleanup.
441         '''
442         self.cleanup()
443
444
445 class SourceContentsScanner(object):
446     '''
447     SourceContentsScanner provides a method scan() to scan the contents of a
448     DBSource object.
449     '''
450     def __init__(self, source_id):
451         '''
452         The argument source_id is the id of the DBSource object that
453         should be scanned.
454         '''
455         self.source_id = source_id
456
457     def scan(self):
458         '''
459         This method does the actual scan and fills in the associated SrcContents
460         property. It commits any changes to the database.
461         '''
462         session = DBConn().session()
463         source = session.query(DBSource).get(self.source_id)
464         fileset = set(source.scan_contents())
465         for filename in fileset:
466             source.contents.append(SrcContents(file = filename))
467         session.commit()
468         session.close()
469
470     @classmethod
471     def scan_all(class_, limit = None):
472         '''
473         The class method scan_all() scans all source using multiple processes.
474         The number of sources to be scanned can be limited with the limit
475         argument. Returns the number of processed and remaining packages as a
476         dict.
477         '''
478         session = DBConn().session()
479         query = session.query(DBSource).filter(DBSource.contents == None)
480         remaining = query.count
481         if limit is not None:
482             query = query.limit(limit)
483         processed = query.count()
484         pool = Pool()
485         for source in query.yield_per(100):
486             pool.apply_async(source_scan_helper, (source.source_id, ))
487         pool.close()
488         pool.join()
489         remaining = remaining()
490         session.close()
491         return { 'processed': processed, 'remaining': remaining }
492
493 def source_scan_helper(source_id):
494     '''
495     This function runs in a subprocess.
496     '''
497     try:
498         scanner = SourceContentsScanner(source_id)
499         scanner.scan()
500     except Exception as e:
501         print e