]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Add class SourceContentsScanner.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from shutil import rmtree
33 from subprocess import Popen, PIPE, check_call
34 from tempfile import mkdtemp
35
36 import os.path
37
38 class ContentsWriter(object):
39     '''
40     ContentsWriter writes the Contents-$arch.gz files.
41     '''
42     def __init__(self, suite, architecture, overridetype, component = None):
43         '''
44         The constructor clones its arguments into a new session object to make
45         sure that the new ContentsWriter object can be executed in a different
46         thread.
47         '''
48         self.suite = suite
49         self.architecture = architecture
50         self.overridetype = overridetype
51         self.component = component
52         self.session = suite.session()
53
54     def query(self):
55         '''
56         Returns a query object that is doing most of the work.
57         '''
58         overridesuite = self.suite
59         if self.suite.overridesuite is not None:
60             overridesuite = get_suite(self.suite.overridesuite, self.session)
61         params = {
62             'suite':         self.suite.suite_id,
63             'overridesuite': overridesuite.suite_id,
64             'arch_all':      get_architecture('all', self.session).arch_id,
65             'arch':          self.architecture.arch_id,
66             'type_id':       self.overridetype.overridetype_id,
67             'type':          self.overridetype.overridetype,
68         }
69
70         if self.component is not None:
71             params['component'] = self.component.component_id
72             sql = '''
73 create temp table newest_binaries (
74     id integer primary key,
75     package text);
76
77 create index newest_binaries_by_package on newest_binaries (package);
78
79 insert into newest_binaries (id, package)
80     select distinct on (package) id, package from binaries
81         where type = :type and
82             (architecture = :arch_all or architecture = :arch) and
83             id in (select bin from bin_associations where suite = :suite)
84         order by package, version desc;
85
86 with
87
88 unique_override as
89     (select o.package, s.section
90         from override o, section s
91         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
92         o.component = :component)
93
94 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
95     from newest_binaries b, bin_contents bc, unique_override o
96     where b.id = bc.binary_id and o.package = b.package
97     group by bc.file'''
98
99         else:
100             sql = '''
101 create temp table newest_binaries (
102     id integer primary key,
103     package text);
104
105 create index newest_binaries_by_package on newest_binaries (package);
106
107 insert into newest_binaries (id, package)
108     select distinct on (package) id, package from binaries
109         where type = :type and
110             (architecture = :arch_all or architecture = :arch) and
111             id in (select bin from bin_associations where suite = :suite)
112         order by package, version desc;
113
114 with
115
116 unique_override as
117     (select distinct on (o.package, s.section) o.package, s.section
118         from override o, section s
119         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
120         order by o.package, s.section, o.modified desc)
121
122 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
123     from newest_binaries b, bin_contents bc, unique_override o
124     where b.id = bc.binary_id and o.package = b.package
125     group by bc.file'''
126
127         return self.session.query("file", "pkglist").from_statement(sql). \
128             params(params)
129
130     def formatline(self, filename, package_list):
131         '''
132         Returns a formatted string for the filename argument.
133         '''
134         return "%-55s %s\n" % (filename, package_list)
135
136     def fetch(self):
137         '''
138         Yields a new line of the Contents-$arch.gz file in filename order.
139         '''
140         for filename, package_list in self.query().yield_per(100):
141             yield self.formatline(filename, package_list)
142         # end transaction to return connection to pool
143         self.session.rollback()
144
145     def get_list(self):
146         '''
147         Returns a list of lines for the Contents-$arch.gz file.
148         '''
149         return [item for item in self.fetch()]
150
151     def output_filename(self):
152         '''
153         Returns the name of the output file.
154         '''
155         values = {
156             'root': Config()['Dir::Root'],
157             'suite': self.suite.suite_name,
158             'architecture': self.architecture.arch_string
159         }
160         if self.component is None:
161             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
162         values['component'] = self.component.component_name
163         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
164
165     def get_header(self):
166         '''
167         Returns the header for the Contents files as a string.
168         '''
169         header_file = None
170         try:
171             filename = os.path.join(Config()['Dir::Templates'], 'contents')
172             header_file = open(filename)
173             return header_file.read()
174         finally:
175             if header_file:
176                 header_file.close()
177
178     def write_file(self):
179         '''
180         Write the output file.
181         '''
182         command = ['gzip', '--rsyncable']
183         final_filename = self.output_filename()
184         temp_filename = final_filename + '.new'
185         output_file = open(temp_filename, 'w')
186         gzip = Popen(command, stdin = PIPE, stdout = output_file)
187         gzip.stdin.write(self.get_header())
188         for item in self.fetch():
189             gzip.stdin.write(item)
190         gzip.stdin.close()
191         output_file.close()
192         gzip.wait()
193         try:
194             os.remove(final_filename)
195         except:
196             pass
197         os.rename(temp_filename, final_filename)
198         os.chmod(final_filename, 0664)
199
200     @classmethod
201     def log_result(class_, result):
202         '''
203         Writes a result message to the logfile.
204         '''
205         class_.logger.log(result)
206
207     @classmethod
208     def write_all(class_, logger, suite_names = [], force = False):
209         '''
210         Writes all Contents files for suites in list suite_names which defaults
211         to all 'touchable' suites if not specified explicitely. Untouchable
212         suites will be included if the force argument is set to True.
213         '''
214         class_.logger = logger
215         session = DBConn().session()
216         suite_query = session.query(Suite)
217         if len(suite_names) > 0:
218             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
219         if not force:
220             suite_query = suite_query.filter_by(untouchable = False)
221         deb_id = get_override_type('deb', session).overridetype_id
222         udeb_id = get_override_type('udeb', session).overridetype_id
223         main_id = get_component('main', session).component_id
224         non_free_id = get_component('non-free', session).component_id
225         pool = Pool()
226         for suite in suite_query:
227             suite_id = suite.suite_id
228             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
229                 arch_id = architecture.arch_id
230                 # handle 'deb' packages
231                 pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
232                     callback = class_.log_result)
233                 # handle 'udeb' packages for 'main' and 'non-free'
234                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
235                     callback = class_.log_result)
236                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
237                     callback = class_.log_result)
238         pool.close()
239         pool.join()
240         session.close()
241
242 def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
243     '''
244     This function is called in a new subprocess.
245     '''
246     session = DBConn().session()
247     suite = Suite.get(suite_id, session)
248     architecture = Architecture.get(arch_id, session)
249     overridetype = OverrideType.get(overridetype_id, session)
250     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
251     if component_id is None:
252         component = None
253     else:
254         component = Component.get(component_id, session)
255         log_message.append(component.component_name)
256     contents_writer = ContentsWriter(suite, architecture, overridetype, component)
257     contents_writer.write_file()
258     return log_message
259
260
261 class BinaryContentsScanner(object):
262     '''
263     BinaryContentsScanner provides a threadsafe method scan() to scan the
264     contents of a DBBinary object.
265     '''
266     def __init__(self, binary_id):
267         '''
268         The argument binary_id is the id of the DBBinary object that
269         should be scanned.
270         '''
271         self.binary_id = binary_id
272
273     def scan(self, dummy_arg = None):
274         '''
275         This method does the actual scan and fills in the associated BinContents
276         property. It commits any changes to the database. The argument dummy_arg
277         is ignored but needed by our threadpool implementation.
278         '''
279         session = DBConn().session()
280         binary = session.query(DBBinary).get(self.binary_id)
281         fileset = set(binary.scan_contents())
282         if len(fileset) == 0:
283             fileset.add('EMPTY_PACKAGE')
284         for filename in fileset:
285             binary.contents.append(BinContents(file = filename))
286         session.commit()
287         session.close()
288
289     @classmethod
290     def scan_all(class_, limit = None):
291         '''
292         The class method scan_all() scans all binaries using multiple threads.
293         The number of binaries to be scanned can be limited with the limit
294         argument. Returns the number of processed and remaining packages as a
295         dict.
296         '''
297         session = DBConn().session()
298         query = session.query(DBBinary).filter(DBBinary.contents == None)
299         remaining = query.count
300         if limit is not None:
301             query = query.limit(limit)
302         processed = query.count()
303         pool = Pool()
304         for binary in query.yield_per(100):
305             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
306         pool.close()
307         pool.join()
308         remaining = remaining()
309         session.close()
310         return { 'processed': processed, 'remaining': remaining }
311
312 def binary_scan_helper(binary_id):
313     '''
314     This function runs in a subprocess.
315     '''
316     scanner = BinaryContentsScanner(binary_id)
317     scanner.scan()
318
319
320 class UnpackedSource(object):
321     '''
322     UnpackedSource extracts a source package into a temporary location and
323     gives you some convinient function for accessing it.
324     '''
325     def __init__(self, dscfilename):
326         '''
327         The dscfilename is a name of a DSC file that will be extracted.
328         '''
329         self.root_directory = os.path.join(mkdtemp(), 'root')
330         command = ('dpkg-source', '--no-copy', '--no-check', '-x', dscfilename,
331             self.root_directory)
332         # dpkg-source does not have a --quiet option
333         devnull = open(os.devnull, 'w')
334         check_call(command, stdout = devnull, stderr = devnull)
335         devnull.close()
336
337     def get_root_directory(self):
338         '''
339         Returns the name of the package's root directory which is the directory
340         where the debian subdirectory is located.
341         '''
342         return self.root_directory
343
344     def get_changelog_file(self):
345         '''
346         Returns a file object for debian/changelog or None if no such file exists.
347         '''
348         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
349         try:
350             return open(changelog_name)
351         except IOError:
352             return None
353
354     def get_all_filenames(self):
355         '''
356         Returns an iterator over all filenames. The filenames will be relative
357         to the root directory.
358         '''
359         skip = len(self.root_directory) + 1
360         for root, _, files in os.walk(self.root_directory):
361             for name in files:
362                 yield os.path.join(root[skip:], name)
363
364     def cleanup(self):
365         '''
366         Removes all temporary files.
367         '''
368         if self.root_directory is None:
369             return
370         parent_directory = os.path.dirname(self.root_directory)
371         rmtree(parent_directory)
372         self.root_directory = None
373
374     def __del__(self):
375         '''
376         Enforce cleanup.
377         '''
378         self.cleanup()
379
380
381 class SourceContentsScanner(object):
382     '''
383     SourceContentsScanner provides a method scan() to scan the contents of a
384     DBSource object.
385     '''
386     def __init__(self, source_id):
387         '''
388         The argument source_id is the id of the DBSource object that
389         should be scanned.
390         '''
391         self.source_id = source_id
392
393     def scan(self):
394         '''
395         This method does the actual scan and fills in the associated SrcContents
396         property. It commits any changes to the database.
397         '''
398         session = DBConn().session()
399         source = session.query(DBSource).get(self.source_id)
400         fileset = set(source.scan_contents())
401         for filename in fileset:
402             source.contents.append(SrcContents(file = filename))
403         session.commit()
404         session.close()
405
406     @classmethod
407     def scan_all(class_, limit = None):
408         '''
409         The class method scan_all() scans all source using multiple processes.
410         The number of sources to be scanned can be limited with the limit
411         argument. Returns the number of processed and remaining packages as a
412         dict.
413         '''
414         session = DBConn().session()
415         query = session.query(DBSource).filter(DBSource.contents == None)
416         remaining = query.count
417         if limit is not None:
418             query = query.limit(limit)
419         processed = query.count()
420         pool = Pool()
421         for source in query.yield_per(100):
422             pool.apply_async(source_scan_helper, (source.source_id, ))
423         pool.close()
424         pool.join()
425         remaining = remaining()
426         session.close()
427         return { 'processed': processed, 'remaining': remaining }
428
429 def source_scan_helper(binary_id):
430     '''
431     This function runs in a subprocess.
432     '''
433     scanner = SourceContentsScanner(source_id)
434     scanner.scan()
435