]> git.decadent.org.uk Git - dak.git/blob - dak/process_policy.py
process-policy: Rollback transaction if dry run is requested.
[dak.git] / dak / process_policy.py
1 #!/usr/bin/env python
2 # vim:set et ts=4 sw=4:
3
4 """ Handles packages from policy queues
5
6 @contact: Debian FTP Master <ftpmaster@debian.org>
7 @copyright: 2001, 2002, 2003, 2004, 2005, 2006  James Troup <james@nocrew.org>
8 @copyright: 2009 Joerg Jaspert <joerg@debian.org>
9 @copyright: 2009 Frank Lichtenheld <djpig@debian.org>
10 @copyright: 2009 Mark Hymers <mhy@debian.org>
11 @license: GNU General Public License version 2 or later
12 """
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <mhy> So how do we handle that at the moment?
30 # <stew> Probably incorrectly.
31
32 ################################################################################
33
34 import os
35 import datetime
36 import re
37 import sys
38 import traceback
39 import apt_pkg
40
41 from daklib.dbconn import *
42 from daklib import daklog
43 from daklib import utils
44 from daklib.dak_exceptions import CantOpenError, AlreadyLockedError, CantGetLockError
45 from daklib.config import Config
46 from daklib.archive import ArchiveTransaction, source_component_from_package_list
47 from daklib.urgencylog import UrgencyLog
48 from daklib.packagelist import PackageList
49
50 import daklib.announce
51
52 # Globals
53 Options = None
54 Logger = None
55
56 ################################################################################
57
58 def do_comments(dir, srcqueue, opref, npref, line, fn, transaction):
59     session = transaction.session
60     actions = []
61     for comm in [ x for x in os.listdir(dir) if x.startswith(opref) ]:
62         lines = open(os.path.join(dir, comm)).readlines()
63         if len(lines) == 0 or lines[0] != line + "\n": continue
64
65         # If the ACCEPT includes a _<arch> we only accept that .changes.
66         # Otherwise we accept all .changes that start with the given prefix
67         changes_prefix = comm[len(opref):]
68         if changes_prefix.count('_') < 2:
69             changes_prefix = changes_prefix + '_'
70         else:
71             changes_prefix = changes_prefix + '.changes'
72
73         # We need to escape "_" as we use it with the LIKE operator (via the
74         # SQLA startwith) later.
75         changes_prefix = changes_prefix.replace("_", r"\_")
76
77         uploads = session.query(PolicyQueueUpload).filter_by(policy_queue=srcqueue) \
78             .join(PolicyQueueUpload.changes).filter(DBChange.changesname.startswith(changes_prefix)) \
79             .order_by(PolicyQueueUpload.source_id)
80         reason = "".join(lines[1:])
81         actions.extend((u, reason) for u in uploads)
82
83         if opref != npref:
84             newcomm = npref + comm[len(opref):]
85             newcomm = utils.find_next_free(os.path.join(dir, newcomm))
86             transaction.fs.move(os.path.join(dir, comm), newcomm)
87
88     actions.sort()
89
90     for u, reason in actions:
91         print("Processing changes file: {0}".format(u.changes.changesname))
92         fn(u, srcqueue, reason, transaction)
93
94 ################################################################################
95
96 def try_or_reject(function):
97     def wrapper(upload, srcqueue, comments, transaction):
98         try:
99             function(upload, srcqueue, comments, transaction)
100         except Exception as e:
101             comments = 'An exception was raised while processing the package:\n{0}\nOriginal comments:\n{1}'.format(traceback.format_exc(), comments)
102             try:
103                 transaction.rollback()
104                 real_comment_reject(upload, srcqueue, comments, transaction)
105             except Exception as e:
106                 comments = 'In addition an exception was raised while trying to reject the upload:\n{0}\nOriginal rejection:\n{1}'.format(traceback.format_exc(), comments)
107                 transaction.rollback()
108                 real_comment_reject(upload, srcqueue, comments, transaction, notify=False)
109         if not Options['No-Action']:
110             transaction.commit()
111         else:
112             transaction.rollback()
113     return wrapper
114
115 ################################################################################
116
117 @try_or_reject
118 def comment_accept(upload, srcqueue, comments, transaction):
119     for byhand in upload.byhand:
120         path = os.path.join(srcqueue.path, byhand.filename)
121         if os.path.exists(path):
122             raise Exception('E: cannot ACCEPT upload with unprocessed byhand file {0}'.format(byhand.filename))
123
124     cnf = Config()
125
126     fs = transaction.fs
127     session = transaction.session
128     changesname = upload.changes.changesname
129     allow_tainted = srcqueue.suite.archive.tainted
130
131     # We need overrides to get the target component
132     overridesuite = upload.target_suite
133     if overridesuite.overridesuite is not None:
134         overridesuite = session.query(Suite).filter_by(suite_name=overridesuite.overridesuite).one()
135
136     def binary_component_func(db_binary):
137         section = db_binary.proxy['Section']
138         component_name = 'main'
139         if section.find('/') != -1:
140             component_name = section.split('/', 1)[0]
141         return get_mapped_component(component_name, session=session)
142
143     def source_component_func(db_source):
144         package_list = PackageList(db_source.proxy)
145         component = source_component_from_package_list(package_list, upload.target_suite)
146         if component is not None:
147             return get_mapped_component(component, session=session)
148
149         # Fallback for packages without Package-List field
150         query = session.query(Override).filter_by(suite=overridesuite, package=db_source.source) \
151             .join(OverrideType).filter(OverrideType.overridetype == 'dsc') \
152             .join(Component)
153         return query.one().component
154
155     all_target_suites = [upload.target_suite]
156     all_target_suites.extend([q.suite for q in upload.target_suite.copy_queues])
157
158     for suite in all_target_suites:
159         if upload.source is not None:
160             transaction.copy_source(upload.source, suite, source_component_func(upload.source), allow_tainted=allow_tainted)
161         for db_binary in upload.binaries:
162             # build queues may miss the source package if this is a binary-only upload
163             if suite != upload.target_suite:
164                 transaction.copy_source(db_binary.source, suite, source_component_func(db_binary.source), allow_tainted=allow_tainted)
165             transaction.copy_binary(db_binary, suite, binary_component_func(db_binary), allow_tainted=allow_tainted, extra_archives=[upload.target_suite.archive])
166
167     # Copy .changes if needed
168     if upload.target_suite.copychanges:
169         src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
170         dst = os.path.join(upload.target_suite.path, upload.changes.changesname)
171         fs.copy(src, dst, mode=upload.target_suite.archive.mode)
172
173     # Copy upload to Process-Policy::CopyDir
174     # Used on security.d.o to sync accepted packages to ftp-master, but this
175     # should eventually be replaced by something else.
176     copydir = cnf.get('Process-Policy::CopyDir') or None
177     if copydir is not None:
178         mode = upload.target_suite.archive.mode
179         if upload.source is not None:
180             for f in [ df.poolfile for df in upload.source.srcfiles ]:
181                 dst = os.path.join(copydir, f.basename)
182                 if not os.path.exists(dst):
183                     fs.copy(f.fullpath, dst, mode=mode)
184
185         for db_binary in upload.binaries:
186             f = db_binary.poolfile
187             dst = os.path.join(copydir, f.basename)
188             if not os.path.exists(dst):
189                 fs.copy(f.fullpath, dst, mode=mode)
190
191         src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
192         dst = os.path.join(copydir, upload.changes.changesname)
193         if not os.path.exists(dst):
194             fs.copy(src, dst, mode=mode)
195
196     if upload.source is not None and not Options['No-Action']:
197         urgency = upload.changes.urgency
198         if urgency not in cnf.value_list('Urgency::Valid'):
199             urgency = cnf['Urgency::Default']
200         UrgencyLog().log(upload.source.source, upload.source.version, urgency)
201
202     print "  ACCEPT"
203     if not Options['No-Action']:
204         Logger.log(["Policy Queue ACCEPT", srcqueue.queue_name, changesname])
205
206     pu = get_processed_upload(upload)
207     daklib.announce.announce_accept(pu)
208
209     # TODO: code duplication. Similar code is in process-upload.
210     # Move .changes to done
211     src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
212     now = datetime.datetime.now()
213     donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d'))
214     dst = os.path.join(donedir, upload.changes.changesname)
215     dst = utils.find_next_free(dst)
216     fs.copy(src, dst, mode=0o644)
217
218     remove_upload(upload, transaction)
219
220 ################################################################################
221
222 @try_or_reject
223 def comment_reject(*args):
224     real_comment_reject(*args, manual=True)
225
226 def real_comment_reject(upload, srcqueue, comments, transaction, notify=True, manual=False):
227     cnf = Config()
228
229     fs = transaction.fs
230     session = transaction.session
231     changesname = upload.changes.changesname
232     queuedir = upload.policy_queue.path
233     rejectdir = cnf['Dir::Reject']
234
235     ### Copy files to reject/
236
237     poolfiles = [b.poolfile for b in upload.binaries]
238     if upload.source is not None:
239         poolfiles.extend([df.poolfile for df in upload.source.srcfiles])
240     # Not beautiful...
241     files = [ af.path for af in session.query(ArchiveFile) \
242                   .filter_by(archive=upload.policy_queue.suite.archive) \
243                   .join(ArchiveFile.file) \
244                   .filter(PoolFile.file_id.in_([ f.file_id for f in poolfiles ])) ]
245     for byhand in upload.byhand:
246         path = os.path.join(queuedir, byhand.filename)
247         if os.path.exists(path):
248             files.append(path)
249     files.append(os.path.join(queuedir, changesname))
250
251     for fn in files:
252         dst = utils.find_next_free(os.path.join(rejectdir, os.path.basename(fn)))
253         fs.copy(fn, dst, link=True)
254
255     ### Write reason
256
257     dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(changesname)))
258     fh = fs.create(dst)
259     fh.write(comments)
260     fh.close()
261
262     ### Send mail notification
263
264     if notify:
265         rejected_by = None
266         reason = comments
267
268         # Try to use From: from comment file if there is one.
269         # This is not very elegant...
270         match = re.match(r"\AFrom: ([^\n]+)\n\n", comments)
271         if match:
272             rejected_by = match.group(1)
273             reason = '\n'.join(comments.splitlines()[2:])
274
275         pu = get_processed_upload(upload)
276         daklib.announce.announce_reject(pu, reason, rejected_by)
277
278     print "  REJECT"
279     if not Options["No-Action"]:
280         Logger.log(["Policy Queue REJECT", srcqueue.queue_name, upload.changes.changesname])
281
282     changes = upload.changes
283     remove_upload(upload, transaction)
284     session.delete(changes)
285
286 ################################################################################
287
288 def remove_upload(upload, transaction):
289     fs = transaction.fs
290     session = transaction.session
291     changes = upload.changes
292
293     # Remove byhand and changes files. Binary and source packages will be
294     # removed from {bin,src}_associations and eventually removed by clean-suites automatically.
295     queuedir = upload.policy_queue.path
296     for byhand in upload.byhand:
297         path = os.path.join(queuedir, byhand.filename)
298         if os.path.exists(path):
299             fs.unlink(path)
300         session.delete(byhand)
301     fs.unlink(os.path.join(queuedir, upload.changes.changesname))
302
303     session.delete(upload)
304     session.flush()
305
306 ################################################################################
307
308 def get_processed_upload(upload):
309     pu = daklib.announce.ProcessedUpload()
310
311     pu.maintainer = upload.changes.maintainer
312     pu.changed_by = upload.changes.changedby
313     pu.fingerprint = upload.changes.fingerprint
314
315     pu.suites = [ upload.target_suite ]
316     pu.from_policy_suites = [ upload.target_suite ]
317
318     changes_path = os.path.join(upload.policy_queue.path, upload.changes.changesname)
319     pu.changes = open(changes_path, 'r').read()
320     pu.changes_filename = upload.changes.changesname
321     pu.sourceful = upload.source is not None
322     pu.source = upload.changes.source
323     pu.version = upload.changes.version
324     pu.architecture = upload.changes.architecture
325     pu.bugs = upload.changes.closes
326
327     pu.program = "process-policy"
328
329     return pu
330
331 ################################################################################
332
333 def remove_unreferenced_binaries(policy_queue, transaction):
334     """Remove binaries that are no longer referenced by an upload
335
336     @type  policy_queue: L{daklib.dbconn.PolicyQueue}
337
338     @type  transaction: L{daklib.archive.ArchiveTransaction}
339     """
340     session = transaction.session
341     suite = policy_queue.suite
342
343     query = """
344        SELECT b.*
345          FROM binaries b
346          JOIN bin_associations ba ON b.id = ba.bin
347         WHERE ba.suite = :suite_id
348           AND NOT EXISTS (SELECT 1 FROM policy_queue_upload_binaries_map pqubm
349                                    JOIN policy_queue_upload pqu ON pqubm.policy_queue_upload_id = pqu.id
350                                   WHERE pqu.policy_queue_id = :policy_queue_id
351                                     AND pqubm.binary_id = b.id)"""
352     binaries = session.query(DBBinary).from_statement(query) \
353         .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
354
355     for binary in binaries:
356         Logger.log(["removed binary from policy queue", policy_queue.queue_name, binary.package, binary.version])
357         transaction.remove_binary(binary, suite)
358
359 def remove_unreferenced_sources(policy_queue, transaction):
360     """Remove sources that are no longer referenced by an upload or a binary
361
362     @type  policy_queue: L{daklib.dbconn.PolicyQueue}
363
364     @type  transaction: L{daklib.archive.ArchiveTransaction}
365     """
366     session = transaction.session
367     suite = policy_queue.suite
368
369     query = """
370        SELECT s.*
371          FROM source s
372          JOIN src_associations sa ON s.id = sa.source
373         WHERE sa.suite = :suite_id
374           AND NOT EXISTS (SELECT 1 FROM policy_queue_upload pqu
375                                   WHERE pqu.policy_queue_id = :policy_queue_id
376                                     AND pqu.source_id = s.id)
377           AND NOT EXISTS (SELECT 1 FROM binaries b
378                                    JOIN bin_associations ba ON b.id = ba.bin
379                                   WHERE b.source = s.id
380                                     AND ba.suite = :suite_id)"""
381     sources = session.query(DBSource).from_statement(query) \
382         .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
383
384     for source in sources:
385         Logger.log(["removed source from policy queue", policy_queue.queue_name, source.source, source.version])
386         transaction.remove_source(source, suite)
387
388 ################################################################################
389
390 def main():
391     global Options, Logger
392
393     cnf = Config()
394     session = DBConn().session()
395
396     Arguments = [('h',"help","Process-Policy::Options::Help"),
397                  ('n',"no-action","Process-Policy::Options::No-Action")]
398
399     for i in ["help", "no-action"]:
400         if not cnf.has_key("Process-Policy::Options::%s" % (i)):
401             cnf["Process-Policy::Options::%s" % (i)] = ""
402
403     queue_name = apt_pkg.parse_commandline(cnf.Cnf,Arguments,sys.argv)
404
405     if len(queue_name) != 1:
406         print "E: Specify exactly one policy queue"
407         sys.exit(1)
408
409     queue_name = queue_name[0]
410
411     Options = cnf.subtree("Process-Policy::Options")
412
413     if Options["Help"]:
414         usage()
415
416     Logger = daklog.Logger("process-policy")
417     if not Options["No-Action"]:
418         urgencylog = UrgencyLog()
419
420     with ArchiveTransaction() as transaction:
421         session = transaction.session
422         try:
423             pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one()
424         except NoResultFound:
425             print "E: Cannot find policy queue %s" % queue_name
426             sys.exit(1)
427
428         commentsdir = os.path.join(pq.path, 'COMMENTS')
429         # The comments stuff relies on being in the right directory
430         os.chdir(pq.path)
431
432         do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, transaction)
433         do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, transaction)
434         do_comments(commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, transaction)
435
436         remove_unreferenced_binaries(pq, transaction)
437         remove_unreferenced_sources(pq, transaction)
438
439     if not Options['No-Action']:
440         urgencylog.close()
441
442 ################################################################################
443
444 if __name__ == '__main__':
445     main()