]> git.decadent.org.uk Git - dak.git/blob - dak/import_known_changes.py
Merge commit 'stew/knownchanges' into merge
[dak.git] / dak / import_known_changes.py
1 #!/usr/bin/env python
2 # coding=utf8
3
4 """
5 Import known_changes files
6
7 @contact: Debian FTP Master <ftpmaster@debian.org>
8 @copyright: 2009  Mike O'Connor <stew@debian.org>
9 @license: GNU General Public License version 2 or later
10 """
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28
29 ################################################################################
30
31 import sys
32 import os
33 import logging
34 import threading
35 from daklib.dbconn import DBConn,get_knownchange
36 from daklib.config import Config
37 import apt_pkg
38 from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError
39 from daklib.changes import Changes
40 from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output
41 import traceback
42
43 # where in dak.conf all of our configuration will be stowed
44 options_prefix = "KnownChanges"
45 options_prefix = "%s::Options" % options_prefix
46
47 log = logging.getLogger()
48
49 ################################################################################
50
51
52 def usage (exit_code=0):
53     print """Usage: dak import-known-changes [options]
54
55 OPTIONS
56      -j n
57         run with n threads concurrently
58
59      -v, --verbose
60         show verbose information messages
61
62      -q, --quiet
63         supress all output but errors
64
65 """
66     sys.exit(exit_code)
67
68 def check_signature (sig_filename, data_filename=""):
69     fingerprint = None
70
71     keyrings = [
72         "/home/joerg/keyring/keyrings/debian-keyring.gpg",
73         "/home/joerg/keyring/keyrings/debian-keyring.pgp",
74         "/home/joerg/keyring/keyrings/debian-maintainers.gpg",
75         "/home/joerg/keyring/keyrings/debian-role-keys.gpg",
76         "/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
77         "/home/joerg/keyring/keyrings/emeritus-keyring.gpg",
78         "/home/joerg/keyring/keyrings/removed-keys.gpg",
79         "/home/joerg/keyring/keyrings/removed-keys.pgp"
80         ]
81
82     keyringargs = " ".join(["--keyring %s" % x for x in keyrings ])
83
84     # Build the command line
85     status_read, status_write = os.pipe()
86     cmd = "gpgv --status-fd %s %s %s" % (status_write, keyringargs, sig_filename)
87
88     # Invoke gpgv on the file
89     (output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write)
90
91     # Process the status-fd output
92     (keywords, internal_error) = process_gpgv_output(status)
93
94     # If we failed to parse the status-fd output, let's just whine and bail now
95     if internal_error:
96         warn("Couldn't parse signature")
97         return None
98
99     # usually one would check for bad things here. We, however, do not care.
100
101     # Next check gpgv exited with a zero return code
102     if exit_status:
103         warn("Couldn't parse signature")
104         return None
105
106     # Sanity check the good stuff we expect
107     if not keywords.has_key("VALIDSIG"):
108         warn("Couldn't parse signature")
109     else:
110         args = keywords["VALIDSIG"]
111         if len(args) < 1:
112             warn("Couldn't parse signature")
113         else:
114             fingerprint = args[0]
115
116     return fingerprint
117
118
119 class EndOfChanges(object):
120     """something enqueued to signify the last change"""
121     pass
122
123
124 class OneAtATime(object):
125     """
126     a one space queue which sits between multiple possible producers
127     and multiple possible consumers
128     """
129     def __init__(self):
130         self.next_in_line = None
131         self.next_lock = threading.Condition()
132         self.die = False
133
134     def plsDie(self):
135         self.die = True
136         self.next_lock.notify()
137
138     def enqueue(self, next):
139         self.next_lock.acquire()
140         while self.next_in_line:
141             if self.die:
142                 return
143             self.next_lock.wait()
144
145         assert( not self.next_in_line )
146         self.next_in_line = next
147         self.next_lock.notify()
148         self.next_lock.release()
149
150     def dequeue(self):
151         self.next_lock.acquire()
152         while not self.next_in_line:
153             if self.die:
154                 return
155             self.next_lock.wait()
156
157         result = self.next_in_line
158
159         self.next_in_line = None
160         self.next_lock.notify()
161         self.next_lock.release()
162
163         if isinstance(result, EndOfChanges):
164             return None
165
166         return result
167
168 class ChangesToImport(object):
169     """A changes file to be enqueued to be processed"""
170     def __init__(self, checkdir, changesfile, count):
171         self.dirpath = checkdir
172         self.changesfile = changesfile
173         self.count = count
174
175     def __str__(self):
176         return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath)
177
178 class ChangesGenerator(threading.Thread):
179     """enqueues changes files to be imported"""
180     def __init__(self, parent, queue):
181         threading.Thread.__init__(self)
182         self.queue = queue
183         self.session = DBConn().session()
184         self.parent = parent
185         self.die = False
186
187     def plsDie(self):
188         self.die = True
189
190     def run(self):
191         cnf = Config()
192         count = 1
193         for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]:
194             checkdir = cnf["Dir::Queue::%s" % (directory) ]
195             if os.path.exists(checkdir):
196                 print "Looking into %s" % (checkdir)
197
198                 for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False):
199                     if not filenames:
200                         # Empty directory (or only subdirectories), next
201                         continue
202
203                     for changesfile in filenames:
204                         try:
205                             if not changesfile.endswith(".changes"):
206                                 # Only interested in changes files.
207                                 continue
208                             count += 1
209
210                             if not get_knownchange(changesfile, self.session):
211                                 to_import = ChangesToImport(dirpath, changesfile, count)
212                                 if self.die:
213                                     return
214                                 self.queue.enqueue(to_import)
215                         except KeyboardInterrupt:
216                             print("got Ctrl-c in enqueue thread.  terminating")
217                             self.parent.plsDie()
218                             sys.exit(1)
219
220         self.queue.enqueue(EndOfChanges())
221
222 class ImportThread(threading.Thread):
223     def __init__(self, parent, queue):
224         threading.Thread.__init__(self)
225         self.queue = queue
226         self.session = DBConn().session()
227         self.parent = parent
228         self.die = False
229
230     def plsDie(self):
231         self.die = True
232
233     def run(self):
234         while True:
235             try:
236                 if self.die:
237                     return
238                 to_import = self.queue.dequeue()
239                 if not to_import:
240                     return
241
242                 print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) )
243
244                 changes = Changes()
245                 changes.changes_file = to_import.changesfile
246                 changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
247                 changes.changes = parse_changes(changesfile, signing_rules=-1)
248                 changes.changes["fingerprint"] = check_signature(changesfile)
249                 changes.add_known_changes(to_import.dirpath, self.session)
250                 self.session.commit()
251
252             except InvalidDscError, line:
253                 warn("syntax error in .dsc file '%s', line %s." % (f, line))
254
255             except ChangesUnicodeError:
256                 warn("found invalid changes file, not properly utf-8 encoded")
257
258
259             except KeyboardInterrupt:
260                 print("Caught C-c; on ImportThread. terminating.")
261                 self.parent.plsDie()
262                 sys.exit(1)
263             except:
264                 traceback.print_exc()
265                 self.parent.plsDie()
266                 sys.exit(1)
267
268 class ImportKnownChanges(object):
269     def __init__(self,num_threads):
270         self.queue = OneAtATime()
271         self.threads = [ ChangesGenerator(self,self.queue) ]
272
273         for i in range(num_threads):
274             self.threads.append( ImportThread(self,self.queue) )
275
276         try:
277             for thread in self.threads:
278                 thread.start()
279
280         except KeyboardInterrupt:
281             print("Caught C-c; terminating.")
282             utils.warn("Caught C-c; terminating.")
283             self.plsDie()
284
285     def plsDie(self):
286         traceback.print_stack90
287         for thread in self.threads:
288             print( "STU: before ask %s to die" % thread )
289             thread.plsDie()
290             print( "STU: after ask %s to die" % thread )
291
292         self.threads=[]
293         sys.exit(1)
294
295
296 def main():
297     cnf = Config()
298
299     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
300                  ('j',"concurrency", "%s::%s" % (options_prefix,"Concurrency"),"HasArg"),
301                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
302                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
303                 ]
304
305     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
306
307     num_threads = 1
308
309     if len(args) > 0:
310         usage()
311
312     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
313         usage()
314
315     level=logging.INFO
316     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
317         level=logging.ERROR
318
319     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
320         level=logging.DEBUG
321
322
323     logging.basicConfig( level=level,
324                          format='%(asctime)s %(levelname)s %(message)s',
325                          stream = sys.stderr )
326
327     if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
328         num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")])
329
330     ImportKnownChanges(num_threads)
331
332
333
334
335 if __name__ == '__main__':
336     main()