]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Merge remote branch 'mhy/g-r' into merge
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from shutil import rmtree
33 from subprocess import Popen, PIPE, check_call
34 from tempfile import mkdtemp
35
36 import os.path
37 import signal
38
39 class BinaryContentsWriter(object):
40     '''
41     BinaryContentsWriter writes the Contents-$arch.gz files.
42     '''
43     def __init__(self, suite, architecture, overridetype, component = None):
44         self.suite = suite
45         self.architecture = architecture
46         self.overridetype = overridetype
47         self.component = component
48         self.session = suite.session()
49
50     def query(self):
51         '''
52         Returns a query object that is doing most of the work.
53         '''
54         overridesuite = self.suite
55         if self.suite.overridesuite is not None:
56             overridesuite = get_suite(self.suite.overridesuite, self.session)
57         params = {
58             'suite':         self.suite.suite_id,
59             'overridesuite': overridesuite.suite_id,
60             'arch_all':      get_architecture('all', self.session).arch_id,
61             'arch':          self.architecture.arch_id,
62             'type_id':       self.overridetype.overridetype_id,
63             'type':          self.overridetype.overridetype,
64         }
65
66         if self.component is not None:
67             params['component'] = self.component.component_id
68             sql = '''
69 create temp table newest_binaries (
70     id integer primary key,
71     package text);
72
73 create index newest_binaries_by_package on newest_binaries (package);
74
75 insert into newest_binaries (id, package)
76     select distinct on (package) id, package from binaries
77         where type = :type and
78             (architecture = :arch_all or architecture = :arch) and
79             id in (select bin from bin_associations where suite = :suite)
80         order by package, version desc;
81
82 with
83
84 unique_override as
85     (select o.package, s.section
86         from override o, section s
87         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
88         o.component = :component)
89
90 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
91     from newest_binaries b, bin_contents bc, unique_override o
92     where b.id = bc.binary_id and o.package = b.package
93     group by bc.file'''
94
95         else:
96             sql = '''
97 create temp table newest_binaries (
98     id integer primary key,
99     package text);
100
101 create index newest_binaries_by_package on newest_binaries (package);
102
103 insert into newest_binaries (id, package)
104     select distinct on (package) id, package from binaries
105         where type = :type and
106             (architecture = :arch_all or architecture = :arch) and
107             id in (select bin from bin_associations where suite = :suite)
108         order by package, version desc;
109
110 with
111
112 unique_override as
113     (select distinct on (o.package, s.section) o.package, s.section
114         from override o, section s
115         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
116         order by o.package, s.section, o.modified desc)
117
118 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
119     from newest_binaries b, bin_contents bc, unique_override o
120     where b.id = bc.binary_id and o.package = b.package
121     group by bc.file'''
122
123         return self.session.query("file", "pkglist").from_statement(sql). \
124             params(params)
125
126     def formatline(self, filename, package_list):
127         '''
128         Returns a formatted string for the filename argument.
129         '''
130         return "%-55s %s\n" % (filename, package_list)
131
132     def fetch(self):
133         '''
134         Yields a new line of the Contents-$arch.gz file in filename order.
135         '''
136         for filename, package_list in self.query().yield_per(100):
137             yield self.formatline(filename, package_list)
138         # end transaction to return connection to pool
139         self.session.rollback()
140
141     def get_list(self):
142         '''
143         Returns a list of lines for the Contents-$arch.gz file.
144         '''
145         return [item for item in self.fetch()]
146
147     def output_filename(self):
148         '''
149         Returns the name of the output file.
150         '''
151         values = {
152             'root': Config()['Dir::Root'],
153             'suite': self.suite.suite_name,
154             'architecture': self.architecture.arch_string
155         }
156         if self.component is None:
157             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
158         values['component'] = self.component.component_name
159         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
160
161     def get_header(self):
162         '''
163         Returns the header for the Contents files as a string.
164         '''
165         header_file = None
166         try:
167             filename = os.path.join(Config()['Dir::Templates'], 'contents')
168             header_file = open(filename)
169             return header_file.read()
170         finally:
171             if header_file:
172                 header_file.close()
173
174     def write_file(self):
175         '''
176         Write the output file.
177         '''
178         command = ['gzip', '--rsyncable']
179         final_filename = self.output_filename()
180         temp_filename = final_filename + '.new'
181         output_file = open(temp_filename, 'w')
182         gzip = Popen(command, stdin = PIPE, stdout = output_file)
183         gzip.stdin.write(self.get_header())
184         for item in self.fetch():
185             gzip.stdin.write(item)
186         gzip.stdin.close()
187         output_file.close()
188         gzip.wait()
189         os.chmod(temp_filename, 0664)
190         os.rename(temp_filename, final_filename)
191
192
193 class SourceContentsWriter(object):
194     '''
195     SourceContentsWriter writes the Contents-source.gz files.
196     '''
197     def __init__(self, suite, component):
198         self.suite = suite
199         self.component = component
200         self.session = suite.session()
201
202     def query(self):
203         '''
204         Returns a query object that is doing most of the work.
205         '''
206         params = {
207             'suite_id':     self.suite.suite_id,
208             'component_id': self.component.component_id,
209         }
210
211         sql = '''
212 create temp table newest_sources (
213     id integer primary key,
214     source text);
215
216 create index sources_binaries_by_source on newest_sources (source);
217
218 insert into newest_sources (id, source)
219     select distinct on (source) s.id, s.source from source s
220         join files f on f.id = s.file
221         join location l on l.id = f.location
222         where s.id in (select source from src_associations where suite = :suite_id)
223             and l.component = :component_id
224         order by source, version desc;
225
226 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
227     from newest_sources s, src_contents sc
228     where s.id = sc.source_id group by sc.file'''
229
230         return self.session.query("file", "pkglist").from_statement(sql). \
231             params(params)
232
233     def formatline(self, filename, package_list):
234         '''
235         Returns a formatted string for the filename argument.
236         '''
237         return "%s\t%s\n" % (filename, package_list)
238
239     def fetch(self):
240         '''
241         Yields a new line of the Contents-source.gz file in filename order.
242         '''
243         for filename, package_list in self.query().yield_per(100):
244             yield self.formatline(filename, package_list)
245         # end transaction to return connection to pool
246         self.session.rollback()
247
248     def get_list(self):
249         '''
250         Returns a list of lines for the Contents-source.gz file.
251         '''
252         return [item for item in self.fetch()]
253
254     def output_filename(self):
255         '''
256         Returns the name of the output file.
257         '''
258         values = {
259             'root':      Config()['Dir::Root'],
260             'suite':     self.suite.suite_name,
261             'component': self.component.component_name
262         }
263         return "%(root)s/dists/%(suite)s/%(component)s/Contents-source.gz" % values
264
265     def write_file(self):
266         '''
267         Write the output file.
268         '''
269         command = ['gzip', '--rsyncable']
270         final_filename = self.output_filename()
271         temp_filename = final_filename + '.new'
272         output_file = open(temp_filename, 'w')
273         gzip = Popen(command, stdin = PIPE, stdout = output_file)
274         for item in self.fetch():
275             gzip.stdin.write(item)
276         gzip.stdin.close()
277         output_file.close()
278         gzip.wait()
279         os.chmod(temp_filename, 0664)
280         os.rename(temp_filename, final_filename)
281
282
283 def binary_helper(suite_id, arch_id, overridetype_id, component_id = None):
284     '''
285     This function is called in a new subprocess and multiprocessing wants a top
286     level function.
287     '''
288     session = DBConn().session()
289     suite = Suite.get(suite_id, session)
290     architecture = Architecture.get(arch_id, session)
291     overridetype = OverrideType.get(overridetype_id, session)
292     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
293     if component_id is None:
294         component = None
295     else:
296         component = Component.get(component_id, session)
297         log_message.append(component.component_name)
298     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
299     contents_writer.write_file()
300     return log_message
301
302 def source_helper(suite_id, component_id):
303     '''
304     This function is called in a new subprocess and multiprocessing wants a top
305     level function.
306     '''
307     session = DBConn().session()
308     suite = Suite.get(suite_id, session)
309     component = Component.get(component_id, session)
310     log_message = [suite.suite_name, 'source', component.component_name]
311     contents_writer = SourceContentsWriter(suite, component)
312     contents_writer.write_file()
313     return log_message
314
315 class ContentsWriter(object):
316     '''
317     Loop over all suites, architectures, overridetypes, and components to write
318     all contents files.
319     '''
320     @classmethod
321     def log_result(class_, result):
322         '''
323         Writes a result message to the logfile.
324         '''
325         class_.logger.log(result)
326
327     @classmethod
328     def write_all(class_, logger, suite_names = [], force = False):
329         '''
330         Writes all Contents files for suites in list suite_names which defaults
331         to all 'touchable' suites if not specified explicitely. Untouchable
332         suites will be included if the force argument is set to True.
333         '''
334         class_.logger = logger
335         session = DBConn().session()
336         suite_query = session.query(Suite)
337         if len(suite_names) > 0:
338             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
339         if not force:
340             suite_query = suite_query.filter_by(untouchable = False)
341         deb_id = get_override_type('deb', session).overridetype_id
342         udeb_id = get_override_type('udeb', session).overridetype_id
343         main_id = get_component('main', session).component_id
344         contrib_id = get_component('contrib', session).component_id
345         non_free_id = get_component('non-free', session).component_id
346         pool = Pool()
347         for suite in suite_query:
348             suite_id = suite.suite_id
349             # handle source packages
350             pool.apply_async(source_helper, (suite_id, main_id),
351                 callback = class_.log_result)
352             pool.apply_async(source_helper, (suite_id, contrib_id),
353                 callback = class_.log_result)
354             pool.apply_async(source_helper, (suite_id, non_free_id),
355                 callback = class_.log_result)
356             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
357                 arch_id = architecture.arch_id
358                 # handle 'deb' packages
359                 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id), \
360                     callback = class_.log_result)
361                 # handle 'udeb' packages for 'main' and 'non-free'
362                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, main_id), \
363                     callback = class_.log_result)
364                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, non_free_id), \
365                     callback = class_.log_result)
366         pool.close()
367         pool.join()
368         session.close()
369
370
371 class BinaryContentsScanner(object):
372     '''
373     BinaryContentsScanner provides a threadsafe method scan() to scan the
374     contents of a DBBinary object.
375     '''
376     def __init__(self, binary_id):
377         '''
378         The argument binary_id is the id of the DBBinary object that
379         should be scanned.
380         '''
381         self.binary_id = binary_id
382
383     def scan(self, dummy_arg = None):
384         '''
385         This method does the actual scan and fills in the associated BinContents
386         property. It commits any changes to the database. The argument dummy_arg
387         is ignored but needed by our threadpool implementation.
388         '''
389         session = DBConn().session()
390         binary = session.query(DBBinary).get(self.binary_id)
391         fileset = set(binary.scan_contents())
392         if len(fileset) == 0:
393             fileset.add('EMPTY_PACKAGE')
394         for filename in fileset:
395             binary.contents.append(BinContents(file = filename))
396         session.commit()
397         session.close()
398
399     @classmethod
400     def scan_all(class_, limit = None):
401         '''
402         The class method scan_all() scans all binaries using multiple threads.
403         The number of binaries to be scanned can be limited with the limit
404         argument. Returns the number of processed and remaining packages as a
405         dict.
406         '''
407         session = DBConn().session()
408         query = session.query(DBBinary).filter(DBBinary.contents == None)
409         remaining = query.count
410         if limit is not None:
411             query = query.limit(limit)
412         processed = query.count()
413         pool = Pool()
414         for binary in query.yield_per(100):
415             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
416         pool.close()
417         pool.join()
418         remaining = remaining()
419         session.close()
420         return { 'processed': processed, 'remaining': remaining }
421
422 def binary_scan_helper(binary_id):
423     '''
424     This function runs in a subprocess.
425     '''
426     scanner = BinaryContentsScanner(binary_id)
427     scanner.scan()
428
429
430 def subprocess_setup():
431     # Python installs a SIGPIPE handler by default. This is usually not what
432     # non-Python subprocesses expect.
433     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
434
435 class UnpackedSource(object):
436     '''
437     UnpackedSource extracts a source package into a temporary location and
438     gives you some convinient function for accessing it.
439     '''
440     def __init__(self, dscfilename):
441         '''
442         The dscfilename is a name of a DSC file that will be extracted.
443         '''
444         self.root_directory = os.path.join(mkdtemp(), 'root')
445         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
446             dscfilename, self.root_directory)
447         check_call(command, preexec_fn = subprocess_setup)
448
449     def get_root_directory(self):
450         '''
451         Returns the name of the package's root directory which is the directory
452         where the debian subdirectory is located.
453         '''
454         return self.root_directory
455
456     def get_changelog_file(self):
457         '''
458         Returns a file object for debian/changelog or None if no such file exists.
459         '''
460         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
461         try:
462             return open(changelog_name)
463         except IOError:
464             return None
465
466     def get_all_filenames(self):
467         '''
468         Returns an iterator over all filenames. The filenames will be relative
469         to the root directory.
470         '''
471         skip = len(self.root_directory) + 1
472         for root, _, files in os.walk(self.root_directory):
473             for name in files:
474                 yield os.path.join(root[skip:], name)
475
476     def cleanup(self):
477         '''
478         Removes all temporary files.
479         '''
480         if self.root_directory is None:
481             return
482         parent_directory = os.path.dirname(self.root_directory)
483         rmtree(parent_directory)
484         self.root_directory = None
485
486     def __del__(self):
487         '''
488         Enforce cleanup.
489         '''
490         self.cleanup()
491
492
493 class SourceContentsScanner(object):
494     '''
495     SourceContentsScanner provides a method scan() to scan the contents of a
496     DBSource object.
497     '''
498     def __init__(self, source_id):
499         '''
500         The argument source_id is the id of the DBSource object that
501         should be scanned.
502         '''
503         self.source_id = source_id
504
505     def scan(self):
506         '''
507         This method does the actual scan and fills in the associated SrcContents
508         property. It commits any changes to the database.
509         '''
510         session = DBConn().session()
511         source = session.query(DBSource).get(self.source_id)
512         fileset = set(source.scan_contents())
513         for filename in fileset:
514             source.contents.append(SrcContents(file = filename))
515         session.commit()
516         session.close()
517
518     @classmethod
519     def scan_all(class_, limit = None):
520         '''
521         The class method scan_all() scans all source using multiple processes.
522         The number of sources to be scanned can be limited with the limit
523         argument. Returns the number of processed and remaining packages as a
524         dict.
525         '''
526         session = DBConn().session()
527         query = session.query(DBSource).filter(DBSource.contents == None)
528         remaining = query.count
529         if limit is not None:
530             query = query.limit(limit)
531         processed = query.count()
532         pool = Pool()
533         for source in query.yield_per(100):
534             pool.apply_async(source_scan_helper, (source.source_id, ))
535         pool.close()
536         pool.join()
537         remaining = remaining()
538         session.close()
539         return { 'processed': processed, 'remaining': remaining }
540
541 def source_scan_helper(source_id):
542     '''
543     This function runs in a subprocess.
544     '''
545     try:
546         scanner = SourceContentsScanner(source_id)
547         scanner.scan()
548     except Exception, e:
549         print e
550