5 Import known_changes files
7 @contact: Debian FTP Master <ftpmaster@debian.org>
8 @copyright: 2009 Mike O'Connor <stew@debian.org>
9 @license: GNU General Public License version 2 or later
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 ################################################################################
29 ################################################################################
35 from daklib.dbconn import DBConn, get_dbchange, get_policy_queue
36 from daklib.config import Config
38 from daklib.dak_exceptions import DBUpdateError, InvalidDscError, ChangesUnicodeError
39 from daklib.changes import Changes
40 from daklib.utils import parse_changes, warn, gpgv_get_status_output, process_gpgv_output
43 # where in dak.conf all of our configuration will be stowed
44 options_prefix = "KnownChanges"
45 options_prefix = "%s::Options" % options_prefix
47 log = logging.getLogger()
49 ################################################################################
52 def usage (exit_code=0):
53 print """Usage: dak import-known-changes [options]
57 run with n threads concurrently
60 show verbose information messages
63 supress all output but errors
68 def check_signature (sig_filename, data_filename=""):
72 "/home/joerg/keyring/keyrings/debian-keyring.gpg",
73 "/home/joerg/keyring/keyrings/debian-maintainers.gpg",
74 "/home/joerg/keyring/keyrings/debian-role-keys.gpg",
75 "/home/joerg/keyring/keyrings/emeritus-keyring.pgp",
76 "/home/joerg/keyring/keyrings/emeritus-keyring.gpg",
77 "/home/joerg/keyring/keyrings/removed-keys.gpg",
78 "/home/joerg/keyring/keyrings/removed-keys.pgp"
81 keyringargs = " ".join(["--keyring %s" % x for x in keyrings ])
83 # Build the command line
84 status_read, status_write = os.pipe()
85 cmd = "gpgv --status-fd %s %s %s" % (status_write, keyringargs, sig_filename)
87 # Invoke gpgv on the file
88 (output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write)
90 # Process the status-fd output
91 (keywords, internal_error) = process_gpgv_output(status)
93 # If we failed to parse the status-fd output, let's just whine and bail now
95 warn("Couldn't parse signature")
98 # usually one would check for bad things here. We, however, do not care.
100 # Next check gpgv exited with a zero return code
102 warn("Couldn't parse signature")
105 # Sanity check the good stuff we expect
106 if not keywords.has_key("VALIDSIG"):
107 warn("Couldn't parse signature")
109 args = keywords["VALIDSIG"]
111 warn("Couldn't parse signature")
113 fingerprint = args[0]
118 class EndOfChanges(object):
119 """something enqueued to signify the last change"""
123 class OneAtATime(object):
125 a one space queue which sits between multiple possible producers
126 and multiple possible consumers
129 self.next_in_line = None
130 self.read_lock = threading.Condition()
131 self.write_lock = threading.Condition()
136 self.write_lock.acquire()
137 self.write_lock.notifyAll()
138 self.write_lock.release()
140 self.read_lock.acquire()
141 self.read_lock.notifyAll()
142 self.read_lock.release()
144 def enqueue(self, next):
145 self.write_lock.acquire()
146 while self.next_in_line:
149 self.write_lock.wait()
151 assert( not self.next_in_line )
152 self.next_in_line = next
153 self.write_lock.release()
154 self.read_lock.acquire()
155 self.read_lock.notify()
156 self.read_lock.release()
159 self.read_lock.acquire()
160 while not self.next_in_line:
163 self.read_lock.wait()
165 result = self.next_in_line
167 self.next_in_line = None
168 self.read_lock.release()
169 self.write_lock.acquire()
170 self.write_lock.notify()
171 self.write_lock.release()
173 if isinstance(result, EndOfChanges):
178 class ChangesToImport(object):
179 """A changes file to be enqueued to be processed"""
180 def __init__(self, checkdir, changesfile, count):
181 self.dirpath = checkdir
182 self.changesfile = changesfile
186 return "#%d: %s in %s" % (self.count, self.changesfile, self.dirpath)
188 class ChangesGenerator(threading.Thread):
189 """enqueues changes files to be imported"""
190 def __init__(self, parent, queue):
191 threading.Thread.__init__(self)
193 self.session = DBConn().session()
205 dirs.append(cnf['Dir::Done'])
207 for queue_name in [ "byhand", "new", "proposedupdates", "oldproposedupdates" ]:
208 queue = get_policy_queue(queue_name)
210 dirs.append(os.path.abspath(queue.path))
212 warn("Could not find queue %s in database" % queue_name)
214 for checkdir in dirs:
215 if os.path.exists(checkdir):
216 print "Looking into %s" % (checkdir)
218 for dirpath, dirnames, filenames in os.walk(checkdir, topdown=True):
220 # Empty directory (or only subdirectories), next
223 for changesfile in filenames:
225 if not changesfile.endswith(".changes"):
226 # Only interested in changes files.
230 if not get_dbchange(changesfile, self.session):
231 to_import = ChangesToImport(dirpath, changesfile, count)
234 self.queue.enqueue(to_import)
235 except KeyboardInterrupt:
236 print("got Ctrl-c in enqueue thread. terminating")
240 self.queue.enqueue(EndOfChanges())
242 class ImportThread(threading.Thread):
243 def __init__(self, parent, queue):
244 threading.Thread.__init__(self)
246 self.session = DBConn().session()
258 to_import = self.queue.dequeue()
262 print( "Directory %s, file %7d, (%s)" % (to_import.dirpath[-10:], to_import.count, to_import.changesfile) )
265 changes.changes_file = to_import.changesfile
266 changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
267 changes.changes = parse_changes(changesfile, signing_rules=-1)
268 changes.changes["fingerprint"] = check_signature(changesfile)
269 changes.add_known_changes(to_import.dirpath, session=self.session)
270 self.session.commit()
272 except InvalidDscError as line:
273 warn("syntax error in .dsc file '%s', line %s." % (f, line))
275 except ChangesUnicodeError:
276 warn("found invalid changes file, not properly utf-8 encoded")
278 except KeyboardInterrupt:
279 print("Caught C-c; on ImportThread. terminating.")
287 class ImportKnownChanges(object):
288 def __init__(self,num_threads):
289 self.queue = OneAtATime()
290 self.threads = [ ChangesGenerator(self,self.queue) ]
292 for i in range(num_threads):
293 self.threads.append( ImportThread(self,self.queue) )
296 for thread in self.threads:
299 except KeyboardInterrupt:
300 print("Caught C-c; terminating.")
301 warn("Caught C-c; terminating.")
305 traceback.print_stack90
306 for thread in self.threads:
307 print( "STU: before ask %s to die" % thread )
309 print( "STU: after ask %s to die" % thread )
318 arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
319 ('j',"concurrency", "%s::%s" % (options_prefix,"Concurrency"),"HasArg"),
320 ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
321 ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
324 args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
331 if cnf.has_key("%s::%s" % (options_prefix,"Help")):
335 if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
338 elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
342 logging.basicConfig( level=level,
343 format='%(asctime)s %(levelname)s %(message)s',
344 stream = sys.stderr )
346 if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")):
347 num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")])
349 ImportKnownChanges(num_threads)
354 if __name__ == '__main__':