]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Contents: replace multithreading by multiprocessing.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from subprocess import Popen, PIPE
33
34 import os.path
35
36 class ContentsWriter(object):
37     '''
38     ContentsWriter writes the Contents-$arch.gz files.
39     '''
40     def __init__(self, suite, architecture, overridetype, component = None):
41         '''
42         The constructor clones its arguments into a new session object to make
43         sure that the new ContentsWriter object can be executed in a different
44         thread.
45         '''
46         self.suite = suite
47         self.architecture = architecture
48         self.overridetype = overridetype
49         self.component = component
50         self.session = suite.session()
51
52     def query(self):
53         '''
54         Returns a query object that is doing most of the work.
55         '''
56         overridesuite = self.suite
57         if self.suite.overridesuite is not None:
58             overridesuite = get_suite(self.suite.overridesuite, self.session)
59         params = {
60             'suite':         self.suite.suite_id,
61             'overridesuite': overridesuite.suite_id,
62             'arch_all':      get_architecture('all', self.session).arch_id,
63             'arch':          self.architecture.arch_id,
64             'type_id':       self.overridetype.overridetype_id,
65             'type':          self.overridetype.overridetype,
66         }
67
68         if self.component is not None:
69             params['component'] = self.component.component_id
70             sql = '''
71 create temp table newest_binaries (
72     id integer primary key,
73     package text);
74
75 create index newest_binaries_by_package on newest_binaries (package);
76
77 insert into newest_binaries (id, package)
78     select distinct on (package) id, package from binaries
79         where type = :type and
80             (architecture = :arch_all or architecture = :arch) and
81             id in (select bin from bin_associations where suite = :suite)
82         order by package, version desc;
83
84 with
85
86 unique_override as
87     (select o.package, s.section
88         from override o, section s
89         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
90         o.component = :component)
91
92 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
93     from newest_binaries b, bin_contents bc, unique_override o
94     where b.id = bc.binary_id and o.package = b.package
95     group by bc.file'''
96
97         else:
98             sql = '''
99 create temp table newest_binaries (
100     id integer primary key,
101     package text);
102
103 create index newest_binaries_by_package on newest_binaries (package);
104
105 insert into newest_binaries (id, package)
106     select distinct on (package) id, package from binaries
107         where type = :type and
108             (architecture = :arch_all or architecture = :arch) and
109             id in (select bin from bin_associations where suite = :suite)
110         order by package, version desc;
111
112 with
113
114 unique_override as
115     (select distinct on (o.package, s.section) o.package, s.section
116         from override o, section s
117         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
118         order by o.package, s.section, o.modified desc)
119
120 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
121     from newest_binaries b, bin_contents bc, unique_override o
122     where b.id = bc.binary_id and o.package = b.package
123     group by bc.file'''
124
125         return self.session.query("file", "pkglist").from_statement(sql). \
126             params(params)
127
128     def formatline(self, filename, package_list):
129         '''
130         Returns a formatted string for the filename argument.
131         '''
132         return "%-55s %s\n" % (filename, package_list)
133
134     def fetch(self):
135         '''
136         Yields a new line of the Contents-$arch.gz file in filename order.
137         '''
138         for filename, package_list in self.query().yield_per(100):
139             yield self.formatline(filename, package_list)
140         # end transaction to return connection to pool
141         self.session.rollback()
142
143     def get_list(self):
144         '''
145         Returns a list of lines for the Contents-$arch.gz file.
146         '''
147         return [item for item in self.fetch()]
148
149     def output_filename(self):
150         '''
151         Returns the name of the output file.
152         '''
153         values = {
154             'root': Config()['Dir::Root'],
155             'suite': self.suite.suite_name,
156             'architecture': self.architecture.arch_string
157         }
158         if self.component is None:
159             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
160         values['component'] = self.component.component_name
161         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
162
163     def get_header(self):
164         '''
165         Returns the header for the Contents files as a string.
166         '''
167         header_file = None
168         try:
169             filename = os.path.join(Config()['Dir::Templates'], 'contents')
170             header_file = open(filename)
171             return header_file.read()
172         finally:
173             if header_file:
174                 header_file.close()
175
176     def write_file(self):
177         '''
178         Write the output file.
179         '''
180         command = ['gzip', '--rsyncable']
181         final_filename = self.output_filename()
182         temp_filename = final_filename + '.new'
183         output_file = open(temp_filename, 'w')
184         gzip = Popen(command, stdin = PIPE, stdout = output_file)
185         gzip.stdin.write(self.get_header())
186         for item in self.fetch():
187             gzip.stdin.write(item)
188         gzip.stdin.close()
189         output_file.close()
190         gzip.wait()
191         try:
192             os.remove(final_filename)
193         except:
194             pass
195         os.rename(temp_filename, final_filename)
196         os.chmod(final_filename, 0664)
197
198     @classmethod
199     def log_result(class_, result):
200         '''
201         Writes a result message to the logfile.
202         '''
203         class_.logger.log(result)
204
205     @classmethod
206     def write_all(class_, logger, suite_names = [], force = False):
207         '''
208         Writes all Contents files for suites in list suite_names which defaults
209         to all 'touchable' suites if not specified explicitely. Untouchable
210         suites will be included if the force argument is set to True.
211         '''
212         class_.logger = logger
213         session = DBConn().session()
214         suite_query = session.query(Suite)
215         if len(suite_names) > 0:
216             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
217         if not force:
218             suite_query = suite_query.filter_by(untouchable = False)
219         deb_id = get_override_type('deb', session).overridetype_id
220         udeb_id = get_override_type('udeb', session).overridetype_id
221         main_id = get_component('main', session).component_id
222         non_free_id = get_component('non-free', session).component_id
223         pool = Pool()
224         for suite in suite_query:
225             suite_id = suite.suite_id
226             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
227                 arch_id = architecture.arch_id
228                 # handle 'deb' packages
229                 pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
230                     callback = class_.log_result)
231                 # handle 'udeb' packages for 'main' and 'non-free'
232                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
233                     callback = class_.log_result)
234                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
235                     callback = class_.log_result)
236         pool.close()
237         pool.join()
238         session.close()
239
240 def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
241     '''
242     This function is called in a new subprocess.
243     '''
244     DBConn().reset()
245     session = DBConn().session()
246     suite = Suite.get(suite_id, session)
247     architecture = Architecture.get(arch_id, session)
248     overridetype = OverrideType.get(overridetype_id, session)
249     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
250     if component_id is None:
251         component = None
252     else:
253         component = Component.get(component_id, session)
254         log_message.append(component.component_name)
255     contents_writer = ContentsWriter(suite, architecture, overridetype, component)
256     contents_writer.write_file()
257     return log_message
258
259
260 class ContentsScanner(object):
261     '''
262     ContentsScanner provides a threadsafe method scan() to scan the contents of
263     a DBBinary object.
264     '''
265     def __init__(self, binary_id):
266         '''
267         The argument binary_id is the id of the DBBinary object that
268         should be scanned.
269         '''
270         self.binary_id = binary_id
271
272     def scan(self, dummy_arg = None):
273         '''
274         This method does the actual scan and fills in the associated BinContents
275         property. It commits any changes to the database. The argument dummy_arg
276         is ignored but needed by our threadpool implementation.
277         '''
278         session = DBConn().session()
279         binary = session.query(DBBinary).get(self.binary_id)
280         fileset = set(binary.scan_contents())
281         if len(fileset) == 0:
282             fileset.add('EMPTY_PACKAGE')
283         for filename in fileset:
284             binary.contents.append(BinContents(file = filename))
285         session.commit()
286         session.close()
287
288     @classmethod
289     def scan_all(class_, limit = None):
290         '''
291         The class method scan_all() scans all binaries using multiple threads.
292         The number of binaries to be scanned can be limited with the limit
293         argument. Returns the number of processed and remaining packages as a
294         dict.
295         '''
296         session = DBConn().session()
297         query = session.query(DBBinary).filter(DBBinary.contents == None)
298         remaining = query.count
299         if limit is not None:
300             query = query.limit(limit)
301         processed = query.count()
302         pool = Pool()
303         for binary in query.yield_per(100):
304             pool.apply_async(scan_helper, (binary.binary_id, ))
305         pool.close()
306         pool.join()
307         remaining = remaining()
308         session.close()
309         return { 'processed': processed, 'remaining': remaining }
310
311 reset = False
312
313 def scan_helper(binary_id):
314     '''
315     This function runs in a subprocess.
316     '''
317     global reset
318     if not reset:
319         DBConn().reset()
320         reset = True
321     scanner = ContentsScanner(binary_id)
322     scanner.scan()