]> git.decadent.org.uk Git - dak.git/blob - dak/contents.py
bugfixes
[dak.git] / dak / contents.py
1 #!/usr/bin/env python
2 """
3 Create all the contents files
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2008, 2009 Michael Casadevall <mcasadevall@debian.org>
7 @copyright: 2009 Mike O'Connor <stew@debian.org>
8 @license: GNU General Public License version 2 or later
9 """
10
11 ################################################################################
12
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <Ganneff> there is the idea to slowly replace contents files
30 # <Ganneff> with a new generation of such files.
31 # <Ganneff> having more info.
32
33 # <Ganneff> of course that wont help for now where we need to generate them :)
34
35 ################################################################################
36
37 import sys
38 import os
39 import logging
40 import gzip
41 import threading
42 import traceback
43 import Queue
44 import apt_pkg
45 import datetime
46 import traceback
47 from daklib import utils
48 from daklib.binary import Binary
49 from daklib.config import Config
50 from daklib.dbconn import *
51 from daklib.contents import ContentsScanner
52 from daklib import daklog
53
54 Logger = None
55
56 ################################################################################
57
58 def usage (exit_code=0):
59     print """Usage: dak contents [options] command [arguments]
60
61 COMMANDS
62     generate
63         generate Contents-$arch.gz files
64
65     bootstrap_bin
66         scan the debs in the existing pool and load contents into the bin_contents table
67
68     bootstrap
69         copy data from the bin_contents table into the deb_contents / udeb_contents tables
70
71     cruft
72         remove files/paths which are no longer referenced by a binary
73
74 OPTIONS
75      -h, --help
76         show this help and exit
77
78      -v, --verbose
79         show verbose information messages
80
81      -q, --quiet
82         supress all output but errors
83
84      -s, --suite={stable,testing,unstable,...}
85         only operate on a single suite
86
87      -l, --limit=NUMBER
88         optional package limit for bootstrap_bin
89 """
90     sys.exit(exit_code)
91
92 ################################################################################
93
94 # where in dak.conf all of our configuration will be stowed
95
96 options_prefix = "Contents"
97 options_prefix = "%s::Options" % options_prefix
98
99 log = logging.getLogger()
100
101 ################################################################################
102
103 class EndOfContents(object):
104     """
105     A sentry object for the end of the filename stream
106     """
107     pass
108
109 class OneAtATime(object):
110     """
111     a one space queue which sits between multiple possible producers
112     and multiple possible consumers
113     """
114     def __init__(self):
115         self.next_in_line = None
116         self.read_lock = threading.Condition()
117         self.write_lock = threading.Condition()
118         self.die = False
119
120     def enqueue(self, next):
121         self.write_lock.acquire()
122         while self.next_in_line:
123             if self.die:
124                 return
125             self.write_lock.wait()
126
127         assert( not self.next_in_line )
128         self.next_in_line = next
129         self.write_lock.release()
130         self.read_lock.acquire()
131         self.read_lock.notify()
132         self.read_lock.release()
133
134     def dequeue(self):
135         self.read_lock.acquire()
136         while not self.next_in_line:
137             if self.die:
138                 return
139             self.read_lock.wait()
140
141         result = self.next_in_line
142
143         self.next_in_line = None
144         self.read_lock.release()
145         self.write_lock.acquire()
146         self.write_lock.notify()
147         self.write_lock.release()
148
149         return result
150
151
152 class ContentsWorkThread(threading.Thread):
153     """
154     """
155     def __init__(self, upstream, downstream):
156         threading.Thread.__init__(self)
157         self.upstream = upstream
158         self.downstream = downstream
159
160     def run(self):
161         while True:
162             try:
163                 contents_file = self.upstream.dequeue()
164                 if isinstance(contents_file,EndOfContents):
165                     if self.downstream:
166                         self.downstream.enqueue(contents_file)
167                     break
168
169                 s = datetime.datetime.now()
170                 print("%s start: %s" % (self,contents_file) )
171                 self._run(contents_file)
172                 print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
173                 if self.downstream:
174                     self.downstream.enqueue(contents_file)
175             except:
176                 traceback.print_exc()
177
178 class QueryThread(ContentsWorkThread):
179     def __init__(self, upstream, downstream):
180         ContentsWorkThread.__init__(self, upstream, downstream)
181
182     def __str__(self):
183         return "QueryThread"
184     __repr__ = __str__
185
186     def _run(self, contents_file):
187         contents_file.query()
188
189 class IngestThread(ContentsWorkThread):
190     def __init__(self, upstream, downstream):
191         ContentsWorkThread.__init__(self, upstream, downstream)
192
193     def __str__(self):
194         return "IngestThread"
195     __repr__ = __str__
196
197     def _run(self, contents_file):
198         contents_file.ingest()
199
200 class SortThread(ContentsWorkThread):
201     def __init__(self, upstream, downstream):
202         ContentsWorkThread.__init__(self, upstream, downstream)
203
204     def __str__(self):
205         return "SortThread"
206     __repr__ = __str__
207
208     def _run(self, contents_file):
209         contents_file.sorted_keys = sorted(contents_file.filenames.keys())
210
211 class OutputThread(ContentsWorkThread):
212     def __init__(self, upstream, downstream):
213         ContentsWorkThread.__init__(self, upstream, downstream)
214
215     def __str__(self):
216         return "OutputThread"
217     __repr__ = __str__
218
219     def _run(self, contents_file):
220         contents_file.open_file()
221         for fname in contents_file.sorted_keys:
222             contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
223         contents_file.sorted_keys = None
224         contents_file.filenames.clear()
225
226 class GzipThread(ContentsWorkThread):
227     def __init__(self, upstream, downstream):
228         ContentsWorkThread.__init__(self, upstream, downstream)
229
230     def __str__(self):
231         return "GzipThread"
232     __repr__ = __str__
233
234     def _run(self, contents_file):
235         os.system("gzip -f %s" % contents_file.filename)
236
237 class ContentFile(object):
238     def __init__(self,
239                  filename,
240                  suite_str,
241                  suite_id):
242
243         self.filename = filename
244         self.filenames = {}
245         self.sorted_keys = None
246         self.suite_str = suite_str
247         self.suite_id = suite_id
248         self.session = None
249         self.filehandle = None
250         self.results = None
251
252     def __str__(self):
253         return self.filename
254     __repr__ = __str__
255
256
257     def cleanup(self):
258         self.filenames = None
259         self.sortedkeys = None
260         self.filehandle.close()
261         self.session.close()
262
263     def ingest(self):
264         while True:
265             r = self.results.fetchone()
266             if not r:
267                 break
268             filename, package = r
269             self.filenames[filename]=package
270
271         self.session.close()
272
273     def open_file(self):
274         """
275         opens a gzip stream to the contents file
276         """
277         filepath = Config()["Contents::Root"] + self.filename
278         filedir = os.path.dirname(self.filename)
279         if not os.path.isdir(filedir):
280             os.makedirs(filedir)
281         self.filehandle = open(self.filename, "w")
282         self._write_header()
283
284     def _write_header(self):
285         self._get_header();
286         self.filehandle.write(ContentFile.header)
287
288     header=None
289
290     @classmethod
291     def _get_header(self):
292         """
293         Internal method to return the header for Contents.gz files
294
295         This is boilerplate which explains the contents of the file and how
296         it can be used.
297         """
298         if not ContentFile.header:
299             if Config().has_key("Contents::Header"):
300                 try:
301                     h = open(os.path.join( Config()["Dir::Templates"],
302                                            Config()["Contents::Header"] ), "r")
303                     ContentFile.header = h.read()
304                     h.close()
305                 except:
306                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
307                                                                       traceback.format_exc() ))
308                     ContentFile.header = None
309             else:
310                 ContentFile.header = None
311
312         return ContentFile.header
313
314
315 class DebContentFile(ContentFile):
316     def __init__(self,
317                  filename,
318                  suite_str,
319                  suite_id,
320                  arch_str,
321                  arch_id):
322         ContentFile.__init__(self,
323                              filename,
324                              suite_str,
325                              suite_id )
326         self.arch_str = arch_str
327         self.arch_id = arch_id
328
329     def query(self):
330         self.session = DBConn().session();
331
332         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
333         FROM deb_contents
334         WHERE ( arch=2 or arch = :arch) AND suite = :suite
335         """, { 'arch':self.arch_id, 'suite':self.suite_id } )
336
337 class UdebContentFile(ContentFile):
338     def __init__(self,
339                  filename,
340                  suite_str,
341                  suite_id,
342                  section_name,
343                  section_id):
344         ContentFile.__init__(self,
345                              filename,
346                              suite_str,
347                              suite_id )
348
349     def query(self):
350         self.session = DBConn().session();
351
352         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
353         FROM udeb_contents
354         WHERE suite = :suite
355         group by filename
356         """ , { 'suite': self.suite_id } )
357
358 class Contents(object):
359     """
360     Class capable of generating Contents-$arch.gz files
361     """
362     def __init__(self):
363         self.header = None
364
365     def reject(self, message):
366         log.error("E: %s" % message)
367
368     def cruft(self):
369         """
370         remove files/paths from the DB which are no longer referenced
371         by binaries and clean the temporary table
372         """
373         s = DBConn().session()
374
375         # clear out all of the temporarily stored content associations
376         # this should be run only after p-a has run.  after a p-a
377         # run we should have either accepted or rejected every package
378         # so there should no longer be anything in the queue
379         s.query(PendingContentAssociation).delete()
380
381         # delete any filenames we are storing which have no binary associated
382         # with them
383         cafq = s.query(ContentAssociation.filename_id).distinct()
384         cfq = s.query(ContentFilename)
385         cfq = cfq.filter(~ContentFilename.cafilename_id.in_(cafq))
386         cfq.delete()
387
388         # delete any paths we are storing which have no binary associated with
389         # them
390         capq = s.query(ContentAssociation.filepath_id).distinct()
391         cpq = s.query(ContentFilepath)
392         cpq = cpq.filter(~ContentFilepath.cafilepath_id.in_(capq))
393         cpq.delete()
394
395         s.commit()
396
397
398     def bootstrap_bin(self):
399         """
400         scan the existing debs in the pool to populate the bin_contents table
401         """
402         pooldir = Config()[ 'Dir::Pool' ]
403
404         s = DBConn().session()
405
406         for binary in s.query(DBBinary).yield_per(100):
407             print( "binary: %s" % binary.package )
408             filename = binary.poolfile.filename
409              # Check for existing contents
410             existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
411             if existingq.fetchone():
412                 log.debug( "already imported: %s" % (filename))
413             else:
414                 # We don't have existing contents so import them
415                 log.debug( "scanning: %s" % (filename) )
416
417                 debfile = os.path.join(pooldir, filename)
418                 if os.path.exists(debfile):
419                     Binary(debfile, self.reject).scan_package(binary.binary_id, True)
420                 else:
421                     log.error("missing .deb: %s" % filename)
422
423
424
425     def bootstrap(self):
426         """
427         scan the existing debs in the pool to populate the contents database tables
428         """
429         s = DBConn().session()
430
431
432         # get a mapping of all the override types we care about (right now .deb an .udeb)
433         override_type_map = {};
434         for override_type in s.query(OverrideType).all():
435             if override_type.overridetype.endswith('deb' ):
436                 override_type_map[override_type.overridetype_id] = override_type.overridetype;
437
438         for override in s.query(Override).yield_per(100):
439             if not override_type_map.has_key(override.overridetype_id):
440                 #this isn't an override we care about
441                 continue
442
443             binaries = s.execute("""SELECT b.id, b.architecture
444                                     FROM binaries b
445                                     JOIN bin_associations ba ON ba.bin=b.id
446                                     WHERE ba.suite=:suite
447                                     AND b.package=:package""", {'suite':override.suite_id, 'package':override.package})
448             while True:
449                 binary = binaries.fetchone()
450                 if not binary:
451                     break
452
453                 exists = s.execute("SELECT 1 FROM %s_contents WHERE binary_id=:id limit 1" % override_type_map[override.overridetype_id], {'id':binary.id})
454
455
456                 if exists.fetchone():
457                     print '.',
458                     continue
459                 else:
460                     print '+',
461
462                 s.execute( """INSERT INTO %s_contents (filename,section,package,binary_id,arch,suite)
463                               SELECT file, :section, :package, :binary_id, :arch, :suite
464                               FROM bin_contents
465                               WHERE binary_id=:binary_id;""" % override_type_map[override.overridetype_id],
466                            { 'section' : override.section_id,
467                              'package' : override.package,
468                              'binary_id' : binary.id,
469                              'arch' : binary.architecture,
470                              'suite' : override.suite_id } )
471                 s.commit()
472
473     def generate(self):
474         """
475         Generate contents files for both deb and udeb
476         """
477         self.deb_generate()
478         self.udeb_generate()
479
480     def deb_generate(self):
481         """
482         Generate Contents-$arch.gz files for every available arch in each given suite.
483         """
484         session = DBConn().session()
485         debtype_id = get_override_type("deb", session)
486         suites = self._suites()
487
488         inputtoquery = OneAtATime()
489         querytoingest = OneAtATime()
490         ingesttosort = OneAtATime()
491         sorttooutput = OneAtATime()
492         outputtogzip = OneAtATime()
493
494         qt = QueryThread(inputtoquery,querytoingest)
495         it = IngestThread(querytoingest,ingesttosort)
496         st = SortThread(ingesttosort,sorttooutput)
497         ot = OutputThread(sorttooutput,outputtogzip)
498         gt = GzipThread(outputtogzip, None)
499
500         qt.start()
501         it.start()
502         st.start()
503         ot.start()
504         gt.start()
505
506         # Get our suites, and the architectures
507         for suite in [i.lower() for i in suites]:
508             suite_id = get_suite(suite, session).suite_id
509             print( "got suite_id: %s for suite: %s" % (suite_id, suite ) )
510             arch_list = self._arches(suite_id, session)
511
512             for (arch_id,arch_str) in arch_list:
513                 print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
514
515                 filename = "dists/%s/Contents-%s" % (suite, arch_str)
516                 cf = DebContentFile(filename, suite, suite_id, arch_str, arch_id)
517                 inputtoquery.enqueue( cf )
518
519         inputtoquery.enqueue( EndOfContents() )
520         gt.join()
521
522     def udeb_generate(self):
523         """
524         Generate Contents-$arch.gz files for every available arch in each given suite.
525         """
526         session = DBConn().session()
527         udebtype_id=DBConn().get_override_type_id("udeb")
528         suites = self._suites()
529
530         inputtoquery = OneAtATime()
531         querytoingest = OneAtATime()
532         ingesttosort = OneAtATime()
533         sorttooutput = OneAtATime()
534         outputtogzip = OneAtATime()
535
536         qt = QueryThread(inputtoquery,querytoingest)
537         it = IngestThread(querytoingest,ingesttosort)
538         st = SortThread(ingesttosort,sorttooutput)
539         ot = OutputThread(sorttooutput,outputtogzip)
540         gt = GzipThread(outputtogzip, None)
541
542         qt.start()
543         it.start()
544         st.start()
545         ot.start()
546         gt.start()
547
548
549     def generate(self):
550         """
551         Generate Contents-$arch.gz files for every available arch in each given suite.
552         """
553         session = DBConn().session()
554
555         arch_all_id = get_architecture("all", session).arch_id
556
557         # The MORE fun part. Ok, udebs need their own contents files, udeb, and udeb-nf (not-free)
558         # This is HORRIBLY debian specific :-/
559         for dtype, section, fn_pattern in \
560               [('deb',  None,                        "dists/%s/Contents-%s.gz"),
561                ('udeb', "debian-installer",          "dists/%s/Contents-udeb-%s.gz"),
562                ('udeb', "non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
563
564             overridetype = get_override_type(dtype, session)
565
566             # For udebs, we only look in certain sections (see the for loop above)
567             if section is not None:
568                 section = get_section(section, session)
569
570             # Get our suites
571             for suite in which_suites(session):
572                 # Which architectures do we need to work on
573                 arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
574
575                 # Set up our file writer dictionary
576                 file_writers = {}
577                 try:
578                     # One file writer per arch
579                     for arch in arch_list:
580                         file_writers[arch.arch_id] = GzippedContentWriter(fn_pattern % (suite, arch.arch_string))
581
582                     for r in get_suite_contents(suite, overridetype, section, session=session).fetchall():
583                         filename, section, package, arch_id = r
584
585                         if arch_id == arch_all_id:
586                             # It's arch all, so all contents files get it
587                             for writer in file_writers.values():
588                                 writer.write(filename, section, package)
589                         else:
590                             if file_writers.has_key(arch_id):
591                                 file_writers[arch_id].write(filename, section, package)
592
593                 finally:
594                     # close all the files
595                     for writer in file_writers.values():
596                         writer.finish()
597     def _suites(self):
598         """
599         return a list of suites to operate on
600         """
601         if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
602             suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
603         else:
604             suites = Config().SubTree("Suite").List()
605
606         return suites
607
608     def _arches(self, suite, session):
609         """
610         return a list of archs to operate on
611         """
612         arch_list = []
613         arches = session.execute(
614             """SELECT s.architecture, a.arch_string
615             FROM suite_architectures s
616             JOIN architecture a ON (s.architecture=a.id)
617             WHERE suite = :suite_id""",
618             {'suite_id':suite } )
619
620         while True:
621             r = arches.fetchone()
622             if not r:
623                 break
624
625             if r[1] != "source" and r[1] != "all":
626                 arch_list.append((r[0], r[1]))
627
628         return arch_list
629
630
631 def scan_all(limit):
632     result = ContentsScanner.scan_all(limit)
633     processed = '%(processed)d packages processed' % result
634     remaining = '%(remaining)d packages remaining' % result
635     Logger.log([processed, remaining])
636
637 ################################################################################
638
639 def main():
640     cnf = Config()
641
642     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
643                  ('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
644                  ('l',"limit", "%s::%s" % (options_prefix,"Limit"),"HasArg"),
645                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
646                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
647                 ]
648
649     commands = {'generate' : Contents.generate,
650                 'bootstrap_bin' : Contents.bootstrap_bin,
651                 'bootstrap' : Contents.bootstrap,
652                 'cruft' : Contents.cruft,
653                 }
654
655     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
656
657     if (len(args) < 1) or not commands.has_key(args[0]):
658         usage()
659
660     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
661         usage()
662
663     level=logging.INFO
664     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
665         level=logging.ERROR
666
667     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
668         level=logging.DEBUG
669
670
671     logging.basicConfig( level=level,
672                          format='%(asctime)s %(levelname)s %(message)s',
673                          stream = sys.stderr )
674
675     global Logger
676     Logger = daklog.Logger(cnf.Cnf, 'contents')
677
678     limit = None
679     if cnf.has_key("%s::%s" % (options_prefix,"Limit")):
680         limit = cnf["%s::%s" % (options_prefix,"Limit")]
681
682     if args[0] == 'bootstrap_bin':
683         scan_all(limit)
684     else:
685         commands[args[0]](Contents())
686
687     Logger.close()
688
689 def which_suites(session):
690     """
691     return a list of suites to operate on
692     """
693     if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
694         suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
695     else:
696         suites = Config().SubTree("Suite").List()
697
698     return [get_suite(s.lower(), session) for s in suites]
699
700
701 if __name__ == '__main__':
702     main()