]> git.decadent.org.uk Git - dak.git/blob - dak/process_policy.py
dak/process_policy.py: escape '_' in string passed to SQL LIKE operator later
[dak.git] / dak / process_policy.py
1 #!/usr/bin/env python
2 # vim:set et ts=4 sw=4:
3
4 """ Handles packages from policy queues
5
6 @contact: Debian FTP Master <ftpmaster@debian.org>
7 @copyright: 2001, 2002, 2003, 2004, 2005, 2006  James Troup <james@nocrew.org>
8 @copyright: 2009 Joerg Jaspert <joerg@debian.org>
9 @copyright: 2009 Frank Lichtenheld <djpig@debian.org>
10 @copyright: 2009 Mark Hymers <mhy@debian.org>
11 @license: GNU General Public License version 2 or later
12 """
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <mhy> So how do we handle that at the moment?
30 # <stew> Probably incorrectly.
31
32 ################################################################################
33
34 import os
35 import datetime
36 import re
37 import sys
38 import traceback
39 import apt_pkg
40
41 from daklib.dbconn import *
42 from daklib import daklog
43 from daklib import utils
44 from daklib.dak_exceptions import CantOpenError, AlreadyLockedError, CantGetLockError
45 from daklib.config import Config
46 from daklib.archive import ArchiveTransaction
47 from daklib.urgencylog import UrgencyLog
48
49 import daklib.announce
50
51 # Globals
52 Options = None
53 Logger = None
54
55 ################################################################################
56
57 def do_comments(dir, srcqueue, opref, npref, line, fn, transaction):
58     session = transaction.session
59     for comm in [ x for x in os.listdir(dir) if x.startswith(opref) ]:
60         lines = open(os.path.join(dir, comm)).readlines()
61         if len(lines) == 0 or lines[0] != line + "\n": continue
62
63         # If the ACCEPT includes a _<arch> we only accept that .changes.
64         # Otherwise we accept all .changes that start with the given prefix
65         changes_prefix = comm[len(opref):]
66         if changes_prefix.count('_') < 2:
67             changes_prefix = changes_prefix + '_'
68         else:
69             changes_prefix = changes_prefix + '.changes'
70
71         # We need to escape "_" as we use it with the LIKE operator (via the
72         # SQLA startwith) later.
73         changes_prefix = changes_prefix.replace("_", r"\_")
74
75         uploads = session.query(PolicyQueueUpload).filter_by(policy_queue=srcqueue) \
76             .join(PolicyQueueUpload.changes).filter(DBChange.changesname.startswith(changes_prefix)) \
77             .order_by(PolicyQueueUpload.source_id)
78         for u in uploads:
79             print "Processing changes file: %s" % u.changes.changesname
80             fn(u, srcqueue, "".join(lines[1:]), transaction)
81
82         if opref != npref:
83             newcomm = npref + comm[len(opref):]
84             transaction.fs.move(os.path.join(dir, comm), os.path.join(dir, newcomm))
85
86 ################################################################################
87
88 def try_or_reject(function):
89     def wrapper(upload, srcqueue, comments, transaction):
90         try:
91             function(upload, srcqueue, comments, transaction)
92         except Exception as e:
93             comments = 'An exception was raised while processing the package:\n{0}\nOriginal comments:\n{1}'.format(traceback.format_exc(), comments)
94             try:
95                 transaction.rollback()
96                 real_comment_reject(upload, srcqueue, comments, transaction)
97             except Exception as e:
98                 comments = 'In addition an exception was raised while trying to reject the upload:\n{0}\nOriginal rejection:\n{1}'.format(traceback.format_exc(), comments)
99                 transaction.rollback()
100                 real_comment_reject(upload, srcqueue, comments, transaction, notify=False)
101         if not Options['No-Action']:
102             transaction.commit()
103     return wrapper
104
105 ################################################################################
106
107 @try_or_reject
108 def comment_accept(upload, srcqueue, comments, transaction):
109     for byhand in upload.byhand:
110         path = os.path.join(srcqueue.path, byhand.filename)
111         if os.path.exists(path):
112             raise Exception('E: cannot ACCEPT upload with unprocessed byhand file {0}'.format(byhand.filename))
113
114     cnf = Config()
115
116     fs = transaction.fs
117     session = transaction.session
118     changesname = upload.changes.changesname
119     allow_tainted = srcqueue.suite.archive.tainted
120
121     # We need overrides to get the target component
122     overridesuite = upload.target_suite
123     if overridesuite.overridesuite is not None:
124         overridesuite = session.query(Suite).filter_by(suite_name=overridesuite.overridesuite).one()
125
126     def binary_component_func(db_binary):
127         override = session.query(Override).filter_by(suite=overridesuite, package=db_binary.package) \
128             .join(OverrideType).filter(OverrideType.overridetype == db_binary.binarytype) \
129             .join(Component).one()
130         return override.component
131
132     def source_component_func(db_source):
133         override = session.query(Override).filter_by(suite=overridesuite, package=db_source.source) \
134             .join(OverrideType).filter(OverrideType.overridetype == 'dsc') \
135             .join(Component).one()
136         return override.component
137
138     all_target_suites = [upload.target_suite]
139     all_target_suites.extend([q.suite for q in upload.target_suite.copy_queues])
140
141     for suite in all_target_suites:
142         if upload.source is not None:
143             transaction.copy_source(upload.source, suite, source_component_func(upload.source), allow_tainted=allow_tainted)
144         for db_binary in upload.binaries:
145             # build queues may miss the source package if this is a binary-only upload
146             if suite != upload.target_suite:
147                 transaction.copy_source(db_binary.source, suite, source_component_func(db_binary.source), allow_tainted=allow_tainted)
148             transaction.copy_binary(db_binary, suite, binary_component_func(db_binary), allow_tainted=allow_tainted, extra_archives=[upload.target_suite.archive])
149
150     # Copy .changes if needed
151     if upload.target_suite.copychanges:
152         src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
153         dst = os.path.join(upload.target_suite.path, upload.changes.changesname)
154         fs.copy(src, dst, mode=upload.target_suite.archive.mode)
155
156     if upload.source is not None and not Options['No-Action']:
157         urgency = upload.changes.urgency
158         if urgency not in cnf.value_list('Urgency::Valid'):
159             urgency = cnf['Urgency::Default']
160         UrgencyLog().log(upload.source.source, upload.source.version, urgency)
161
162     print "  ACCEPT"
163     if not Options['No-Action']:
164         Logger.log(["Policy Queue ACCEPT", srcqueue.queue_name, changesname])
165
166     pu = get_processed_upload(upload)
167     daklib.announce.announce_accept(pu)
168
169     # TODO: code duplication. Similar code is in process-upload.
170     # Move .changes to done
171     src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
172     now = datetime.datetime.now()
173     donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d'))
174     dst = os.path.join(donedir, upload.changes.changesname)
175     dst = utils.find_next_free(dst)
176     fs.copy(src, dst, mode=0o644)
177
178     remove_upload(upload, transaction)
179
180 ################################################################################
181
182 @try_or_reject
183 def comment_reject(*args):
184     real_comment_reject(*args, manual=True)
185
186 def real_comment_reject(upload, srcqueue, comments, transaction, notify=True, manual=False):
187     cnf = Config()
188
189     fs = transaction.fs
190     session = transaction.session
191     changesname = upload.changes.changesname
192     queuedir = upload.policy_queue.path
193     rejectdir = cnf['Dir::Reject']
194
195     ### Copy files to reject/
196
197     poolfiles = [b.poolfile for b in upload.binaries]
198     if upload.source is not None:
199         poolfiles.extend([df.poolfile for df in upload.source.srcfiles])
200     # Not beautiful...
201     files = [ af.path for af in session.query(ArchiveFile) \
202                   .filter_by(archive=upload.policy_queue.suite.archive) \
203                   .join(ArchiveFile.file) \
204                   .filter(PoolFile.file_id.in_([ f.file_id for f in poolfiles ])) ]
205     for byhand in upload.byhand:
206         path = os.path.join(queuedir, byhand.filename)
207         if os.path.exists(path):
208             files.append(path)
209     files.append(os.path.join(queuedir, changesname))
210
211     for fn in files:
212         dst = utils.find_next_free(os.path.join(rejectdir, os.path.basename(fn)))
213         fs.copy(fn, dst, link=True)
214
215     ### Write reason
216
217     dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(changesname)))
218     fh = fs.create(dst)
219     fh.write(comments)
220     fh.close()
221
222     ### Send mail notification
223
224     if notify:
225         rejected_by = None
226         reason = comments
227
228         # Try to use From: from comment file if there is one.
229         # This is not very elegant...
230         match = re.match(r"\AFrom: ([^\n]+)\n\n", comments)
231         if match:
232             rejected_by = match.group(1)
233             reason = '\n'.join(comments.splitlines()[2:])
234
235         pu = get_processed_upload(upload)
236         daklib.announce.announce_reject(pu, reason, rejected_by)
237
238     print "  REJECT"
239     if not Options["No-Action"]:
240         Logger.log(["Policy Queue REJECT", srcqueue.queue_name, upload.changes.changesname])
241
242     changes = upload.changes
243     remove_upload(upload, transaction)
244     session.delete(changes)
245
246 ################################################################################
247
248 def remove_upload(upload, transaction):
249     fs = transaction.fs
250     session = transaction.session
251     changes = upload.changes
252
253     # Remove byhand and changes files. Binary and source packages will be
254     # removed from {bin,src}_associations and eventually removed by clean-suites automatically.
255     queuedir = upload.policy_queue.path
256     for byhand in upload.byhand:
257         path = os.path.join(queuedir, byhand.filename)
258         if os.path.exists(path):
259             fs.unlink(path)
260         session.delete(byhand)
261     fs.unlink(os.path.join(queuedir, upload.changes.changesname))
262
263     session.delete(upload)
264     session.flush()
265
266 ################################################################################
267
268 def get_processed_upload(upload):
269     pu = daklib.announce.ProcessedUpload()
270
271     pu.maintainer = upload.changes.maintainer
272     pu.changed_by = upload.changes.changedby
273     pu.fingerprint = upload.changes.fingerprint
274
275     pu.suites = [ upload.target_suite ]
276     pu.from_policy_suites = [ upload.target_suite ]
277
278     changes_path = os.path.join(upload.policy_queue.path, upload.changes.changesname)
279     pu.changes = open(changes_path, 'r').read()
280     pu.changes_filename = upload.changes.changesname
281     pu.sourceful = upload.source is not None
282     pu.source = upload.changes.source
283     pu.version = upload.changes.version
284     pu.architecture = upload.changes.architecture
285     pu.bugs = upload.changes.closes
286
287     pu.program = "process-policy"
288
289     return pu
290
291 ################################################################################
292
293 def remove_unreferenced_binaries(policy_queue, transaction):
294     """Remove binaries that are no longer referenced by an upload
295
296     @type  policy_queue: L{daklib.dbconn.PolicyQueue}
297
298     @type  transaction: L{daklib.archive.ArchiveTransaction}
299     """
300     session = transaction.session
301     suite = policy_queue.suite
302
303     query = """
304        SELECT b.*
305          FROM binaries b
306          JOIN bin_associations ba ON b.id = ba.bin
307         WHERE ba.suite = :suite_id
308           AND NOT EXISTS (SELECT 1 FROM policy_queue_upload_binaries_map pqubm
309                                    JOIN policy_queue_upload pqu ON pqubm.policy_queue_upload_id = pqu.id
310                                   WHERE pqu.policy_queue_id = :policy_queue_id
311                                     AND pqubm.binary_id = b.id)"""
312     binaries = session.query(DBBinary).from_statement(query) \
313         .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
314
315     for binary in binaries:
316         Logger.log(["removed binary from policy queue", policy_queue.queue_name, binary.package, binary.version])
317         transaction.remove_binary(binary, suite)
318
319 def remove_unreferenced_sources(policy_queue, transaction):
320     """Remove sources that are no longer referenced by an upload or a binary
321
322     @type  policy_queue: L{daklib.dbconn.PolicyQueue}
323
324     @type  transaction: L{daklib.archive.ArchiveTransaction}
325     """
326     session = transaction.session
327     suite = policy_queue.suite
328
329     query = """
330        SELECT s.*
331          FROM source s
332          JOIN src_associations sa ON s.id = sa.source
333         WHERE sa.suite = :suite_id
334           AND NOT EXISTS (SELECT 1 FROM policy_queue_upload pqu
335                                   WHERE pqu.policy_queue_id = :policy_queue_id
336                                     AND pqu.source_id = s.id)
337           AND NOT EXISTS (SELECT 1 FROM binaries b
338                                    JOIN bin_associations ba ON b.id = ba.bin
339                                   WHERE b.source = s.id
340                                     AND ba.suite = :suite_id)"""
341     sources = session.query(DBSource).from_statement(query) \
342         .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
343
344     for source in sources:
345         Logger.log(["removed source from policy queue", policy_queue.queue_name, source.source, source.version])
346         transaction.remove_source(source, suite)
347
348 ################################################################################
349
350 def main():
351     global Options, Logger
352
353     cnf = Config()
354     session = DBConn().session()
355
356     Arguments = [('h',"help","Process-Policy::Options::Help"),
357                  ('n',"no-action","Process-Policy::Options::No-Action")]
358
359     for i in ["help", "no-action"]:
360         if not cnf.has_key("Process-Policy::Options::%s" % (i)):
361             cnf["Process-Policy::Options::%s" % (i)] = ""
362
363     queue_name = apt_pkg.parse_commandline(cnf.Cnf,Arguments,sys.argv)
364
365     if len(queue_name) != 1:
366         print "E: Specify exactly one policy queue"
367         sys.exit(1)
368
369     queue_name = queue_name[0]
370
371     Options = cnf.subtree("Process-Policy::Options")
372
373     if Options["Help"]:
374         usage()
375
376     Logger = daklog.Logger("process-policy")
377     if not Options["No-Action"]:
378         urgencylog = UrgencyLog()
379
380     with ArchiveTransaction() as transaction:
381         session = transaction.session
382         try:
383             pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one()
384         except NoResultFound:
385             print "E: Cannot find policy queue %s" % queue_name
386             sys.exit(1)
387
388         commentsdir = os.path.join(pq.path, 'COMMENTS')
389         # The comments stuff relies on being in the right directory
390         os.chdir(pq.path)
391
392         do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, transaction)
393         do_comments(commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, transaction)
394         do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, transaction)
395
396         remove_unreferenced_binaries(pq, transaction)
397         remove_unreferenced_sources(pq, transaction)
398
399     if not Options['No-Action']:
400         urgencylog.close()
401
402 ################################################################################
403
404 if __name__ == '__main__':
405     main()