]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Add class SourceContentsWriter.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from shutil import rmtree
33 from subprocess import Popen, PIPE, check_call
34 from tempfile import mkdtemp
35
36 import os.path
37
38 class BinaryContentsWriter(object):
39     '''
40     BinaryContentsWriter writes the Contents-$arch.gz files.
41     '''
42     def __init__(self, suite, architecture, overridetype, component = None):
43         self.suite = suite
44         self.architecture = architecture
45         self.overridetype = overridetype
46         self.component = component
47         self.session = suite.session()
48
49     def query(self):
50         '''
51         Returns a query object that is doing most of the work.
52         '''
53         overridesuite = self.suite
54         if self.suite.overridesuite is not None:
55             overridesuite = get_suite(self.suite.overridesuite, self.session)
56         params = {
57             'suite':         self.suite.suite_id,
58             'overridesuite': overridesuite.suite_id,
59             'arch_all':      get_architecture('all', self.session).arch_id,
60             'arch':          self.architecture.arch_id,
61             'type_id':       self.overridetype.overridetype_id,
62             'type':          self.overridetype.overridetype,
63         }
64
65         if self.component is not None:
66             params['component'] = self.component.component_id
67             sql = '''
68 create temp table newest_binaries (
69     id integer primary key,
70     package text);
71
72 create index newest_binaries_by_package on newest_binaries (package);
73
74 insert into newest_binaries (id, package)
75     select distinct on (package) id, package from binaries
76         where type = :type and
77             (architecture = :arch_all or architecture = :arch) and
78             id in (select bin from bin_associations where suite = :suite)
79         order by package, version desc;
80
81 with
82
83 unique_override as
84     (select o.package, s.section
85         from override o, section s
86         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
87         o.component = :component)
88
89 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
90     from newest_binaries b, bin_contents bc, unique_override o
91     where b.id = bc.binary_id and o.package = b.package
92     group by bc.file'''
93
94         else:
95             sql = '''
96 create temp table newest_binaries (
97     id integer primary key,
98     package text);
99
100 create index newest_binaries_by_package on newest_binaries (package);
101
102 insert into newest_binaries (id, package)
103     select distinct on (package) id, package from binaries
104         where type = :type and
105             (architecture = :arch_all or architecture = :arch) and
106             id in (select bin from bin_associations where suite = :suite)
107         order by package, version desc;
108
109 with
110
111 unique_override as
112     (select distinct on (o.package, s.section) o.package, s.section
113         from override o, section s
114         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
115         order by o.package, s.section, o.modified desc)
116
117 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
118     from newest_binaries b, bin_contents bc, unique_override o
119     where b.id = bc.binary_id and o.package = b.package
120     group by bc.file'''
121
122         return self.session.query("file", "pkglist").from_statement(sql). \
123             params(params)
124
125     def formatline(self, filename, package_list):
126         '''
127         Returns a formatted string for the filename argument.
128         '''
129         return "%-55s %s\n" % (filename, package_list)
130
131     def fetch(self):
132         '''
133         Yields a new line of the Contents-$arch.gz file in filename order.
134         '''
135         for filename, package_list in self.query().yield_per(100):
136             yield self.formatline(filename, package_list)
137         # end transaction to return connection to pool
138         self.session.rollback()
139
140     def get_list(self):
141         '''
142         Returns a list of lines for the Contents-$arch.gz file.
143         '''
144         return [item for item in self.fetch()]
145
146     def output_filename(self):
147         '''
148         Returns the name of the output file.
149         '''
150         values = {
151             'root': Config()['Dir::Root'],
152             'suite': self.suite.suite_name,
153             'architecture': self.architecture.arch_string
154         }
155         if self.component is None:
156             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
157         values['component'] = self.component.component_name
158         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
159
160     def get_header(self):
161         '''
162         Returns the header for the Contents files as a string.
163         '''
164         header_file = None
165         try:
166             filename = os.path.join(Config()['Dir::Templates'], 'contents')
167             header_file = open(filename)
168             return header_file.read()
169         finally:
170             if header_file:
171                 header_file.close()
172
173     def write_file(self):
174         '''
175         Write the output file.
176         '''
177         command = ['gzip', '--rsyncable']
178         final_filename = self.output_filename()
179         temp_filename = final_filename + '.new'
180         output_file = open(temp_filename, 'w')
181         gzip = Popen(command, stdin = PIPE, stdout = output_file)
182         gzip.stdin.write(self.get_header())
183         for item in self.fetch():
184             gzip.stdin.write(item)
185         gzip.stdin.close()
186         output_file.close()
187         gzip.wait()
188         os.chmod(temp_filename, 0664)
189         os.rename(temp_filename, final_filename)
190
191
192 class SourceContentsWriter(object):
193     '''
194     SourceContentsWriter writes the Contents-source.gz files.
195     '''
196     def __init__(self, suite, component):
197         self.suite = suite
198         self.component = component
199         self.session = suite.session()
200
201     def query(self):
202         '''
203         Returns a query object that is doing most of the work.
204         '''
205         params = {
206             'suite_id':     self.suite.suite_id,
207             'component_id': self.component.component_id,
208         }
209
210         sql = '''
211 create temp table newest_sources (
212     id integer primary key,
213     source text);
214
215 create index sources_binaries_by_source on newest_sources (source);
216
217 insert into newest_sources (id, source)
218     select distinct on (source) s.id, s.source from source s
219         join files f on f.id = s.file
220         join location l on l.id = f.location
221         where s.id in (select source from src_associations where suite = :suite_id)
222             and l.component = :component_id
223         order by source, version desc;
224
225 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
226     from newest_sources s, src_contents sc
227     where s.id = sc.source_id group by sc.file'''
228
229         return self.session.query("file", "pkglist").from_statement(sql). \
230             params(params)
231
232     def formatline(self, filename, package_list):
233         '''
234         Returns a formatted string for the filename argument.
235         '''
236         return "%s\t%s\n" % (filename, package_list)
237
238     def fetch(self):
239         '''
240         Yields a new line of the Contents-source.gz file in filename order.
241         '''
242         for filename, package_list in self.query().yield_per(100):
243             yield self.formatline(filename, package_list)
244         # end transaction to return connection to pool
245         self.session.rollback()
246
247     def get_list(self):
248         '''
249         Returns a list of lines for the Contents-source.gz file.
250         '''
251         return [item for item in self.fetch()]
252
253     def output_filename(self):
254         '''
255         Returns the name of the output file.
256         '''
257         values = {
258             'root':      Config()['Dir::Root'],
259             'suite':     self.suite.suite_name,
260             'component': self.component.component_name
261         }
262         return "%(root)s/dists/%(suite)s/%(component)s/Contents-source.gz" % values
263
264     def write_file(self):
265         '''
266         Write the output file.
267         '''
268         command = ['gzip', '--rsyncable']
269         final_filename = self.output_filename()
270         temp_filename = final_filename + '.new'
271         output_file = open(temp_filename, 'w')
272         gzip = Popen(command, stdin = PIPE, stdout = output_file)
273         for item in self.fetch():
274             gzip.stdin.write(item)
275         gzip.stdin.close()
276         output_file.close()
277         gzip.wait()
278         os.chmod(temp_filename, 0664)
279         os.rename(temp_filename, final_filename)
280
281
282 def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
283     '''
284     This function is called in a new subprocess.
285     '''
286     session = DBConn().session()
287     suite = Suite.get(suite_id, session)
288     architecture = Architecture.get(arch_id, session)
289     overridetype = OverrideType.get(overridetype_id, session)
290     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
291     if component_id is None:
292         component = None
293     else:
294         component = Component.get(component_id, session)
295         log_message.append(component.component_name)
296     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
297     contents_writer.write_file()
298     return log_message
299
300 class ContentsWriter(object):
301     '''
302     Loop over all suites, architectures, overridetypes, and components to write
303     all contents files.
304     '''
305     @classmethod
306     def log_result(class_, result):
307         '''
308         Writes a result message to the logfile.
309         '''
310         class_.logger.log(result)
311
312     @classmethod
313     def write_all(class_, logger, suite_names = [], force = False):
314         '''
315         Writes all Contents files for suites in list suite_names which defaults
316         to all 'touchable' suites if not specified explicitely. Untouchable
317         suites will be included if the force argument is set to True.
318         '''
319         class_.logger = logger
320         session = DBConn().session()
321         suite_query = session.query(Suite)
322         if len(suite_names) > 0:
323             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
324         if not force:
325             suite_query = suite_query.filter_by(untouchable = False)
326         deb_id = get_override_type('deb', session).overridetype_id
327         udeb_id = get_override_type('udeb', session).overridetype_id
328         main_id = get_component('main', session).component_id
329         non_free_id = get_component('non-free', session).component_id
330         pool = Pool()
331         for suite in suite_query:
332             suite_id = suite.suite_id
333             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
334                 arch_id = architecture.arch_id
335                 # handle 'deb' packages
336                 pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
337                     callback = class_.log_result)
338                 # handle 'udeb' packages for 'main' and 'non-free'
339                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
340                     callback = class_.log_result)
341                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
342                     callback = class_.log_result)
343         pool.close()
344         pool.join()
345         session.close()
346
347
348 class BinaryContentsScanner(object):
349     '''
350     BinaryContentsScanner provides a threadsafe method scan() to scan the
351     contents of a DBBinary object.
352     '''
353     def __init__(self, binary_id):
354         '''
355         The argument binary_id is the id of the DBBinary object that
356         should be scanned.
357         '''
358         self.binary_id = binary_id
359
360     def scan(self, dummy_arg = None):
361         '''
362         This method does the actual scan and fills in the associated BinContents
363         property. It commits any changes to the database. The argument dummy_arg
364         is ignored but needed by our threadpool implementation.
365         '''
366         session = DBConn().session()
367         binary = session.query(DBBinary).get(self.binary_id)
368         fileset = set(binary.scan_contents())
369         if len(fileset) == 0:
370             fileset.add('EMPTY_PACKAGE')
371         for filename in fileset:
372             binary.contents.append(BinContents(file = filename))
373         session.commit()
374         session.close()
375
376     @classmethod
377     def scan_all(class_, limit = None):
378         '''
379         The class method scan_all() scans all binaries using multiple threads.
380         The number of binaries to be scanned can be limited with the limit
381         argument. Returns the number of processed and remaining packages as a
382         dict.
383         '''
384         session = DBConn().session()
385         query = session.query(DBBinary).filter(DBBinary.contents == None)
386         remaining = query.count
387         if limit is not None:
388             query = query.limit(limit)
389         processed = query.count()
390         pool = Pool()
391         for binary in query.yield_per(100):
392             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
393         pool.close()
394         pool.join()
395         remaining = remaining()
396         session.close()
397         return { 'processed': processed, 'remaining': remaining }
398
399 def binary_scan_helper(binary_id):
400     '''
401     This function runs in a subprocess.
402     '''
403     scanner = BinaryContentsScanner(binary_id)
404     scanner.scan()
405
406
407 class UnpackedSource(object):
408     '''
409     UnpackedSource extracts a source package into a temporary location and
410     gives you some convinient function for accessing it.
411     '''
412     def __init__(self, dscfilename):
413         '''
414         The dscfilename is a name of a DSC file that will be extracted.
415         '''
416         self.root_directory = os.path.join(mkdtemp(), 'root')
417         command = ('dpkg-source', '--no-copy', '--no-check', '-x', dscfilename,
418             self.root_directory)
419         # dpkg-source does not have a --quiet option
420         devnull = open(os.devnull, 'w')
421         check_call(command, stdout = devnull, stderr = devnull)
422         devnull.close()
423
424     def get_root_directory(self):
425         '''
426         Returns the name of the package's root directory which is the directory
427         where the debian subdirectory is located.
428         '''
429         return self.root_directory
430
431     def get_changelog_file(self):
432         '''
433         Returns a file object for debian/changelog or None if no such file exists.
434         '''
435         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
436         try:
437             return open(changelog_name)
438         except IOError:
439             return None
440
441     def get_all_filenames(self):
442         '''
443         Returns an iterator over all filenames. The filenames will be relative
444         to the root directory.
445         '''
446         skip = len(self.root_directory) + 1
447         for root, _, files in os.walk(self.root_directory):
448             for name in files:
449                 yield os.path.join(root[skip:], name)
450
451     def cleanup(self):
452         '''
453         Removes all temporary files.
454         '''
455         if self.root_directory is None:
456             return
457         parent_directory = os.path.dirname(self.root_directory)
458         rmtree(parent_directory)
459         self.root_directory = None
460
461     def __del__(self):
462         '''
463         Enforce cleanup.
464         '''
465         self.cleanup()
466
467
468 class SourceContentsScanner(object):
469     '''
470     SourceContentsScanner provides a method scan() to scan the contents of a
471     DBSource object.
472     '''
473     def __init__(self, source_id):
474         '''
475         The argument source_id is the id of the DBSource object that
476         should be scanned.
477         '''
478         self.source_id = source_id
479
480     def scan(self):
481         '''
482         This method does the actual scan and fills in the associated SrcContents
483         property. It commits any changes to the database.
484         '''
485         session = DBConn().session()
486         source = session.query(DBSource).get(self.source_id)
487         fileset = set(source.scan_contents())
488         for filename in fileset:
489             source.contents.append(SrcContents(file = filename))
490         session.commit()
491         session.close()
492
493     @classmethod
494     def scan_all(class_, limit = None):
495         '''
496         The class method scan_all() scans all source using multiple processes.
497         The number of sources to be scanned can be limited with the limit
498         argument. Returns the number of processed and remaining packages as a
499         dict.
500         '''
501         session = DBConn().session()
502         query = session.query(DBSource).filter(DBSource.contents == None)
503         remaining = query.count
504         if limit is not None:
505             query = query.limit(limit)
506         processed = query.count()
507         pool = Pool()
508         for source in query.yield_per(100):
509             pool.apply_async(source_scan_helper, (source.source_id, ))
510         pool.close()
511         pool.join()
512         remaining = remaining()
513         session.close()
514         return { 'processed': processed, 'remaining': remaining }
515
516 def source_scan_helper(source_id):
517     '''
518     This function runs in a subprocess.
519     '''
520     try:
521         scanner = SourceContentsScanner(source_id)
522         scanner.scan()
523     except Exception, e:
524         print e
525