]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Switch to multiprocessing module and reap child gzips.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.threadpool import ThreadPool
31 from multiprocessing import Pool
32
33 from sqlalchemy import desc, or_
34 from subprocess import Popen, PIPE
35
36 import os.path
37
38 class ContentsWriter(object):
39     '''
40     ContentsWriter writes the Contents-$arch.gz files.
41     '''
42     def __init__(self, suite, architecture, overridetype, component = None):
43         '''
44         The constructor clones its arguments into a new session object to make
45         sure that the new ContentsWriter object can be executed in a different
46         thread.
47         '''
48         self.suite = suite.clone()
49         self.session = self.suite.session()
50         self.architecture = architecture.clone(self.session)
51         self.overridetype = overridetype.clone(self.session)
52         if component is not None:
53             self.component = component.clone(self.session)
54         else:
55             self.component = None
56
57     def query(self):
58         '''
59         Returns a query object that is doing most of the work.
60         '''
61         params = {
62             'suite':    self.suite.suite_id,
63             'arch_all': get_architecture('all', self.session).arch_id,
64             'arch':     self.architecture.arch_id,
65             'type_id':  self.overridetype.overridetype_id,
66             'type':     self.overridetype.overridetype,
67         }
68
69         if self.component is not None:
70             params['component'] = self.component.component_id
71             sql = '''
72 create temp table newest_binaries (
73     id integer primary key,
74     package text);
75
76 create index newest_binaries_by_package on newest_binaries (package);
77
78 insert into newest_binaries (id, package)
79     select distinct on (package) id, package from binaries
80         where type = :type and
81             (architecture = :arch_all or architecture = :arch) and
82             id in (select bin from bin_associations where suite = :suite)
83         order by package, version desc;
84
85 with
86
87 unique_override as
88     (select o.package, s.section
89         from override o, section s
90         where o.suite = :suite and o.type = :type_id and o.section = s.id and
91         o.component = :component)
92
93 select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
94     from newest_binaries b, bin_contents bc, unique_override o
95     where b.id = bc.binary_id and o.package = b.package
96     order by bc.file, b.package'''
97
98         else:
99             sql = '''
100 create temp table newest_binaries (
101     id integer primary key,
102     package text);
103
104 create index newest_binaries_by_package on newest_binaries (package);
105
106 insert into newest_binaries (id, package)
107     select distinct on (package) id, package from binaries
108         where type = :type and
109             (architecture = :arch_all or architecture = :arch) and
110             id in (select bin from bin_associations where suite = :suite)
111         order by package, version desc;
112
113 with
114
115 unique_override as
116     (select distinct on (o.package, s.section) o.package, s.section
117         from override o, section s
118         where o.suite = :suite and o.type = :type_id and o.section = s.id
119         order by o.package, s.section, o.modified desc)
120
121 select bc.file, substring(o.section from position('/' in o.section) + 1) || '/' || b.package as package
122     from newest_binaries b, bin_contents bc, unique_override o
123     where b.id = bc.binary_id and o.package = b.package
124     order by bc.file, b.package'''
125
126         return self.session.query("file", "package").from_statement(sql). \
127             params(params)
128
129     def formatline(self, filename, package_list):
130         '''
131         Returns a formatted string for the filename argument.
132         '''
133         package_list = ','.join(package_list)
134         return "%-55s %s\n" % (filename, package_list)
135
136     def fetch(self):
137         '''
138         Yields a new line of the Contents-$arch.gz file in filename order.
139         '''
140         last_filename = None
141         package_list = []
142         for filename, package in self.query().yield_per(100):
143             if filename != last_filename:
144                 if last_filename is not None:
145                     yield self.formatline(last_filename, package_list)
146                 last_filename = filename
147                 package_list = []
148             package_list.append(package)
149         if last_filename is not None:
150             yield self.formatline(last_filename, package_list)
151         # end transaction to return connection to pool
152         self.session.rollback()
153
154     def get_list(self):
155         '''
156         Returns a list of lines for the Contents-$arch.gz file.
157         '''
158         return [item for item in self.fetch()]
159
160     def output_filename(self):
161         '''
162         Returns the name of the output file.
163         '''
164         values = {
165             'root': Config()['Dir::Root'],
166             'suite': self.suite.suite_name,
167             'architecture': self.architecture.arch_string
168         }
169         if self.component is None:
170             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
171         values['component'] = self.component.component_name
172         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
173
174     def get_header(self):
175         '''
176         Returns the header for the Contents files as a string.
177         '''
178         header_file = None
179         try:
180             filename = os.path.join(Config()['Dir::Templates'], 'contents')
181             header_file = open(filename)
182             return header_file.read()
183         finally:
184             if header_file:
185                 header_file.close()
186
187     def write_file(self):
188         '''
189         Write the output file.
190         '''
191         command = ['gzip', '--rsyncable']
192         output_file = open(self.output_filename(), 'w')
193         gzip = Popen(command, stdin = PIPE, stdout = output_file)
194         gzip.stdin.write(self.get_header())
195         for item in self.fetch():
196             gzip.stdin.write(item)
197         gzip.stdin.close()
198         output_file.close()
199         gzip.wait()
200
201     @classmethod
202     def write_all(class_, suite_names = [], force = False):
203         '''
204         Writes all Contents files for suites in list suite_names which defaults
205         to all 'touchable' suites if not specified explicitely. Untouchable
206         suites will be included if the force argument is set to True.
207         '''
208         session = DBConn().session()
209         suite_query = session.query(Suite)
210         if len(suite_names) > 0:
211             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
212         if not force:
213             suite_query = suite_query.filter_by(untouchable = False)
214         main = get_component('main', session)
215         non_free = get_component('non-free', session)
216         deb = get_override_type('deb', session)
217         udeb = get_override_type('udeb', session)
218         pool = Pool()
219         for suite in suite_query:
220             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
221                 # handle 'deb' packages
222                 writer = ContentsWriter(suite, architecture, deb)
223                 pool.apply(writer.write_file)
224                 # handle 'udeb' packages for 'main' and 'non-free'
225                 writer = ContentsWriter(suite, architecture, udeb, component = main)
226                 pool.apply(writer.write_file)
227                 writer = ContentsWriter(suite, architecture, udeb, component = non_free)
228                 pool.apply(writer.write_file)
229         pool.close()
230         pool.join()
231         session.close()
232
233
234 class ContentsScanner(object):
235     '''
236     ContentsScanner provides a threadsafe method scan() to scan the contents of
237     a DBBinary object.
238     '''
239     def __init__(self, binary):
240         '''
241         The argument binary is the actual DBBinary object that should be
242         scanned.
243         '''
244         self.binary_id = binary.binary_id
245
246     def scan(self, dummy_arg = None):
247         '''
248         This method does the actual scan and fills in the associated BinContents
249         property. It commits any changes to the database. The argument dummy_arg
250         is ignored but needed by our threadpool implementation.
251         '''
252         session = DBConn().session()
253         binary = session.query(DBBinary).get(self.binary_id)
254         for filename in binary.scan_contents():
255             binary.contents.append(BinContents(file = filename))
256         session.commit()
257         session.close()
258
259     @classmethod
260     def scan_all(class_, limit = None):
261         '''
262         The class method scan_all() scans all binaries using multiple threads.
263         The number of binaries to be scanned can be limited with the limit
264         argument. Returns the number of processed and remaining packages as a
265         dict.
266         '''
267         session = DBConn().session()
268         query = session.query(DBBinary).filter(DBBinary.contents == None)
269         remaining = query.count
270         if limit is not None:
271             query = query.limit(limit)
272         processed = query.count()
273         threadpool = ThreadPool()
274         for binary in query.yield_per(100):
275             threadpool.queueTask(ContentsScanner(binary).scan)
276         threadpool.joinAll()
277         remaining = remaining()
278         session.close()
279         return { 'processed': processed, 'remaining': remaining }