]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Improve source contents logging a bit.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from shutil import rmtree
33 from subprocess import Popen, PIPE, check_call
34 from tempfile import mkdtemp
35
36 import os.path
37
38 class BinaryContentsWriter(object):
39     '''
40     BinaryContentsWriter writes the Contents-$arch.gz files.
41     '''
42     def __init__(self, suite, architecture, overridetype, component = None):
43         self.suite = suite
44         self.architecture = architecture
45         self.overridetype = overridetype
46         self.component = component
47         self.session = suite.session()
48
49     def query(self):
50         '''
51         Returns a query object that is doing most of the work.
52         '''
53         overridesuite = self.suite
54         if self.suite.overridesuite is not None:
55             overridesuite = get_suite(self.suite.overridesuite, self.session)
56         params = {
57             'suite':         self.suite.suite_id,
58             'overridesuite': overridesuite.suite_id,
59             'arch_all':      get_architecture('all', self.session).arch_id,
60             'arch':          self.architecture.arch_id,
61             'type_id':       self.overridetype.overridetype_id,
62             'type':          self.overridetype.overridetype,
63         }
64
65         if self.component is not None:
66             params['component'] = self.component.component_id
67             sql = '''
68 create temp table newest_binaries (
69     id integer primary key,
70     package text);
71
72 create index newest_binaries_by_package on newest_binaries (package);
73
74 insert into newest_binaries (id, package)
75     select distinct on (package) id, package from binaries
76         where type = :type and
77             (architecture = :arch_all or architecture = :arch) and
78             id in (select bin from bin_associations where suite = :suite)
79         order by package, version desc;
80
81 with
82
83 unique_override as
84     (select o.package, s.section
85         from override o, section s
86         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
87         o.component = :component)
88
89 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
90     from newest_binaries b, bin_contents bc, unique_override o
91     where b.id = bc.binary_id and o.package = b.package
92     group by bc.file'''
93
94         else:
95             sql = '''
96 create temp table newest_binaries (
97     id integer primary key,
98     package text);
99
100 create index newest_binaries_by_package on newest_binaries (package);
101
102 insert into newest_binaries (id, package)
103     select distinct on (package) id, package from binaries
104         where type = :type and
105             (architecture = :arch_all or architecture = :arch) and
106             id in (select bin from bin_associations where suite = :suite)
107         order by package, version desc;
108
109 with
110
111 unique_override as
112     (select distinct on (o.package, s.section) o.package, s.section
113         from override o, section s
114         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
115         order by o.package, s.section, o.modified desc)
116
117 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
118     from newest_binaries b, bin_contents bc, unique_override o
119     where b.id = bc.binary_id and o.package = b.package
120     group by bc.file'''
121
122         return self.session.query("file", "pkglist").from_statement(sql). \
123             params(params)
124
125     def formatline(self, filename, package_list):
126         '''
127         Returns a formatted string for the filename argument.
128         '''
129         return "%-55s %s\n" % (filename, package_list)
130
131     def fetch(self):
132         '''
133         Yields a new line of the Contents-$arch.gz file in filename order.
134         '''
135         for filename, package_list in self.query().yield_per(100):
136             yield self.formatline(filename, package_list)
137         # end transaction to return connection to pool
138         self.session.rollback()
139
140     def get_list(self):
141         '''
142         Returns a list of lines for the Contents-$arch.gz file.
143         '''
144         return [item for item in self.fetch()]
145
146     def output_filename(self):
147         '''
148         Returns the name of the output file.
149         '''
150         values = {
151             'root': Config()['Dir::Root'],
152             'suite': self.suite.suite_name,
153             'architecture': self.architecture.arch_string
154         }
155         if self.component is None:
156             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
157         values['component'] = self.component.component_name
158         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
159
160     def get_header(self):
161         '''
162         Returns the header for the Contents files as a string.
163         '''
164         header_file = None
165         try:
166             filename = os.path.join(Config()['Dir::Templates'], 'contents')
167             header_file = open(filename)
168             return header_file.read()
169         finally:
170             if header_file:
171                 header_file.close()
172
173     def write_file(self):
174         '''
175         Write the output file.
176         '''
177         command = ['gzip', '--rsyncable']
178         final_filename = self.output_filename()
179         temp_filename = final_filename + '.new'
180         output_file = open(temp_filename, 'w')
181         gzip = Popen(command, stdin = PIPE, stdout = output_file)
182         gzip.stdin.write(self.get_header())
183         for item in self.fetch():
184             gzip.stdin.write(item)
185         gzip.stdin.close()
186         output_file.close()
187         gzip.wait()
188         os.chmod(temp_filename, 0664)
189         os.rename(temp_filename, final_filename)
190
191
192 class SourceContentsWriter(object):
193     '''
194     SourceContentsWriter writes the Contents-source.gz files.
195     '''
196     def __init__(self, suite, component):
197         self.suite = suite
198         self.component = component
199         self.session = suite.session()
200
201     def query(self):
202         '''
203         Returns a query object that is doing most of the work.
204         '''
205         params = {
206             'suite_id':     self.suite.suite_id,
207             'component_id': self.component.component_id,
208         }
209
210         sql = '''
211 create temp table newest_sources (
212     id integer primary key,
213     source text);
214
215 create index sources_binaries_by_source on newest_sources (source);
216
217 insert into newest_sources (id, source)
218     select distinct on (source) s.id, s.source from source s
219         join files f on f.id = s.file
220         join location l on l.id = f.location
221         where s.id in (select source from src_associations where suite = :suite_id)
222             and l.component = :component_id
223         order by source, version desc;
224
225 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
226     from newest_sources s, src_contents sc
227     where s.id = sc.source_id group by sc.file'''
228
229         return self.session.query("file", "pkglist").from_statement(sql). \
230             params(params)
231
232     def formatline(self, filename, package_list):
233         '''
234         Returns a formatted string for the filename argument.
235         '''
236         return "%s\t%s\n" % (filename, package_list)
237
238     def fetch(self):
239         '''
240         Yields a new line of the Contents-source.gz file in filename order.
241         '''
242         for filename, package_list in self.query().yield_per(100):
243             yield self.formatline(filename, package_list)
244         # end transaction to return connection to pool
245         self.session.rollback()
246
247     def get_list(self):
248         '''
249         Returns a list of lines for the Contents-source.gz file.
250         '''
251         return [item for item in self.fetch()]
252
253     def output_filename(self):
254         '''
255         Returns the name of the output file.
256         '''
257         values = {
258             'root':      Config()['Dir::Root'],
259             'suite':     self.suite.suite_name,
260             'component': self.component.component_name
261         }
262         return "%(root)s/dists/%(suite)s/%(component)s/Contents-source.gz" % values
263
264     def write_file(self):
265         '''
266         Write the output file.
267         '''
268         command = ['gzip', '--rsyncable']
269         final_filename = self.output_filename()
270         temp_filename = final_filename + '.new'
271         output_file = open(temp_filename, 'w')
272         gzip = Popen(command, stdin = PIPE, stdout = output_file)
273         for item in self.fetch():
274             gzip.stdin.write(item)
275         gzip.stdin.close()
276         output_file.close()
277         gzip.wait()
278         os.chmod(temp_filename, 0664)
279         os.rename(temp_filename, final_filename)
280
281
282 def binary_helper(suite_id, arch_id, overridetype_id, component_id = None):
283     '''
284     This function is called in a new subprocess and multiprocessing wants a top
285     level function.
286     '''
287     session = DBConn().session()
288     suite = Suite.get(suite_id, session)
289     architecture = Architecture.get(arch_id, session)
290     overridetype = OverrideType.get(overridetype_id, session)
291     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
292     if component_id is None:
293         component = None
294     else:
295         component = Component.get(component_id, session)
296         log_message.append(component.component_name)
297     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
298     contents_writer.write_file()
299     return log_message
300
301 def source_helper(suite_id, component_id):
302     '''
303     This function is called in a new subprocess and multiprocessing wants a top
304     level function.
305     '''
306     session = DBConn().session()
307     suite = Suite.get(suite_id, session)
308     component = Component.get(component_id, session)
309     log_message = [suite.suite_name, 'source', component.component_name]
310     contents_writer = SourceContentsWriter(suite, component)
311     contents_writer.write_file()
312     return log_message
313
314 class ContentsWriter(object):
315     '''
316     Loop over all suites, architectures, overridetypes, and components to write
317     all contents files.
318     '''
319     @classmethod
320     def log_result(class_, result):
321         '''
322         Writes a result message to the logfile.
323         '''
324         class_.logger.log(result)
325
326     @classmethod
327     def write_all(class_, logger, suite_names = [], force = False):
328         '''
329         Writes all Contents files for suites in list suite_names which defaults
330         to all 'touchable' suites if not specified explicitely. Untouchable
331         suites will be included if the force argument is set to True.
332         '''
333         class_.logger = logger
334         session = DBConn().session()
335         suite_query = session.query(Suite)
336         if len(suite_names) > 0:
337             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
338         if not force:
339             suite_query = suite_query.filter_by(untouchable = False)
340         deb_id = get_override_type('deb', session).overridetype_id
341         udeb_id = get_override_type('udeb', session).overridetype_id
342         main_id = get_component('main', session).component_id
343         contrib_id = get_component('contrib', session).component_id
344         non_free_id = get_component('non-free', session).component_id
345         pool = Pool()
346         for suite in suite_query:
347             suite_id = suite.suite_id
348             # handle source packages
349             pool.apply_async(source_helper, (suite_id, main_id),
350                 callback = class_.log_result)
351             pool.apply_async(source_helper, (suite_id, contrib_id),
352                 callback = class_.log_result)
353             pool.apply_async(source_helper, (suite_id, non_free_id),
354                 callback = class_.log_result)
355             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
356                 arch_id = architecture.arch_id
357                 # handle 'deb' packages
358                 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id), \
359                     callback = class_.log_result)
360                 # handle 'udeb' packages for 'main' and 'non-free'
361                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, main_id), \
362                     callback = class_.log_result)
363                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, non_free_id), \
364                     callback = class_.log_result)
365         pool.close()
366         pool.join()
367         session.close()
368
369
370 class BinaryContentsScanner(object):
371     '''
372     BinaryContentsScanner provides a threadsafe method scan() to scan the
373     contents of a DBBinary object.
374     '''
375     def __init__(self, binary_id):
376         '''
377         The argument binary_id is the id of the DBBinary object that
378         should be scanned.
379         '''
380         self.binary_id = binary_id
381
382     def scan(self, dummy_arg = None):
383         '''
384         This method does the actual scan and fills in the associated BinContents
385         property. It commits any changes to the database. The argument dummy_arg
386         is ignored but needed by our threadpool implementation.
387         '''
388         session = DBConn().session()
389         binary = session.query(DBBinary).get(self.binary_id)
390         fileset = set(binary.scan_contents())
391         if len(fileset) == 0:
392             fileset.add('EMPTY_PACKAGE')
393         for filename in fileset:
394             binary.contents.append(BinContents(file = filename))
395         session.commit()
396         session.close()
397
398     @classmethod
399     def scan_all(class_, limit = None):
400         '''
401         The class method scan_all() scans all binaries using multiple threads.
402         The number of binaries to be scanned can be limited with the limit
403         argument. Returns the number of processed and remaining packages as a
404         dict.
405         '''
406         session = DBConn().session()
407         query = session.query(DBBinary).filter(DBBinary.contents == None)
408         remaining = query.count
409         if limit is not None:
410             query = query.limit(limit)
411         processed = query.count()
412         pool = Pool()
413         for binary in query.yield_per(100):
414             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
415         pool.close()
416         pool.join()
417         remaining = remaining()
418         session.close()
419         return { 'processed': processed, 'remaining': remaining }
420
421 def binary_scan_helper(binary_id):
422     '''
423     This function runs in a subprocess.
424     '''
425     scanner = BinaryContentsScanner(binary_id)
426     scanner.scan()
427
428
429 class UnpackedSource(object):
430     '''
431     UnpackedSource extracts a source package into a temporary location and
432     gives you some convinient function for accessing it.
433     '''
434     def __init__(self, dscfilename):
435         '''
436         The dscfilename is a name of a DSC file that will be extracted.
437         '''
438         self.root_directory = os.path.join(mkdtemp(), 'root')
439         command = ('dpkg-source', '--no-copy', '--no-check', '-x', dscfilename,
440             self.root_directory)
441         # dpkg-source does not have a --quiet option
442         devnull = open(os.devnull, 'w')
443         check_call(command, stdout = devnull, stderr = devnull)
444         devnull.close()
445
446     def get_root_directory(self):
447         '''
448         Returns the name of the package's root directory which is the directory
449         where the debian subdirectory is located.
450         '''
451         return self.root_directory
452
453     def get_changelog_file(self):
454         '''
455         Returns a file object for debian/changelog or None if no such file exists.
456         '''
457         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
458         try:
459             return open(changelog_name)
460         except IOError:
461             return None
462
463     def get_all_filenames(self):
464         '''
465         Returns an iterator over all filenames. The filenames will be relative
466         to the root directory.
467         '''
468         skip = len(self.root_directory) + 1
469         for root, _, files in os.walk(self.root_directory):
470             for name in files:
471                 yield os.path.join(root[skip:], name)
472
473     def cleanup(self):
474         '''
475         Removes all temporary files.
476         '''
477         if self.root_directory is None:
478             return
479         parent_directory = os.path.dirname(self.root_directory)
480         rmtree(parent_directory)
481         self.root_directory = None
482
483     def __del__(self):
484         '''
485         Enforce cleanup.
486         '''
487         self.cleanup()
488
489
490 class SourceContentsScanner(object):
491     '''
492     SourceContentsScanner provides a method scan() to scan the contents of a
493     DBSource object.
494     '''
495     def __init__(self, source_id):
496         '''
497         The argument source_id is the id of the DBSource object that
498         should be scanned.
499         '''
500         self.source_id = source_id
501
502     def scan(self):
503         '''
504         This method does the actual scan and fills in the associated SrcContents
505         property. It commits any changes to the database.
506         '''
507         session = DBConn().session()
508         source = session.query(DBSource).get(self.source_id)
509         fileset = set(source.scan_contents())
510         for filename in fileset:
511             source.contents.append(SrcContents(file = filename))
512         session.commit()
513         session.close()
514
515     @classmethod
516     def scan_all(class_, limit = None):
517         '''
518         The class method scan_all() scans all source using multiple processes.
519         The number of sources to be scanned can be limited with the limit
520         argument. Returns the number of processed and remaining packages as a
521         dict.
522         '''
523         session = DBConn().session()
524         query = session.query(DBSource).filter(DBSource.contents == None)
525         remaining = query.count
526         if limit is not None:
527             query = query.limit(limit)
528         processed = query.count()
529         pool = Pool()
530         for source in query.yield_per(100):
531             pool.apply_async(source_scan_helper, (source.source_id, ))
532         pool.close()
533         pool.join()
534         remaining = remaining()
535         session.close()
536         return { 'processed': processed, 'remaining': remaining }
537
538 def source_scan_helper(source_id):
539     '''
540     This function runs in a subprocess.
541     '''
542     try:
543         scanner = SourceContentsScanner(source_id)
544         scanner.scan()
545     except Exception, e:
546         print e
547