]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
2a29b2e55b5080eadc574e559edf38a22ae1615a
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from shutil import rmtree
33 from subprocess import Popen, PIPE, check_call
34 from tempfile import mkdtemp
35
36 import os.path
37
38 class ContentsWriter(object):
39     '''
40     ContentsWriter writes the Contents-$arch.gz files.
41     '''
42     def __init__(self, suite, architecture, overridetype, component = None):
43         '''
44         The constructor clones its arguments into a new session object to make
45         sure that the new ContentsWriter object can be executed in a different
46         thread.
47         '''
48         self.suite = suite
49         self.architecture = architecture
50         self.overridetype = overridetype
51         self.component = component
52         self.session = suite.session()
53
54     def query(self):
55         '''
56         Returns a query object that is doing most of the work.
57         '''
58         overridesuite = self.suite
59         if self.suite.overridesuite is not None:
60             overridesuite = get_suite(self.suite.overridesuite, self.session)
61         params = {
62             'suite':         self.suite.suite_id,
63             'overridesuite': overridesuite.suite_id,
64             'arch_all':      get_architecture('all', self.session).arch_id,
65             'arch':          self.architecture.arch_id,
66             'type_id':       self.overridetype.overridetype_id,
67             'type':          self.overridetype.overridetype,
68         }
69
70         if self.component is not None:
71             params['component'] = self.component.component_id
72             sql = '''
73 create temp table newest_binaries (
74     id integer primary key,
75     package text);
76
77 create index newest_binaries_by_package on newest_binaries (package);
78
79 insert into newest_binaries (id, package)
80     select distinct on (package) id, package from binaries
81         where type = :type and
82             (architecture = :arch_all or architecture = :arch) and
83             id in (select bin from bin_associations where suite = :suite)
84         order by package, version desc;
85
86 with
87
88 unique_override as
89     (select o.package, s.section
90         from override o, section s
91         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
92         o.component = :component)
93
94 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
95     from newest_binaries b, bin_contents bc, unique_override o
96     where b.id = bc.binary_id and o.package = b.package
97     group by bc.file'''
98
99         else:
100             sql = '''
101 create temp table newest_binaries (
102     id integer primary key,
103     package text);
104
105 create index newest_binaries_by_package on newest_binaries (package);
106
107 insert into newest_binaries (id, package)
108     select distinct on (package) id, package from binaries
109         where type = :type and
110             (architecture = :arch_all or architecture = :arch) and
111             id in (select bin from bin_associations where suite = :suite)
112         order by package, version desc;
113
114 with
115
116 unique_override as
117     (select distinct on (o.package, s.section) o.package, s.section
118         from override o, section s
119         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
120         order by o.package, s.section, o.modified desc)
121
122 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
123     from newest_binaries b, bin_contents bc, unique_override o
124     where b.id = bc.binary_id and o.package = b.package
125     group by bc.file'''
126
127         return self.session.query("file", "pkglist").from_statement(sql). \
128             params(params)
129
130     def formatline(self, filename, package_list):
131         '''
132         Returns a formatted string for the filename argument.
133         '''
134         return "%-55s %s\n" % (filename, package_list)
135
136     def fetch(self):
137         '''
138         Yields a new line of the Contents-$arch.gz file in filename order.
139         '''
140         for filename, package_list in self.query().yield_per(100):
141             yield self.formatline(filename, package_list)
142         # end transaction to return connection to pool
143         self.session.rollback()
144
145     def get_list(self):
146         '''
147         Returns a list of lines for the Contents-$arch.gz file.
148         '''
149         return [item for item in self.fetch()]
150
151     def output_filename(self):
152         '''
153         Returns the name of the output file.
154         '''
155         values = {
156             'root': Config()['Dir::Root'],
157             'suite': self.suite.suite_name,
158             'architecture': self.architecture.arch_string
159         }
160         if self.component is None:
161             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
162         values['component'] = self.component.component_name
163         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
164
165     def get_header(self):
166         '''
167         Returns the header for the Contents files as a string.
168         '''
169         header_file = None
170         try:
171             filename = os.path.join(Config()['Dir::Templates'], 'contents')
172             header_file = open(filename)
173             return header_file.read()
174         finally:
175             if header_file:
176                 header_file.close()
177
178     def write_file(self):
179         '''
180         Write the output file.
181         '''
182         command = ['gzip', '--rsyncable']
183         final_filename = self.output_filename()
184         temp_filename = final_filename + '.new'
185         output_file = open(temp_filename, 'w')
186         gzip = Popen(command, stdin = PIPE, stdout = output_file)
187         gzip.stdin.write(self.get_header())
188         for item in self.fetch():
189             gzip.stdin.write(item)
190         gzip.stdin.close()
191         output_file.close()
192         gzip.wait()
193         os.chmod(temp_filename, 0664)
194         os.rename(temp_filename, final_filename)
195
196     @classmethod
197     def log_result(class_, result):
198         '''
199         Writes a result message to the logfile.
200         '''
201         class_.logger.log(result)
202
203     @classmethod
204     def write_all(class_, logger, suite_names = [], force = False):
205         '''
206         Writes all Contents files for suites in list suite_names which defaults
207         to all 'touchable' suites if not specified explicitely. Untouchable
208         suites will be included if the force argument is set to True.
209         '''
210         class_.logger = logger
211         session = DBConn().session()
212         suite_query = session.query(Suite)
213         if len(suite_names) > 0:
214             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
215         if not force:
216             suite_query = suite_query.filter_by(untouchable = False)
217         deb_id = get_override_type('deb', session).overridetype_id
218         udeb_id = get_override_type('udeb', session).overridetype_id
219         main_id = get_component('main', session).component_id
220         non_free_id = get_component('non-free', session).component_id
221         pool = Pool()
222         for suite in suite_query:
223             suite_id = suite.suite_id
224             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
225                 arch_id = architecture.arch_id
226                 # handle 'deb' packages
227                 pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
228                     callback = class_.log_result)
229                 # handle 'udeb' packages for 'main' and 'non-free'
230                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
231                     callback = class_.log_result)
232                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
233                     callback = class_.log_result)
234         pool.close()
235         pool.join()
236         session.close()
237
238 def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
239     '''
240     This function is called in a new subprocess.
241     '''
242     session = DBConn().session()
243     suite = Suite.get(suite_id, session)
244     architecture = Architecture.get(arch_id, session)
245     overridetype = OverrideType.get(overridetype_id, session)
246     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
247     if component_id is None:
248         component = None
249     else:
250         component = Component.get(component_id, session)
251         log_message.append(component.component_name)
252     contents_writer = ContentsWriter(suite, architecture, overridetype, component)
253     contents_writer.write_file()
254     return log_message
255
256
257 class BinaryContentsScanner(object):
258     '''
259     BinaryContentsScanner provides a threadsafe method scan() to scan the
260     contents of a DBBinary object.
261     '''
262     def __init__(self, binary_id):
263         '''
264         The argument binary_id is the id of the DBBinary object that
265         should be scanned.
266         '''
267         self.binary_id = binary_id
268
269     def scan(self, dummy_arg = None):
270         '''
271         This method does the actual scan and fills in the associated BinContents
272         property. It commits any changes to the database. The argument dummy_arg
273         is ignored but needed by our threadpool implementation.
274         '''
275         session = DBConn().session()
276         binary = session.query(DBBinary).get(self.binary_id)
277         fileset = set(binary.scan_contents())
278         if len(fileset) == 0:
279             fileset.add('EMPTY_PACKAGE')
280         for filename in fileset:
281             binary.contents.append(BinContents(file = filename))
282         session.commit()
283         session.close()
284
285     @classmethod
286     def scan_all(class_, limit = None):
287         '''
288         The class method scan_all() scans all binaries using multiple threads.
289         The number of binaries to be scanned can be limited with the limit
290         argument. Returns the number of processed and remaining packages as a
291         dict.
292         '''
293         session = DBConn().session()
294         query = session.query(DBBinary).filter(DBBinary.contents == None)
295         remaining = query.count
296         if limit is not None:
297             query = query.limit(limit)
298         processed = query.count()
299         pool = Pool()
300         for binary in query.yield_per(100):
301             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
302         pool.close()
303         pool.join()
304         remaining = remaining()
305         session.close()
306         return { 'processed': processed, 'remaining': remaining }
307
308 def binary_scan_helper(binary_id):
309     '''
310     This function runs in a subprocess.
311     '''
312     scanner = BinaryContentsScanner(binary_id)
313     scanner.scan()
314
315
316 class UnpackedSource(object):
317     '''
318     UnpackedSource extracts a source package into a temporary location and
319     gives you some convinient function for accessing it.
320     '''
321     def __init__(self, dscfilename):
322         '''
323         The dscfilename is a name of a DSC file that will be extracted.
324         '''
325         self.root_directory = os.path.join(mkdtemp(), 'root')
326         command = ('dpkg-source', '--no-copy', '--no-check', '-x', dscfilename,
327             self.root_directory)
328         # dpkg-source does not have a --quiet option
329         devnull = open(os.devnull, 'w')
330         check_call(command, stdout = devnull, stderr = devnull)
331         devnull.close()
332
333     def get_root_directory(self):
334         '''
335         Returns the name of the package's root directory which is the directory
336         where the debian subdirectory is located.
337         '''
338         return self.root_directory
339
340     def get_changelog_file(self):
341         '''
342         Returns a file object for debian/changelog or None if no such file exists.
343         '''
344         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
345         try:
346             return open(changelog_name)
347         except IOError:
348             return None
349
350     def get_all_filenames(self):
351         '''
352         Returns an iterator over all filenames. The filenames will be relative
353         to the root directory.
354         '''
355         skip = len(self.root_directory) + 1
356         for root, _, files in os.walk(self.root_directory):
357             for name in files:
358                 yield os.path.join(root[skip:], name)
359
360     def cleanup(self):
361         '''
362         Removes all temporary files.
363         '''
364         if self.root_directory is None:
365             return
366         parent_directory = os.path.dirname(self.root_directory)
367         rmtree(parent_directory)
368         self.root_directory = None
369
370     def __del__(self):
371         '''
372         Enforce cleanup.
373         '''
374         self.cleanup()
375
376
377 class SourceContentsScanner(object):
378     '''
379     SourceContentsScanner provides a method scan() to scan the contents of a
380     DBSource object.
381     '''
382     def __init__(self, source_id):
383         '''
384         The argument source_id is the id of the DBSource object that
385         should be scanned.
386         '''
387         self.source_id = source_id
388
389     def scan(self):
390         '''
391         This method does the actual scan and fills in the associated SrcContents
392         property. It commits any changes to the database.
393         '''
394         session = DBConn().session()
395         source = session.query(DBSource).get(self.source_id)
396         fileset = set(source.scan_contents())
397         for filename in fileset:
398             source.contents.append(SrcContents(file = filename))
399         session.commit()
400         session.close()
401
402     @classmethod
403     def scan_all(class_, limit = None):
404         '''
405         The class method scan_all() scans all source using multiple processes.
406         The number of sources to be scanned can be limited with the limit
407         argument. Returns the number of processed and remaining packages as a
408         dict.
409         '''
410         session = DBConn().session()
411         query = session.query(DBSource).filter(DBSource.contents == None)
412         remaining = query.count
413         if limit is not None:
414             query = query.limit(limit)
415         processed = query.count()
416         pool = Pool()
417         for source in query.yield_per(100):
418             pool.apply_async(source_scan_helper, (source.source_id, ))
419         pool.close()
420         pool.join()
421         remaining = remaining()
422         session.close()
423         return { 'processed': processed, 'remaining': remaining }
424
425 def source_scan_helper(source_id):
426     '''
427     This function runs in a subprocess.
428     '''
429     try:
430         scanner = SourceContentsScanner(source_id)
431         scanner.scan()
432     except Exception, e:
433         print e
434