]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
various bugfixes
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.filewriter import BinaryContentsFileWriter
31
32 from multiprocessing import Pool
33 from shutil import rmtree
34 from subprocess import Popen, PIPE, check_call
35 from tempfile import mkdtemp
36
37 import os.path
38 import signal
39
40 class BinaryContentsWriter(object):
41     '''
42     BinaryContentsWriter writes the Contents-$arch.gz files.
43     '''
44     def __init__(self, suite, architecture, overridetype, component = None):
45         self.suite = suite
46         self.architecture = architecture
47         self.overridetype = overridetype
48         self.component = component
49         self.session = suite.session()
50
51     def query(self):
52         '''
53         Returns a query object that is doing most of the work.
54         '''
55         overridesuite = self.suite
56         if self.suite.overridesuite is not None:
57             overridesuite = get_suite(self.suite.overridesuite, self.session)
58         params = {
59             'suite':         self.suite.suite_id,
60             'overridesuite': overridesuite.suite_id,
61             'arch_all':      get_architecture('all', self.session).arch_id,
62             'arch':          self.architecture.arch_id,
63             'type_id':       self.overridetype.overridetype_id,
64             'type':          self.overridetype.overridetype,
65         }
66
67         if self.component is not None:
68             params['component'] = self.component.component_id
69             sql = '''
70 create temp table newest_binaries (
71     id integer primary key,
72     package text);
73
74 create index newest_binaries_by_package on newest_binaries (package);
75
76 insert into newest_binaries (id, package)
77     select distinct on (package) id, package from binaries
78         where type = :type and
79             (architecture = :arch_all or architecture = :arch) and
80             id in (select bin from bin_associations where suite = :suite)
81         order by package, version desc;
82
83 with
84
85 unique_override as
86     (select o.package, s.section
87         from override o, section s
88         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
89         o.component = :component)
90
91 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
92     from newest_binaries b, bin_contents bc, unique_override o
93     where b.id = bc.binary_id and o.package = b.package
94     group by bc.file'''
95
96         else:
97             sql = '''
98 create temp table newest_binaries (
99     id integer primary key,
100     package text);
101
102 create index newest_binaries_by_package on newest_binaries (package);
103
104 insert into newest_binaries (id, package)
105     select distinct on (package) id, package from binaries
106         where type = :type and
107             (architecture = :arch_all or architecture = :arch) and
108             id in (select bin from bin_associations where suite = :suite)
109         order by package, version desc;
110
111 with
112
113 unique_override as
114     (select distinct on (o.package, s.section) o.package, s.section
115         from override o, section s
116         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
117         order by o.package, s.section, o.modified desc)
118
119 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
120     from newest_binaries b, bin_contents bc, unique_override o
121     where b.id = bc.binary_id and o.package = b.package
122     group by bc.file'''
123
124         return self.session.query("file", "pkglist").from_statement(sql). \
125             params(params)
126
127     def formatline(self, filename, package_list):
128         '''
129         Returns a formatted string for the filename argument.
130         '''
131         return "%-55s %s\n" % (filename, package_list)
132
133     def fetch(self):
134         '''
135         Yields a new line of the Contents-$arch.gz file in filename order.
136         '''
137         for filename, package_list in self.query().yield_per(100):
138             yield self.formatline(filename, package_list)
139         # end transaction to return connection to pool
140         self.session.rollback()
141
142     def get_list(self):
143         '''
144         Returns a list of lines for the Contents-$arch.gz file.
145         '''
146         return [item for item in self.fetch()]
147
148     def writer(self):
149         '''
150         Returns a writer object.
151         '''
152         values = {
153             'suite':        self.suite.suite_name,
154             'architecture': self.architecture.arch_string,
155         }
156         if self.component is not None:
157             values['component'] = self.component.component_name
158         return BinaryContentsFileWriter(**values)
159
160     def get_header(self):
161         '''
162         Returns the header for the Contents files as a string.
163         '''
164         header_file = None
165         try:
166             filename = os.path.join(Config()['Dir::Templates'], 'contents')
167             header_file = open(filename)
168             return header_file.read()
169         finally:
170             if header_file:
171                 header_file.close()
172
173     def write_file(self):
174         '''
175         Write the output file.
176         '''
177         writer = self.writer()
178         file = writer.open()
179         file.write(self.get_header())
180         for item in self.fetch():
181             file.write(item)
182         writer.close()
183
184
185 class SourceContentsWriter(object):
186     '''
187     SourceContentsWriter writes the Contents-source.gz files.
188     '''
189     def __init__(self, suite, component):
190         self.suite = suite
191         self.component = component
192         self.session = suite.session()
193
194     def query(self):
195         '''
196         Returns a query object that is doing most of the work.
197         '''
198         params = {
199             'suite_id':     self.suite.suite_id,
200             'component_id': self.component.component_id,
201         }
202
203         sql = '''
204 create temp table newest_sources (
205     id integer primary key,
206     source text);
207
208 create index sources_binaries_by_source on newest_sources (source);
209
210 insert into newest_sources (id, source)
211     select distinct on (source) s.id, s.source from source s
212         join files f on f.id = s.file
213         join location l on l.id = f.location
214         where s.id in (select source from src_associations where suite = :suite_id)
215             and l.component = :component_id
216         order by source, version desc;
217
218 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
219     from newest_sources s, src_contents sc
220     where s.id = sc.source_id group by sc.file'''
221
222         return self.session.query("file", "pkglist").from_statement(sql). \
223             params(params)
224
225     def formatline(self, filename, package_list):
226         '''
227         Returns a formatted string for the filename argument.
228         '''
229         return "%s\t%s\n" % (filename, package_list)
230
231     def fetch(self):
232         '''
233         Yields a new line of the Contents-source.gz file in filename order.
234         '''
235         for filename, package_list in self.query().yield_per(100):
236             yield self.formatline(filename, package_list)
237         # end transaction to return connection to pool
238         self.session.rollback()
239
240     def get_list(self):
241         '''
242         Returns a list of lines for the Contents-source.gz file.
243         '''
244         return [item for item in self.fetch()]
245
246     def output_filename(self):
247         '''
248         Returns the name of the output file.
249         '''
250         values = {
251             'root':      Config()['Dir::Root'],
252             'suite':     self.suite.suite_name,
253             'component': self.component.component_name
254         }
255         return "%(root)s/dists/%(suite)s/%(component)s/Contents-source.gz" % values
256
257     def write_file(self):
258         '''
259         Write the output file.
260         '''
261         command = ['gzip', '--rsyncable']
262         final_filename = self.output_filename()
263         temp_filename = final_filename + '.new'
264         output_file = open(temp_filename, 'w')
265         gzip = Popen(command, stdin = PIPE, stdout = output_file)
266         for item in self.fetch():
267             gzip.stdin.write(item)
268         gzip.stdin.close()
269         output_file.close()
270         gzip.wait()
271         os.chmod(temp_filename, 0664)
272         os.rename(temp_filename, final_filename)
273
274
275 def binary_helper(suite_id, arch_id, overridetype_id, component_id = None):
276     '''
277     This function is called in a new subprocess and multiprocessing wants a top
278     level function.
279     '''
280     session = DBConn().session(work_mem = 1000)
281     suite = Suite.get(suite_id, session)
282     architecture = Architecture.get(arch_id, session)
283     overridetype = OverrideType.get(overridetype_id, session)
284     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
285     if component_id is None:
286         component = None
287     else:
288         component = Component.get(component_id, session)
289         log_message.append(component.component_name)
290     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
291     contents_writer.write_file()
292     return log_message
293
294 def source_helper(suite_id, component_id):
295     '''
296     This function is called in a new subprocess and multiprocessing wants a top
297     level function.
298     '''
299     session = DBConn().session(work_mem = 1000)
300     suite = Suite.get(suite_id, session)
301     component = Component.get(component_id, session)
302     log_message = [suite.suite_name, 'source', component.component_name]
303     contents_writer = SourceContentsWriter(suite, component)
304     contents_writer.write_file()
305     return log_message
306
307 class ContentsWriter(object):
308     '''
309     Loop over all suites, architectures, overridetypes, and components to write
310     all contents files.
311     '''
312     @classmethod
313     def log_result(class_, result):
314         '''
315         Writes a result message to the logfile.
316         '''
317         class_.logger.log(result)
318
319     @classmethod
320     def write_all(class_, logger, suite_names = [], force = False):
321         '''
322         Writes all Contents files for suites in list suite_names which defaults
323         to all 'touchable' suites if not specified explicitely. Untouchable
324         suites will be included if the force argument is set to True.
325         '''
326         class_.logger = logger
327         session = DBConn().session()
328         suite_query = session.query(Suite)
329         if len(suite_names) > 0:
330             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
331         if not force:
332             suite_query = suite_query.filter_by(untouchable = False)
333         deb_id = get_override_type('deb', session).overridetype_id
334         udeb_id = get_override_type('udeb', session).overridetype_id
335         main_id = get_component('main', session).component_id
336         contrib_id = get_component('contrib', session).component_id
337         non_free_id = get_component('non-free', session).component_id
338         pool = Pool()
339         for suite in suite_query:
340             suite_id = suite.suite_id
341             # handle source packages
342             pool.apply_async(source_helper, (suite_id, main_id),
343                 callback = class_.log_result)
344             pool.apply_async(source_helper, (suite_id, contrib_id),
345                 callback = class_.log_result)
346             pool.apply_async(source_helper, (suite_id, non_free_id),
347                 callback = class_.log_result)
348             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
349                 arch_id = architecture.arch_id
350                 # handle 'deb' packages
351                 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id), \
352                     callback = class_.log_result)
353                 # handle 'udeb' packages for 'main' and 'non-free'
354                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, main_id), \
355                     callback = class_.log_result)
356                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, non_free_id), \
357                     callback = class_.log_result)
358         pool.close()
359         pool.join()
360         session.close()
361
362
363 class BinaryContentsScanner(object):
364     '''
365     BinaryContentsScanner provides a threadsafe method scan() to scan the
366     contents of a DBBinary object.
367     '''
368     def __init__(self, binary_id):
369         '''
370         The argument binary_id is the id of the DBBinary object that
371         should be scanned.
372         '''
373         self.binary_id = binary_id
374
375     def scan(self, dummy_arg = None):
376         '''
377         This method does the actual scan and fills in the associated BinContents
378         property. It commits any changes to the database. The argument dummy_arg
379         is ignored but needed by our threadpool implementation.
380         '''
381         session = DBConn().session()
382         binary = session.query(DBBinary).get(self.binary_id)
383         fileset = set(binary.scan_contents())
384         if len(fileset) == 0:
385             fileset.add('EMPTY_PACKAGE')
386         for filename in fileset:
387             binary.contents.append(BinContents(file = filename))
388         session.commit()
389         session.close()
390
391     @classmethod
392     def scan_all(class_, limit = None):
393         '''
394         The class method scan_all() scans all binaries using multiple threads.
395         The number of binaries to be scanned can be limited with the limit
396         argument. Returns the number of processed and remaining packages as a
397         dict.
398         '''
399         session = DBConn().session()
400         query = session.query(DBBinary).filter(DBBinary.contents == None)
401         remaining = query.count
402         if limit is not None:
403             query = query.limit(limit)
404         processed = query.count()
405         pool = Pool()
406         for binary in query.yield_per(100):
407             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
408         pool.close()
409         pool.join()
410         remaining = remaining()
411         session.close()
412         return { 'processed': processed, 'remaining': remaining }
413
414 def binary_scan_helper(binary_id):
415     '''
416     This function runs in a subprocess.
417     '''
418     scanner = BinaryContentsScanner(binary_id)
419     scanner.scan()
420
421
422 def subprocess_setup():
423     # Python installs a SIGPIPE handler by default. This is usually not what
424     # non-Python subprocesses expect.
425     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
426
427 class UnpackedSource(object):
428     '''
429     UnpackedSource extracts a source package into a temporary location and
430     gives you some convinient function for accessing it.
431     '''
432     def __init__(self, dscfilename):
433         '''
434         The dscfilename is a name of a DSC file that will be extracted.
435         '''
436         temp_directory = mkdtemp(dir = Config()['Dir::TempPath'])
437         self.root_directory = os.path.join(temp_directory, 'root')
438         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
439             dscfilename, self.root_directory)
440         check_call(command, preexec_fn = subprocess_setup)
441
442     def get_root_directory(self):
443         '''
444         Returns the name of the package's root directory which is the directory
445         where the debian subdirectory is located.
446         '''
447         return self.root_directory
448
449     def get_changelog_file(self):
450         '''
451         Returns a file object for debian/changelog or None if no such file exists.
452         '''
453         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
454         try:
455             return open(changelog_name)
456         except IOError:
457             return None
458
459     def get_all_filenames(self):
460         '''
461         Returns an iterator over all filenames. The filenames will be relative
462         to the root directory.
463         '''
464         skip = len(self.root_directory) + 1
465         for root, _, files in os.walk(self.root_directory):
466             for name in files:
467                 yield os.path.join(root[skip:], name)
468
469     def cleanup(self):
470         '''
471         Removes all temporary files.
472         '''
473         if self.root_directory is None:
474             return
475         parent_directory = os.path.dirname(self.root_directory)
476         rmtree(parent_directory)
477         self.root_directory = None
478
479     def __del__(self):
480         '''
481         Enforce cleanup.
482         '''
483         self.cleanup()
484
485
486 class SourceContentsScanner(object):
487     '''
488     SourceContentsScanner provides a method scan() to scan the contents of a
489     DBSource object.
490     '''
491     def __init__(self, source_id):
492         '''
493         The argument source_id is the id of the DBSource object that
494         should be scanned.
495         '''
496         self.source_id = source_id
497
498     def scan(self):
499         '''
500         This method does the actual scan and fills in the associated SrcContents
501         property. It commits any changes to the database.
502         '''
503         session = DBConn().session()
504         source = session.query(DBSource).get(self.source_id)
505         fileset = set(source.scan_contents())
506         for filename in fileset:
507             source.contents.append(SrcContents(file = filename))
508         session.commit()
509         session.close()
510
511     @classmethod
512     def scan_all(class_, limit = None):
513         '''
514         The class method scan_all() scans all source using multiple processes.
515         The number of sources to be scanned can be limited with the limit
516         argument. Returns the number of processed and remaining packages as a
517         dict.
518         '''
519         session = DBConn().session()
520         query = session.query(DBSource).filter(DBSource.contents == None)
521         remaining = query.count
522         if limit is not None:
523             query = query.limit(limit)
524         processed = query.count()
525         pool = Pool()
526         for source in query.yield_per(100):
527             pool.apply_async(source_scan_helper, (source.source_id, ))
528         pool.close()
529         pool.join()
530         remaining = remaining()
531         session.close()
532         return { 'processed': processed, 'remaining': remaining }
533
534 def source_scan_helper(source_id):
535     '''
536     This function runs in a subprocess.
537     '''
538     try:
539         scanner = SourceContentsScanner(source_id)
540         scanner.scan()
541     except Exception, e:
542         print e
543