]> git.decadent.org.uk Git - dak.git/blob - daklib/contents.py
Add more logging to contents generate.
[dak.git] / daklib / contents.py
1 #!/usr/bin/env python
2 """
3 Helper code for contents generation.
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2011 Torsten Werner <twerner@debian.org>
7 @license: GNU General Public License version 2 or later
8 """
9
10 ################################################################################
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28 from daklib.dbconn import *
29 from daklib.config import Config
30 from daklib.threadpool import ThreadPool
31 from multiprocessing import Pool
32
33 from sqlalchemy import desc, or_
34 from sqlalchemy.exc import IntegrityError
35 from subprocess import Popen, PIPE
36
37 import os.path
38
39 class ContentsWriter(object):
40     '''
41     ContentsWriter writes the Contents-$arch.gz files.
42     '''
43     def __init__(self, suite, architecture, overridetype, component = None):
44         '''
45         The constructor clones its arguments into a new session object to make
46         sure that the new ContentsWriter object can be executed in a different
47         thread.
48         '''
49         self.suite = suite
50         self.architecture = architecture
51         self.overridetype = overridetype
52         self.component = component
53         self.session = suite.session()
54
55     def query(self):
56         '''
57         Returns a query object that is doing most of the work.
58         '''
59         overridesuite = self.suite
60         if self.suite.overridesuite is not None:
61             overridesuite = get_suite(self.suite.overridesuite, self.session)
62         params = {
63             'suite':         self.suite.suite_id,
64             'overridesuite': overridesuite.suite_id,
65             'arch_all':      get_architecture('all', self.session).arch_id,
66             'arch':          self.architecture.arch_id,
67             'type_id':       self.overridetype.overridetype_id,
68             'type':          self.overridetype.overridetype,
69         }
70
71         if self.component is not None:
72             params['component'] = self.component.component_id
73             sql = '''
74 create temp table newest_binaries (
75     id integer primary key,
76     package text);
77
78 create index newest_binaries_by_package on newest_binaries (package);
79
80 insert into newest_binaries (id, package)
81     select distinct on (package) id, package from binaries
82         where type = :type and
83             (architecture = :arch_all or architecture = :arch) and
84             id in (select bin from bin_associations where suite = :suite)
85         order by package, version desc;
86
87 with
88
89 unique_override as
90     (select o.package, s.section
91         from override o, section s
92         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
93         o.component = :component)
94
95 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
96     from newest_binaries b, bin_contents bc, unique_override o
97     where b.id = bc.binary_id and o.package = b.package
98     group by bc.file'''
99
100         else:
101             sql = '''
102 create temp table newest_binaries (
103     id integer primary key,
104     package text);
105
106 create index newest_binaries_by_package on newest_binaries (package);
107
108 insert into newest_binaries (id, package)
109     select distinct on (package) id, package from binaries
110         where type = :type and
111             (architecture = :arch_all or architecture = :arch) and
112             id in (select bin from bin_associations where suite = :suite)
113         order by package, version desc;
114
115 with
116
117 unique_override as
118     (select distinct on (o.package, s.section) o.package, s.section
119         from override o, section s
120         where o.suite = :overridesuite and o.type = :type_id and o.section = s.id
121         order by o.package, s.section, o.modified desc)
122
123 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
124     from newest_binaries b, bin_contents bc, unique_override o
125     where b.id = bc.binary_id and o.package = b.package
126     group by bc.file'''
127
128         return self.session.query("file", "pkglist").from_statement(sql). \
129             params(params)
130
131     def formatline(self, filename, package_list):
132         '''
133         Returns a formatted string for the filename argument.
134         '''
135         return "%-55s %s\n" % (filename, package_list)
136
137     def fetch(self):
138         '''
139         Yields a new line of the Contents-$arch.gz file in filename order.
140         '''
141         for filename, package_list in self.query().yield_per(100):
142             yield self.formatline(filename, package_list)
143         # end transaction to return connection to pool
144         self.session.rollback()
145
146     def get_list(self):
147         '''
148         Returns a list of lines for the Contents-$arch.gz file.
149         '''
150         return [item for item in self.fetch()]
151
152     def output_filename(self):
153         '''
154         Returns the name of the output file.
155         '''
156         values = {
157             'root': Config()['Dir::Root'],
158             'suite': self.suite.suite_name,
159             'architecture': self.architecture.arch_string
160         }
161         if self.component is None:
162             return "%(root)s/dists/%(suite)s/Contents-%(architecture)s.gz" % values
163         values['component'] = self.component.component_name
164         return "%(root)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" % values
165
166     def get_header(self):
167         '''
168         Returns the header for the Contents files as a string.
169         '''
170         header_file = None
171         try:
172             filename = os.path.join(Config()['Dir::Templates'], 'contents')
173             header_file = open(filename)
174             return header_file.read()
175         finally:
176             if header_file:
177                 header_file.close()
178
179     def write_file(self):
180         '''
181         Write the output file.
182         '''
183         command = ['gzip', '--rsyncable']
184         final_filename = self.output_filename()
185         temp_filename = final_filename + '.new'
186         output_file = open(temp_filename, 'w')
187         gzip = Popen(command, stdin = PIPE, stdout = output_file)
188         gzip.stdin.write(self.get_header())
189         for item in self.fetch():
190             gzip.stdin.write(item)
191         gzip.stdin.close()
192         output_file.close()
193         gzip.wait()
194         try:
195             os.remove(final_filename)
196         except:
197             pass
198         os.rename(temp_filename, final_filename)
199         os.chmod(final_filename, 0664)
200
201     @classmethod
202     def log_result(class_, result):
203         '''
204         Writes a result message to the logfile.
205         '''
206         class_.logger.log(result)
207
208     @classmethod
209     def write_all(class_, logger, suite_names = [], force = False):
210         '''
211         Writes all Contents files for suites in list suite_names which defaults
212         to all 'touchable' suites if not specified explicitely. Untouchable
213         suites will be included if the force argument is set to True.
214         '''
215         class_.logger = logger
216         session = DBConn().session()
217         suite_query = session.query(Suite)
218         if len(suite_names) > 0:
219             suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
220         if not force:
221             suite_query = suite_query.filter_by(untouchable = False)
222         deb_id = get_override_type('deb', session).overridetype_id
223         udeb_id = get_override_type('udeb', session).overridetype_id
224         main_id = get_component('main', session).component_id
225         non_free_id = get_component('non-free', session).component_id
226         pool = Pool()
227         for suite in suite_query:
228             suite_id = suite.suite_id
229             for architecture in suite.get_architectures(skipsrc = True, skipall = True):
230                 arch_id = architecture.arch_id
231                 # handle 'deb' packages
232                 pool.apply_async(generate_helper, (suite_id, arch_id, deb_id), \
233                     callback = class_.log_result)
234                 # handle 'udeb' packages for 'main' and 'non-free'
235                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, main_id), \
236                     callback = class_.log_result)
237                 pool.apply_async(generate_helper, (suite_id, arch_id, udeb_id, non_free_id), \
238                     callback = class_.log_result)
239         pool.close()
240         pool.join()
241         session.close()
242
243 def generate_helper(suite_id, arch_id, overridetype_id, component_id = None):
244     '''
245     This function is called in a new subprocess.
246     '''
247     DBConn().reset()
248     session = DBConn().session()
249     suite = Suite.get(suite_id, session)
250     architecture = Architecture.get(arch_id, session)
251     overridetype = OverrideType.get(overridetype_id, session)
252     log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype]
253     if component_id is None:
254         component = None
255     else:
256         component = Component.get(component_id, session)
257         log_message.append(component.component_name)
258     contents_writer = ContentsWriter(suite, architecture, overridetype, component)
259     contents_writer.write_file()
260     return log_message
261
262
263 class ContentsScanner(object):
264     '''
265     ContentsScanner provides a threadsafe method scan() to scan the contents of
266     a DBBinary object.
267     '''
268     def __init__(self, binary):
269         '''
270         The argument binary is the actual DBBinary object that should be
271         scanned.
272         '''
273         self.binary_id = binary.binary_id
274
275     def scan(self, dummy_arg = None):
276         '''
277         This method does the actual scan and fills in the associated BinContents
278         property. It commits any changes to the database. The argument dummy_arg
279         is ignored but needed by our threadpool implementation.
280         '''
281         session = DBConn().session()
282         binary = session.query(DBBinary).get(self.binary_id)
283         fileset = set(binary.scan_contents())
284         if len(fileset) == 0:
285             fileset.add('EMPTY_PACKAGE')
286         for filename in fileset:
287             binary.contents.append(BinContents(file = filename))
288         session.commit()
289         session.close()
290
291     @classmethod
292     def scan_all(class_, limit = None):
293         '''
294         The class method scan_all() scans all binaries using multiple threads.
295         The number of binaries to be scanned can be limited with the limit
296         argument. Returns the number of processed and remaining packages as a
297         dict.
298         '''
299         session = DBConn().session()
300         query = session.query(DBBinary).filter(DBBinary.contents == None)
301         remaining = query.count
302         if limit is not None:
303             query = query.limit(limit)
304         processed = query.count()
305         threadpool = ThreadPool()
306         for binary in query.yield_per(100):
307             threadpool.queueTask(ContentsScanner(binary).scan)
308         threadpool.joinAll()
309         remaining = remaining()
310         session.close()
311         return { 'processed': processed, 'remaining': remaining }