]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
daklib/contents.py: update for multi-archive changes
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
31
32 from multiprocessing import Pool
33 from shutil import rmtree
34 from subprocess import Popen, PIPE, check_call
35 from tempfile import mkdtemp
36
37 import os.path
38 import signal
39
40 class BinaryContentsWriter(object):
41     '''
42     BinaryContentsWriter writes the Contents-$arch.gz files.
43     '''
44     def __init__(self, suite, architecture, overridetype, component):
45         self.suite = suite
46         self.architecture = architecture
47         self.overridetype = overridetype
48         self.component = component
49         self.session = suite.session()
50
51     def query(self):
52         '''
53         Returns a query object that is doing most of the work.
54         '''
55         overridesuite = self.suite
56         if self.suite.overridesuite is not None:
57             overridesuite = get_suite(self.suite.overridesuite, self.session)
58         params = {
59             'suite':         self.suite.suite_id,
60             'overridesuite': overridesuite.suite_id,
61             'component':     self.component.component_id,
62             'arch_all':      get_architecture('all', self.session).arch_id,
63             'arch':          self.architecture.arch_id,
64             'type_id':       self.overridetype.overridetype_id,
65             'type':          self.overridetype.overridetype,
66         }
67
68         sql_create_temp = '''
69 create temp table newest_binaries (
70     id integer primary key,
71     package text);
72
73 create index newest_binaries_by_package on newest_binaries (package);
74
75 insert into newest_binaries (id, package)
76     select distinct on (package) id, package from binaries
77         where type = :type and
78             (architecture = :arch_all or architecture = :arch) and
79             id in (select bin from bin_associations where suite = :suite)
80         order by package, version desc;'''
81         self.session.execute(sql_create_temp, params=params)
82
83         sql = '''
84 with
85
86 unique_override as
87     (select o.package, s.section
88         from override o, section s
89         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
90         o.component = :component)
91
92 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
93     from newest_binaries b, bin_contents bc, unique_override o
94     where b.id = bc.binary_id and o.package = b.package
95     group by bc.file'''
96
97         return self.session.query("file", "pkglist").from_statement(sql). \
98             params(params)
99
100     def formatline(self, filename, package_list):
101         '''
102         Returns a formatted string for the filename argument.
103         '''
104         return "%-55s %s\n" % (filename, package_list)
105
106     def fetch(self):
107         '''
108         Yields a new line of the Contents-$arch.gz file in filename order.
109         '''
110         for filename, package_list in self.query().yield_per(100):
111             yield self.formatline(filename, package_list)
112         # end transaction to return connection to pool
113         self.session.rollback()
114
115     def get_list(self):
116         '''
117         Returns a list of lines for the Contents-$arch.gz file.
118         '''
119         return [item for item in self.fetch()]
120
121     def writer(self):
122         '''
123         Returns a writer object.
124         '''
125         values = {
126             'archive':      self.suite.archive.path,
127             'suite':        self.suite.suite_name,
128             'component':    self.component.component_name,
129             'debtype':      self.overridetype.overridetype,
130             'architecture': self.architecture.arch_string,
131         }
132         return BinaryContentsFileWriter(**values)
133
134     def get_header(self):
135         '''
136         Returns the header for the Contents files as a string.
137         '''
138         header_file = None
139         try:
140             filename = os.path.join(Config()['Dir::Templates'], 'contents')
141             header_file = open(filename)
142             return header_file.read()
143         finally:
144             if header_file:
145                 header_file.close()
146
147     def write_file(self):
148         '''
149         Write the output file.
150         '''
151         writer = self.writer()
152         file = writer.open()
153         file.write(self.get_header())
154         for item in self.fetch():
155             file.write(item)
156         writer.close()
157
158
159 class SourceContentsWriter(object):
160     '''
161     SourceContentsWriter writes the Contents-source.gz files.
162     '''
163     def __init__(self, suite, component):
164         self.suite = suite
165         self.component = component
166         self.session = suite.session()
167
168     def query(self):
169         '''
170         Returns a query object that is doing most of the work.
171         '''
172         params = {
173             'suite_id':     self.suite.suite_id,
174             'component_id': self.component.component_id,
175         }
176
177         sql_create_temp = '''
178 create temp table newest_sources (
179     id integer primary key,
180     source text);
181
182 create index sources_binaries_by_source on newest_sources (source);
183
184 insert into newest_sources (id, source)
185     select distinct on (source) s.id, s.source from source s
186         join files_archive_map af on s.file = af.file_id
187         where s.id in (select source from src_associations where suite = :suite_id)
188             and af.component_id = :component_id
189         order by source, version desc;'''
190         self.session.execute(sql_create_temp, params=params)
191
192         sql = '''
193 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
194     from newest_sources s, src_contents sc
195     where s.id = sc.source_id group by sc.file'''
196
197         return self.session.query("file", "pkglist").from_statement(sql). \
198             params(params)
199
200     def formatline(self, filename, package_list):
201         '''
202         Returns a formatted string for the filename argument.
203         '''
204         return "%s\t%s\n" % (filename, package_list)
205
206     def fetch(self):
207         '''
208         Yields a new line of the Contents-source.gz file in filename order.
209         '''
210         for filename, package_list in self.query().yield_per(100):
211             yield self.formatline(filename, package_list)
212         # end transaction to return connection to pool
213         self.session.rollback()
214
215     def get_list(self):
216         '''
217         Returns a list of lines for the Contents-source.gz file.
218         '''
219         return [item for item in self.fetch()]
220
221     def writer(self):
222         '''
223         Returns a writer object.
224         '''
225         values = {
226             'archive':   self.suite.archive.path,
227             'suite':     self.suite.suite_name,
228             'component': self.component.component_name
229         }
230         return SourceContentsFileWriter(**values)
231
232     def write_file(self):
233         '''
234         Write the output file.
235         '''
236         writer = self.writer()
237         file = writer.open()
238         for item in self.fetch():
239             file.write(item)
240         writer.close()
241
242
243 def binary_helper(suite_id, arch_id, overridetype_id, component_id):
244     '''
245     This function is called in a new subprocess and multiprocessing wants a top
246     level function.
247     '''
248     session = DBConn().session(work_mem = 1000)
249     suite = Suite.get(suite_id, session)
250     architecture = Architecture.get(arch_id, session)
251     overridetype = OverrideType.get(overridetype_id, session)
252     component = Component.get(component_id, session)
253     log_message = [suite.suite_name, architecture.arch_string, \
254         overridetype.overridetype, component.component_name]
255     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
256     contents_writer.write_file()
257     session.close()
258     return log_message
259
260 def source_helper(suite_id, component_id):
261     '''
262     This function is called in a new subprocess and multiprocessing wants a top
263     level function.
264     '''
265     session = DBConn().session(work_mem = 1000)
266     suite = Suite.get(suite_id, session)
267     component = Component.get(component_id, session)
268     log_message = [suite.suite_name, 'source', component.component_name]
269     contents_writer = SourceContentsWriter(suite, component)
270     contents_writer.write_file()
271     session.close()
272     return log_message
273
274 class ContentsWriter(object):
275     '''
276     Loop over all suites, architectures, overridetypes, and components to write
277     all contents files.
278     '''
279     @classmethod
280     def log_result(class_, result):
281         '''
282         Writes a result message to the logfile.
283         '''
284         class_.logger.log(result)
285
286     @classmethod
287     def write_all(class_, logger, suite_names = [], component_names = [], force = False):
288         '''
289         Writes all Contents files for suites in list suite_names which defaults
290         to all 'touchable' suites if not specified explicitely. Untouchable
291         suites will be included if the force argument is set to True.
292         '''
293         class_.logger = logger
294         session = DBConn().session()
295         suite_query = session.query(Suite)
296         if len(suite_names) > 0:
297             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
298         component_query = session.query(Component)
299         if len(component_names) > 0:
300             component_query = component_query.filter(Component.component_name.in_(component_names))
301         if not force:
302             suite_query = suite_query.filter_by(untouchable = False)
303         deb_id = get_override_type('deb', session).overridetype_id
304         udeb_id = get_override_type('udeb', session).overridetype_id
305         pool = Pool()
306         for suite in suite_query:
307             suite_id = suite.suite_id
308             for component in component_query:
309                 component_id = component.component_id
310                 # handle source packages
311                 pool.apply_async(source_helper, (suite_id, component_id),
312                     callback = class_.log_result)
313                 for architecture in suite.get_architectures(skipsrc = True, skipall = True):
314                     arch_id = architecture.arch_id
315                     # handle 'deb' packages
316                     pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
317                         callback = class_.log_result)
318                     # handle 'udeb' packages
319                     pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
320                         callback = class_.log_result)
321         pool.close()
322         pool.join()
323         session.close()
324
325
326 class BinaryContentsScanner(object):
327     '''
328     BinaryContentsScanner provides a threadsafe method scan() to scan the
329     contents of a DBBinary object.
330     '''
331     def __init__(self, binary_id):
332         '''
333         The argument binary_id is the id of the DBBinary object that
334         should be scanned.
335         '''
336         self.binary_id = binary_id
337
338     def scan(self, dummy_arg = None):
339         '''
340         This method does the actual scan and fills in the associated BinContents
341         property. It commits any changes to the database. The argument dummy_arg
342         is ignored but needed by our threadpool implementation.
343         '''
344         session = DBConn().session()
345         binary = session.query(DBBinary).get(self.binary_id)
346         fileset = set(binary.scan_contents())
347         if len(fileset) == 0:
348             fileset.add('EMPTY_PACKAGE')
349         for filename in fileset:
350             binary.contents.append(BinContents(file = filename))
351         session.commit()
352         session.close()
353
354     @classmethod
355     def scan_all(class_, limit = None):
356         '''
357         The class method scan_all() scans all binaries using multiple threads.
358         The number of binaries to be scanned can be limited with the limit
359         argument. Returns the number of processed and remaining packages as a
360         dict.
361         '''
362         session = DBConn().session()
363         query = session.query(DBBinary).filter(DBBinary.contents == None)
364         remaining = query.count
365         if limit is not None:
366             query = query.limit(limit)
367         processed = query.count()
368         pool = Pool()
369         for binary in query.yield_per(100):
370             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
371         pool.close()
372         pool.join()
373         remaining = remaining()
374         session.close()
375         return { 'processed': processed, 'remaining': remaining }
376
377 def binary_scan_helper(binary_id):
378     '''
379     This function runs in a subprocess.
380     '''
381     scanner = BinaryContentsScanner(binary_id)
382     scanner.scan()
383
384
385 def subprocess_setup():
386     # Python installs a SIGPIPE handler by default. This is usually not what
387     # non-Python subprocesses expect.
388     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
389
390 class UnpackedSource(object):
391     '''
392     UnpackedSource extracts a source package into a temporary location and
393     gives you some convinient function for accessing it.
394     '''
395     def __init__(self, dscfilename):
396         '''
397         The dscfilename is a name of a DSC file that will be extracted.
398         '''
399         temp_directory = mkdtemp(dir = Config()['Dir::TempPath'])
400         self.root_directory = os.path.join(temp_directory, 'root')
401         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
402             dscfilename, self.root_directory)
403         check_call(command, preexec_fn = subprocess_setup)
404
405     def get_root_directory(self):
406         '''
407         Returns the name of the package's root directory which is the directory
408         where the debian subdirectory is located.
409         '''
410         return self.root_directory
411
412     def get_changelog_file(self):
413         '''
414         Returns a file object for debian/changelog or None if no such file exists.
415         '''
416         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
417         try:
418             return open(changelog_name)
419         except IOError:
420             return None
421
422     def get_all_filenames(self):
423         '''
424         Returns an iterator over all filenames. The filenames will be relative
425         to the root directory.
426         '''
427         skip = len(self.root_directory) + 1
428         for root, _, files in os.walk(self.root_directory):
429             for name in files:
430                 yield os.path.join(root[skip:], name)
431
432     def cleanup(self):
433         '''
434         Removes all temporary files.
435         '''
436         if self.root_directory is None:
437             return
438         parent_directory = os.path.dirname(self.root_directory)
439         rmtree(parent_directory)
440         self.root_directory = None
441
442     def __del__(self):
443         '''
444         Enforce cleanup.
445         '''
446         self.cleanup()
447
448
449 class SourceContentsScanner(object):
450     '''
451     SourceContentsScanner provides a method scan() to scan the contents of a
452     DBSource object.
453     '''
454     def __init__(self, source_id):
455         '''
456         The argument source_id is the id of the DBSource object that
457         should be scanned.
458         '''
459         self.source_id = source_id
460
461     def scan(self):
462         '''
463         This method does the actual scan and fills in the associated SrcContents
464         property. It commits any changes to the database.
465         '''
466         session = DBConn().session()
467         source = session.query(DBSource).get(self.source_id)
468         fileset = set(source.scan_contents())
469         for filename in fileset:
470             source.contents.append(SrcContents(file = filename))
471         session.commit()
472         session.close()
473
474     @classmethod
475     def scan_all(class_, limit = None):
476         '''
477         The class method scan_all() scans all source using multiple processes.
478         The number of sources to be scanned can be limited with the limit
479         argument. Returns the number of processed and remaining packages as a
480         dict.
481         '''
482         session = DBConn().session()
483         query = session.query(DBSource).filter(DBSource.contents == None)
484         remaining = query.count
485         if limit is not None:
486             query = query.limit(limit)
487         processed = query.count()
488         pool = Pool()
489         for source in query.yield_per(100):
490             pool.apply_async(source_scan_helper, (source.source_id, ))
491         pool.close()
492         pool.join()
493         remaining = remaining()
494         session.close()
495         return { 'processed': processed, 'remaining': remaining }
496
497 def source_scan_helper(source_id):
498     '''
499     This function runs in a subprocess.
500     '''
501     try:
502         scanner = SourceContentsScanner(source_id)
503         scanner.scan()
504     except Exception as e:
505         print e
506