]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Refactor class ContentsWriter to prepare for source contents.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from shutil import rmtree
33 from subprocess import Popen, PIPE, check_call
34 from tempfile import mkdtemp
35
36 import os.path
37
38 class BinaryContentsWriter(object):
39     '''
40     BinaryContentsWriter writes the Contents-$arch.gz files.
41     '''
42     def __init__(self, suite, architecture, overridetype, component = None):
43         self.suite = suite
44         self.architecture = architecture
45         self.overridetype = overridetype
46         self.component = component
47         self.session = suite.session()
48
49     def query(self):
50         '''
51         Returns a query object that is doing most of the work.
52         '''
53         overridesuite = self.suite
54         if self.suite.overridesuite is not None:
55             overridesuite = get_suite(self.suite.overridesuite, self.session)
56         params = {
57             'suite':         self.suite.suite_id,
58             'overridesuite': overridesuite.suite_id,
59             'arch_all':      get_architecture('all', self.session).arch_id,
60             'arch':          self.architecture.arch_id,
61             'type_id':       self.overridetype.overridetype_id,
62             'type':          self.overridetype.overridetype,
63         }
64
65         if self.component is not None:
66             params['component'] = self.component.component_id
67             sql = '''
68 create temp table newest_binaries (
69     id integer primary key,
70     package text);
71
72 create index newest_binaries_by_package on newest_binaries (package);
73
74 insert into newest_binaries (id, package)
75     select distinct on (package) id, package from binaries
76         where type = :type and
77             (architecture = :arch_all or architecture = :arch) and
78             id in (select bin from bin_associations where suite = :suite)
79         order by package, version desc;
80
81 with
82
83 unique_override as
84     (select o.package, s.section
85         from override o, section s
86         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
87         o.component = :component)
88
89 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
90     from newest_binaries b, bin_contents bc, unique_override o
91     where b.id = bc.binary_id and o.package = b.package
92     group by bc.file'''
93
94         else:
95             sql = '''
96 create temp table newest_binaries (
97     id integer primary key,
98     package text);
99
100 create index newest_binaries_by_package on newest_binaries (package);
101
102 insert into newest_binaries (id, package)
103     select distinct on (package) id, package from binaries
104         where type = :type and
105             (architecture = :arch_all or architecture = :arch) and
106             id in (select bin from bin_associations where suite = :suite)
107         order by package, version desc;
108
109 with
110
111 unique_override as
112     (select distinct on (o.package, s.section) o.package, s.section
113         from override o, section s
114         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
115         order by o.package, s.section, o.modified desc)
116
117 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
118     from newest_binaries b, bin_contents bc, unique_override o
119     where b.id = bc.binary_id and o.package = b.package
120     group by bc.file'''
121
122         return self.session.query("file", "pkglist").from_statement(sql). \
123             params(params)
124
125     def formatline(self, filename, package_list):
126         '''
127         Returns a formatted string for the filename argument.
128         '''
129         return "%-55s %s\n" % (filename, package_list)
130
131     def fetch(self):
132         '''
133         Yields a new line of the Contents-$arch.gz file in filename order.
134         '''
135         for filename, package_list in self.query().yield_per(100):
136             yield self.formatline(filename, package_list)
137         # end transaction to return connection to pool
138         self.session.rollback()
139
140     def get_list(self):
141         '''
142         Returns a list of lines for the Contents-$arch.gz file.
143         '''
144         return [item for item in self.fetch()]
145
146     def output_filename(self):
147         '''
148         Returns the name of the output file.
149         '''
150         values = {
151             'root': Config()['Dir::Root'],
152             'suite': self.suite.suite_name,
153             'architecture': self.architecture.arch_string
154         }
155         if self.component is None:
156             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
157         values['component'] = self.component.component_name
158         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
159
160     def get_header(self):
161         '''
162         Returns the header for the Contents files as a string.
163         '''
164         header_file = None
165         try:
166             filename = os.path.join(Config()['Dir::Templates'], 'contents')
167             header_file = open(filename)
168             return header_file.read()
169         finally:
170             if header_file:
171                 header_file.close()
172
173     def write_file(self):
174         '''
175         Write the output file.
176         '''
177         command = ['gzip', '--rsyncable']
178         final_filename = self.output_filename()
179         temp_filename = final_filename + '.new'
180         output_file = open(temp_filename, 'w')
181         gzip = Popen(command, stdin = PIPE, stdout = output_file)
182         gzip.stdin.write(self.get_header())
183         for item in self.fetch():
184             gzip.stdin.write(item)
185         gzip.stdin.close()
186         output_file.close()
187         gzip.wait()
188         os.chmod(temp_filename, 0664)
189         os.rename(temp_filename, final_filename)
190
191
192 def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
193     '''
194     This function is called in a new subprocess.
195     '''
196     session = DBConn().session()
197     suite = Suite.get(suite_id, session)
198     architecture = Architecture.get(arch_id, session)
199     overridetype = OverrideType.get(overridetype_id, session)
200     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
201     if component_id is None:
202         component = None
203     else:
204         component = Component.get(component_id, session)
205         log_message.append(component.component_name)
206     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
207     contents_writer.write_file()
208     return log_message
209
210 class ContentsWriter(object):
211     '''
212     Loop over all suites, architectures, overridetypes, and components to write
213     all contents files.
214     '''
215     @classmethod
216     def log_result(class_, result):
217         '''
218         Writes a result message to the logfile.
219         '''
220         class_.logger.log(result)
221
222     @classmethod
223     def write_all(class_, logger, suite_names = [], force = False):
224         '''
225         Writes all Contents files for suites in list suite_names which defaults
226         to all 'touchable' suites if not specified explicitely. Untouchable
227         suites will be included if the force argument is set to True.
228         '''
229         class_.logger = logger
230         session = DBConn().session()
231         suite_query = session.query(Suite)
232         if len(suite_names) > 0:
233             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
234         if not force:
235             suite_query = suite_query.filter_by(untouchable = False)
236         deb_id = get_override_type('deb', session).overridetype_id
237         udeb_id = get_override_type('udeb', session).overridetype_id
238         main_id = get_component('main', session).component_id
239         non_free_id = get_component('non-free', session).component_id
240         pool = Pool()
241         for suite in suite_query:
242             suite_id = suite.suite_id
243             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
244                 arch_id = architecture.arch_id
245                 # handle 'deb' packages
246                 pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
247                     callback = class_.log_result)
248                 # handle 'udeb' packages for 'main' and 'non-free'
249                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
250                     callback = class_.log_result)
251                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
252                     callback = class_.log_result)
253         pool.close()
254         pool.join()
255         session.close()
256
257
258 class BinaryContentsScanner(object):
259     '''
260     BinaryContentsScanner provides a threadsafe method scan() to scan the
261     contents of a DBBinary object.
262     '''
263     def __init__(self, binary_id):
264         '''
265         The argument binary_id is the id of the DBBinary object that
266         should be scanned.
267         '''
268         self.binary_id = binary_id
269
270     def scan(self, dummy_arg = None):
271         '''
272         This method does the actual scan and fills in the associated BinContents
273         property. It commits any changes to the database. The argument dummy_arg
274         is ignored but needed by our threadpool implementation.
275         '''
276         session = DBConn().session()
277         binary = session.query(DBBinary).get(self.binary_id)
278         fileset = set(binary.scan_contents())
279         if len(fileset) == 0:
280             fileset.add('EMPTY_PACKAGE')
281         for filename in fileset:
282             binary.contents.append(BinContents(file = filename))
283         session.commit()
284         session.close()
285
286     @classmethod
287     def scan_all(class_, limit = None):
288         '''
289         The class method scan_all() scans all binaries using multiple threads.
290         The number of binaries to be scanned can be limited with the limit
291         argument. Returns the number of processed and remaining packages as a
292         dict.
293         '''
294         session = DBConn().session()
295         query = session.query(DBBinary).filter(DBBinary.contents == None)
296         remaining = query.count
297         if limit is not None:
298             query = query.limit(limit)
299         processed = query.count()
300         pool = Pool()
301         for binary in query.yield_per(100):
302             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
303         pool.close()
304         pool.join()
305         remaining = remaining()
306         session.close()
307         return { 'processed': processed, 'remaining': remaining }
308
309 def binary_scan_helper(binary_id):
310     '''
311     This function runs in a subprocess.
312     '''
313     scanner = BinaryContentsScanner(binary_id)
314     scanner.scan()
315
316
317 class UnpackedSource(object):
318     '''
319     UnpackedSource extracts a source package into a temporary location and
320     gives you some convinient function for accessing it.
321     '''
322     def __init__(self, dscfilename):
323         '''
324         The dscfilename is a name of a DSC file that will be extracted.
325         '''
326         self.root_directory = os.path.join(mkdtemp(), 'root')
327         command = ('dpkg-source', '--no-copy', '--no-check', '-x', dscfilename,
328             self.root_directory)
329         # dpkg-source does not have a --quiet option
330         devnull = open(os.devnull, 'w')
331         check_call(command, stdout = devnull, stderr = devnull)
332         devnull.close()
333
334     def get_root_directory(self):
335         '''
336         Returns the name of the package's root directory which is the directory
337         where the debian subdirectory is located.
338         '''
339         return self.root_directory
340
341     def get_changelog_file(self):
342         '''
343         Returns a file object for debian/changelog or None if no such file exists.
344         '''
345         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
346         try:
347             return open(changelog_name)
348         except IOError:
349             return None
350
351     def get_all_filenames(self):
352         '''
353         Returns an iterator over all filenames. The filenames will be relative
354         to the root directory.
355         '''
356         skip = len(self.root_directory) + 1
357         for root, _, files in os.walk(self.root_directory):
358             for name in files:
359                 yield os.path.join(root[skip:], name)
360
361     def cleanup(self):
362         '''
363         Removes all temporary files.
364         '''
365         if self.root_directory is None:
366             return
367         parent_directory = os.path.dirname(self.root_directory)
368         rmtree(parent_directory)
369         self.root_directory = None
370
371     def __del__(self):
372         '''
373         Enforce cleanup.
374         '''
375         self.cleanup()
376
377
378 class SourceContentsScanner(object):
379     '''
380     SourceContentsScanner provides a method scan() to scan the contents of a
381     DBSource object.
382     '''
383     def __init__(self, source_id):
384         '''
385         The argument source_id is the id of the DBSource object that
386         should be scanned.
387         '''
388         self.source_id = source_id
389
390     def scan(self):
391         '''
392         This method does the actual scan and fills in the associated SrcContents
393         property. It commits any changes to the database.
394         '''
395         session = DBConn().session()
396         source = session.query(DBSource).get(self.source_id)
397         fileset = set(source.scan_contents())
398         for filename in fileset:
399             source.contents.append(SrcContents(file = filename))
400         session.commit()
401         session.close()
402
403     @classmethod
404     def scan_all(class_, limit = None):
405         '''
406         The class method scan_all() scans all source using multiple processes.
407         The number of sources to be scanned can be limited with the limit
408         argument. Returns the number of processed and remaining packages as a
409         dict.
410         '''
411         session = DBConn().session()
412         query = session.query(DBSource).filter(DBSource.contents == None)
413         remaining = query.count
414         if limit is not None:
415             query = query.limit(limit)
416         processed = query.count()
417         pool = Pool()
418         for source in query.yield_per(100):
419             pool.apply_async(source_scan_helper, (source.source_id, ))
420         pool.close()
421         pool.join()
422         remaining = remaining()
423         session.close()
424         return { 'processed': processed, 'remaining': remaining }
425
426 def source_scan_helper(source_id):
427     '''
428     This function runs in a subprocess.
429     '''
430     try:
431         scanner = SourceContentsScanner(source_id)
432         scanner.scan()
433     except Exception, e:
434         print e
435