]> git.decadent.org.uk Git - dak.git/blob - dak/contents.py
merge from master with sqla
[dak.git] / dak / contents.py
1 #!/usr/bin/env python
2 """
3 Create all the contents files
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2008, 2009 Michael Casadevall <mcasadevall@debian.org>
7 @copyright: 2009 Mike O'Connor <stew@debian.org>
8 @license: GNU General Public License version 2 or later
9 """
10
11 ################################################################################
12
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <Ganneff> there is the idea to slowly replace contents files
30 # <Ganneff> with a new generation of such files.
31 # <Ganneff> having more info.
32
33 # <Ganneff> of course that wont help for now where we need to generate them :)
34
35 ################################################################################
36
37 import sys
38 import os
39 import logging
40 import math
41 import gzip
42 import threading
43 import traceback
44 import Queue
45 import apt_pkg
46 import datetime #just for debugging, can be removed
47 from daklib import utils
48 from daklib.binary import Binary
49 from daklib.config import Config
50 from daklib.dbconn import DBConn
51 ################################################################################
52
53 log=None
54
55 def usage (exit_code=0):
56     print """Usage: dak contents [options] command [arguments]
57
58 COMMANDS
59     generate
60         generate Contents-$arch.gz files
61
62     bootstrap
63         scan the debs in the existing pool and load contents in the the database
64
65     cruft
66         remove files/paths which are no longer referenced by a binary
67
68 OPTIONS
69      -h, --help
70         show this help and exit
71
72      -v, --verbose
73         show verbose information messages
74
75      -q, --quiet
76         supress all output but errors
77
78      -s, --suite={stable,testing,unstable,...}
79         only operate on a single suite
80 """
81     sys.exit(exit_code)
82
83 ################################################################################
84
85 # where in dak.conf all of our configuration will be stowed
86
87 options_prefix = "Contents"
88 options_prefix = "%s::Options" % options_prefix
89
90 #log = logging.getLogger()
91
92 ################################################################################
93
94 # get all the arches delivered for a given suite
95 # this should probably exist somehere common
96 arches_q = """PREPARE arches_q(int) as
97               SELECT s.architecture, a.arch_string
98               FROM suite_architectures s
99               JOIN architecture a ON (s.architecture=a.id)
100                   WHERE suite = $1"""
101
102 # find me the .deb for a given binary id
103 debs_q = """PREPARE debs_q(int, int) as
104               SELECT b.id, f.filename FROM bin_assoc_by_arch baa
105               JOIN binaries b ON baa.bin=b.id
106               JOIN files f ON b.file=f.id
107               WHERE suite = $1
108                   AND arch = $2"""
109
110 # find me all of the contents for a given .deb
111 contents_q = """PREPARE contents_q(int,int) as
112                 SELECT file, section, package
113                 FROM deb_contents
114                 WHERE suite = $1
115                 AND (arch = $2 or arch=2)"""
116 #                ORDER BY file"""
117                 
118 # find me all of the contents for a given .udeb
119 udeb_contents_q = """PREPARE udeb_contents_q(int,int,text, int) as
120                 SELECT file, section, package, arch
121                 FROM udeb_contents
122                 WHERE suite = $1
123                 AND otype = $2
124                 AND section = $3
125                 and arch = $4
126                 ORDER BY file"""
127
128
129 # clear out all of the temporarily stored content associations
130 # this should be run only after p-a has run.  after a p-a
131 # run we should have either accepted or rejected every package
132 # so there should no longer be anything in the queue
133 remove_pending_contents_cruft_q = """DELETE FROM pending_content_associations"""
134
135 class EndOfContents(object):
136     pass
137
138 class OneAtATime(object):
139     """
140     """
141     def __init__(self):
142         self.next_in_line = None
143         self.next_lock = threading.Condition()
144
145     def enqueue(self, next):
146         self.next_lock.acquire()
147         while self.next_in_line:
148             self.next_lock.wait()
149             
150         assert( not self.next_in_line )
151         self.next_in_line = next
152         self.next_lock.notify()
153         self.next_lock.release()
154
155     def dequeue(self):
156         self.next_lock.acquire()
157         while not self.next_in_line:
158             self.next_lock.wait()
159         result = self.next_in_line
160         self.next_in_line = None
161         self.next_lock.notify()
162         self.next_lock.release()
163         return result
164         
165
166 class ContentsWorkThread(threading.Thread):
167     """
168     """
169     def __init__(self, upstream, downstream):
170         threading.Thread.__init__(self)
171         self.upstream = upstream
172         self.downstream = downstream
173
174     def run(self):
175         while True:
176             try:
177                 contents_file = self.upstream.dequeue()
178                 if isinstance(contents_file,EndOfContents):
179                     if self.downstream:
180                         self.downstream.enqueue(contents_file)
181                     break
182
183                 s = datetime.datetime.now()
184                 print("%s start: %s" % (self,contents_file) )
185                 self._run(contents_file)
186                 print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
187                 if self.downstream:
188                     self.downstream.enqueue(contents_file)
189             except:
190                 traceback.print_exc()
191
192 class QueryThread(ContentsWorkThread):
193     def __init__(self, upstream, downstream):
194         ContentsWorkThread.__init__(self, upstream, downstream)
195
196     def __str__(self):
197         return "QueryThread"
198     __repr__ = __str__
199
200     def _run(self, contents_file):
201         contents_file.query()
202
203 class IngestThread(ContentsWorkThread):
204     def __init__(self, upstream, downstream):
205         ContentsWorkThread.__init__(self, upstream, downstream)
206
207     def __str__(self):
208         return "IngestThread"
209     __repr__ = __str__
210
211     def _run(self, contents_file):
212         contents_file.ingest()
213
214 class SortThread(ContentsWorkThread):
215     def __init__(self, upstream, downstream):
216         ContentsWorkThread.__init__(self, upstream, downstream)
217
218     def __str__(self):
219         return "SortThread"
220     __repr__ = __str__
221
222     def _run(self, contents_file):
223         contents_file.sorted_keys = sorted(contents_file.filenames.keys())
224
225 class OutputThread(ContentsWorkThread):
226     def __init__(self, upstream, downstream):
227         ContentsWorkThread.__init__(self, upstream, downstream)
228
229     def __str__(self):
230         return "OutputThread"
231     __repr__ = __str__
232
233     def _run(self, contents_file):
234         contents_file.open_file()
235         for fname in contents_file.sorted_keys:
236             contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
237         contents_file.sorted_keys = None
238         contents_file.filenames.clear()
239     
240 class GzipThread(ContentsWorkThread):
241     def __init__(self, upstream, downstream):
242         ContentsWorkThread.__init__(self, upstream, downstream)
243
244     def __str__(self):
245         return "GzipThread"
246     __repr__ = __str__
247
248     def _run(self, contents_file):
249         os.system("gzip -f %s" % contents_file.filename)
250     
251 class ContentFile(object):
252     def __init__(self,
253                  filename,
254                  suite_str,
255                  suite_id,
256                  arch_str,
257                  arch_id):
258
259         self.filename = filename
260         self.filenames = {}
261         self.sorted_keys = None
262         self.suite_str = suite_str
263         self.suite_id = suite_id
264         self.arch_str = arch_str
265         self.arch_id = arch_id
266         self.cursor = None
267         self.filehandle = None
268
269     def __str__(self):
270         return self.filename
271     __repr__ = __str__
272
273
274     def cleanup(self):
275         self.filenames = None
276         self.sortedkeys = None
277         self.filehandle.close()
278         self.cursor.close()
279
280     def query(self):
281         self.cursor = DBConn().cursor();
282
283         self.cursor.execute("""SELECT file, section || '/' || package
284         FROM deb_contents
285         WHERE ( arch=2 or arch = %d) AND suite = %d
286         """ % (self.arch_id, self.suite_id))
287
288     def ingest(self):
289         while True:
290             r = self.cursor.fetchone()
291             if not r:
292                 break
293             filename, package = r
294             if self.filenames.has_key(filename):
295                 self.filenames[filename] += ",%s" % (package)
296             else:
297                 self.filenames[filename] = "%s" % (package)
298         self.cursor.close()
299
300     def open_file(self):
301         """
302         opens a gzip stream to the contents file
303         """
304 #        filepath = Config()["Contents::Root"] + self.filename
305         self.filename = "/home/stew/contents/" + self.filename
306         filedir = os.path.dirname(self.filename)
307         if not os.path.isdir(filedir):
308             os.makedirs(filedir)
309 #        self.filehandle = gzip.open(self.filename, "w")
310         self.filehandle = open(self.filename, "w")
311         self._write_header()
312
313     def _write_header(self):
314         self._get_header();
315         self.filehandle.write(ContentFile.header)
316
317     header=None
318
319     @classmethod
320     def _get_header(self):
321         """
322         Internal method to return the header for Contents.gz files
323
324         This is boilerplate which explains the contents of the file and how
325         it can be used.
326         """
327         if not ContentFile.header:
328             if Config().has_key("Contents::Header"):
329                 try:
330                     h = open(os.path.join( Config()["Dir::Templates"],
331                                            Config()["Contents::Header"] ), "r")
332                     ContentFile.header = h.read()
333                     h.close()
334                 except:
335                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
336                                                                       traceback.format_exc() ))
337                     ContentFile.header = None
338             else:
339                 ContentFile.header = None
340
341         return ContentFile.header
342
343 class Contents(object):
344     """
345     Class capable of generating Contents-$arch.gz files
346
347     Usage GenerateContents().generateContents( ["main","contrib","non-free"] )
348     """
349
350     def __init__(self):
351         self.header = None
352
353     def reject(self, message):
354         log.error("E: %s" % message)
355
356     # goal column for section column
357     _goal_column = 54
358
359     def cruft(self):
360         """
361         remove files/paths from the DB which are no longer referenced
362         by binaries and clean the temporary table
363         """
364         cursor = DBConn().cursor();
365         cursor.execute( "BEGIN WORK" )
366         cursor.execute( remove_pending_contents_cruft_q )
367         cursor.execute( remove_filename_cruft_q )
368         cursor.execute( remove_filepath_cruft_q )
369         cursor.execute( "COMMIT" )
370
371
372     def bootstrap(self):
373         """
374         scan the existing debs in the pool to populate the contents database tables
375         """
376         pooldir = Config()[ 'Dir::Pool' ]
377
378         cursor = DBConn().cursor();
379         DBConn().prepare("debs_q",debs_q)
380         DBConn().prepare("arches_q",arches_q)
381
382         suites = self._suites()
383         for suite in [i.lower() for i in suites]:
384             suite_id = DBConn().get_suite_id(suite)
385
386             arch_list = self._arches(cursor, suite_id)
387             arch_all_id = DBConn().get_architecture_id("all")
388             for arch_id in arch_list:
389                 cursor.execute( "EXECUTE debs_q(%d, %d)" % ( suite_id, arch_id[0] ) )
390
391                 count = 0
392                 while True:
393                     deb = cursor.fetchone()
394                     if not deb:
395                         break
396                     count += 1
397                     cursor1 = DBConn().cursor();
398                     cursor1.execute( "SELECT 1 FROM deb_contents WHERE binary_id = %d LIMIT 1" % (deb[0] ) )
399                     old = cursor1.fetchone()
400                     if old:
401                         log.log( "already imported: %s" % (deb[1]) )
402                     else:
403 #                        log.debug( "scanning: %s" % (deb[1]) )
404                         log.log( "scanning: %s" % (deb[1]) )
405                         debfile = os.path.join( pooldir, deb[1] )
406                         if os.path.exists( debfile ):
407                             Binary(debfile, self.reject).scan_package(deb[0], True)
408                         else:
409                             log.error("missing .deb: %s" % deb[1])
410
411
412     def generate(self):
413         """
414         Generate contents files for both deb and udeb
415         """
416         DBConn().prepare("arches_q", arches_q)
417         self.deb_generate()
418 #        self.udeb_generate()
419
420     def deb_generate(self):
421         """
422         Generate Contents-$arch.gz files for every available arch in each given suite.
423         """
424         cursor = DBConn().cursor()
425         debtype_id = DBConn().get_override_type_id("deb")
426         suites = self._suites()
427
428         inputtoquery = OneAtATime()
429         querytoingest = OneAtATime()
430         ingesttosort = OneAtATime()
431         sorttooutput = OneAtATime()
432         outputtogzip = OneAtATime()
433
434         qt = QueryThread(inputtoquery,querytoingest)
435         it = IngestThread(querytoingest,ingesttosort)
436 # these actually make things worse
437 #        it2 = IngestThread(querytoingest,ingesttosort)
438 #        it3 = IngestThread(querytoingest,ingesttosort)
439 #        it4 = IngestThread(querytoingest,ingesttosort)
440         st = SortThread(ingesttosort,sorttooutput)
441         ot = OutputThread(sorttooutput,outputtogzip)
442         gt = GzipThread(outputtogzip, None)
443
444         qt.start()
445         it.start()
446 #        it2.start()
447 #        it3.start()
448 #        it2.start()
449         st.start()
450         ot.start()
451         gt.start()
452         
453         # Get our suites, and the architectures
454         for suite in [i.lower() for i in suites]:
455             suite_id = DBConn().get_suite_id(suite)
456             arch_list = self._arches(cursor, suite_id)
457
458             for (arch_id,arch_str) in arch_list:
459                 print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
460
461 #                filename = "dists/%s/Contents-%s.gz" % (suite, arch_str)
462                 filename = "dists/%s/Contents-%s" % (suite, arch_str)
463                 cf = ContentFile(filename, suite, suite_id, arch_str, arch_id)
464                 inputtoquery.enqueue( cf )
465
466         inputtoquery.enqueue( EndOfContents() )
467         gt.join()
468
469     def udeb_generate(self):
470         """
471         Generate Contents-$arch.gz files for every available arch in each given suite.
472         """
473         cursor = DBConn().cursor()
474
475         DBConn().prepare("udeb_contents_q", udeb_contents_q)
476         udebtype_id=DBConn().get_override_type_id("udeb")
477         suites = self._suites()
478
479 #        for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s.gz"),
480 #                                    ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
481
482         for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"),
483                                     ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]:
484
485             section_id = DBConn().get_section_id(section) # all udebs should be here)
486             if section_id != -1:
487
488                 # Get our suites, and the architectures
489                 for suite in [i.lower() for i in suites]:
490                     suite_id = DBConn().get_suite_id(suite)
491                     arch_list = self._arches(cursor, suite_id)
492
493                     for arch_id in arch_list:
494
495                         writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
496                         try:
497
498                             cursor.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id))
499
500                             while True:
501                                 r = cursor.fetchone()
502                                 if not r:
503                                     break
504
505                                 filename, section, package, arch = r
506                                 writer.write(filename, section, package)
507                         finally:
508                             writer.close()
509
510
511
512 ################################################################################
513
514     def _suites(self):
515         """
516         return a list of suites to operate on
517         """
518         if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
519             suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
520         else:
521             suites = [ 'unstable', 'testing' ]
522 #            suites = Config().SubTree("Suite").List()
523
524         return suites
525
526     def _arches(self, cursor, suite):
527         """
528         return a list of archs to operate on
529         """
530         arch_list = []
531         cursor.execute("EXECUTE arches_q(%d)" % (suite))
532         while True:
533             r = cursor.fetchone()
534             if not r:
535                 break
536
537             if r[1] != "source" and r[1] != "all":
538                 arch_list.append((r[0], r[1]))
539
540         return arch_list
541
542 ################################################################################
543
544
545 def main():
546     cnf = Config()
547 #    log = logging.Logger(cnf, "contents")
548                          
549     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
550                  ('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
551                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
552                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
553                 ]
554
555     commands = {'generate' : Contents.generate,
556                 'bootstrap' : Contents.bootstrap,
557                 'cruft' : Contents.cruft,
558                 }
559
560     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
561
562     if (len(args) < 1) or not commands.has_key(args[0]):
563         usage()
564
565     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
566         usage()
567
568 #     level=logging.INFO
569 #     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
570 #         level=logging.ERROR
571
572 #     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
573 #         level=logging.DEBUG
574
575
576 #     logging.basicConfig( level=level,
577 #                          format='%(asctime)s %(levelname)s %(message)s',
578 #                          stream = sys.stderr )
579
580     commands[args[0]](Contents())
581
582 if __name__ == '__main__':
583     main()