]> git.decadent.org.uk Git - dak.git/blob - dak/import_known_changes.py
Convert exception handling to Python3 syntax.
[dak.git] / dak / import_known_changes.py
1 #!/usr/bin/env python
2 # coding=utf8
3
4 """
5 Import known_changes files
6
7 @contact: Debian FTP Master <ftpmaster@debian.org>
8 @copyright: 2009  Mike O'Connor <stew@debian.org>
9 @license: GNU General Public License version 2 or later
10 """
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28
29 ################################################################################
30
31 import sys
32 import os
33 import logging
34 import threading
35 from daklib.dbconn import DBConn, get_dbchange, get_policy_queue
36 from daklib.config import Config
37 import apt_pkg
38 from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError
39 from daklib.changes import Changes
40 from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output
41 import traceback
42
43 # where in dak.conf all of our configuration will be stowed
44 options_prefix = "KnownChanges"
45 options_prefix = "%s::Options" % options_prefix
46
47 log = logging.getLogger()
48
49 ################################################################################
50
51
52 def usage (exit_code=0):
53     print """Usage: dak import-known-changes [options]
54
55 OPTIONS
56      -j n
57         run with n threads concurrently
58
59      -v, --verbose
60         show verbose information messages
61
62      -q, --quiet
63         supress all output but errors
64
65 """
66     sys.exit(exit_code)
67
68 def check_signature (sig_filename, data_filename=""):
69     fingerprint = None
70
71     keyrings = [
72         "/home/joerg/keyring/keyrings/debian-keyring.gpg",
73         "/home/joerg/keyring/keyrings/debian-maintainers.gpg",
74         "/home/joerg/keyring/keyrings/debian-role-keys.gpg",
75         "/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
76         "/home/joerg/keyring/keyrings/emeritus-keyring.gpg",
77         "/home/joerg/keyring/keyrings/removed-keys.gpg",
78         "/home/joerg/keyring/keyrings/removed-keys.pgp"
79         ]
80
81     keyringargs = " ".join(["--keyring %s" % x for x in keyrings ])
82
83     # Build the command line
84     status_read, status_write = os.pipe()
85     cmd = "gpgv --status-fd %s %s %s" % (status_write, keyringargs, sig_filename)
86
87     # Invoke gpgv on the file
88     (output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write)
89
90     # Process the status-fd output
91     (keywords, internal_error) = process_gpgv_output(status)
92
93     # If we failed to parse the status-fd output, let's just whine and bail now
94     if internal_error:
95         warn("Couldn't parse signature")
96         return None
97
98     # usually one would check for bad things here. We, however, do not care.
99
100     # Next check gpgv exited with a zero return code
101     if exit_status:
102         warn("Couldn't parse signature")
103         return None
104
105     # Sanity check the good stuff we expect
106     if not keywords.has_key("VALIDSIG"):
107         warn("Couldn't parse signature")
108     else:
109         args = keywords["VALIDSIG"]
110         if len(args) < 1:
111             warn("Couldn't parse signature")
112         else:
113             fingerprint = args[0]
114
115     return fingerprint
116
117
118 class EndOfChanges(object):
119     """something enqueued to signify the last change"""
120     pass
121
122
123 class OneAtATime(object):
124     """
125     a one space queue which sits between multiple possible producers
126     and multiple possible consumers
127     """
128     def __init__(self):
129         self.next_in_line = None
130         self.read_lock = threading.Condition()
131         self.write_lock = threading.Condition()
132         self.die = False
133
134     def plsDie(self):
135         self.die = True
136         self.write_lock.acquire()
137         self.write_lock.notifyAll()
138         self.write_lock.release()
139
140         self.read_lock.acquire()
141         self.read_lock.notifyAll()
142         self.read_lock.release()
143
144     def enqueue(self, next):
145         self.write_lock.acquire()
146         while self.next_in_line:
147             if self.die:
148                 return
149             self.write_lock.wait()
150
151         assert( not self.next_in_line )
152         self.next_in_line = next
153         self.write_lock.release()
154         self.read_lock.acquire()
155         self.read_lock.notify()
156         self.read_lock.release()
157
158     def dequeue(self):
159         self.read_lock.acquire()
160         while not self.next_in_line:
161             if self.die:
162                 return
163             self.read_lock.wait()
164
165         result = self.next_in_line
166
167         self.next_in_line = None
168         self.read_lock.release()
169         self.write_lock.acquire()
170         self.write_lock.notify()
171         self.write_lock.release()
172
173         if isinstance(result, EndOfChanges):
174             return None
175
176         return result
177
178 class ChangesToImport(object):
179     """A changes file to be enqueued to be processed"""
180     def __init__(self, checkdir, changesfile, count):
181         self.dirpath = checkdir
182         self.changesfile = changesfile
183         self.count = count
184
185     def __str__(self):
186         return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath)
187
188 class ChangesGenerator(threading.Thread):
189     """enqueues changes files to be imported"""
190     def __init__(self, parent, queue):
191         threading.Thread.__init__(self)
192         self.queue = queue
193         self.session = DBConn().session()
194         self.parent = parent
195         self.die = False
196
197     def plsDie(self):
198         self.die = True
199
200     def run(self):
201         cnf = Config()
202         count = 1
203
204         dirs = []
205         dirs.append(cnf['Dir::Done'])
206
207         for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates" ]:
208             queue = get_policy_queue(queue_name)
209             if queue:
210                 dirs.append(os.path.abspath(queue.path))
211             else:
212                 warn("Could not find queue %s in database" % queue_name)
213
214         for checkdir in dirs:
215             if os.path.exists(checkdir):
216                 print "Looking into %s" % (checkdir)
217
218                 for dirpath, dirnames, filenames in os.walk(checkdir, topdown=True):
219                     if not filenames:
220                         # Empty directory (or only subdirectories), next
221                         continue
222
223                     for changesfile in filenames:
224                         try:
225                             if not changesfile.endswith(".changes"):
226                                 # Only interested in changes files.
227                                 continue
228                             count += 1
229
230                             if not get_dbchange(changesfile, self.session):
231                                 to_import = ChangesToImport(dirpath, changesfile, count)
232                                 if self.die:
233                                     return
234                                 self.queue.enqueue(to_import)
235                         except KeyboardInterrupt:
236                             print("got Ctrl-c in enqueue thread.  terminating")
237                             self.parent.plsDie()
238                             sys.exit(1)
239
240         self.queue.enqueue(EndOfChanges())
241
242 class ImportThread(threading.Thread):
243     def __init__(self, parent, queue):
244         threading.Thread.__init__(self)
245         self.queue = queue
246         self.session = DBConn().session()
247         self.parent = parent
248         self.die = False
249
250     def plsDie(self):
251         self.die = True
252
253     def run(self):
254         while True:
255             try:
256                 if self.die:
257                     return
258                 to_import = self.queue.dequeue()
259                 if not to_import:
260                     return
261
262                 print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) )
263
264                 changes = Changes()
265                 changes.changes_file = to_import.changesfile
266                 changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
267                 changes.changes = parse_changes(changesfile, signing_rules=-1)
268                 changes.changes["fingerprint"] = check_signature(changesfile)
269                 changes.add_known_changes(to_import.dirpath, session=self.session)
270                 self.session.commit()
271
272             except InvalidDscError as line:
273                 warn("syntax error in .dsc file '%s', line %s." % (f, line))
274
275             except ChangesUnicodeError:
276                 warn("found invalid changes file, not properly utf-8 encoded")
277
278             except KeyboardInterrupt:
279                 print("Caught C-c; on ImportThread. terminating.")
280                 self.parent.plsDie()
281                 sys.exit(1)
282
283             except:
284                 self.parent.plsDie()
285                 sys.exit(1)
286
287 class ImportKnownChanges(object):
288     def __init__(self,num_threads):
289         self.queue = OneAtATime()
290         self.threads = [ ChangesGenerator(self,self.queue) ]
291
292         for i in range(num_threads):
293             self.threads.append( ImportThread(self,self.queue) )
294
295         try:
296             for thread in self.threads:
297                 thread.start()
298
299         except KeyboardInterrupt:
300             print("Caught C-c; terminating.")
301             warn("Caught C-c; terminating.")
302             self.plsDie()
303
304     def plsDie(self):
305         traceback.print_stack90
306         for thread in self.threads:
307             print( "STU: before ask %s to die" % thread )
308             thread.plsDie()
309             print( "STU: after ask %s to die" % thread )
310
311         self.threads=[]
312         sys.exit(1)
313
314
315 def main():
316     cnf = Config()
317
318     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
319                  ('j',"concurrency", "%s::%s" % (options_prefix,"Concurrency"),"HasArg"),
320                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
321                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
322                 ]
323
324     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
325
326     num_threads = 1
327
328     if len(args) > 0:
329         usage()
330
331     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
332         usage()
333
334     level=logging.INFO
335     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
336         level=logging.ERROR
337
338     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
339         level=logging.DEBUG
340
341
342     logging.basicConfig( level=level,
343                          format='%(asctime)s %(levelname)s %(message)s',
344                          stream = sys.stderr )
345
346     if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
347         num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")])
348
349     ImportKnownChanges(num_threads)
350
351
352
353
354 if __name__ == '__main__':
355     main()