]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Add by-hash support
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
31
32 from multiprocessing import Pool
33 from shutil import rmtree
34 from tempfile import mkdtemp
35
36 import daklib.daksubprocess
37 import os.path
38
39 class BinaryContentsWriter(object):
40     '''
41     BinaryContentsWriter writes the Contents-$arch.gz files.
42     '''
43     def __init__(self, suite, architecture, overridetype, component):
44         self.suite = suite
45         self.architecture = architecture
46         self.overridetype = overridetype
47         self.component = component
48         self.session = suite.session()
49
50     def query(self):
51         '''
52         Returns a query object that is doing most of the work.
53         '''
54         overridesuite = self.suite
55         if self.suite.overridesuite is not None:
56             overridesuite = get_suite(self.suite.overridesuite, self.session)
57         params = {
58             'suite':         self.suite.suite_id,
59             'overridesuite': overridesuite.suite_id,
60             'component':     self.component.component_id,
61             'arch_all':      get_architecture('all', self.session).arch_id,
62             'arch':          self.architecture.arch_id,
63             'type_id':       self.overridetype.overridetype_id,
64             'type':          self.overridetype.overridetype,
65         }
66
67         sql_create_temp = '''
68 create temp table newest_binaries (
69     id integer primary key,
70     package text);
71
72 create index newest_binaries_by_package on newest_binaries (package);
73
74 insert into newest_binaries (id, package)
75     select distinct on (package) id, package from binaries
76         where type = :type and
77             (architecture = :arch_all or architecture = :arch) and
78             id in (select bin from bin_associations where suite = :suite)
79         order by package, version desc;'''
80         self.session.execute(sql_create_temp, params=params)
81
82         sql = '''
83 with
84
85 unique_override as
86     (select o.package, s.section
87         from override o, section s
88         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
89         o.component = :component)
90
91 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
92     from newest_binaries b, bin_contents bc, unique_override o
93     where b.id = bc.binary_id and o.package = b.package
94     group by bc.file'''
95
96         return self.session.query("file", "pkglist").from_statement(sql). \
97             params(params)
98
99     def formatline(self, filename, package_list):
100         '''
101         Returns a formatted string for the filename argument.
102         '''
103         return "%-55s %s\n" % (filename, package_list)
104
105     def fetch(self):
106         '''
107         Yields a new line of the Contents-$arch.gz file in filename order.
108         '''
109         for filename, package_list in self.query().yield_per(100):
110             yield self.formatline(filename, package_list)
111         # end transaction to return connection to pool
112         self.session.rollback()
113
114     def get_list(self):
115         '''
116         Returns a list of lines for the Contents-$arch.gz file.
117         '''
118         return [item for item in self.fetch()]
119
120     def writer(self):
121         '''
122         Returns a writer object.
123         '''
124         values = {
125             'archive':      self.suite.archive.path,
126             'suite':        self.suite.suite_name,
127             'component':    self.component.component_name,
128             'debtype':      self.overridetype.overridetype,
129             'architecture': self.architecture.arch_string,
130         }
131         return BinaryContentsFileWriter(**values)
132
133     def get_header(self):
134         '''
135         Returns the header for the Contents files as a string.
136         '''
137         filename = os.path.join(Config()['Dir::Templates'], 'contents')
138         with open(filename) as header_file:
139             return header_file.read()
140
141     def write_file(self):
142         '''
143         Write the output file.
144         '''
145         writer = self.writer()
146         file = writer.open()
147         file.write(self.get_header())
148         for item in self.fetch():
149             file.write(item)
150         writer.close()
151
152
153 class SourceContentsWriter(object):
154     '''
155     SourceContentsWriter writes the Contents-source.gz files.
156     '''
157     def __init__(self, suite, component):
158         self.suite = suite
159         self.component = component
160         self.session = suite.session()
161
162     def query(self):
163         '''
164         Returns a query object that is doing most of the work.
165         '''
166         params = {
167             'suite_id':     self.suite.suite_id,
168             'component_id': self.component.component_id,
169         }
170
171         sql_create_temp = '''
172 create temp table newest_sources (
173     id integer primary key,
174     source text);
175
176 create index sources_binaries_by_source on newest_sources (source);
177
178 insert into newest_sources (id, source)
179     select distinct on (source) s.id, s.source from source s
180         join files_archive_map af on s.file = af.file_id
181         where s.id in (select source from src_associations where suite = :suite_id)
182             and af.component_id = :component_id
183         order by source, version desc;'''
184         self.session.execute(sql_create_temp, params=params)
185
186         sql = '''
187 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
188     from newest_sources s, src_contents sc
189     where s.id = sc.source_id group by sc.file'''
190
191         return self.session.query("file", "pkglist").from_statement(sql). \
192             params(params)
193
194     def formatline(self, filename, package_list):
195         '''
196         Returns a formatted string for the filename argument.
197         '''
198         return "%s\t%s\n" % (filename, package_list)
199
200     def fetch(self):
201         '''
202         Yields a new line of the Contents-source.gz file in filename order.
203         '''
204         for filename, package_list in self.query().yield_per(100):
205             yield self.formatline(filename, package_list)
206         # end transaction to return connection to pool
207         self.session.rollback()
208
209     def get_list(self):
210         '''
211         Returns a list of lines for the Contents-source.gz file.
212         '''
213         return [item for item in self.fetch()]
214
215     def writer(self):
216         '''
217         Returns a writer object.
218         '''
219         values = {
220             'archive':   self.suite.archive.path,
221             'suite':     self.suite.suite_name,
222             'component': self.component.component_name
223         }
224         return SourceContentsFileWriter(**values)
225
226     def write_file(self):
227         '''
228         Write the output file.
229         '''
230         writer = self.writer()
231         file = writer.open()
232         for item in self.fetch():
233             file.write(item)
234         writer.close()
235
236
237 def binary_helper(suite_id, arch_id, overridetype_id, component_id):
238     '''
239     This function is called in a new subprocess and multiprocessing wants a top
240     level function.
241     '''
242     session = DBConn().session(work_mem = 1000)
243     suite = Suite.get(suite_id, session)
244     architecture = Architecture.get(arch_id, session)
245     overridetype = OverrideType.get(overridetype_id, session)
246     component = Component.get(component_id, session)
247     log_message = [suite.suite_name, architecture.arch_string, \
248         overridetype.overridetype, component.component_name]
249     contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
250     contents_writer.write_file()
251     session.close()
252     return log_message
253
254 def source_helper(suite_id, component_id):
255     '''
256     This function is called in a new subprocess and multiprocessing wants a top
257     level function.
258     '''
259     session = DBConn().session(work_mem = 1000)
260     suite = Suite.get(suite_id, session)
261     component = Component.get(component_id, session)
262     log_message = [suite.suite_name, 'source', component.component_name]
263     contents_writer = SourceContentsWriter(suite, component)
264     contents_writer.write_file()
265     session.close()
266     return log_message
267
268 class ContentsWriter(object):
269     '''
270     Loop over all suites, architectures, overridetypes, and components to write
271     all contents files.
272     '''
273     @classmethod
274     def log_result(class_, result):
275         '''
276         Writes a result message to the logfile.
277         '''
278         class_.logger.log(result)
279
280     @classmethod
281     def write_all(class_, logger, archive_names = [], suite_names = [], component_names = [], force = False):
282         '''
283         Writes all Contents files for suites in list suite_names which defaults
284         to all 'touchable' suites if not specified explicitely. Untouchable
285         suites will be included if the force argument is set to True.
286         '''
287         class_.logger = logger
288         session = DBConn().session()
289         suite_query = session.query(Suite)
290         if len(archive_names) > 0:
291             suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
292         if len(suite_names) > 0:
293             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
294         component_query = session.query(Component)
295         if len(component_names) > 0:
296             component_query = component_query.filter(Component.component_name.in_(component_names))
297         if not force:
298             suite_query = suite_query.filter(Suite.untouchable == False)
299         deb_id = get_override_type('deb', session).overridetype_id
300         udeb_id = get_override_type('udeb', session).overridetype_id
301         pool = Pool()
302
303         # Lock tables so that nobody can change things underneath us
304         session.execute("LOCK TABLE bin_contents IN SHARE MODE")
305         session.execute("LOCK TABLE src_contents IN SHARE MODE")
306
307         for suite in suite_query:
308             suite_id = suite.suite_id
309             for component in component_query:
310                 component_id = component.component_id
311                 # handle source packages
312                 pool.apply_async(source_helper, (suite_id, component_id),
313                     callback = class_.log_result)
314                 for architecture in suite.get_architectures(skipsrc = True, skipall = True):
315                     arch_id = architecture.arch_id
316                     # handle 'deb' packages
317                     pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), \
318                         callback = class_.log_result)
319                     # handle 'udeb' packages
320                     pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), \
321                         callback = class_.log_result)
322         pool.close()
323         pool.join()
324         session.close()
325
326
327 class BinaryContentsScanner(object):
328     '''
329     BinaryContentsScanner provides a threadsafe method scan() to scan the
330     contents of a DBBinary object.
331     '''
332     def __init__(self, binary_id):
333         '''
334         The argument binary_id is the id of the DBBinary object that
335         should be scanned.
336         '''
337         self.binary_id = binary_id
338
339     def scan(self, dummy_arg = None):
340         '''
341         This method does the actual scan and fills in the associated BinContents
342         property. It commits any changes to the database. The argument dummy_arg
343         is ignored but needed by our threadpool implementation.
344         '''
345         session = DBConn().session()
346         binary = session.query(DBBinary).get(self.binary_id)
347         fileset = set(binary.scan_contents())
348         if len(fileset) == 0:
349             fileset.add('EMPTY_PACKAGE')
350         for filename in fileset:
351             binary.contents.append(BinContents(file = filename))
352         session.commit()
353         session.close()
354
355     @classmethod
356     def scan_all(class_, limit = None):
357         '''
358         The class method scan_all() scans all binaries using multiple threads.
359         The number of binaries to be scanned can be limited with the limit
360         argument. Returns the number of processed and remaining packages as a
361         dict.
362         '''
363         session = DBConn().session()
364         query = session.query(DBBinary).filter(DBBinary.contents == None)
365         remaining = query.count
366         if limit is not None:
367             query = query.limit(limit)
368         processed = query.count()
369         pool = Pool()
370         for binary in query.yield_per(100):
371             pool.apply_async(binary_scan_helper, (binary.binary_id, ))
372         pool.close()
373         pool.join()
374         remaining = remaining()
375         session.close()
376         return { 'processed': processed, 'remaining': remaining }
377
378 def binary_scan_helper(binary_id):
379     '''
380     This function runs in a subprocess.
381     '''
382     scanner = BinaryContentsScanner(binary_id)
383     scanner.scan()
384
385 class UnpackedSource(object):
386     '''
387     UnpackedSource extracts a source package into a temporary location and
388     gives you some convinient function for accessing it.
389     '''
390     def __init__(self, dscfilename, tmpbasedir=None):
391         '''
392         The dscfilename is a name of a DSC file that will be extracted.
393         '''
394         basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
395         temp_directory = mkdtemp(dir = basedir)
396         self.root_directory = os.path.join(temp_directory, 'root')
397         command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
398             dscfilename, self.root_directory)
399         daklib.daksubprocess.check_call(command)
400
401     def get_root_directory(self):
402         '''
403         Returns the name of the package's root directory which is the directory
404         where the debian subdirectory is located.
405         '''
406         return self.root_directory
407
408     def get_changelog_file(self):
409         '''
410         Returns a file object for debian/changelog or None if no such file exists.
411         '''
412         changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
413         try:
414             return open(changelog_name)
415         except IOError:
416             return None
417
418     def get_all_filenames(self):
419         '''
420         Returns an iterator over all filenames. The filenames will be relative
421         to the root directory.
422         '''
423         skip = len(self.root_directory) + 1
424         for root, _, files in os.walk(self.root_directory):
425             for name in files:
426                 yield os.path.join(root[skip:], name)
427
428     def cleanup(self):
429         '''
430         Removes all temporary files.
431         '''
432         if self.root_directory is None:
433             return
434         parent_directory = os.path.dirname(self.root_directory)
435         rmtree(parent_directory)
436         self.root_directory = None
437
438     def __del__(self):
439         '''
440         Enforce cleanup.
441         '''
442         self.cleanup()
443
444
445 class SourceContentsScanner(object):
446     '''
447     SourceContentsScanner provides a method scan() to scan the contents of a
448     DBSource object.
449     '''
450     def __init__(self, source_id):
451         '''
452         The argument source_id is the id of the DBSource object that
453         should be scanned.
454         '''
455         self.source_id = source_id
456
457     def scan(self):
458         '''
459         This method does the actual scan and fills in the associated SrcContents
460         property. It commits any changes to the database.
461         '''
462         session = DBConn().session()
463         source = session.query(DBSource).get(self.source_id)
464         fileset = set(source.scan_contents())
465         for filename in fileset:
466             source.contents.append(SrcContents(file = filename))
467         session.commit()
468         session.close()
469
470     @classmethod
471     def scan_all(class_, limit = None):
472         '''
473         The class method scan_all() scans all source using multiple processes.
474         The number of sources to be scanned can be limited with the limit
475         argument. Returns the number of processed and remaining packages as a
476         dict.
477         '''
478         session = DBConn().session()
479         query = session.query(DBSource).filter(DBSource.contents == None)
480         remaining = query.count
481         if limit is not None:
482             query = query.limit(limit)
483         processed = query.count()
484         pool = Pool()
485         for source in query.yield_per(100):
486             pool.apply_async(source_scan_helper, (source.source_id, ))
487         pool.close()
488         pool.join()
489         remaining = remaining()
490         session.close()
491         return { 'processed': processed, 'remaining': remaining }
492
493 def source_scan_helper(source_id):
494     '''
495     This function runs in a subprocess.
496     '''
497     try:
498         scanner = SourceContentsScanner(source_id)
499         scanner.scan()
500     except Exception as e:
501         print e