]> git.decadent.org.uk Git - dak.git/blob - dak/contents.py
merge from ftp-master
[dak.git] / dak / contents.py
1 #!/usr/bin/env python
2 """
3 Create all the contents files
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2008, 2009 Michael Casadevall <mcasadevall@debian.org>
7 @copyright: 2009 Mike O'Connor <stew@debian.org>
8 @license: GNU General Public License version 2 or later
9 """
10
11 ################################################################################
12
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <Ganneff> there is the idea to slowly replace contents files
30 # <Ganneff> with a new generation of such files.
31 # <Ganneff> having more info.
32
33 # <Ganneff> of course that wont help for now where we need to generate them :)
34
35 ################################################################################
36
37 import sys
38 import os
39 import logging
40 import gzip
41 import threading
42 import traceback
43 import Queue
44 import apt_pkg
45 import datetime
46 import traceback
47 from daklib import utils
48 from daklib.binary import Binary
49 from daklib.config import Config
50 from daklib.dbconn import *
51
52 ################################################################################
53
54 def usage (exit_code=0):
55     print """Usage: dak contents [options] command [arguments]
56
57 COMMANDS
58     generate
59         generate Contents-$arch.gz files
60
61     bootstrap_bin
62         scan the debs in the existing pool and load contents into the bin_contents table
63
64     bootstrap
65         copy data from the bin_contents table into the deb_contents / udeb_contents tables
66
67     cruft
68         remove files/paths which are no longer referenced by a binary
69
70 OPTIONS
71      -h, --help
72         show this help and exit
73
74      -v, --verbose
75         show verbose information messages
76
77      -q, --quiet
78         supress all output but errors
79
80      -s, --suite={stable,testing,unstable,...}
81         only operate on a single suite
82 """
83     sys.exit(exit_code)
84
85 ################################################################################
86
87 # where in dak.conf all of our configuration will be stowed
88
89 options_prefix = "Contents"
90 options_prefix = "%s::Options" % options_prefix
91
92 log = logging.getLogger()
93
94 ################################################################################
95
96 class EndOfContents(object):
97     """
98     A sentry object for the end of the filename stream
99     """
100     pass
101
102 class OneAtATime(object):
103     """
104     a one space queue which sits between multiple possible producers
105     and multiple possible consumers
106     """
107     def __init__(self):
108         self.next_in_line = None
109         self.read_lock = threading.Condition()
110         self.write_lock = threading.Condition()
111         self.die = False
112
113     def enqueue(self, next):
114         self.write_lock.acquire()
115         while self.next_in_line:
116             if self.die:
117                 return
118             self.write_lock.wait()
119
120         assert( not self.next_in_line )
121         self.next_in_line = next
122         self.write_lock.release()
123         self.read_lock.acquire()
124         self.read_lock.notify()
125         self.read_lock.release()
126
127     def dequeue(self):
128         self.read_lock.acquire()
129         while not self.next_in_line:
130             if self.die:
131                 return
132             self.read_lock.wait()
133
134         result = self.next_in_line
135
136         self.next_in_line = None
137         self.read_lock.release()
138         self.write_lock.acquire()
139         self.write_lock.notify()
140         self.write_lock.release()
141
142         return result
143
144
145 class ContentsWorkThread(threading.Thread):
146     """
147     """
148     def __init__(self, upstream, downstream):
149         threading.Thread.__init__(self)
150         self.upstream = upstream
151         self.downstream = downstream
152
153     def run(self):
154         while True:
155             try:
156                 contents_file = self.upstream.dequeue()
157                 if isinstance(contents_file,EndOfContents):
158                     if self.downstream:
159                         self.downstream.enqueue(contents_file)
160                     break
161
162                 s = datetime.datetime.now()
163                 print("%s start: %s" % (self,contents_file) )
164                 self._run(contents_file)
165                 print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
166                 if self.downstream:
167                     self.downstream.enqueue(contents_file)
168             except:
169                 traceback.print_exc()
170
171 class QueryThread(ContentsWorkThread):
172     def __init__(self, upstream, downstream):
173         ContentsWorkThread.__init__(self, upstream, downstream)
174
175     def __str__(self):
176         return "QueryThread"
177     __repr__ = __str__
178
179     def _run(self, contents_file):
180         contents_file.query()
181
182 class IngestThread(ContentsWorkThread):
183     def __init__(self, upstream, downstream):
184         ContentsWorkThread.__init__(self, upstream, downstream)
185
186     def __str__(self):
187         return "IngestThread"
188     __repr__ = __str__
189
190     def _run(self, contents_file):
191         contents_file.ingest()
192
193 class SortThread(ContentsWorkThread):
194     def __init__(self, upstream, downstream):
195         ContentsWorkThread.__init__(self, upstream, downstream)
196
197     def __str__(self):
198         return "SortThread"
199     __repr__ = __str__
200
201     def _run(self, contents_file):
202         contents_file.sorted_keys = sorted(contents_file.filenames.keys())
203
204 class OutputThread(ContentsWorkThread):
205     def __init__(self, upstream, downstream):
206         ContentsWorkThread.__init__(self, upstream, downstream)
207
208     def __str__(self):
209         return "OutputThread"
210     __repr__ = __str__
211
212     def _run(self, contents_file):
213         contents_file.open_file()
214         for fname in contents_file.sorted_keys:
215             contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
216         contents_file.sorted_keys = None
217         contents_file.filenames.clear()
218     
219 class GzipThread(ContentsWorkThread):
220     def __init__(self, upstream, downstream):
221         ContentsWorkThread.__init__(self, upstream, downstream)
222
223     def __str__(self):
224         return "GzipThread"
225     __repr__ = __str__
226
227     def _run(self, contents_file):
228         os.system("gzip -f %s" % contents_file.filename)
229
230 class ContentFile(object):
231     def __init__(self,
232                  filename,
233                  suite_str,
234                  suite_id):
235
236         self.filename = filename
237         self.filenames = {}
238         self.sorted_keys = None
239         self.suite_str = suite_str
240         self.suite_id = suite_id
241         self.session = None
242         self.filehandle = None
243         self.results = None
244
245     def __str__(self):
246         return self.filename
247     __repr__ = __str__
248
249
250     def cleanup(self):
251         self.filenames = None
252         self.sortedkeys = None
253         self.filehandle.close()
254         self.session.close()
255
256     def ingest(self):
257         while True:
258             r = self.results.fetchone()
259             if not r:
260                 break
261             filename, package = r
262             self.filenames[filename]=package
263
264         self.session.close()
265
266 #     def ingest(self):
267 #         while True:
268 #             r = self.results.fetchone()
269 #             if not r:
270 #                 break
271 #             filename, package = r
272 #             if self.filenames.has_key(filename):
273 #                 self.filenames[filename] += ",%s" % (package)
274 #             else:
275 #                 self.filenames[filename] = "%s" % (package)
276 #         self.session.close()
277
278     def open_file(self):
279         """
280         opens a gzip stream to the contents file
281         """
282 #        filepath = Config()["Contents::Root"] + self.filename
283         self.filename = "/home/stew/contents/" + self.filename
284         filedir = os.path.dirname(self.filename)
285         if not os.path.isdir(filedir):
286             os.makedirs(filedir)
287 #        self.filehandle = gzip.open(self.filename, "w")
288         self.filehandle = open(self.filename, "w")
289         self._write_header()
290
291     def _write_header(self):
292         self._get_header();
293         self.filehandle.write(ContentFile.header)
294
295     header=None
296
297     @classmethod
298     def _get_header(self):
299         """
300         Internal method to return the header for Contents.gz files
301
302         This is boilerplate which explains the contents of the file and how
303         it can be used.
304         """
305         if not ContentFile.header:
306             if Config().has_key("Contents::Header"):
307                 try:
308                     h = open(os.path.join( Config()["Dir::Templates"],
309                                            Config()["Contents::Header"] ), "r")
310                     ContentFile.header = h.read()
311                     h.close()
312                 except:
313                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
314                                                                       traceback.format_exc() ))
315                     ContentFile.header = None
316             else:
317                 ContentFile.header = None
318
319         return ContentFile.header
320
321
322 class DebContentFile(ContentFile):
323     def __init__(self,
324                  filename,
325                  suite_str,
326                  suite_id,
327                  arch_str,
328                  arch_id):
329         ContentFile.__init__(self,
330                              filename,
331                              suite_str,
332                              suite_id )
333         self.arch_str = arch_str
334         self.arch_id = arch_id
335
336     def query(self):
337         self.session = DBConn().session();
338
339         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
340         FROM deb_contents
341         WHERE ( arch=2 or arch = :arch) AND suite = :suite
342         """, { 'arch':self.arch_id, 'suite':self.suite_id } )
343
344 class UdebContentFile(ContentFile):
345     def __init__(self,
346                  filename,
347                  suite_str,
348                  suite_id,
349                  section_name,
350                  section_id):
351         ContentFile.__init__(self,
352                              filename,
353                              suite_str,
354                              suite_id )
355
356     def query(self):
357         self.session = DBConn().session();
358
359         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
360         FROM udeb_contents
361         WHERE suite = :suite
362         group by filename
363         """ , { 'suite': self.suite_id } )
364
365 class Contents(object):
366     """
367     Class capable of generating Contents-$arch.gz files
368     """
369     def __init__(self):
370         self.header = None
371
372     def reject(self, message):
373         log.error("E: %s" % message)
374
375     def cruft(self):
376         """
377         remove files/paths from the DB which are no longer referenced
378         by binaries and clean the temporary table
379         """
380         s = DBConn().session()
381
382         # clear out all of the temporarily stored content associations
383         # this should be run only after p-a has run.  after a p-a
384         # run we should have either accepted or rejected every package
385         # so there should no longer be anything in the queue
386         s.query(PendingContentAssociation).delete()
387
388         # delete any filenames we are storing which have no binary associated
389         # with them
390         cafq = s.query(ContentAssociation.filename_id).distinct()
391         cfq = s.query(ContentFilename)
392         cfq = cfq.filter(~ContentFilename.cafilename_id.in_(cafq))
393         cfq.delete()
394
395         # delete any paths we are storing which have no binary associated with
396         # them
397         capq = s.query(ContentAssociation.filepath_id).distinct()
398         cpq = s.query(ContentFilepath)
399         cpq = cpq.filter(~ContentFilepath.cafilepath_id.in_(capq))
400         cpq.delete()
401
402         s.commit()
403
404
405     def bootstrap_bin(self):
406         """
407         scan the existing debs in the pool to populate the bin_contents table
408         """
409         pooldir = Config()[ 'Dir::Pool' ]
410
411         s = DBConn().session()
412
413         for binary in s.query(DBBinary).yield_per(100):
414             print( "binary: %s" % binary.package )
415             filename = binary.poolfile.filename
416              # Check for existing contents
417             existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
418             if existingq.fetchone():
419                 log.debug( "already imported: %s" % (filename))
420             else:
421                 # We don't have existing contents so import them
422                 log.debug( "scanning: %s" % (filename) )
423
424                 debfile = os.path.join(pooldir, filename)
425                 if os.path.exists(debfile):
426                     Binary(debfile, self.reject).scan_package(binary.binary_id, True)
427                 else:
428                     log.error("missing .deb: %s" % filename)
429
430
431
432     def bootstrap(self):
433         """
434         scan the existing debs in the pool to populate the contents database tables
435         """
436         s = DBConn().session()
437
438
439         # get a mapping of all the override types we care about (right now .deb an .udeb)
440         override_type_map = {};
441         for override_type in s.query(OverrideType).all():
442             if override_type.overridetype.endswith('deb' ):
443                 override_type_map[override_type.overridetype_id] = override_type.overridetype;
444
445         for override in s.query(Override).yield_per(100):
446             if not override_type_map.has_key(override.overridetype_id):
447                 #this isn't an override we care about
448                 continue
449
450             binaries = s.execute("""SELECT b.id, b.architecture
451                                     FROM binaries b
452                                     JOIN bin_associations ba ON ba.bin=b.id
453                                     WHERE ba.suite=:suite
454                                     AND b.package=:package""", {'suite':override.suite_id, 'package':override.package})
455             while True:
456                 binary = binaries.fetchone()
457                 if not binary:
458                     break
459
460                 exists = s.execute("SELECT 1 FROM %s_contents WHERE binary_id=:id limit 1" % override_type_map[override.overridetype_id], {'id':binary.id})
461
462
463                 if exists.fetchone():
464                     print '.',
465                     continue
466                 else:
467                     print '+',
468
469                 s.execute( """INSERT INTO %s_contents (filename,section,package,binary_id,arch,suite)
470                               SELECT file, :section, :package, :binary_id, :arch, :suite
471                               FROM bin_contents
472                               WHERE binary_id=:binary_id;""" % override_type_map[override.overridetype_id],
473                            { 'section' : override.section_id,
474                              'package' : override.package,
475                              'binary_id' : binary.id,
476                              'arch' : binary.architecture,
477                              'suite' : override.suite_id } )
478                 s.commit()
479
480     def generate(self):
481         """
482         Generate contents files for both deb and udeb
483         """
484         self.deb_generate()
485 #        self.udeb_generate()
486
487     def deb_generate(self):
488         """
489         Generate Contents-$arch.gz files for every available arch in each given suite.
490         """
491         session = DBConn().session()
492         debtype_id = get_override_type("deb", session)
493         suites = self._suites()
494
495         inputtoquery = OneAtATime()
496         querytoingest = OneAtATime()
497         ingesttosort = OneAtATime()
498         sorttooutput = OneAtATime()
499         outputtogzip = OneAtATime()
500
501         qt = QueryThread(inputtoquery,querytoingest)
502         it = IngestThread(querytoingest,ingesttosort)
503 # these actually make things worse
504 #        it2 = IngestThread(querytoingest,ingesttosort)
505 #        it3 = IngestThread(querytoingest,ingesttosort)
506 #        it4 = IngestThread(querytoingest,ingesttosort)
507         st = SortThread(ingesttosort,sorttooutput)
508         ot = OutputThread(sorttooutput,outputtogzip)
509         gt = GzipThread(outputtogzip, None)
510
511         qt.start()
512         it.start()
513 #        it2.start()
514 #        it3.start()
515 #        it2.start()
516         st.start()
517         ot.start()
518         gt.start()
519         
520         # Get our suites, and the architectures
521         for suite in [i.lower() for i in suites]:
522             suite_id = get_suite(suite, session).suite_id
523             print( "got suite_id: %s for suite: %s" % (suite_id, suite ) )
524             arch_list = self._arches(suite_id, session)
525
526             for (arch_id,arch_str) in arch_list:
527                 print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
528
529 #                filename = "dists/%s/Contents-%s.gz" % (suite, arch_str)
530                 filename = "dists/%s/Contents-%s" % (suite, arch_str)
531                 cf = DebContentFile(filename, suite, suite_id, arch_str, arch_id)
532                 inputtoquery.enqueue( cf )
533
534         inputtoquery.enqueue( EndOfContents() )
535         gt.join()
536
537     def udeb_generate(self):
538         """
539         Generate Contents-$arch.gz files for every available arch in each given suite.
540         """
541         session = DBConn().session()
542         udebtype_id=DBConn().get_override_type_id("udeb")
543         suites = self._suites()
544
545         inputtoquery = OneAtATime()
546         querytoingest = OneAtATime()
547         ingesttosort = OneAtATime()
548         sorttooutput = OneAtATime()
549         outputtogzip = OneAtATime()
550
551         qt = QueryThread(inputtoquery,querytoingest)
552         it = IngestThread(querytoingest,ingesttosort)
553 # these actually make things worse
554 #        it2 = IngestThread(querytoingest,ingesttosort)
555 #        it3 = IngestThread(querytoingest,ingesttosort)
556 #        it4 = IngestThread(querytoingest,ingesttosort)
557         st = SortThread(ingesttosort,sorttooutput)
558         ot = OutputThread(sorttooutput,outputtogzip)
559         gt = GzipThread(outputtogzip, None)
560
561         qt.start()
562         it.start()
563 #        it2.start()
564 #        it3.start()
565 #        it2.start()
566         st.start()
567         ot.start()
568         gt.start()
569         
570 #        for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"),
571 #                                     ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]:
572
573 #             section_id = DBConn().get_section_id(section) # all udebs should be here)
574 #             if section_id != -1:
575
576                 
577
578 #                 # Get our suites, and the architectures
579 #                 for suite in [i.lower() for i in suites]:
580 #                     suite_id = DBConn().get_suite_id(suite)
581 #                     arch_list = self._arches(suite_id, session)
582
583 #                     for arch_id in arch_list:
584
585 #                         writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
586 #                         try:
587
588 #                             results = session.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id))
589
590 #                             while True:
591 #                                 r = cursor.fetchone()
592 #                                 if not r:
593 #                                     break
594
595 #                                 filename, section, package, arch = r
596 #                                 writer.write(filename, section, package)
597 #                         finally:
598 #                             writer.close()
599
600
601
602
603     def generate(self):
604         """
605         Generate Contents-$arch.gz files for every available arch in each given suite.
606         """
607         session = DBConn().session()
608
609         arch_all_id = get_architecture("all", session).arch_id
610
611         # The MORE fun part. Ok, udebs need their own contents files, udeb, and udeb-nf (not-free)
612         # This is HORRIBLY debian specific :-/
613         for dtype, section, fn_pattern in \
614               [('deb',  None,                        "dists/%s/Contents-%s.gz"),
615                ('udeb', "debian-installer",          "dists/%s/Contents-udeb-%s.gz"),
616                ('udeb', "non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
617
618             overridetype = get_override_type(dtype, session)
619
620             # For udebs, we only look in certain sections (see the for loop above)
621             if section is not None:
622                 section = get_section(section, session)
623
624             # Get our suites
625             for suite in which_suites(session):
626                 # Which architectures do we need to work on
627                 arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
628
629                 # Set up our file writer dictionary
630                 file_writers = {}
631                 try:
632                     # One file writer per arch
633                     for arch in arch_list:
634                         file_writers[arch.arch_id] = GzippedContentWriter(fn_pattern % (suite, arch.arch_string))
635
636                     for r in get_suite_contents(suite, overridetype, section, session=session).fetchall():
637                         filename, section, package, arch_id = r
638
639                         if arch_id == arch_all_id:
640                             # It's arch all, so all contents files get it
641                             for writer in file_writers.values():
642                                 writer.write(filename, section, package)
643                         else:
644                             if file_writers.has_key(arch_id):
645                                 file_writers[arch_id].write(filename, section, package)
646
647                 finally:
648                     # close all the files
649                     for writer in file_writers.values():
650                         writer.finish()
651     def _suites(self):
652         """
653         return a list of suites to operate on
654         """
655         if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
656             suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
657         else:
658             suites = [ 'unstable', 'testing' ]
659 #            suites = Config().SubTree("Suite").List()
660
661         return suites
662
663     def _arches(self, suite, session):
664         """
665         return a list of archs to operate on
666         """
667         arch_list = []
668         arches = session.execute(
669             """SELECT s.architecture, a.arch_string
670             FROM suite_architectures s
671             JOIN architecture a ON (s.architecture=a.id)
672             WHERE suite = :suite_id""",
673             {'suite_id':suite } )
674
675         while True:
676             r = arches.fetchone()
677             if not r:
678                 break
679
680             if r[1] != "source" and r[1] != "all":
681                 arch_list.append((r[0], r[1]))
682
683         return arch_list
684
685
686 ################################################################################
687
688 def main():
689     cnf = Config()
690
691     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
692                  ('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
693                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
694                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
695                 ]
696
697     commands = {'generate' : Contents.deb_generate,
698                 'bootstrap_bin' : Contents.bootstrap_bin,
699                 'bootstrap' : Contents.bootstrap,
700                 'cruft' : Contents.cruft,
701                 }
702
703     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
704
705     if (len(args) < 1) or not commands.has_key(args[0]):
706         usage()
707
708     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
709         usage()
710
711     level=logging.INFO
712     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
713         level=logging.ERROR
714
715     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
716         level=logging.DEBUG
717
718
719     logging.basicConfig( level=level,
720                          format='%(asctime)s %(levelname)s %(message)s',
721                          stream = sys.stderr )
722
723     commands[args[0]](Contents())
724
725 def which_suites(session):
726     """
727     return a list of suites to operate on
728     """
729     if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
730         suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
731     else:
732         suites = Config().SubTree("Suite").List()
733
734     return [get_suite(s.lower(), session) for s in suites]
735
736
737 if __name__ == '__main__':
738     main()