]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Merge branch 'contents' of ftp-master.debian.org:public_html/dak into contents
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30
31 from multiprocessing import Pool
32 from subprocess import Popen, PIPE
33
34 import os.path
35
36 class ContentsWriter(object):
37     '''
38     ContentsWriter writes the Contents-$arch.gz files.
39     '''
40     def __init__(self, suite, architecture, overridetype, component = None):
41         '''
42         The constructor clones its arguments into a new session object to make
43         sure that the new ContentsWriter object can be executed in a different
44         thread.
45         '''
46         self.suite = suite
47         self.architecture = architecture
48         self.overridetype = overridetype
49         self.component = component
50         self.session = suite.session()
51
52     def query(self):
53         '''
54         Returns a query object that is doing most of the work.
55         '''
56         overridesuite = self.suite
57         if self.suite.overridesuite is not None:
58             overridesuite = get_suite(self.suite.overridesuite, self.session)
59         params = {
60             'suite':         self.suite.suite_id,
61             'overridesuite': overridesuite.suite_id,
62             'arch_all':      get_architecture('all', self.session).arch_id,
63             'arch':          self.architecture.arch_id,
64             'type_id':       self.overridetype.overridetype_id,
65             'type':          self.overridetype.overridetype,
66         }
67
68         if self.component is not None:
69             params['component'] = self.component.component_id
70             sql = '''
71 create temp table newest_binaries (
72     id integer primary key,
73     package text);
74
75 create index newest_binaries_by_package on newest_binaries (package);
76
77 insert into newest_binaries (id, package)
78     select distinct on (package) id, package from binaries
79         where type = :type and
80             (architecture = :arch_all or architecture = :arch) and
81             id in (select bin from bin_associations where suite = :suite)
82         order by package, version desc;
83
84 with
85
86 unique_override as
87     (select o.package, s.section
88         from override o, section s
89         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
90         o.component = :component)
91
92 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
93     from newest_binaries b, bin_contents bc, unique_override o
94     where b.id = bc.binary_id and o.package = b.package
95     group by bc.file'''
96
97         else:
98             sql = '''
99 create temp table newest_binaries (
100     id integer primary key,
101     package text);
102
103 create index newest_binaries_by_package on newest_binaries (package);
104
105 insert into newest_binaries (id, package)
106     select distinct on (package) id, package from binaries
107         where type = :type and
108             (architecture = :arch_all or architecture = :arch) and
109             id in (select bin from bin_associations where suite = :suite)
110         order by package, version desc;
111
112 with
113
114 unique_override as
115     (select distinct on (o.package, s.section) o.package, s.section
116         from override o, section s
117         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
118         order by o.package, s.section, o.modified desc)
119
120 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
121     from newest_binaries b, bin_contents bc, unique_override o
122     where b.id = bc.binary_id and o.package = b.package
123     group by bc.file'''
124
125         return self.session.query("file", "pkglist").from_statement(sql). \
126             params(params)
127
128     def formatline(self, filename, package_list):
129         '''
130         Returns a formatted string for the filename argument.
131         '''
132         return "%-55s %s\n" % (filename, package_list)
133
134     def fetch(self):
135         '''
136         Yields a new line of the Contents-$arch.gz file in filename order.
137         '''
138         for filename, package_list in self.query().yield_per(100):
139             yield self.formatline(filename, package_list)
140         # end transaction to return connection to pool
141         self.session.rollback()
142
143     def get_list(self):
144         '''
145         Returns a list of lines for the Contents-$arch.gz file.
146         '''
147         return [item for item in self.fetch()]
148
149     def output_filename(self):
150         '''
151         Returns the name of the output file.
152         '''
153         values = {
154             'root': Config()['Dir::Root'],
155             'suite': self.suite.suite_name,
156             'architecture': self.architecture.arch_string
157         }
158         if self.component is None:
159             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
160         values['component'] = self.component.component_name
161         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
162
163     def get_header(self):
164         '''
165         Returns the header for the Contents files as a string.
166         '''
167         header_file = None
168         try:
169             filename = os.path.join(Config()['Dir::Templates'], 'contents')
170             header_file = open(filename)
171             return header_file.read()
172         finally:
173             if header_file:
174                 header_file.close()
175
176     def write_file(self):
177         '''
178         Write the output file.
179         '''
180         command = ['gzip', '--rsyncable']
181         final_filename = self.output_filename()
182         temp_filename = final_filename + '.new'
183         output_file = open(temp_filename, 'w')
184         gzip = Popen(command, stdin = PIPE, stdout = output_file)
185         gzip.stdin.write(self.get_header())
186         for item in self.fetch():
187             gzip.stdin.write(item)
188         gzip.stdin.close()
189         output_file.close()
190         gzip.wait()
191         try:
192             os.remove(final_filename)
193         except:
194             pass
195         os.rename(temp_filename, final_filename)
196         os.chmod(final_filename, 0664)
197
198     @classmethod
199     def log_result(class_, result):
200         '''
201         Writes a result message to the logfile.
202         '''
203         class_.logger.log(result)
204
205     @classmethod
206     def write_all(class_, logger, suite_names = [], force = False):
207         '''
208         Writes all Contents files for suites in list suite_names which defaults
209         to all 'touchable' suites if not specified explicitely. Untouchable
210         suites will be included if the force argument is set to True.
211         '''
212         class_.logger = logger
213         session = DBConn().session()
214         suite_query = session.query(Suite)
215         if len(suite_names) > 0:
216             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
217         if not force:
218             suite_query = suite_query.filter_by(untouchable = False)
219         deb_id = get_override_type('deb', session).overridetype_id
220         udeb_id = get_override_type('udeb', session).overridetype_id
221         main_id = get_component('main', session).component_id
222         non_free_id = get_component('non-free', session).component_id
223         pool = Pool()
224         for suite in suite_query:
225             suite_id = suite.suite_id
226             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
227                 arch_id = architecture.arch_id
228                 # handle 'deb' packages
229                 pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
230                     callback = class_.log_result)
231                 # handle 'udeb' packages for 'main' and 'non-free'
232                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
233                     callback = class_.log_result)
234                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
235                     callback = class_.log_result)
236         pool.close()
237         pool.join()
238         session.close()
239
240 def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
241     '''
242     This function is called in a new subprocess.
243     '''
244     session = DBConn().session()
245     suite = Suite.get(suite_id, session)
246     architecture = Architecture.get(arch_id, session)
247     overridetype = OverrideType.get(overridetype_id, session)
248     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
249     if component_id is None:
250         component = None
251     else:
252         component = Component.get(component_id, session)
253         log_message.append(component.component_name)
254     contents_writer = ContentsWriter(suite, architecture, overridetype, component)
255     contents_writer.write_file()
256     return log_message
257
258
259 class ContentsScanner(object):
260     '''
261     ContentsScanner provides a threadsafe method scan() to scan the contents of
262     a DBBinary object.
263     '''
264     def __init__(self, binary_id):
265         '''
266         The argument binary_id is the id of the DBBinary object that
267         should be scanned.
268         '''
269         self.binary_id = binary_id
270
271     def scan(self, dummy_arg = None):
272         '''
273         This method does the actual scan and fills in the associated BinContents
274         property. It commits any changes to the database. The argument dummy_arg
275         is ignored but needed by our threadpool implementation.
276         '''
277         session = DBConn().session()
278         binary = session.query(DBBinary).get(self.binary_id)
279         fileset = set(binary.scan_contents())
280         if len(fileset) == 0:
281             fileset.add('EMPTY_PACKAGE')
282         for filename in fileset:
283             binary.contents.append(BinContents(file = filename))
284         session.commit()
285         session.close()
286
287     @classmethod
288     def scan_all(class_, limit = None):
289         '''
290         The class method scan_all() scans all binaries using multiple threads.
291         The number of binaries to be scanned can be limited with the limit
292         argument. Returns the number of processed and remaining packages as a
293         dict.
294         '''
295         session = DBConn().session()
296         query = session.query(DBBinary).filter(DBBinary.contents == None)
297         remaining = query.count
298         if limit is not None:
299             query = query.limit(limit)
300         processed = query.count()
301         pool = Pool()
302         for binary in query.yield_per(100):
303             pool.apply_async(scan_helper, (binary.binary_id, ))
304         pool.close()
305         pool.join()
306         remaining = remaining()
307         session.close()
308         return { 'processed': processed, 'remaining': remaining }
309
310 def scan_helper(binary_id):
311     '''
312     This function runs in a subprocess.
313     '''
314     scanner = ContentsScanner(binary_id)
315     scanner.scan()