]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Merge branch 'master' of ftp-master.debian.org:/srv/ftp.debian.org/git/dak
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
31
32 from multiprocessing import Pool
33 from shutil import rmtree
34 from subprocess import Popen, PIPE, check_call
35 from tempfile import mkdtemp
36
37 import os.path
38 import signal
39
40 class BinaryContentsWriter(object):
41     '''
42     BinaryContentsWriter writes the Contents-$arch.gz files.
43     '''
44     def __init__(self, suite, architecture, overridetype, component = None):
45         self.suite = suite
46         self.architecture = architecture
47         self.overridetype = overridetype
48         self.component = component
49         self.session = suite.session()
50
51     def query(self):
52         '''
53         Returns a query object that is doing most of the work.
54         '''
55         overridesuite = self.suite
56         if self.suite.overridesuite is not None:
57             overridesuite = get_suite(self.suite.overridesuite, self.session)
58         params = {
59             'suite':         self.suite.suite_id,
60             'overridesuite': overridesuite.suite_id,
61             'arch_all':      get_architecture('all', self.session).arch_id,
62             'arch':          self.architecture.arch_id,
63             'type_id':       self.overridetype.overridetype_id,
64             'type':          self.overridetype.overridetype,
65         }
66
67         if self.component is not None:
68             params['component'] = self.component.component_id
69             sql = '''
70 create temp table newest_binaries (
71     id integer primary key,
72     package text);
73
74 create index newest_binaries_by_package on newest_binaries (package);
75
76 insert into newest_binaries (id, package)
77     select distinct on (package) id, package from binaries
78         where type = :type and
79             (architecture = :arch_all or architecture = :arch) and
80             id in (select bin from bin_associations where suite = :suite)
81         order by package, version desc;
82
83 with
84
85 unique_override as
86     (select o.package, s.section
87         from override o, section s
88         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
89         o.component = :component)
90
91 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
92     from newest_binaries b, bin_contents bc, unique_override o
93     where b.id = bc.binary_id and o.package = b.package
94     group by bc.file'''
95
96         else:
97             sql = '''
98 create temp table newest_binaries (
99     id integer primary key,
100     package text);
101
102 create index newest_binaries_by_package on newest_binaries (package);
103
104 insert into newest_binaries (id, package)
105     select distinct on (package) id, package from binaries
106         where type = :type and
107             (architecture = :arch_all or architecture = :arch) and
108             id in (select bin from bin_associations where suite = :suite)
109         order by package, version desc;
110
111 with
112
113 unique_override as
114     (select distinct on (o.package, s.section) o.package, s.section
115         from override o, section s
116         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
117         order by o.package, s.section, o.modified desc)
118
119 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
120     from newest_binaries b, bin_contents bc, unique_override o
121     where b.id = bc.binary_id and o.package = b.package
122     group by bc.file'''
123
124         return self.session.query("file", "pkglist").from_statement(sql). \
125             params(params)
126
127     def formatline(self, filename, package_list):
128         '''
129         Returns a formatted string for the filename argument.
130         '''
131         return "%-55s %s\n" % (filename, package_list)
132
133     def fetch(self):
134         '''
135         Yields a new line of the Contents-$arch.gz file in filename order.
136         '''
137         for filename, package_list in self.query().yield_per(100):
138             yield self.formatline(filename, package_list)
139         # end transaction to return connection to pool
140         self.session.rollback()
141
142     def get_list(self):
143         '''
144         Returns a list of lines for the Contents-$arch.gz file.
145         '''
146         return [item for item in self.fetch()]
147
148     def writer(self):
149         '''
150         Returns a writer object.
151         '''
152         values = {
153             'suite':        self.suite.suite_name,
154             'architecture': self.architecture.arch_string,
155         }
156         if self.component is not None:
157             values['component'] = self.component.component_name
158         return BinaryContentsFileWriter(**values)
159
160     def get_header(self):
161         '''
162         Returns the header for the Contents files as a string.
163         '''
164         header_file = None
165         try:
166             filename = os.path.join(Config()['Dir::Templates'], 'contents')
167             header_file = open(filename)
168             return header_file.read()
169         finally:
170             if header_file:
171                 header_file.close()
172
173     def write_file(self):
174         '''
175         Write the output file.
176         '''
177         writer = self.writer()
178         file = writer.open()
179         file.write(self.get_header())
180         for item in self.fetch():
181             file.write(item)
182         writer.close()
183
184
185 class SourceContentsWriter(object):
186     '''
187     SourceContentsWriter writes the Contents-source.gz files.
188     '''
189     def __init__(self, suite, component):
190         self.suite = suite
191         self.component = component
192         self.session = suite.session()
193
194     def query(self):
195         '''
196         Returns a query object that is doing most of the work.
197         '''
198         params = {
199             'suite_id':     self.suite.suite_id,
200             'component_id': self.component.component_id,
201         }
202
203         sql = '''
204 create temp table newest_sources (
205     id integer primary key,
206     source text);
207
208 create index sources_binaries_by_source on newest_sources (source);
209
210 insert into newest_sources (id, source)
211     select distinct on (source) s.id, s.source from source s
212         join files f on f.id = s.file
213         join location l on l.id = f.location
214         where s.id in (select source from src_associations where suite = :suite_id)
215             and l.component = :component_id
216         order by source, version desc;
217
218 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
219     from newest_sources s, src_contents sc
220     where s.id = sc.source_id group by sc.file'''
221
222         return self.session.query("file", "pkglist").from_statement(sql). \
223             params(params)
224
225     def formatline(self, filename, package_list):
226         '''
227         Returns a formatted string for the filename argument.
228         '''
229         return "%s\t%s\n" % (filename, package_list)
230
231     def fetch(self):
232         '''
233         Yields a new line of the Contents-source.gz file in filename order.
234         '''
235         for filename, package_list in self.query().yield_per(100):
236             yield self.formatline(filename, package_list)
237         # end transaction to return connection to pool
238         self.session.rollback()
239
240     def get_list(self):
241         '''
242         Returns a list of lines for the Contents-source.gz file.
243         '''
244         return [item for item in self.fetch()]
245
246     def writer(self):
247         '''
248         Returns a writer object.
249         '''
250         values = {
251             'suite':     self.suite.suite_name,
252             'component': self.component.component_name
253         }
254         return SourceContentsFileWriter(**values)
255
256     def write_file(self):
257         '''
258         Write the output file.
259         '''
260         writer = self.writer()
261         file = writer.open()
262         for item in self.fetch():
263             file.write(item)
264         writer.close()
265
266
267 def binary_helper(suite_id, arch_id, overridetype_id, component_id = None):
268     '''
269     This function is called in a new subprocess and multiprocessing wants a top
270     level function.
271     '''
272     session = DBConn().session(work_mem = 1000)
273     suite = Suite.get(suite_id, session)
274     architecture = Architecture.get(arch_id, session)
275     overridetype = OverrideType.get(overridetype_id, session)
276     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
277     if component_id is None:
278         component = None
279     else:
280         component = Component.get(component_id, session)
281         log_message.append(component.component_name)
282     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
283     contents_writer.write_file()
284     return log_message
285
286 def source_helper(suite_id, component_id):
287     '''
288     This function is called in a new subprocess and multiprocessing wants a top
289     level function.
290     '''
291     session = DBConn().session(work_mem = 1000)
292     suite = Suite.get(suite_id, session)
293     component = Component.get(component_id, session)
294     log_message = [suite.suite_name, 'source', component.component_name]
295     contents_writer = SourceContentsWriter(suite, component)
296     contents_writer.write_file()
297     return log_message
298
299 class ContentsWriter(object):
300     '''
301     Loop over all suites, architectures, overridetypes, and components to write
302     all contents files.
303     '''
304     @classmethod
305     def log_result(class_, result):
306         '''
307         Writes a result message to the logfile.
308         '''
309         class_.logger.log(result)
310
311     @classmethod
312     def write_all(class_, logger, suite_names = [], force = False):
313         '''
314         Writes all Contents files for suites in list suite_names which defaults
315         to all 'touchable' suites if not specified explicitely. Untouchable
316         suites will be included if the force argument is set to True.
317         '''
318         class_.logger = logger
319         session = DBConn().session()
320         suite_query = session.query(Suite)
321         if len(suite_names) > 0:
322             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
323         if not force:
324             suite_query = suite_query.filter_by(untouchable = False)
325         deb_id = get_override_type('deb', session).overridetype_id
326         udeb_id = get_override_type('udeb', session).overridetype_id
327         main_id = get_component('main', session).component_id
328         contrib_id = get_component('contrib', session).component_id
329         non_free_id = get_component('non-free', session).component_id
330         pool = Pool()
331         for suite in suite_query:
332             suite_id = suite.suite_id
333             # handle source packages
334             pool.apply_async(source_helper, (suite_id, main_id),
335                 callback = class_.log_result)
336             pool.apply_async(source_helper, (suite_id, contrib_id),
337                 callback = class_.log_result)
338             pool.apply_async(source_helper, (suite_id, non_free_id),
339                 callback = class_.log_result)
340             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
341                 arch_id = architecture.arch_id
342                 # handle 'deb' packages
343                 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id), \
344                     callback = class_.log_result)
345                 # handle 'udeb' packages for 'main' and 'non-free'
346                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, main_id), \
347                     callback = class_.log_result)
348                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, non_free_id), \
349                     callback = class_.log_result)
350         pool.close()
351         pool.join()
352         session.close()
353
354
355 class BinaryContentsScanner(object):
356     '''
357     BinaryContentsScanner provides a threadsafe method scan() to scan the
358     contents of a DBBinary object.
359     '''
360     def __init__(self, binary_id):
361         '''
362         The argument binary_id is the id of the DBBinary object that
363         should be scanned.
364         '''
365         self.binary_id = binary_id
366
367     def scan(self, dummy_arg = None):
368         '''
369         This method does the actual scan and fills in the associated BinContents
370         property. It commits any changes to the database. The argument dummy_arg
371         is ignored but needed by our threadpool implementation.
372         '''
373         session = DBConn().session()
374         binary = session.query(DBBinary).get(self.binary_id)
375         fileset = set(binary.scan_contents())
376         if len(fileset) == 0:
377             fileset.add('EMPTY_PACKAGE')
378         for filename in fileset:
379             binary.contents.append(BinContents(file = filename))
380         session.commit()
381         session.close()
382
383     @classmethod
384     def scan_all(class_, limit = None):
385         '''
386         The class method scan_all() scans all binaries using multiple threads.
387         The number of binaries to be scanned can be limited with the limit
388         argument. Returns the number of processed and remaining packages as a
389         dict.
390         '''
391         session = DBConn().session()
392         query = session.query(DBBinary).filter(DBBinary.contents == None)
393         remaining = query.count
394         if limit is not None:
395             query = query.limit(limit)
396         processed = query.count()
397         pool = Pool()
398         for binary in query.yield_per(100):
399             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
400         pool.close()
401         pool.join()
402         remaining = remaining()
403         session.close()
404         return { 'processed': processed, 'remaining': remaining }
405
406 def binary_scan_helper(binary_id):
407     '''
408     This function runs in a subprocess.
409     '''
410     scanner = BinaryContentsScanner(binary_id)
411     scanner.scan()
412
413
414 def subprocess_setup():
415     # Python installs a SIGPIPE handler by default. This is usually not what
416     # non-Python subprocesses expect.
417     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
418
419 class UnpackedSource(object):
420     '''
421     UnpackedSource extracts a source package into a temporary location and
422     gives you some convinient function for accessing it.
423     '''
424     def __init__(self, dscfilename):
425         '''
426         The dscfilename is a name of a DSC file that will be extracted.
427         '''
428         temp_directory = mkdtemp(dir = Config()['Dir::TempPath'])
429         self.root_directory = os.path.join(temp_directory, 'root')
430         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
431             dscfilename, self.root_directory)
432         check_call(command, preexec_fn = subprocess_setup)
433
434     def get_root_directory(self):
435         '''
436         Returns the name of the package's root directory which is the directory
437         where the debian subdirectory is located.
438         '''
439         return self.root_directory
440
441     def get_changelog_file(self):
442         '''
443         Returns a file object for debian/changelog or None if no such file exists.
444         '''
445         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
446         try:
447             return open(changelog_name)
448         except IOError:
449             return None
450
451     def get_all_filenames(self):
452         '''
453         Returns an iterator over all filenames. The filenames will be relative
454         to the root directory.
455         '''
456         skip = len(self.root_directory) + 1
457         for root, _, files in os.walk(self.root_directory):
458             for name in files:
459                 yield os.path.join(root[skip:], name)
460
461     def cleanup(self):
462         '''
463         Removes all temporary files.
464         '''
465         if self.root_directory is None:
466             return
467         parent_directory = os.path.dirname(self.root_directory)
468         rmtree(parent_directory)
469         self.root_directory = None
470
471     def __del__(self):
472         '''
473         Enforce cleanup.
474         '''
475         self.cleanup()
476
477
478 class SourceContentsScanner(object):
479     '''
480     SourceContentsScanner provides a method scan() to scan the contents of a
481     DBSource object.
482     '''
483     def __init__(self, source_id):
484         '''
485         The argument source_id is the id of the DBSource object that
486         should be scanned.
487         '''
488         self.source_id = source_id
489
490     def scan(self):
491         '''
492         This method does the actual scan and fills in the associated SrcContents
493         property. It commits any changes to the database.
494         '''
495         session = DBConn().session()
496         source = session.query(DBSource).get(self.source_id)
497         fileset = set(source.scan_contents())
498         for filename in fileset:
499             source.contents.append(SrcContents(file = filename))
500         session.commit()
501         session.close()
502
503     @classmethod
504     def scan_all(class_, limit = None):
505         '''
506         The class method scan_all() scans all source using multiple processes.
507         The number of sources to be scanned can be limited with the limit
508         argument. Returns the number of processed and remaining packages as a
509         dict.
510         '''
511         session = DBConn().session()
512         query = session.query(DBSource).filter(DBSource.contents == None)
513         remaining = query.count
514         if limit is not None:
515             query = query.limit(limit)
516         processed = query.count()
517         pool = Pool()
518         for source in query.yield_per(100):
519             pool.apply_async(source_scan_helper, (source.source_id, ))
520         pool.close()
521         pool.join()
522         remaining = remaining()
523         session.close()
524         return { 'processed': processed, 'remaining': remaining }
525
526 def source_scan_helper(source_id):
527     '''
528     This function runs in a subprocess.
529     '''
530     try:
531         scanner = SourceContentsScanner(source_id)
532         scanner.scan()
533     except Exception, e:
534         print e
535