]> git.decadent.org.uk Git - dak.git/blob - dak/import_known_changes.py
import known_changes with correct name. catch keybord exception
[dak.git] / dak / import_known_changes.py
1 #!/usr/bin/env python
2 # coding=utf8
3
4 """
5 Import known_changes files
6
7 @contact: Debian FTP Master <ftpmaster@debian.org>
8 @copyright: 2009  Mike O'Connor <stew@debian.org>
9 @license: GNU General Public License version 2 or later
10 """
11
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 # GNU General Public License for more details.
21
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
26 ################################################################################
27
28
29 ################################################################################
30
31 import sys
32 import os
33 import logging
34 import threading
35 from daklib.dbconn import DBConn,get_knownchange
36 from daklib.config import Config
37 import apt_pkg
38 from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError
39 from daklib.changes import Changes
40 from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output
41 import traceback
42
43 # where in dak.conf all of our configuration will be stowed
44 options_prefix = "KnownChanges"
45 options_prefix = "%s::Options" % options_prefix
46
47 log = logging.getLogger()
48
49 ################################################################################
50
51
52 def usage (exit_code=0):
53     print """Usage: dak import-known-changes [options]
54
55 OPTIONS
56      -j n
57         run with n threads concurrently
58
59      -v, --verbose
60         show verbose information messages
61
62      -q, --quiet
63         supress all output but errors
64
65 """
66     sys.exit(exit_code)
67
68 def check_signature (sig_filename, data_filename=""):
69     keyrings = [
70         "/home/joerg/keyring/keyrings/debian-keyring.gpg",
71         "/home/joerg/keyring/keyrings/debian-keyring.pgp",
72         "/home/joerg/keyring/keyrings/debian-maintainers.gpg",
73         "/home/joerg/keyring/keyrings/debian-role-keys.gpg",
74         "/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
75         "/home/joerg/keyring/keyrings/emeritus-keyring.gpg",
76         "/home/joerg/keyring/keyrings/removed-keys.gpg",
77         "/home/joerg/keyring/keyrings/removed-keys.pgp"
78         ]
79
80     keyringargs = " ".join(["--keyring %s" % x for x in keyrings ])
81
82     # Build the command line
83     status_read, status_write = os.pipe()
84     cmd = "gpgv --status-fd %s %s %s" % (status_write, keyringargs, sig_filename)
85
86     # Invoke gpgv on the file
87     (output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write)
88
89     # Process the status-fd output
90     (keywords, internal_error) = process_gpgv_output(status)
91
92     # If we failed to parse the status-fd output, let's just whine and bail now
93     if internal_error:
94         warn("Couldn't parse signature")
95         return None
96
97     # usually one would check for bad things here. We, however, do not care.
98
99     # Next check gpgv exited with a zero return code
100     if exit_status:
101         warn("Couldn't parse signature")
102         return None
103
104     # Sanity check the good stuff we expect
105     if not keywords.has_key("VALIDSIG"):
106         warn("Couldn't parse signature")
107     else:
108         args = keywords["VALIDSIG"]
109         if len(args) < 1:
110             warn("Couldn't parse signature")
111         else:
112             fingerprint = args[0]
113
114     return fingerprint
115
116
117 class EndOfChanges(object):
118     """something enqueued to signify the last change"""
119     pass
120
121
122 class OneAtATime(object):
123     """
124     a one space queue which sits between multiple possible producers
125     and multiple possible consumers
126     """
127     def __init__(self):
128         self.next_in_line = None
129         self.next_lock = threading.Condition()
130
131     def enqueue(self, next):
132         self.next_lock.acquire()
133         while self.next_in_line:
134             self.next_lock.wait()
135
136         assert( not self.next_in_line )
137         self.next_in_line = next
138         self.next_lock.notify()
139         self.next_lock.release()
140
141     def dequeue(self):
142         self.next_lock.acquire()
143         while not self.next_in_line:
144             self.next_lock.wait()
145         result = self.next_in_line
146
147         if isinstance(result, EndOfChanges):
148             return None
149
150         self.next_in_line = None
151         self.next_lock.notify()
152         self.next_lock.release()
153         return result
154
155 class ChangesToImport(object):
156     """A changes file to be enqueued to be processed"""
157     def __init__(self, checkdir, changesfile, count):
158         self.dirpath = checkdir
159         self.changesfile = changesfile
160         self.count = count
161
162     def __str__(self):
163         return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath)
164
165 class ChangesGenerator(threading.Thread):
166     """enqueues changes files to be imported"""
167     def __init__(self, queue):
168         threading.Thread.__init__(self)
169         self.queue = queue
170         self.session = DBConn().session()
171         self.die = False
172
173     def plsDie(self):
174         self.die = True
175
176     def run(self):
177         cnf = Config()
178         count = 1
179         for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]:
180             checkdir = cnf["Dir::Queue::%s" % (directory) ]
181             if os.path.exists(checkdir):
182                 print "Looking into %s" % (checkdir)
183
184                 for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False):
185                     if not filenames:
186                         # Empty directory (or only subdirectories), next
187                         continue
188                     if self.die:
189                         return
190
191                     for changesfile in filenames:
192                         if not changesfile.endswith(".changes"):
193                             # Only interested in changes files.
194                             continue
195                         count += 1
196
197                         if not get_knownchange(changesfile, self.session):
198                             to_import = ChangesToImport(dirpath, changesfile, count)
199                             print("enqueue: %s" % to_import)
200                             self.queue.enqueue(to_import)
201
202         self.queue.enqueue(EndOfChanges())
203
204 class ImportThread(threading.Thread):
205     def __init__(self, queue):
206         threading.Thread.__init__(self)
207         self.queue = queue
208         self.session = DBConn().session()
209         self.die = False
210
211     def plsDie(self):
212         self.die = True
213
214     def run(self):
215         while True:
216             try:
217                 if self.die:
218                     return
219                 to_import = self.queue.dequeue()
220                 if not to_import:
221                     return
222
223                 print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) )
224
225                 changes = Changes()
226                 changes.changes_file = to_import.changesfile
227                 changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
228                 print( "STU: %s / %s" % (to_import.dirpath, to_import.changesfile))
229                 changes.changes = parse_changes(changesfile, signing_rules=-1)
230                 changes.changes["fingerprint"] = check_signature(changesfile)
231                 changes.add_known_changes(to_import.dirpath, self.session)
232                 self.session.commit()
233
234             except InvalidDscError, line:
235                 warn("syntax error in .dsc file '%s', line %s." % (f, line))
236 #                failure += 1
237
238             except ChangesUnicodeError:
239                 warn("found invalid changes file, not properly utf-8 encoded")
240 #                failure += 1
241
242                 print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile)
243
244
245             except:
246                 traceback.print_exc()
247
248 def main():
249     cnf = Config()
250
251     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
252                  ('j',"concurrency", "%s::%s" % (options_prefix,"Concurrency"),"HasArg"),
253                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
254                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
255                 ]
256
257     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
258
259     num_threads = 1
260
261     if len(args) > 0:
262         usage()
263
264     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
265         usage()
266
267     level=logging.INFO
268     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
269         level=logging.ERROR
270
271     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
272         level=logging.DEBUG
273
274
275     logging.basicConfig( level=level,
276                          format='%(asctime)s %(levelname)s %(message)s',
277                          stream = sys.stderr )
278
279     if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
280         num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")])
281
282
283     queue = OneAtATime()
284     threads = [ ChangesGenerator(queue) ]
285
286     for i in range(num_threads):
287         threads.append( ImportThread(queue) )
288
289     try:
290         for thread in threads:
291             thread.start()
292
293         for thread in thrads:
294             thread.join()
295
296     except KeyboardInterrupt:
297         utils.warn("Caught C-c; terminating.")
298         for thread in threads:
299             thread.plsDie()
300
301         for thread in threads:
302             thread.join()
303
304
305 if __name__ == '__main__':
306     main()