]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Merge remote-tracking branch 'ansgar/pu/wheezy' into merge
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
31
32 from multiprocessing import Pool
33 from shutil import rmtree
34 from subprocess import Popen, PIPE, check_call
35 from tempfile import mkdtemp
36
37 import os.path
38 import signal
39
40 class BinaryContentsWriter(object):
41     '''
42     BinaryContentsWriter writes the Contents-$arch.gz files.
43     '''
44     def __init__(self, suite, architecture, overridetype, component):
45         self.suite = suite
46         self.architecture = architecture
47         self.overridetype = overridetype
48         self.component = component
49         self.session = suite.session()
50
51     def query(self):
52         '''
53         Returns a query object that is doing most of the work.
54         '''
55         overridesuite = self.suite
56         if self.suite.overridesuite is not None:
57             overridesuite = get_suite(self.suite.overridesuite, self.session)
58         params = {
59             'suite':         self.suite.suite_id,
60             'overridesuite': overridesuite.suite_id,
61             'component':     self.component.component_id,
62             'arch_all':      get_architecture('all', self.session).arch_id,
63             'arch':          self.architecture.arch_id,
64             'type_id':       self.overridetype.overridetype_id,
65             'type':          self.overridetype.overridetype,
66         }
67
68         sql_create_temp = '''
69 create temp table newest_binaries (
70     id integer primary key,
71     package text);
72
73 create index newest_binaries_by_package on newest_binaries (package);
74
75 insert into newest_binaries (id, package)
76     select distinct on (package) id, package from binaries
77         where type = :type and
78             (architecture = :arch_all or architecture = :arch) and
79             id in (select bin from bin_associations where suite = :suite)
80         order by package, version desc;'''
81         self.session.execute(sql_create_temp, params=params)
82
83         sql = '''
84 with
85
86 unique_override as
87     (select o.package, s.section
88         from override o, section s
89         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
90         o.component = :component)
91
92 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
93     from newest_binaries b, bin_contents bc, unique_override o
94     where b.id = bc.binary_id and o.package = b.package
95     group by bc.file'''
96
97         return self.session.query("file", "pkglist").from_statement(sql). \
98             params(params)
99
100     def formatline(self, filename, package_list):
101         '''
102         Returns a formatted string for the filename argument.
103         '''
104         return "%-55s %s\n" % (filename, package_list)
105
106     def fetch(self):
107         '''
108         Yields a new line of the Contents-$arch.gz file in filename order.
109         '''
110         for filename, package_list in self.query().yield_per(100):
111             yield self.formatline(filename, package_list)
112         # end transaction to return connection to pool
113         self.session.rollback()
114
115     def get_list(self):
116         '''
117         Returns a list of lines for the Contents-$arch.gz file.
118         '''
119         return [item for item in self.fetch()]
120
121     def writer(self):
122         '''
123         Returns a writer object.
124         '''
125         values = {
126             'suite':        self.suite.suite_name,
127             'component':    self.component.component_name,
128             'debtype':      self.overridetype.overridetype,
129             'architecture': self.architecture.arch_string,
130         }
131         return BinaryContentsFileWriter(**values)
132
133     def get_header(self):
134         '''
135         Returns the header for the Contents files as a string.
136         '''
137         header_file = None
138         try:
139             filename = os.path.join(Config()['Dir::Templates'], 'contents')
140             header_file = open(filename)
141             return header_file.read()
142         finally:
143             if header_file:
144                 header_file.close()
145
146     def write_file(self):
147         '''
148         Write the output file.
149         '''
150         writer = self.writer()
151         file = writer.open()
152         file.write(self.get_header())
153         for item in self.fetch():
154             file.write(item)
155         writer.close()
156
157
158 class SourceContentsWriter(object):
159     '''
160     SourceContentsWriter writes the Contents-source.gz files.
161     '''
162     def __init__(self, suite, component):
163         self.suite = suite
164         self.component = component
165         self.session = suite.session()
166
167     def query(self):
168         '''
169         Returns a query object that is doing most of the work.
170         '''
171         params = {
172             'suite_id':     self.suite.suite_id,
173             'component_id': self.component.component_id,
174         }
175
176         sql_create_temp = '''
177 create temp table newest_sources (
178     id integer primary key,
179     source text);
180
181 create index sources_binaries_by_source on newest_sources (source);
182
183 insert into newest_sources (id, source)
184     select distinct on (source) s.id, s.source from source s
185         join files f on f.id = s.file
186         join location l on l.id = f.location
187         where s.id in (select source from src_associations where suite = :suite_id)
188             and l.component = :component_id
189         order by source, version desc;'''
190         self.session.execute(sql_create_temp, params=params)
191
192         sql = '''
193 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
194     from newest_sources s, src_contents sc
195     where s.id = sc.source_id group by sc.file'''
196
197         return self.session.query("file", "pkglist").from_statement(sql). \
198             params(params)
199
200     def formatline(self, filename, package_list):
201         '''
202         Returns a formatted string for the filename argument.
203         '''
204         return "%s\t%s\n" % (filename, package_list)
205
206     def fetch(self):
207         '''
208         Yields a new line of the Contents-source.gz file in filename order.
209         '''
210         for filename, package_list in self.query().yield_per(100):
211             yield self.formatline(filename, package_list)
212         # end transaction to return connection to pool
213         self.session.rollback()
214
215     def get_list(self):
216         '''
217         Returns a list of lines for the Contents-source.gz file.
218         '''
219         return [item for item in self.fetch()]
220
221     def writer(self):
222         '''
223         Returns a writer object.
224         '''
225         values = {
226             'suite':     self.suite.suite_name,
227             'component': self.component.component_name
228         }
229         return SourceContentsFileWriter(**values)
230
231     def write_file(self):
232         '''
233         Write the output file.
234         '''
235         writer = self.writer()
236         file = writer.open()
237         for item in self.fetch():
238             file.write(item)
239         writer.close()
240
241
242 def binary_helper(suite_id, arch_id, overridetype_id, component_id):
243     '''
244     This function is called in a new subprocess and multiprocessing wants a top
245     level function.
246     '''
247     session = DBConn().session(work_mem = 1000)
248     suite = Suite.get(suite_id, session)
249     architecture = Architecture.get(arch_id, session)
250     overridetype = OverrideType.get(overridetype_id, session)
251     component = Component.get(component_id, session)
252     log_message = [suite.suite_name, architecture.arch_string, \
253         overridetype.overridetype, component.component_name]
254     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
255     contents_writer.write_file()
256     return log_message
257
258 def source_helper(suite_id, component_id):
259     '''
260     This function is called in a new subprocess and multiprocessing wants a top
261     level function.
262     '''
263     session = DBConn().session(work_mem = 1000)
264     suite = Suite.get(suite_id, session)
265     component = Component.get(component_id, session)
266     log_message = [suite.suite_name, 'source', component.component_name]
267     contents_writer = SourceContentsWriter(suite, component)
268     contents_writer.write_file()
269     return log_message
270
271 class ContentsWriter(object):
272     '''
273     Loop over all suites, architectures, overridetypes, and components to write
274     all contents files.
275     '''
276     @classmethod
277     def log_result(class_, result):
278         '''
279         Writes a result message to the logfile.
280         '''
281         class_.logger.log(result)
282
283     @classmethod
284     def write_all(class_, logger, suite_names = [], component_names = [], force = False):
285         '''
286         Writes all Contents files for suites in list suite_names which defaults
287         to all 'touchable' suites if not specified explicitely. Untouchable
288         suites will be included if the force argument is set to True.
289         '''
290         class_.logger = logger
291         session = DBConn().session()
292         suite_query = session.query(Suite)
293         if len(suite_names) > 0:
294             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
295         component_query = session.query(Component)
296         if len(component_names) > 0:
297             component_query = component_query.filter(Component.component_name.in_(component_names))
298         if not force:
299             suite_query = suite_query.filter_by(untouchable = False)
300         deb_id = get_override_type('deb', session).overridetype_id
301         udeb_id = get_override_type('udeb', session).overridetype_id
302         pool = Pool()
303         for suite in suite_query:
304             suite_id = suite.suite_id
305             for component in component_query:
306                 component_id = component.component_id
307                 # handle source packages
308                 pool.apply_async(source_helper, (suite_id, component_id),
309                     callback = class_.log_result)
310                 for architecture in suite.get_architectures(skipsrc = True, skipall = True):
311                     arch_id = architecture.arch_id
312                     # handle 'deb' packages
313                     pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
314                         callback = class_.log_result)
315                     # handle 'udeb' packages
316                     pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
317                         callback = class_.log_result)
318         pool.close()
319         pool.join()
320         session.close()
321
322
323 class BinaryContentsScanner(object):
324     '''
325     BinaryContentsScanner provides a threadsafe method scan() to scan the
326     contents of a DBBinary object.
327     '''
328     def __init__(self, binary_id):
329         '''
330         The argument binary_id is the id of the DBBinary object that
331         should be scanned.
332         '''
333         self.binary_id = binary_id
334
335     def scan(self, dummy_arg = None):
336         '''
337         This method does the actual scan and fills in the associated BinContents
338         property. It commits any changes to the database. The argument dummy_arg
339         is ignored but needed by our threadpool implementation.
340         '''
341         session = DBConn().session()
342         binary = session.query(DBBinary).get(self.binary_id)
343         fileset = set(binary.scan_contents())
344         if len(fileset) == 0:
345             fileset.add('EMPTY_PACKAGE')
346         for filename in fileset:
347             binary.contents.append(BinContents(file = filename))
348         session.commit()
349         session.close()
350
351     @classmethod
352     def scan_all(class_, limit = None):
353         '''
354         The class method scan_all() scans all binaries using multiple threads.
355         The number of binaries to be scanned can be limited with the limit
356         argument. Returns the number of processed and remaining packages as a
357         dict.
358         '''
359         session = DBConn().session()
360         query = session.query(DBBinary).filter(DBBinary.contents == None)
361         remaining = query.count
362         if limit is not None:
363             query = query.limit(limit)
364         processed = query.count()
365         pool = Pool()
366         for binary in query.yield_per(100):
367             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
368         pool.close()
369         pool.join()
370         remaining = remaining()
371         session.close()
372         return { 'processed': processed, 'remaining': remaining }
373
374 def binary_scan_helper(binary_id):
375     '''
376     This function runs in a subprocess.
377     '''
378     scanner = BinaryContentsScanner(binary_id)
379     scanner.scan()
380
381
382 def subprocess_setup():
383     # Python installs a SIGPIPE handler by default. This is usually not what
384     # non-Python subprocesses expect.
385     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
386
387 class UnpackedSource(object):
388     '''
389     UnpackedSource extracts a source package into a temporary location and
390     gives you some convinient function for accessing it.
391     '''
392     def __init__(self, dscfilename):
393         '''
394         The dscfilename is a name of a DSC file that will be extracted.
395         '''
396         temp_directory = mkdtemp(dir = Config()['Dir::TempPath'])
397         self.root_directory = os.path.join(temp_directory, 'root')
398         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
399             dscfilename, self.root_directory)
400         check_call(command, preexec_fn = subprocess_setup)
401
402     def get_root_directory(self):
403         '''
404         Returns the name of the package's root directory which is the directory
405         where the debian subdirectory is located.
406         '''
407         return self.root_directory
408
409     def get_changelog_file(self):
410         '''
411         Returns a file object for debian/changelog or None if no such file exists.
412         '''
413         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
414         try:
415             return open(changelog_name)
416         except IOError:
417             return None
418
419     def get_all_filenames(self):
420         '''
421         Returns an iterator over all filenames. The filenames will be relative
422         to the root directory.
423         '''
424         skip = len(self.root_directory) + 1
425         for root, _, files in os.walk(self.root_directory):
426             for name in files:
427                 yield os.path.join(root[skip:], name)
428
429     def cleanup(self):
430         '''
431         Removes all temporary files.
432         '''
433         if self.root_directory is None:
434             return
435         parent_directory = os.path.dirname(self.root_directory)
436         rmtree(parent_directory)
437         self.root_directory = None
438
439     def __del__(self):
440         '''
441         Enforce cleanup.
442         '''
443         self.cleanup()
444
445
446 class SourceContentsScanner(object):
447     '''
448     SourceContentsScanner provides a method scan() to scan the contents of a
449     DBSource object.
450     '''
451     def __init__(self, source_id):
452         '''
453         The argument source_id is the id of the DBSource object that
454         should be scanned.
455         '''
456         self.source_id = source_id
457
458     def scan(self):
459         '''
460         This method does the actual scan and fills in the associated SrcContents
461         property. It commits any changes to the database.
462         '''
463         session = DBConn().session()
464         source = session.query(DBSource).get(self.source_id)
465         fileset = set(source.scan_contents())
466         for filename in fileset:
467             source.contents.append(SrcContents(file = filename))
468         session.commit()
469         session.close()
470
471     @classmethod
472     def scan_all(class_, limit = None):
473         '''
474         The class method scan_all() scans all source using multiple processes.
475         The number of sources to be scanned can be limited with the limit
476         argument. Returns the number of processed and remaining packages as a
477         dict.
478         '''
479         session = DBConn().session()
480         query = session.query(DBSource).filter(DBSource.contents == None)
481         remaining = query.count
482         if limit is not None:
483             query = query.limit(limit)
484         processed = query.count()
485         pool = Pool()
486         for source in query.yield_per(100):
487             pool.apply_async(source_scan_helper, (source.source_id, ))
488         pool.close()
489         pool.join()
490         remaining = remaining()
491         session.close()
492         return { 'processed': processed, 'remaining': remaining }
493
494 def source_scan_helper(source_id):
495     '''
496     This function runs in a subprocess.
497     '''
498     try:
499         scanner = SourceContentsScanner(source_id)
500         scanner.scan()
501     except Exception as e:
502         print e
503