]> git.decadent.org.uk Git - dak.git/blob - dak/contents.py
get rid of some w h i t spa c e; and fix the comment in update28
[dak.git] / dak / contents.py
1 #!/usr/bin/env python
2 """
3 Create all the contents files
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2008, 2009 Michael Casadevall <mcasadevall@debian.org>
7 @copyright: 2009 Mike O'Connor <stew@debian.org>
8 @license: GNU General Public License version 2 or later
9 """
10
11 ################################################################################
12
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <Ganneff> there is the idea to slowly replace contents files
30 # <Ganneff> with a new generation of such files.
31 # <Ganneff> having more info.
32
33 # <Ganneff> of course that wont help for now where we need to generate them :)
34
35 ################################################################################
36
37 import sys
38 import os
39 import logging
40 import gzip
41 import threading
42 import traceback
43 import Queue
44 import apt_pkg
45 import datetime
46 import traceback
47 from daklib import utils
48 from daklib.binary import Binary
49 from daklib.config import Config
50 from daklib.dbconn import *
51
52 ################################################################################
53
54 def usage (exit_code=0):
55     print """Usage: dak contents [options] command [arguments]
56
57 COMMANDS
58     generate
59         generate Contents-$arch.gz files
60
61     bootstrap_bin
62         scan the debs in the existing pool and load contents into the bin_contents table
63
64     bootstrap
65         copy data from the bin_contents table into the deb_contents / udeb_contents tables
66
67     cruft
68         remove files/paths which are no longer referenced by a binary
69
70 OPTIONS
71      -h, --help
72         show this help and exit
73
74      -v, --verbose
75         show verbose information messages
76
77      -q, --quiet
78         supress all output but errors
79
80      -s, --suite={stable,testing,unstable,...}
81         only operate on a single suite
82 """
83     sys.exit(exit_code)
84
85 ################################################################################
86
87 # where in dak.conf all of our configuration will be stowed
88
89 options_prefix = "Contents"
90 options_prefix = "%s::Options" % options_prefix
91
92 log = logging.getLogger()
93
94 ################################################################################
95
96 class EndOfContents(object):
97     """
98     A sentry object for the end of the filename stream
99     """
100     pass
101
102 class OneAtATime(object):
103     """
104     a one space queue which sits between multiple possible producers
105     and multiple possible consumers
106     """
107     def __init__(self):
108         self.next_in_line = None
109         self.read_lock = threading.Condition()
110         self.write_lock = threading.Condition()
111         self.die = False
112
113     def enqueue(self, next):
114         self.write_lock.acquire()
115         while self.next_in_line:
116             if self.die:
117                 return
118             self.write_lock.wait()
119
120         assert( not self.next_in_line )
121         self.next_in_line = next
122         self.write_lock.release()
123         self.read_lock.acquire()
124         self.read_lock.notify()
125         self.read_lock.release()
126
127     def dequeue(self):
128         self.read_lock.acquire()
129         while not self.next_in_line:
130             if self.die:
131                 return
132             self.read_lock.wait()
133
134         result = self.next_in_line
135
136         self.next_in_line = None
137         self.read_lock.release()
138         self.write_lock.acquire()
139         self.write_lock.notify()
140         self.write_lock.release()
141
142         return result
143
144
145 class ContentsWorkThread(threading.Thread):
146     """
147     """
148     def __init__(self, upstream, downstream):
149         threading.Thread.__init__(self)
150         self.upstream = upstream
151         self.downstream = downstream
152
153     def run(self):
154         while True:
155             try:
156                 contents_file = self.upstream.dequeue()
157                 if isinstance(contents_file,EndOfContents):
158                     if self.downstream:
159                         self.downstream.enqueue(contents_file)
160                     break
161
162                 s = datetime.datetime.now()
163                 print("%s start: %s" % (self,contents_file) )
164                 self._run(contents_file)
165                 print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
166                 if self.downstream:
167                     self.downstream.enqueue(contents_file)
168             except:
169                 traceback.print_exc()
170
171 class QueryThread(ContentsWorkThread):
172     def __init__(self, upstream, downstream):
173         ContentsWorkThread.__init__(self, upstream, downstream)
174
175     def __str__(self):
176         return "QueryThread"
177     __repr__ = __str__
178
179     def _run(self, contents_file):
180         contents_file.query()
181
182 class IngestThread(ContentsWorkThread):
183     def __init__(self, upstream, downstream):
184         ContentsWorkThread.__init__(self, upstream, downstream)
185
186     def __str__(self):
187         return "IngestThread"
188     __repr__ = __str__
189
190     def _run(self, contents_file):
191         contents_file.ingest()
192
193 class SortThread(ContentsWorkThread):
194     def __init__(self, upstream, downstream):
195         ContentsWorkThread.__init__(self, upstream, downstream)
196
197     def __str__(self):
198         return "SortThread"
199     __repr__ = __str__
200
201     def _run(self, contents_file):
202         contents_file.sorted_keys = sorted(contents_file.filenames.keys())
203
204 class OutputThread(ContentsWorkThread):
205     def __init__(self, upstream, downstream):
206         ContentsWorkThread.__init__(self, upstream, downstream)
207
208     def __str__(self):
209         return "OutputThread"
210     __repr__ = __str__
211
212     def _run(self, contents_file):
213         contents_file.open_file()
214         for fname in contents_file.sorted_keys:
215             contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
216         contents_file.sorted_keys = None
217         contents_file.filenames.clear()
218
219 class GzipThread(ContentsWorkThread):
220     def __init__(self, upstream, downstream):
221         ContentsWorkThread.__init__(self, upstream, downstream)
222
223     def __str__(self):
224         return "GzipThread"
225     __repr__ = __str__
226
227     def _run(self, contents_file):
228         os.system("gzip -f %s" % contents_file.filename)
229
230 class ContentFile(object):
231     def __init__(self,
232                  filename,
233                  suite_str,
234                  suite_id):
235
236         self.filename = filename
237         self.filenames = {}
238         self.sorted_keys = None
239         self.suite_str = suite_str
240         self.suite_id = suite_id
241         self.session = None
242         self.filehandle = None
243         self.results = None
244
245     def __str__(self):
246         return self.filename
247     __repr__ = __str__
248
249
250     def cleanup(self):
251         self.filenames = None
252         self.sortedkeys = None
253         self.filehandle.close()
254         self.session.close()
255
256     def ingest(self):
257         while True:
258             r = self.results.fetchone()
259             if not r:
260                 break
261             filename, package = r
262             self.filenames[filename]=package
263
264         self.session.close()
265
266     def open_file(self):
267         """
268         opens a gzip stream to the contents file
269         """
270 #        filepath = Config()["Contents::Root"] + self.filename
271         self.filename = "/home/stew/contents/" + self.filename
272         filedir = os.path.dirname(self.filename)
273         if not os.path.isdir(filedir):
274             os.makedirs(filedir)
275 #        self.filehandle = gzip.open(self.filename, "w")
276         self.filehandle = open(self.filename, "w")
277         self._write_header()
278
279     def _write_header(self):
280         self._get_header();
281         self.filehandle.write(ContentFile.header)
282
283     header=None
284
285     @classmethod
286     def _get_header(self):
287         """
288         Internal method to return the header for Contents.gz files
289
290         This is boilerplate which explains the contents of the file and how
291         it can be used.
292         """
293         if not ContentFile.header:
294             if Config().has_key("Contents::Header"):
295                 try:
296                     h = open(os.path.join( Config()["Dir::Templates"],
297                                            Config()["Contents::Header"] ), "r")
298                     ContentFile.header = h.read()
299                     h.close()
300                 except:
301                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
302                                                                       traceback.format_exc() ))
303                     ContentFile.header = None
304             else:
305                 ContentFile.header = None
306
307         return ContentFile.header
308
309
310 class DebContentFile(ContentFile):
311     def __init__(self,
312                  filename,
313                  suite_str,
314                  suite_id,
315                  arch_str,
316                  arch_id):
317         ContentFile.__init__(self,
318                              filename,
319                              suite_str,
320                              suite_id )
321         self.arch_str = arch_str
322         self.arch_id = arch_id
323
324     def query(self):
325         self.session = DBConn().session();
326
327         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
328         FROM deb_contents
329         WHERE ( arch=2 or arch = :arch) AND suite = :suite
330         """, { 'arch':self.arch_id, 'suite':self.suite_id } )
331
332 class UdebContentFile(ContentFile):
333     def __init__(self,
334                  filename,
335                  suite_str,
336                  suite_id,
337                  section_name,
338                  section_id):
339         ContentFile.__init__(self,
340                              filename,
341                              suite_str,
342                              suite_id )
343
344     def query(self):
345         self.session = DBConn().session();
346
347         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
348         FROM udeb_contents
349         WHERE suite = :suite
350         group by filename
351         """ , { 'suite': self.suite_id } )
352
353 class Contents(object):
354     """
355     Class capable of generating Contents-$arch.gz files
356     """
357     def __init__(self):
358         self.header = None
359
360     def reject(self, message):
361         log.error("E: %s" % message)
362
363     def cruft(self):
364         """
365         remove files/paths from the DB which are no longer referenced
366         by binaries and clean the temporary table
367         """
368         s = DBConn().session()
369
370         # clear out all of the temporarily stored content associations
371         # this should be run only after p-a has run.  after a p-a
372         # run we should have either accepted or rejected every package
373         # so there should no longer be anything in the queue
374         s.query(PendingContentAssociation).delete()
375
376         # delete any filenames we are storing which have no binary associated
377         # with them
378         cafq = s.query(ContentAssociation.filename_id).distinct()
379         cfq = s.query(ContentFilename)
380         cfq = cfq.filter(~ContentFilename.cafilename_id.in_(cafq))
381         cfq.delete()
382
383         # delete any paths we are storing which have no binary associated with
384         # them
385         capq = s.query(ContentAssociation.filepath_id).distinct()
386         cpq = s.query(ContentFilepath)
387         cpq = cpq.filter(~ContentFilepath.cafilepath_id.in_(capq))
388         cpq.delete()
389
390         s.commit()
391
392
393     def bootstrap_bin(self):
394         """
395         scan the existing debs in the pool to populate the bin_contents table
396         """
397         pooldir = Config()[ 'Dir::Pool' ]
398
399         s = DBConn().session()
400
401         for binary in s.query(DBBinary).yield_per(100):
402             print( "binary: %s" % binary.package )
403             filename = binary.poolfile.filename
404              # Check for existing contents
405             existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
406             if existingq.fetchone():
407                 log.debug( "already imported: %s" % (filename))
408             else:
409                 # We don't have existing contents so import them
410                 log.debug( "scanning: %s" % (filename) )
411
412                 debfile = os.path.join(pooldir, filename)
413                 if os.path.exists(debfile):
414                     Binary(debfile, self.reject).scan_package(binary.binary_id, True)
415                 else:
416                     log.error("missing .deb: %s" % filename)
417
418
419
420     def bootstrap(self):
421         """
422         scan the existing debs in the pool to populate the contents database tables
423         """
424         s = DBConn().session()
425
426
427         # get a mapping of all the override types we care about (right now .deb an .udeb)
428         override_type_map = {};
429         for override_type in s.query(OverrideType).all():
430             if override_type.overridetype.endswith('deb' ):
431                 override_type_map[override_type.overridetype_id] = override_type.overridetype;
432
433         for override in s.query(Override).yield_per(100):
434             if not override_type_map.has_key(override.overridetype_id):
435                 #this isn't an override we care about
436                 continue
437
438             binaries = s.execute("""SELECT b.id, b.architecture
439                                     FROM binaries b
440                                     JOIN bin_associations ba ON ba.bin=b.id
441                                     WHERE ba.suite=:suite
442                                     AND b.package=:package""", {'suite':override.suite_id, 'package':override.package})
443             while True:
444                 binary = binaries.fetchone()
445                 if not binary:
446                     break
447
448                 exists = s.execute("SELECT 1 FROM %s_contents WHERE binary_id=:id limit 1" % override_type_map[override.overridetype_id], {'id':binary.id})
449
450
451                 if exists.fetchone():
452                     print '.',
453                     continue
454                 else:
455                     print '+',
456
457                 s.execute( """INSERT INTO %s_contents (filename,section,package,binary_id,arch,suite)
458                               SELECT file, :section, :package, :binary_id, :arch, :suite
459                               FROM bin_contents
460                               WHERE binary_id=:binary_id;""" % override_type_map[override.overridetype_id],
461                            { 'section' : override.section_id,
462                              'package' : override.package,
463                              'binary_id' : binary.id,
464                              'arch' : binary.architecture,
465                              'suite' : override.suite_id } )
466                 s.commit()
467
468     def generate(self):
469         """
470         Generate contents files for both deb and udeb
471         """
472         self.deb_generate()
473 #        self.udeb_generate()
474
475     def deb_generate(self):
476         """
477         Generate Contents-$arch.gz files for every available arch in each given suite.
478         """
479         session = DBConn().session()
480         debtype_id = get_override_type("deb", session)
481         suites = self._suites()
482
483         inputtoquery = OneAtATime()
484         querytoingest = OneAtATime()
485         ingesttosort = OneAtATime()
486         sorttooutput = OneAtATime()
487         outputtogzip = OneAtATime()
488
489         qt = QueryThread(inputtoquery,querytoingest)
490         it = IngestThread(querytoingest,ingesttosort)
491         st = SortThread(ingesttosort,sorttooutput)
492         ot = OutputThread(sorttooutput,outputtogzip)
493         gt = GzipThread(outputtogzip, None)
494
495         qt.start()
496         it.start()
497         st.start()
498         ot.start()
499         gt.start()
500
501         # Get our suites, and the architectures
502         for suite in [i.lower() for i in suites]:
503             suite_id = get_suite(suite, session).suite_id
504             print( "got suite_id: %s for suite: %s" % (suite_id, suite ) )
505             arch_list = self._arches(suite_id, session)
506
507             for (arch_id,arch_str) in arch_list:
508                 print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
509
510                 filename = "dists/%s/Contents-%s" % (suite, arch_str)
511                 cf = DebContentFile(filename, suite, suite_id, arch_str, arch_id)
512                 inputtoquery.enqueue( cf )
513
514         inputtoquery.enqueue( EndOfContents() )
515         gt.join()
516
517     def udeb_generate(self):
518         """
519         Generate Contents-$arch.gz files for every available arch in each given suite.
520         """
521         session = DBConn().session()
522         udebtype_id=DBConn().get_override_type_id("udeb")
523         suites = self._suites()
524
525         inputtoquery = OneAtATime()
526         querytoingest = OneAtATime()
527         ingesttosort = OneAtATime()
528         sorttooutput = OneAtATime()
529         outputtogzip = OneAtATime()
530
531         qt = QueryThread(inputtoquery,querytoingest)
532         it = IngestThread(querytoingest,ingesttosort)
533         st = SortThread(ingesttosort,sorttooutput)
534         ot = OutputThread(sorttooutput,outputtogzip)
535         gt = GzipThread(outputtogzip, None)
536
537         qt.start()
538         it.start()
539         st.start()
540         ot.start()
541         gt.start()
542
543
544     def generate(self):
545         """
546         Generate Contents-$arch.gz files for every available arch in each given suite.
547         """
548         session = DBConn().session()
549
550         arch_all_id = get_architecture("all", session).arch_id
551
552         # The MORE fun part. Ok, udebs need their own contents files, udeb, and udeb-nf (not-free)
553         # This is HORRIBLY debian specific :-/
554         for dtype, section, fn_pattern in \
555               [('deb',  None,                        "dists/%s/Contents-%s.gz"),
556                ('udeb', "debian-installer",          "dists/%s/Contents-udeb-%s.gz"),
557                ('udeb', "non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
558
559             overridetype = get_override_type(dtype, session)
560
561             # For udebs, we only look in certain sections (see the for loop above)
562             if section is not None:
563                 section = get_section(section, session)
564
565             # Get our suites
566             for suite in which_suites(session):
567                 # Which architectures do we need to work on
568                 arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
569
570                 # Set up our file writer dictionary
571                 file_writers = {}
572                 try:
573                     # One file writer per arch
574                     for arch in arch_list:
575                         file_writers[arch.arch_id] = GzippedContentWriter(fn_pattern % (suite, arch.arch_string))
576
577                     for r in get_suite_contents(suite, overridetype, section, session=session).fetchall():
578                         filename, section, package, arch_id = r
579
580                         if arch_id == arch_all_id:
581                             # It's arch all, so all contents files get it
582                             for writer in file_writers.values():
583                                 writer.write(filename, section, package)
584                         else:
585                             if file_writers.has_key(arch_id):
586                                 file_writers[arch_id].write(filename, section, package)
587
588                 finally:
589                     # close all the files
590                     for writer in file_writers.values():
591                         writer.finish()
592     def _suites(self):
593         """
594         return a list of suites to operate on
595         """
596         if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
597             suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
598         else:
599             suites = [ 'unstable', 'testing' ]
600 #            suites = Config().SubTree("Suite").List()
601
602         return suites
603
604     def _arches(self, suite, session):
605         """
606         return a list of archs to operate on
607         """
608         arch_list = []
609         arches = session.execute(
610             """SELECT s.architecture, a.arch_string
611             FROM suite_architectures s
612             JOIN architecture a ON (s.architecture=a.id)
613             WHERE suite = :suite_id""",
614             {'suite_id':suite } )
615
616         while True:
617             r = arches.fetchone()
618             if not r:
619                 break
620
621             if r[1] != "source" and r[1] != "all":
622                 arch_list.append((r[0], r[1]))
623
624         return arch_list
625
626
627 ################################################################################
628
629 def main():
630     cnf = Config()
631
632     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
633                  ('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
634                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
635                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
636                 ]
637
638     commands = {'generate' : Contents.deb_generate,
639                 'bootstrap_bin' : Contents.bootstrap_bin,
640                 'bootstrap' : Contents.bootstrap,
641                 'cruft' : Contents.cruft,
642                 }
643
644     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
645
646     if (len(args) < 1) or not commands.has_key(args[0]):
647         usage()
648
649     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
650         usage()
651
652     level=logging.INFO
653     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
654         level=logging.ERROR
655
656     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
657         level=logging.DEBUG
658
659
660     logging.basicConfig( level=level,
661                          format='%(asctime)s %(levelname)s %(message)s',
662                          stream = sys.stderr )
663
664     commands[args[0]](Contents())
665
666 def which_suites(session):
667     """
668     return a list of suites to operate on
669     """
670     if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
671         suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
672     else:
673         suites = Config().SubTree("Suite").List()
674
675     return [get_suite(s.lower(), session) for s in suites]
676
677
678 if __name__ == '__main__':
679     main()