]> git.decadent.org.uk Git - dak.git/blob - dak/contents.py
before I rip out pending_*
[dak.git] / dak / contents.py
1 #!/usr/bin/env python
2 """
3 Create all the contents files
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2008, 2009 Michael Casadevall <mcasadevall@debian.org>
7 @copyright: 2009 Mike O'Connor <stew@debian.org>
8 @license: GNU General Public License version 2 or later
9 """
10
11 ################################################################################
12
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <Ganneff> there is the idea to slowly replace contents files
30 # <Ganneff> with a new generation of such files.
31 # <Ganneff> having more info.
32
33 # <Ganneff> of course that wont help for now where we need to generate them :)
34
35 ################################################################################
36
37 import sys
38 import os
39 import logging
40 import gzip
41 import threading
42 import Queue
43 import apt_pkg
44 from daklib import utils
45 from daklib.binary import Binary
46 from daklib.config import Config
47 from daklib.dbconn import *
48
49 ################################################################################
50
51 def usage (exit_code=0):
52     print """Usage: dak contents [options] command [arguments]
53
54 COMMANDS
55     generate
56         generate Contents-$arch.gz files
57
58     bootstrap_bin
59         scan the debs in the existing pool and load contents into the bin_contents table
60
61     cruft
62         remove files/paths which are no longer referenced by a binary
63
64 OPTIONS
65      -h, --help
66         show this help and exit
67
68      -v, --verbose
69         show verbose information messages
70
71      -q, --quiet
72         supress all output but errors
73
74      -s, --suite={stable,testing,unstable,...}
75         only operate on a single suite
76 """
77     sys.exit(exit_code)
78
79 ################################################################################
80
81 # where in dak.conf all of our configuration will be stowed
82
83 options_prefix = "Contents"
84 options_prefix = "%s::Options" % options_prefix
85
86 log = logging.getLogger()
87
88 ################################################################################
89
90 class EndOfContents(object):
91     """
92     A sentry object for the end of the filename stream
93     """
94     pass
95
96 class OneAtATime(object):
97     """
98     """
99     def __init__(self):
100         self.next_in_line = None
101         self.next_lock = threading.Condition()
102
103     def enqueue(self, next):
104         self.next_lock.acquire()
105         while self.next_in_line:
106             self.next_lock.wait()
107             
108         assert( not self.next_in_line )
109         self.next_in_line = next
110         self.next_lock.notify()
111         self.next_lock.release()
112
113     def dequeue(self):
114         self.next_lock.acquire()
115         while not self.next_in_line:
116             self.next_lock.wait()
117         result = self.next_in_line
118         self.next_in_line = None
119         self.next_lock.notify()
120         self.next_lock.release()
121         return result
122
123 class ContentsWorkThread(threading.Thread):
124     """
125     """
126     def __init__(self, upstream, downstream):
127         threading.Thread.__init__(self)
128         self.upstream = upstream
129         self.downstream = downstream
130
131     def run(self):
132         while True:
133             try:
134                 contents_file = self.upstream.dequeue()
135                 if isinstance(contents_file,EndOfContents):
136                     if self.downstream:
137                         self.downstream.enqueue(contents_file)
138                     break
139
140                 s = datetime.datetime.now()
141                 print("%s start: %s" % (self,contents_file) )
142                 self._run(contents_file)
143                 print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
144                 if self.downstream:
145                     self.downstream.enqueue(contents_file)
146             except:
147                 traceback.print_exc()
148
149 class QueryThread(ContentsWorkThread):
150     def __init__(self, upstream, downstream):
151         ContentsWorkThread.__init__(self, upstream, downstream)
152
153     def __str__(self):
154         return "QueryThread"
155     __repr__ = __str__
156
157     def _run(self, contents_file):
158         contents_file.query()
159
160 class IngestThread(ContentsWorkThread):
161     def __init__(self, upstream, downstream):
162         ContentsWorkThread.__init__(self, upstream, downstream)
163
164     def __str__(self):
165         return "IngestThread"
166     __repr__ = __str__
167
168     def _run(self, contents_file):
169         contents_file.ingest()
170
171 class SortThread(ContentsWorkThread):
172     def __init__(self, upstream, downstream):
173         ContentsWorkThread.__init__(self, upstream, downstream)
174
175     def __str__(self):
176         return "SortThread"
177     __repr__ = __str__
178
179     def _run(self, contents_file):
180         contents_file.sorted_keys = sorted(contents_file.filenames.keys())
181
182 class OutputThread(ContentsWorkThread):
183     def __init__(self, upstream, downstream):
184         ContentsWorkThread.__init__(self, upstream, downstream)
185
186     def __str__(self):
187         return "OutputThread"
188     __repr__ = __str__
189
190     def _run(self, contents_file):
191         contents_file.open_file()
192         for fname in contents_file.sorted_keys:
193             contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
194         contents_file.sorted_keys = None
195         contents_file.filenames.clear()
196     
197 class GzipThread(ContentsWorkThread):
198     def __init__(self, upstream, downstream):
199         ContentsWorkThread.__init__(self, upstream, downstream)
200
201     def __str__(self):
202         return "GzipThread"
203     __repr__ = __str__
204
205     def _run(self, contents_file):
206         os.system("gzip -f %s" % contents_file.filename)
207
208 class ContentFile(object):
209     def __init__(self,
210                  filename,
211                  suite_str,
212                  suite_id)
213
214         self.filename = filename
215         self.filenames = {}
216         self.sorted_keys = None
217         self.suite_str = suite_str
218         self.suite_id = suite_id
219         self.cursor = None
220         self.filehandle = None
221
222     def __str__(self):
223         return self.filename
224     __repr__ = __str__
225
226
227     def cleanup(self):
228         self.filenames = None
229         self.sortedkeys = None
230         self.filehandle.close()
231         self.cursor.close()
232
233     def ingest(self):
234         while True:
235             r = self.cursor.fetchone()
236             if not r:
237                 break
238             filename, package = r
239             if self.filenames.has_key(filename):
240                 self.filenames[filename] += ",%s" % (package)
241             else:
242                 self.filenames[filename] = "%s" % (package)
243         self.cursor.close()
244
245     def open_file(self):
246         """
247         opens a gzip stream to the contents file
248         """
249 #        filepath = Config()["Contents::Root"] + self.filename
250         self.filename = "/home/stew/contents/" + self.filename
251         filedir = os.path.dirname(self.filename)
252         if not os.path.isdir(filedir):
253             os.makedirs(filedir)
254 #        self.filehandle = gzip.open(self.filename, "w")
255         self.filehandle = open(self.filename, "w")
256         self._write_header()
257
258     def _write_header(self):
259         self._get_header();
260         self.filehandle.write(ContentFile.header)
261
262     header=None
263
264     @classmethod
265     def _get_header(self):
266         """
267         Internal method to return the header for Contents.gz files
268
269         This is boilerplate which explains the contents of the file and how
270         it can be used.
271         """
272         if not ContentFile.header:
273             if Config().has_key("Contents::Header"):
274                 try:
275                     h = open(os.path.join( Config()["Dir::Templates"],
276                                            Config()["Contents::Header"] ), "r")
277                     ContentFile.header = h.read()
278                     h.close()
279                 except:
280                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
281                                                                       traceback.format_exc() ))
282                     ContentFile.header = None
283             else:
284                 ContentFile.header = None
285
286         return ContentFile.header
287
288
289 class DebContentFile(ContentFile):
290     def __init__(self,
291                  filename,
292                  suite_str,
293                  suite_id,
294                  arch_str,
295                  arch_id):
296         ContentFile.__init__(self,
297                              filename,
298                              suite_str,
299                              suite_id )
300         self.arch_str = arch_str
301         self.arch_id = arch_id
302
303     def query(self):
304         self.cursor = DBConn().session();
305
306         self.cursor.execute("""SELECT file, component || section || '/' || package
307         FROM deb_contents
308         WHERE ( arch=2 or arch = :arch) AND suite = :suite
309         """, { 'arch':self.arch_id, 'suite':self.suite_id }
310
311 class UdebContentFile(ContentFile):
312     def __init__(self,
313                  filename,
314                  suite_str,
315                  suite_id,
316                  section_name,
317                  section_id)
318         ContentFile.__init__(self,
319                              filename,
320                              suite_str,
321                              suite_id )
322
323     def query(self):
324         self.cursor = DBConn().session();
325
326         self.cursor.execute("""SELECT file, component || section || '/' || package
327         FROM udeb_contents
328         WHERE suite = :suite
329         """ , { 'suite': self.suite_id } )
330
331 class Contents(object):
332     """
333     Class capable of generating Contents-$arch.gz files
334     """
335
336     def __init__(self):
337         self.header = None
338
339     def reject(self, message):
340         log.error("E: %s" % message)
341
342     def cruft(self):
343         """
344         remove files/paths from the DB which are no longer referenced
345         by binaries and clean the temporary table
346         """
347         s = DBConn().session()
348
349         # clear out all of the temporarily stored content associations
350         # this should be run only after p-a has run.  after a p-a
351         # run we should have either accepted or rejected every package
352         # so there should no longer be anything in the queue
353         s.query(PendingContentAssociation).delete()
354
355         # delete any filenames we are storing which have no binary associated
356         # with them
357         cafq = s.query(ContentAssociation.filename_id).distinct()
358         cfq = s.query(ContentFilename)
359         cfq = cfq.filter(~ContentFilename.cafilename_id.in_(cafq))
360         cfq.delete()
361
362         # delete any paths we are storing which have no binary associated with
363         # them
364         capq = s.query(ContentAssociation.filepath_id).distinct()
365         cpq = s.query(ContentFilepath)
366         cpq = cpq.filter(~ContentFilepath.cafilepath_id.in_(capq))
367         cpq.delete()
368
369         s.commit()
370
371
372     def bootstrap_bin(self):
373         """
374         scan the existing debs in the pool to populate the bin_contents table
375         """
376         pooldir = Config()[ 'Dir::Pool' ]
377
378         s = DBConn().session()
379
380         print( "bootstrap_bin" )
381         for binary in s.query(DBBinary).yield_per(1000):
382             print( "binary: %s" % binary.package )
383             filename = binary.poolfile.filename
384              # Check for existing contents
385             existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
386             if existingq.fetchone():
387                 log.debug( "already imported: %s" % (filename))
388             else:
389                 # We don't have existing contents so import them
390                 log.debug( "scanning: %s" % (filename) )
391
392                 debfile = os.path.join(pooldir, filename)
393                 if os.path.exists(debfile):
394                     Binary(debfile, self.reject).scan_package(binary.binary_id, True)
395                 else:
396                     log.error("missing .deb: %s" % filename)
397
398
399
400     def bootstrap(self):
401         """
402         scan the existing debs in the pool to populate the contents database tables
403         """
404         s = DBConn().session()
405
406         for override in s.query(Override).all():
407             binaries = s.execute("""SELECT b.binary_id, ba.arch
408                                     FROM binaries b
409                                     JOIN bin_associations ba ON ba.binary_id=b.binary_id
410                                     WHERE ba.suite=:suite
411                                     AND b.package=override.package""", {'suite':override.suite})
412             while True:
413                 binary = binaries.fetchone()
414                 if not binary:
415                     break
416
417                 filenames = s.execute( """SELECT file from bin_contents where binary_id=:id""", { 'id': binary.binary_id } )
418                 while True:
419                     filename = filenames.fetchone()
420                     if not binary:
421                         break
422
423                 
424
425                     if override.type == 7:
426                         s.execute( """INSERT INTO deb_contents (file,section,package,binary_id,arch,suite,component)
427                                       VALUES (:filename, :section, :package, :binary_id, :arch, :suite, :component);""",
428                                    { 'filename' : filename,
429                                      'section' : override.section,
430                                      'package' : override.package,
431                                      'binary_id' : binary.binary_id,
432                                      'arch' : binary.arch,
433                                      'suite' : override.suite,
434                                      'component' : override.component } )
435
436                     
437                     elif override.type == 9:
438                         s.execute( """INSERT INTO deb_contents (file,section,package,binary_id,arch,suite,component)
439                                       VALUES (:filename, :section, :package, :binary_id, :arch, :suite, :component);""",
440                                    { 'filename' : filename,
441                                      'section' : override.section,
442                                      'package' : override.package,
443                                      'binary_id' : binary.binary_id,
444                                      'arch' : binary.arch,
445                                      'suite' : override.suite,
446                                      'component' : override.component } )
447
448 #     def bootstrap(self):
449 #         """
450 #         scan the existing debs in the pool to populate the contents database tables
451 #         """
452 #         pooldir = Config()[ 'Dir::Pool' ]
453
454 #         s = DBConn().session()
455
456 #         for suite in s.query(Suite).all():
457 #             for arch in get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=s):
458 #                 q = s.query(BinAssociation).join(Suite)
459 #                 q = q.join(Suite).filter_by(suite_name=suite.suite_name)
460 #                 q = q.join(DBBinary).join(Architecture).filter_by(arch.arch_string)
461 #                 for ba in q:
462 #                     filename = ba.binary.poolfile.filename
463 #                     # Check for existing contents
464 #                     existingq = s.query(ContentAssociations).filter_by(binary_pkg=ba.binary_id).limit(1)
465 #                     if existingq.count() > 0:
466 #                         log.debug( "already imported: %s" % (filename))
467 #                     else:
468 #                         # We don't have existing contents so import them
469 #                         log.debug( "scanning: %s" % (filename) )
470 #                         debfile = os.path.join(pooldir, filename)
471 #                         if os.path.exists(debfile):
472 #                             Binary(debfile, self.reject).scan_package(ba.binary_id, True)
473 #                         else:
474 #                             log.error("missing .deb: %s" % filename)
475     def generate(self):
476         """
477         Generate contents files for both deb and udeb
478         """
479         DBConn().prepare("arches_q", arches_q)
480         self.deb_generate()
481 #        self.udeb_generate()
482
483     def deb_generate(self):
484         """
485         Generate Contents-$arch.gz files for every available arch in each given suite.
486         """
487         cursor = DBConn().session()
488         debtype_id = DBConn().get_override_type_id("deb")
489         suites = self._suites()
490
491         inputtoquery = OneAtATime()
492         querytoingest = OneAtATime()
493         ingesttosort = OneAtATime()
494         sorttooutput = OneAtATime()
495         outputtogzip = OneAtATime()
496
497         qt = QueryThread(inputtoquery,querytoingest)
498         it = IngestThread(querytoingest,ingesttosort)
499 # these actually make things worse
500 #        it2 = IngestThread(querytoingest,ingesttosort)
501 #        it3 = IngestThread(querytoingest,ingesttosort)
502 #        it4 = IngestThread(querytoingest,ingesttosort)
503         st = SortThread(ingesttosort,sorttooutput)
504         ot = OutputThread(sorttooutput,outputtogzip)
505         gt = GzipThread(outputtogzip, None)
506
507         qt.start()
508         it.start()
509 #        it2.start()
510 #        it3.start()
511 #        it2.start()
512         st.start()
513         ot.start()
514         gt.start()
515         
516         # Get our suites, and the architectures
517         for suite in [i.lower() for i in suites]:
518             suite_id = DBConn().get_suite_id(suite)
519             arch_list = self._arches(cursor, suite_id)
520
521             for (arch_id,arch_str) in arch_list:
522                 print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
523
524 #                filename = "dists/%s/Contents-%s.gz" % (suite, arch_str)
525                 filename = "dists/%s/Contents-%s" % (suite, arch_str)
526                 cf = ContentFile(filename, suite, suite_id, arch_str, arch_id)
527                 inputtoquery.enqueue( cf )
528
529         inputtoquery.enqueue( EndOfContents() )
530         gt.join()
531
532     def udeb_generate(self):
533         """
534         Generate Contents-$arch.gz files for every available arch in each given suite.
535         """
536         cursor = DBConn().session()
537         udebtype_id=DBConn().get_override_type_id("udeb")
538         suites = self._suites()
539
540         inputtoquery = OneAtATime()
541         querytoingest = OneAtATime()
542         ingesttosort = OneAtATime()
543         sorttooutput = OneAtATime()
544         outputtogzip = OneAtATime()
545
546         qt = QueryThread(inputtoquery,querytoingest)
547         it = IngestThread(querytoingest,ingesttosort)
548 # these actually make things worse
549 #        it2 = IngestThread(querytoingest,ingesttosort)
550 #        it3 = IngestThread(querytoingest,ingesttosort)
551 #        it4 = IngestThread(querytoingest,ingesttosort)
552         st = SortThread(ingesttosort,sorttooutput)
553         ot = OutputThread(sorttooutput,outputtogzip)
554         gt = GzipThread(outputtogzip, None)
555
556         qt.start()
557         it.start()
558 #        it2.start()
559 #        it3.start()
560 #        it2.start()
561         st.start()
562         ot.start()
563         gt.start()
564         
565         for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"),
566                                     ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]:
567
568             section_id = DBConn().get_section_id(section) # all udebs should be here)
569             if section_id != -1:
570
571                 
572
573                 # Get our suites, and the architectures
574                 for suite in [i.lower() for i in suites]:
575                     suite_id = DBConn().get_suite_id(suite)
576                     arch_list = self._arches(cursor, suite_id)
577
578                     for arch_id in arch_list:
579
580                         writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
581                         try:
582
583                             cursor.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id))
584
585                             while True:
586                                 r = cursor.fetchone()
587                                 if not r:
588                                     break
589
590                                 filename, section, package, arch = r
591                                 writer.write(filename, section, package)
592                         finally:
593                             writer.close()
594
595
596
597
598     def generate(self):
599         """
600         Generate Contents-$arch.gz files for every available arch in each given suite.
601         """
602         session = DBConn().session()
603
604         arch_all_id = get_architecture("all", session).arch_id
605
606         # The MORE fun part. Ok, udebs need their own contents files, udeb, and udeb-nf (not-free)
607         # This is HORRIBLY debian specific :-/
608         for dtype, section, fn_pattern in \
609               [('deb',  None,                        "dists/%s/Contents-%s.gz"),
610                ('udeb', "debian-installer",          "dists/%s/Contents-udeb-%s.gz"),
611                ('udeb', "non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
612
613             overridetype = get_override_type(dtype, session)
614
615             # For udebs, we only look in certain sections (see the for loop above)
616             if section is not None:
617                 section = get_section(section, session)
618
619             # Get our suites
620             for suite in which_suites():
621                 # Which architectures do we need to work on
622                 arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
623
624                 # Set up our file writer dictionary
625                 file_writers = {}
626                 try:
627                     # One file writer per arch
628                     for arch in arch_list:
629                         file_writers[arch.arch_id] = GzippedContentWriter(fn_pattern % (suite, arch.arch_string))
630
631                     for r in get_suite_contents(suite, overridetype, section, session=session).fetchall():
632                         filename, section, package, arch_id = r
633
634                         if arch_id == arch_all_id:
635                             # It's arch all, so all contents files get it
636                             for writer in file_writers.values():
637                                 writer.write(filename, section, package)
638                         else:
639                             if file_writers.has_key(arch_id):
640                                 file_writers[arch_id].write(filename, section, package)
641
642                 finally:
643                     # close all the files
644                     for writer in file_writers.values():
645                         writer.finish()
646     def _suites(self):
647         """
648         return a list of suites to operate on
649         """
650         if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
651             suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
652         else:
653             suites = [ 'unstable', 'testing' ]
654 #            suites = Config().SubTree("Suite").List()
655
656         return suites
657
658     def _arches(self, cursor, suite):
659         """
660         return a list of archs to operate on
661         """
662         arch_list = []
663         cursor.execute("EXECUTE arches_q(%d)" % (suite))
664         while True:
665             r = cursor.fetchone()
666             if not r:
667                 break
668
669             if r[1] != "source" and r[1] != "all":
670                 arch_list.append((r[0], r[1]))
671
672         return arch_list
673
674
675 ################################################################################
676
677 def main():
678     cnf = Config()
679
680     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
681                  ('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
682                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
683                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
684                 ]
685
686     commands = {'generate' : Contents.generate,
687                 'bootstrap_bin' : Contents.bootstrap_bin,
688                 'cruft' : Contents.cruft,
689                 }
690
691     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
692
693     if (len(args) < 1) or not commands.has_key(args[0]):
694         usage()
695
696     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
697         usage()
698
699     level=logging.INFO
700     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
701         level=logging.ERROR
702
703     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
704         level=logging.DEBUG
705
706
707     logging.basicConfig( level=level,
708                          format='%(asctime)s %(levelname)s %(message)s',
709                          stream = sys.stderr )
710
711     commands[args[0]](Contents())
712
713 def which_suites(session):
714     """
715     return a list of suites to operate on
716     """
717     if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
718         suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
719     else:
720         suites = Config().SubTree("Suite").List()
721
722     return [get_suite(s.lower(), session) for s in suites]
723
724
725 if __name__ == '__main__':
726     main()