]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Split binary contents into components.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
31
32 from multiprocessing import Pool
33 from shutil import rmtree
34 from subprocess import Popen, PIPE, check_call
35 from tempfile import mkdtemp
36
37 import os.path
38 import signal
39
40 class BinaryContentsWriter(object):
41     '''
42     BinaryContentsWriter writes the Contents-$arch.gz files.
43     '''
44     def __init__(self, suite, architecture, overridetype, component):
45         self.suite = suite
46         self.architecture = architecture
47         self.overridetype = overridetype
48         self.component = component
49         self.session = suite.session()
50
51     def query(self):
52         '''
53         Returns a query object that is doing most of the work.
54         '''
55         overridesuite = self.suite
56         if self.suite.overridesuite is not None:
57             overridesuite = get_suite(self.suite.overridesuite, self.session)
58         params = {
59             'suite':         self.suite.suite_id,
60             'overridesuite': overridesuite.suite_id,
61             'component':     self.component.component_id,
62             'arch_all':      get_architecture('all', self.session).arch_id,
63             'arch':          self.architecture.arch_id,
64             'type_id':       self.overridetype.overridetype_id,
65             'type':          self.overridetype.overridetype,
66         }
67
68         sql = '''
69 create temp table newest_binaries (
70     id integer primary key,
71     package text);
72
73 create index newest_binaries_by_package on newest_binaries (package);
74
75 insert into newest_binaries (id, package)
76     select distinct on (package) id, package from binaries
77         where type = :type and
78             (architecture = :arch_all or architecture = :arch) and
79             id in (select bin from bin_associations where suite = :suite)
80         order by package, version desc;
81
82 with
83
84 unique_override as
85     (select o.package, s.section
86         from override o, section s
87         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
88         o.component = :component)
89
90 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
91     from newest_binaries b, bin_contents bc, unique_override o
92     where b.id = bc.binary_id and o.package = b.package
93     group by bc.file'''
94
95         return self.session.query("file", "pkglist").from_statement(sql). \
96             params(params)
97
98     def formatline(self, filename, package_list):
99         '''
100         Returns a formatted string for the filename argument.
101         '''
102         return "%-55s %s\n" % (filename, package_list)
103
104     def fetch(self):
105         '''
106         Yields a new line of the Contents-$arch.gz file in filename order.
107         '''
108         for filename, package_list in self.query().yield_per(100):
109             yield self.formatline(filename, package_list)
110         # end transaction to return connection to pool
111         self.session.rollback()
112
113     def get_list(self):
114         '''
115         Returns a list of lines for the Contents-$arch.gz file.
116         '''
117         return [item for item in self.fetch()]
118
119     def writer(self):
120         '''
121         Returns a writer object.
122         '''
123         values = {
124             'suite':        self.suite.suite_name,
125             'debtype':      self.overridetype.overridetype,
126             'architecture': self.architecture.arch_string,
127         }
128         return BinaryContentsFileWriter(**values)
129
130     def get_header(self):
131         '''
132         Returns the header for the Contents files as a string.
133         '''
134         header_file = None
135         try:
136             filename = os.path.join(Config()['Dir::Templates'], 'contents')
137             header_file = open(filename)
138             return header_file.read()
139         finally:
140             if header_file:
141                 header_file.close()
142
143     def write_file(self):
144         '''
145         Write the output file.
146         '''
147         writer = self.writer()
148         file = writer.open()
149         file.write(self.get_header())
150         for item in self.fetch():
151             file.write(item)
152         writer.close()
153
154
155 class SourceContentsWriter(object):
156     '''
157     SourceContentsWriter writes the Contents-source.gz files.
158     '''
159     def __init__(self, suite, component):
160         self.suite = suite
161         self.component = component
162         self.session = suite.session()
163
164     def query(self):
165         '''
166         Returns a query object that is doing most of the work.
167         '''
168         params = {
169             'suite_id':     self.suite.suite_id,
170             'component_id': self.component.component_id,
171         }
172
173         sql = '''
174 create temp table newest_sources (
175     id integer primary key,
176     source text);
177
178 create index sources_binaries_by_source on newest_sources (source);
179
180 insert into newest_sources (id, source)
181     select distinct on (source) s.id, s.source from source s
182         join files f on f.id = s.file
183         join location l on l.id = f.location
184         where s.id in (select source from src_associations where suite = :suite_id)
185             and l.component = :component_id
186         order by source, version desc;
187
188 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
189     from newest_sources s, src_contents sc
190     where s.id = sc.source_id group by sc.file'''
191
192         return self.session.query("file", "pkglist").from_statement(sql). \
193             params(params)
194
195     def formatline(self, filename, package_list):
196         '''
197         Returns a formatted string for the filename argument.
198         '''
199         return "%s\t%s\n" % (filename, package_list)
200
201     def fetch(self):
202         '''
203         Yields a new line of the Contents-source.gz file in filename order.
204         '''
205         for filename, package_list in self.query().yield_per(100):
206             yield self.formatline(filename, package_list)
207         # end transaction to return connection to pool
208         self.session.rollback()
209
210     def get_list(self):
211         '''
212         Returns a list of lines for the Contents-source.gz file.
213         '''
214         return [item for item in self.fetch()]
215
216     def writer(self):
217         '''
218         Returns a writer object.
219         '''
220         values = {
221             'suite':     self.suite.suite_name,
222             'component': self.component.component_name
223         }
224         return SourceContentsFileWriter(**values)
225
226     def write_file(self):
227         '''
228         Write the output file.
229         '''
230         writer = self.writer()
231         file = writer.open()
232         for item in self.fetch():
233             file.write(item)
234         writer.close()
235
236
237 def binary_helper(suite_id, arch_id, overridetype_id, component_id):
238     '''
239     This function is called in a new subprocess and multiprocessing wants a top
240     level function.
241     '''
242     session = DBConn().session(work_mem = 1000)
243     suite = Suite.get(suite_id, session)
244     architecture = Architecture.get(arch_id, session)
245     overridetype = OverrideType.get(overridetype_id, session)
246     component = Component.get(component_id, session)
247     log_message = [suite.suite_name, architecture.arch_string, \
248         overridetype.overridetype, component.component_name]
249     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
250     contents_writer.write_file()
251     return log_message
252
253 def source_helper(suite_id, component_id):
254     '''
255     This function is called in a new subprocess and multiprocessing wants a top
256     level function.
257     '''
258     session = DBConn().session(work_mem = 1000)
259     suite = Suite.get(suite_id, session)
260     component = Component.get(component_id, session)
261     log_message = [suite.suite_name, 'source', component.component_name]
262     contents_writer = SourceContentsWriter(suite, component)
263     contents_writer.write_file()
264     return log_message
265
266 class ContentsWriter(object):
267     '''
268     Loop over all suites, architectures, overridetypes, and components to write
269     all contents files.
270     '''
271     @classmethod
272     def log_result(class_, result):
273         '''
274         Writes a result message to the logfile.
275         '''
276         class_.logger.log(result)
277
278     @classmethod
279     def write_all(class_, logger, suite_names = [], component_names = [], force = False):
280         '''
281         Writes all Contents files for suites in list suite_names which defaults
282         to all 'touchable' suites if not specified explicitely. Untouchable
283         suites will be included if the force argument is set to True.
284         '''
285         class_.logger = logger
286         session = DBConn().session()
287         suite_query = session.query(Suite)
288         if len(suite_names) > 0:
289             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
290         component_query = session.query(Component)
291         if len(component_names) > 0:
292             component_query = component_query.filter(Component.component_name.in_(component_names))
293         if not force:
294             suite_query = suite_query.filter_by(untouchable = False)
295         deb_id = get_override_type('deb', session).overridetype_id
296         udeb_id = get_override_type('udeb', session).overridetype_id
297         pool = Pool()
298         for suite in suite_query:
299             suite_id = suite.suite_id
300             for component in component_query:
301                 component_id = component.component_id
302                 # handle source packages
303                 pool.apply_async(source_helper, (suite_id, component_id),
304                     callback = class_.log_result)
305                 for architecture in suite.get_architectures(skipsrc = True, skipall = True):
306                     arch_id = architecture.arch_id
307                     # handle 'deb' packages
308                     pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
309                         callback = class_.log_result)
310                     # handle 'udeb' packages
311                     pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
312                         callback = class_.log_result)
313         pool.close()
314         pool.join()
315         session.close()
316
317
318 class BinaryContentsScanner(object):
319     '''
320     BinaryContentsScanner provides a threadsafe method scan() to scan the
321     contents of a DBBinary object.
322     '''
323     def __init__(self, binary_id):
324         '''
325         The argument binary_id is the id of the DBBinary object that
326         should be scanned.
327         '''
328         self.binary_id = binary_id
329
330     def scan(self, dummy_arg = None):
331         '''
332         This method does the actual scan and fills in the associated BinContents
333         property. It commits any changes to the database. The argument dummy_arg
334         is ignored but needed by our threadpool implementation.
335         '''
336         session = DBConn().session()
337         binary = session.query(DBBinary).get(self.binary_id)
338         fileset = set(binary.scan_contents())
339         if len(fileset) == 0:
340             fileset.add('EMPTY_PACKAGE')
341         for filename in fileset:
342             binary.contents.append(BinContents(file = filename))
343         session.commit()
344         session.close()
345
346     @classmethod
347     def scan_all(class_, limit = None):
348         '''
349         The class method scan_all() scans all binaries using multiple threads.
350         The number of binaries to be scanned can be limited with the limit
351         argument. Returns the number of processed and remaining packages as a
352         dict.
353         '''
354         session = DBConn().session()
355         query = session.query(DBBinary).filter(DBBinary.contents == None)
356         remaining = query.count
357         if limit is not None:
358             query = query.limit(limit)
359         processed = query.count()
360         pool = Pool()
361         for binary in query.yield_per(100):
362             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
363         pool.close()
364         pool.join()
365         remaining = remaining()
366         session.close()
367         return { 'processed': processed, 'remaining': remaining }
368
369 def binary_scan_helper(binary_id):
370     '''
371     This function runs in a subprocess.
372     '''
373     scanner = BinaryContentsScanner(binary_id)
374     scanner.scan()
375
376
377 def subprocess_setup():
378     # Python installs a SIGPIPE handler by default. This is usually not what
379     # non-Python subprocesses expect.
380     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
381
382 class UnpackedSource(object):
383     '''
384     UnpackedSource extracts a source package into a temporary location and
385     gives you some convinient function for accessing it.
386     '''
387     def __init__(self, dscfilename):
388         '''
389         The dscfilename is a name of a DSC file that will be extracted.
390         '''
391         temp_directory = mkdtemp(dir = Config()['Dir::TempPath'])
392         self.root_directory = os.path.join(temp_directory, 'root')
393         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
394             dscfilename, self.root_directory)
395         check_call(command, preexec_fn = subprocess_setup)
396
397     def get_root_directory(self):
398         '''
399         Returns the name of the package's root directory which is the directory
400         where the debian subdirectory is located.
401         '''
402         return self.root_directory
403
404     def get_changelog_file(self):
405         '''
406         Returns a file object for debian/changelog or None if no such file exists.
407         '''
408         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
409         try:
410             return open(changelog_name)
411         except IOError:
412             return None
413
414     def get_all_filenames(self):
415         '''
416         Returns an iterator over all filenames. The filenames will be relative
417         to the root directory.
418         '''
419         skip = len(self.root_directory) + 1
420         for root, _, files in os.walk(self.root_directory):
421             for name in files:
422                 yield os.path.join(root[skip:], name)
423
424     def cleanup(self):
425         '''
426         Removes all temporary files.
427         '''
428         if self.root_directory is None:
429             return
430         parent_directory = os.path.dirname(self.root_directory)
431         rmtree(parent_directory)
432         self.root_directory = None
433
434     def __del__(self):
435         '''
436         Enforce cleanup.
437         '''
438         self.cleanup()
439
440
441 class SourceContentsScanner(object):
442     '''
443     SourceContentsScanner provides a method scan() to scan the contents of a
444     DBSource object.
445     '''
446     def __init__(self, source_id):
447         '''
448         The argument source_id is the id of the DBSource object that
449         should be scanned.
450         '''
451         self.source_id = source_id
452
453     def scan(self):
454         '''
455         This method does the actual scan and fills in the associated SrcContents
456         property. It commits any changes to the database.
457         '''
458         session = DBConn().session()
459         source = session.query(DBSource).get(self.source_id)
460         fileset = set(source.scan_contents())
461         for filename in fileset:
462             source.contents.append(SrcContents(file = filename))
463         session.commit()
464         session.close()
465
466     @classmethod
467     def scan_all(class_, limit = None):
468         '''
469         The class method scan_all() scans all source using multiple processes.
470         The number of sources to be scanned can be limited with the limit
471         argument. Returns the number of processed and remaining packages as a
472         dict.
473         '''
474         session = DBConn().session()
475         query = session.query(DBSource).filter(DBSource.contents == None)
476         remaining = query.count
477         if limit is not None:
478             query = query.limit(limit)
479         processed = query.count()
480         pool = Pool()
481         for source in query.yield_per(100):
482             pool.apply_async(source_scan_helper, (source.source_id, ))
483         pool.close()
484         pool.join()
485         remaining = remaining()
486         session.close()
487         return { 'processed': processed, 'remaining': remaining }
488
489 def source_scan_helper(source_id):
490     '''
491     This function runs in a subprocess.
492     '''
493     try:
494         scanner = SourceContentsScanner(source_id)
495         scanner.scan()
496     except Exception, e:
497         print e
498