]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Add the *FileWriter helper classes.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from shutil import rmtree
33 from subprocess import Popen, PIPE, check_call
34 from tempfile import mkdtemp
35
36 import os.path
37 import signal
38
39 class BinaryContentsWriter(object):
40     '''
41     BinaryContentsWriter writes the Contents-$arch.gz files.
42     '''
43     def __init__(self, suite, architecture, overridetype, component = None):
44         self.suite = suite
45         self.architecture = architecture
46         self.overridetype = overridetype
47         self.component = component
48         self.session = suite.session()
49
50     def query(self):
51         '''
52         Returns a query object that is doing most of the work.
53         '''
54         overridesuite = self.suite
55         if self.suite.overridesuite is not None:
56             overridesuite = get_suite(self.suite.overridesuite, self.session)
57         params = {
58             'suite':         self.suite.suite_id,
59             'overridesuite': overridesuite.suite_id,
60             'arch_all':      get_architecture('all', self.session).arch_id,
61             'arch':          self.architecture.arch_id,
62             'type_id':       self.overridetype.overridetype_id,
63             'type':          self.overridetype.overridetype,
64         }
65
66         if self.component is not None:
67             params['component'] = self.component.component_id
68             sql = '''
69 create temp table newest_binaries (
70     id integer primary key,
71     package text);
72
73 create index newest_binaries_by_package on newest_binaries (package);
74
75 insert into newest_binaries (id, package)
76     select distinct on (package) id, package from binaries
77         where type = :type and
78             (architecture = :arch_all or architecture = :arch) and
79             id in (select bin from bin_associations where suite = :suite)
80         order by package, version desc;
81
82 with
83
84 unique_override as
85     (select o.package, s.section
86         from override o, section s
87         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
88         o.component = :component)
89
90 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
91     from newest_binaries b, bin_contents bc, unique_override o
92     where b.id = bc.binary_id and o.package = b.package
93     group by bc.file'''
94
95         else:
96             sql = '''
97 create temp table newest_binaries (
98     id integer primary key,
99     package text);
100
101 create index newest_binaries_by_package on newest_binaries (package);
102
103 insert into newest_binaries (id, package)
104     select distinct on (package) id, package from binaries
105         where type = :type and
106             (architecture = :arch_all or architecture = :arch) and
107             id in (select bin from bin_associations where suite = :suite)
108         order by package, version desc;
109
110 with
111
112 unique_override as
113     (select distinct on (o.package, s.section) o.package, s.section
114         from override o, section s
115         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
116         order by o.package, s.section, o.modified desc)
117
118 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
119     from newest_binaries b, bin_contents bc, unique_override o
120     where b.id = bc.binary_id and o.package = b.package
121     group by bc.file'''
122
123         return self.session.query("file", "pkglist").from_statement(sql). \
124             params(params)
125
126     def formatline(self, filename, package_list):
127         '''
128         Returns a formatted string for the filename argument.
129         '''
130         return "%-55s %s\n" % (filename, package_list)
131
132     def fetch(self):
133         '''
134         Yields a new line of the Contents-$arch.gz file in filename order.
135         '''
136         for filename, package_list in self.query().yield_per(100):
137             yield self.formatline(filename, package_list)
138         # end transaction to return connection to pool
139         self.session.rollback()
140
141     def get_list(self):
142         '''
143         Returns a list of lines for the Contents-$arch.gz file.
144         '''
145         return [item for item in self.fetch()]
146
147     def writer(self):
148         '''
149         Returns a writer object.
150         '''
151         values = {
152             'suite':        self.suite.suite_name,
153             'architecture': self.architecture.arch_string,
154         }
155         if self.component is not None:
156             values['component'] = self.component.component_name
157         return BinaryContentsWriter(values)
158
159     def get_header(self):
160         '''
161         Returns the header for the Contents files as a string.
162         '''
163         header_file = None
164         try:
165             filename = os.path.join(Config()['Dir::Templates'], 'contents')
166             header_file = open(filename)
167             return header_file.read()
168         finally:
169             if header_file:
170                 header_file.close()
171
172     def write_file(self):
173         '''
174         Write the output file.
175         '''
176         writer = self.writer()
177         file = writer.open()
178         file.write(self.get_header())
179         for item in self.fetch():
180             file.write(item)
181         writer.close()
182
183
184 class SourceContentsWriter(object):
185     '''
186     SourceContentsWriter writes the Contents-source.gz files.
187     '''
188     def __init__(self, suite, component):
189         self.suite = suite
190         self.component = component
191         self.session = suite.session()
192
193     def query(self):
194         '''
195         Returns a query object that is doing most of the work.
196         '''
197         params = {
198             'suite_id':     self.suite.suite_id,
199             'component_id': self.component.component_id,
200         }
201
202         sql = '''
203 create temp table newest_sources (
204     id integer primary key,
205     source text);
206
207 create index sources_binaries_by_source on newest_sources (source);
208
209 insert into newest_sources (id, source)
210     select distinct on (source) s.id, s.source from source s
211         join files f on f.id = s.file
212         join location l on l.id = f.location
213         where s.id in (select source from src_associations where suite = :suite_id)
214             and l.component = :component_id
215         order by source, version desc;
216
217 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
218     from newest_sources s, src_contents sc
219     where s.id = sc.source_id group by sc.file'''
220
221         return self.session.query("file", "pkglist").from_statement(sql). \
222             params(params)
223
224     def formatline(self, filename, package_list):
225         '''
226         Returns a formatted string for the filename argument.
227         '''
228         return "%s\t%s\n" % (filename, package_list)
229
230     def fetch(self):
231         '''
232         Yields a new line of the Contents-source.gz file in filename order.
233         '''
234         for filename, package_list in self.query().yield_per(100):
235             yield self.formatline(filename, package_list)
236         # end transaction to return connection to pool
237         self.session.rollback()
238
239     def get_list(self):
240         '''
241         Returns a list of lines for the Contents-source.gz file.
242         '''
243         return [item for item in self.fetch()]
244
245     def output_filename(self):
246         '''
247         Returns the name of the output file.
248         '''
249         values = {
250             'root':      Config()['Dir::Root'],
251             'suite':     self.suite.suite_name,
252             'component': self.component.component_name
253         }
254         return "%(root)s/dists/%(suite)s/%(component)s/Contents-source.gz" % values
255
256     def write_file(self):
257         '''
258         Write the output file.
259         '''
260         command = ['gzip', '--rsyncable']
261         final_filename = self.output_filename()
262         temp_filename = final_filename + '.new'
263         output_file = open(temp_filename, 'w')
264         gzip = Popen(command, stdin = PIPE, stdout = output_file)
265         for item in self.fetch():
266             gzip.stdin.write(item)
267         gzip.stdin.close()
268         output_file.close()
269         gzip.wait()
270         os.chmod(temp_filename, 0664)
271         os.rename(temp_filename, final_filename)
272
273
274 def binary_helper(suite_id, arch_id, overridetype_id, component_id = None):
275     '''
276     This function is called in a new subprocess and multiprocessing wants a top
277     level function.
278     '''
279     session = DBConn().session(work_mem = 1000)
280     suite = Suite.get(suite_id, session)
281     architecture = Architecture.get(arch_id, session)
282     overridetype = OverrideType.get(overridetype_id, session)
283     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
284     if component_id is None:
285         component = None
286     else:
287         component = Component.get(component_id, session)
288         log_message.append(component.component_name)
289     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
290     contents_writer.write_file()
291     return log_message
292
293 def source_helper(suite_id, component_id):
294     '''
295     This function is called in a new subprocess and multiprocessing wants a top
296     level function.
297     '''
298     session = DBConn().session(work_mem = 1000)
299     suite = Suite.get(suite_id, session)
300     component = Component.get(component_id, session)
301     log_message = [suite.suite_name, 'source', component.component_name]
302     contents_writer = SourceContentsWriter(suite, component)
303     contents_writer.write_file()
304     return log_message
305
306 class ContentsWriter(object):
307     '''
308     Loop over all suites, architectures, overridetypes, and components to write
309     all contents files.
310     '''
311     @classmethod
312     def log_result(class_, result):
313         '''
314         Writes a result message to the logfile.
315         '''
316         class_.logger.log(result)
317
318     @classmethod
319     def write_all(class_, logger, suite_names = [], force = False):
320         '''
321         Writes all Contents files for suites in list suite_names which defaults
322         to all 'touchable' suites if not specified explicitely. Untouchable
323         suites will be included if the force argument is set to True.
324         '''
325         class_.logger = logger
326         session = DBConn().session()
327         suite_query = session.query(Suite)
328         if len(suite_names) > 0:
329             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
330         if not force:
331             suite_query = suite_query.filter_by(untouchable = False)
332         deb_id = get_override_type('deb', session).overridetype_id
333         udeb_id = get_override_type('udeb', session).overridetype_id
334         main_id = get_component('main', session).component_id
335         contrib_id = get_component('contrib', session).component_id
336         non_free_id = get_component('non-free', session).component_id
337         pool = Pool()
338         for suite in suite_query:
339             suite_id = suite.suite_id
340             # handle source packages
341             pool.apply_async(source_helper, (suite_id, main_id),
342                 callback = class_.log_result)
343             pool.apply_async(source_helper, (suite_id, contrib_id),
344                 callback = class_.log_result)
345             pool.apply_async(source_helper, (suite_id, non_free_id),
346                 callback = class_.log_result)
347             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
348                 arch_id = architecture.arch_id
349                 # handle 'deb' packages
350                 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id), \
351                     callback = class_.log_result)
352                 # handle 'udeb' packages for 'main' and 'non-free'
353                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, main_id), \
354                     callback = class_.log_result)
355                 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, non_free_id), \
356                     callback = class_.log_result)
357         pool.close()
358         pool.join()
359         session.close()
360
361
362 class BinaryContentsScanner(object):
363     '''
364     BinaryContentsScanner provides a threadsafe method scan() to scan the
365     contents of a DBBinary object.
366     '''
367     def __init__(self, binary_id):
368         '''
369         The argument binary_id is the id of the DBBinary object that
370         should be scanned.
371         '''
372         self.binary_id = binary_id
373
374     def scan(self, dummy_arg = None):
375         '''
376         This method does the actual scan and fills in the associated BinContents
377         property. It commits any changes to the database. The argument dummy_arg
378         is ignored but needed by our threadpool implementation.
379         '''
380         session = DBConn().session()
381         binary = session.query(DBBinary).get(self.binary_id)
382         fileset = set(binary.scan_contents())
383         if len(fileset) == 0:
384             fileset.add('EMPTY_PACKAGE')
385         for filename in fileset:
386             binary.contents.append(BinContents(file = filename))
387         session.commit()
388         session.close()
389
390     @classmethod
391     def scan_all(class_, limit = None):
392         '''
393         The class method scan_all() scans all binaries using multiple threads.
394         The number of binaries to be scanned can be limited with the limit
395         argument. Returns the number of processed and remaining packages as a
396         dict.
397         '''
398         session = DBConn().session()
399         query = session.query(DBBinary).filter(DBBinary.contents == None)
400         remaining = query.count
401         if limit is not None:
402             query = query.limit(limit)
403         processed = query.count()
404         pool = Pool()
405         for binary in query.yield_per(100):
406             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
407         pool.close()
408         pool.join()
409         remaining = remaining()
410         session.close()
411         return { 'processed': processed, 'remaining': remaining }
412
413 def binary_scan_helper(binary_id):
414     '''
415     This function runs in a subprocess.
416     '''
417     scanner = BinaryContentsScanner(binary_id)
418     scanner.scan()
419
420
421 def subprocess_setup():
422     # Python installs a SIGPIPE handler by default. This is usually not what
423     # non-Python subprocesses expect.
424     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
425
426 class UnpackedSource(object):
427     '''
428     UnpackedSource extracts a source package into a temporary location and
429     gives you some convinient function for accessing it.
430     '''
431     def __init__(self, dscfilename):
432         '''
433         The dscfilename is a name of a DSC file that will be extracted.
434         '''
435         temp_directory = mkdtemp(dir = Config()['Dir::TempPath'])
436         self.root_directory = os.path.join(temp_directory, 'root')
437         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
438             dscfilename, self.root_directory)
439         check_call(command, preexec_fn = subprocess_setup)
440
441     def get_root_directory(self):
442         '''
443         Returns the name of the package's root directory which is the directory
444         where the debian subdirectory is located.
445         '''
446         return self.root_directory
447
448     def get_changelog_file(self):
449         '''
450         Returns a file object for debian/changelog or None if no such file exists.
451         '''
452         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
453         try:
454             return open(changelog_name)
455         except IOError:
456             return None
457
458     def get_all_filenames(self):
459         '''
460         Returns an iterator over all filenames. The filenames will be relative
461         to the root directory.
462         '''
463         skip = len(self.root_directory) + 1
464         for root, _, files in os.walk(self.root_directory):
465             for name in files:
466                 yield os.path.join(root[skip:], name)
467
468     def cleanup(self):
469         '''
470         Removes all temporary files.
471         '''
472         if self.root_directory is None:
473             return
474         parent_directory = os.path.dirname(self.root_directory)
475         rmtree(parent_directory)
476         self.root_directory = None
477
478     def __del__(self):
479         '''
480         Enforce cleanup.
481         '''
482         self.cleanup()
483
484
485 class SourceContentsScanner(object):
486     '''
487     SourceContentsScanner provides a method scan() to scan the contents of a
488     DBSource object.
489     '''
490     def __init__(self, source_id):
491         '''
492         The argument source_id is the id of the DBSource object that
493         should be scanned.
494         '''
495         self.source_id = source_id
496
497     def scan(self):
498         '''
499         This method does the actual scan and fills in the associated SrcContents
500         property. It commits any changes to the database.
501         '''
502         session = DBConn().session()
503         source = session.query(DBSource).get(self.source_id)
504         fileset = set(source.scan_contents())
505         for filename in fileset:
506             source.contents.append(SrcContents(file = filename))
507         session.commit()
508         session.close()
509
510     @classmethod
511     def scan_all(class_, limit = None):
512         '''
513         The class method scan_all() scans all source using multiple processes.
514         The number of sources to be scanned can be limited with the limit
515         argument. Returns the number of processed and remaining packages as a
516         dict.
517         '''
518         session = DBConn().session()
519         query = session.query(DBSource).filter(DBSource.contents == None)
520         remaining = query.count
521         if limit is not None:
522             query = query.limit(limit)
523         processed = query.count()
524         pool = Pool()
525         for source in query.yield_per(100):
526             pool.apply_async(source_scan_helper, (source.source_id, ))
527         pool.close()
528         pool.join()
529         remaining = remaining()
530         session.close()
531         return { 'processed': processed, 'remaining': remaining }
532
533 def source_scan_helper(source_id):
534     '''
535     This function runs in a subprocess.
536     '''
537     try:
538         scanner = SourceContentsScanner(source_id)
539         scanner.scan()
540     except Exception, e:
541         print e
542