]> git.decadent.org.uk Git - dak.git/blob - dak/contents.py
merge from master
[dak.git] / dak / contents.py
1 #!/usr/bin/env python
2 """
3 Create all the contents files
4
5 @contact: Debian FTPMaster <ftpmaster@debian.org>
6 @copyright: 2008, 2009 Michael Casadevall <mcasadevall@debian.org>
7 @copyright: 2009 Mike O'Connor <stew@debian.org>
8 @license: GNU General Public License version 2 or later
9 """
10
11 ################################################################################
12
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation; either version 2 of the License, or
16 # (at your option) any later version.
17
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 # GNU General Public License for more details.
22
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
27 ################################################################################
28
29 # <Ganneff> there is the idea to slowly replace contents files
30 # <Ganneff> with a new generation of such files.
31 # <Ganneff> having more info.
32
33 # <Ganneff> of course that wont help for now where we need to generate them :)
34
35 ################################################################################
36
37 import sys
38 import os
39 import logging
40 import gzip
41 import threading
42 import Queue
43 import apt_pkg
44 import datetime
45 import traceback
46 from daklib import utils
47 from daklib.binary import Binary
48 from daklib.config import Config
49 from daklib.dbconn import *
50
51 ################################################################################
52
53 def usage (exit_code=0):
54     print """Usage: dak contents [options] command [arguments]
55
56 COMMANDS
57     generate
58         generate Contents-$arch.gz files
59
60     bootstrap_bin
61         scan the debs in the existing pool and load contents into the bin_contents table
62
63     bootstrap
64         copy data from the bin_contents table into the deb_contents / udeb_contents tables
65
66     cruft
67         remove files/paths which are no longer referenced by a binary
68
69 OPTIONS
70      -h, --help
71         show this help and exit
72
73      -v, --verbose
74         show verbose information messages
75
76      -q, --quiet
77         supress all output but errors
78
79      -s, --suite={stable,testing,unstable,...}
80         only operate on a single suite
81 """
82     sys.exit(exit_code)
83
84 ################################################################################
85
86 # where in dak.conf all of our configuration will be stowed
87
88 options_prefix = "Contents"
89 options_prefix = "%s::Options" % options_prefix
90
91 log = logging.getLogger()
92
93 ################################################################################
94
95 class EndOfContents(object):
96     """
97     A sentry object for the end of the filename stream
98     """
99     pass
100
101 class OneAtATime(object):
102     """
103     a one space queue which sits between multiple possible producers
104     and multiple possible consumers
105     """
106     def __init__(self):
107         self.next_in_line = None
108         self.read_lock = threading.Condition()
109         self.write_lock = threading.Condition()
110         self.die = False
111
112     def enqueue(self, next):
113         self.write_lock.acquire()
114         while self.next_in_line:
115             if self.die:
116                 return
117             self.write_lock.wait()
118
119         assert( not self.next_in_line )
120         self.next_in_line = next
121         self.write_lock.release()
122         self.read_lock.acquire()
123         self.read_lock.notify()
124         self.read_lock.release()
125
126     def dequeue(self):
127         self.read_lock.acquire()
128         while not self.next_in_line:
129             if self.die:
130                 return
131             self.read_lock.wait()
132
133         result = self.next_in_line
134
135         self.next_in_line = None
136         self.read_lock.release()
137         self.write_lock.acquire()
138         self.write_lock.notify()
139         self.write_lock.release()
140
141         return result
142
143
144 class ContentsWorkThread(threading.Thread):
145     """
146     """
147     def __init__(self, upstream, downstream):
148         threading.Thread.__init__(self)
149         self.upstream = upstream
150         self.downstream = downstream
151
152     def run(self):
153         while True:
154             try:
155                 contents_file = self.upstream.dequeue()
156                 if isinstance(contents_file,EndOfContents):
157                     if self.downstream:
158                         self.downstream.enqueue(contents_file)
159                     break
160
161                 s = datetime.datetime.now()
162                 print("%s start: %s" % (self,contents_file) )
163                 self._run(contents_file)
164                 print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
165                 if self.downstream:
166                     self.downstream.enqueue(contents_file)
167             except:
168                 traceback.print_exc()
169
170 class QueryThread(ContentsWorkThread):
171     def __init__(self, upstream, downstream):
172         ContentsWorkThread.__init__(self, upstream, downstream)
173
174     def __str__(self):
175         return "QueryThread"
176     __repr__ = __str__
177
178     def _run(self, contents_file):
179         contents_file.query()
180
181 class IngestThread(ContentsWorkThread):
182     def __init__(self, upstream, downstream):
183         ContentsWorkThread.__init__(self, upstream, downstream)
184
185     def __str__(self):
186         return "IngestThread"
187     __repr__ = __str__
188
189     def _run(self, contents_file):
190         contents_file.ingest()
191
192 class SortThread(ContentsWorkThread):
193     def __init__(self, upstream, downstream):
194         ContentsWorkThread.__init__(self, upstream, downstream)
195
196     def __str__(self):
197         return "SortThread"
198     __repr__ = __str__
199
200     def _run(self, contents_file):
201         contents_file.sorted_keys = sorted(contents_file.filenames.keys())
202
203 class OutputThread(ContentsWorkThread):
204     def __init__(self, upstream, downstream):
205         ContentsWorkThread.__init__(self, upstream, downstream)
206
207     def __str__(self):
208         return "OutputThread"
209     __repr__ = __str__
210
211     def _run(self, contents_file):
212         contents_file.open_file()
213         for fname in contents_file.sorted_keys:
214             contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
215         contents_file.sorted_keys = None
216         contents_file.filenames.clear()
217     
218 class GzipThread(ContentsWorkThread):
219     def __init__(self, upstream, downstream):
220         ContentsWorkThread.__init__(self, upstream, downstream)
221
222     def __str__(self):
223         return "GzipThread"
224     __repr__ = __str__
225
226     def _run(self, contents_file):
227         os.system("gzip -f %s" % contents_file.filename)
228
229 class ContentFile(object):
230     def __init__(self,
231                  filename,
232                  suite_str,
233                  suite_id):
234
235         self.filename = filename
236         self.filenames = {}
237         self.sorted_keys = None
238         self.suite_str = suite_str
239         self.suite_id = suite_id
240         self.session = None
241         self.filehandle = None
242         self.results = None
243
244     def __str__(self):
245         return self.filename
246     __repr__ = __str__
247
248
249     def cleanup(self):
250         self.filenames = None
251         self.sortedkeys = None
252         self.filehandle.close()
253         self.session.close()
254
255     def ingest(self):
256         while True:
257             r = self.results.fetchone()
258             if not r:
259                 break
260             filename, package = r
261             self.filenames[filename]=package
262
263         self.session.close()
264
265 #     def ingest(self):
266 #         while True:
267 #             r = self.results.fetchone()
268 #             if not r:
269 #                 break
270 #             filename, package = r
271 #             if self.filenames.has_key(filename):
272 #                 self.filenames[filename] += ",%s" % (package)
273 #             else:
274 #                 self.filenames[filename] = "%s" % (package)
275 #         self.session.close()
276
277     def open_file(self):
278         """
279         opens a gzip stream to the contents file
280         """
281 #        filepath = Config()["Contents::Root"] + self.filename
282         self.filename = "/home/stew/contents/" + self.filename
283         filedir = os.path.dirname(self.filename)
284         if not os.path.isdir(filedir):
285             os.makedirs(filedir)
286 #        self.filehandle = gzip.open(self.filename, "w")
287         self.filehandle = open(self.filename, "w")
288         self._write_header()
289
290     def _write_header(self):
291         self._get_header();
292         self.filehandle.write(ContentFile.header)
293
294     header=None
295
296     @classmethod
297     def _get_header(self):
298         """
299         Internal method to return the header for Contents.gz files
300
301         This is boilerplate which explains the contents of the file and how
302         it can be used.
303         """
304         if not ContentFile.header:
305             if Config().has_key("Contents::Header"):
306                 try:
307                     h = open(os.path.join( Config()["Dir::Templates"],
308                                            Config()["Contents::Header"] ), "r")
309                     ContentFile.header = h.read()
310                     h.close()
311                 except:
312                     log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
313                                                                       traceback.format_exc() ))
314                     ContentFile.header = None
315             else:
316                 ContentFile.header = None
317
318         return ContentFile.header
319
320
321 class DebContentFile(ContentFile):
322     def __init__(self,
323                  filename,
324                  suite_str,
325                  suite_id,
326                  arch_str,
327                  arch_id):
328         ContentFile.__init__(self,
329                              filename,
330                              suite_str,
331                              suite_id )
332         self.arch_str = arch_str
333         self.arch_id = arch_id
334
335     def query(self):
336         self.session = DBConn().session();
337
338         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
339         FROM deb_contents
340         WHERE ( arch=2 or arch = :arch) AND suite = :suite
341         """, { 'arch':self.arch_id, 'suite':self.suite_id } )
342
343 class UdebContentFile(ContentFile):
344     def __init__(self,
345                  filename,
346                  suite_str,
347                  suite_id,
348                  section_name,
349                  section_id):
350         ContentFile.__init__(self,
351                              filename,
352                              suite_str,
353                              suite_id )
354
355     def query(self):
356         self.session = DBConn().session();
357
358         self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
359         FROM udeb_contents
360         WHERE suite = :suite
361         group by filename
362         """ , { 'suite': self.suite_id } )
363
364 class Contents(object):
365     """
366     Class capable of generating Contents-$arch.gz files
367     """
368     def __init__(self):
369         self.header = None
370
371     def reject(self, message):
372         log.error("E: %s" % message)
373
374     def cruft(self):
375         """
376         remove files/paths from the DB which are no longer referenced
377         by binaries and clean the temporary table
378         """
379         s = DBConn().session()
380
381         # clear out all of the temporarily stored content associations
382         # this should be run only after p-a has run.  after a p-a
383         # run we should have either accepted or rejected every package
384         # so there should no longer be anything in the queue
385         s.query(PendingContentAssociation).delete()
386
387         # delete any filenames we are storing which have no binary associated
388         # with them
389         cafq = s.query(ContentAssociation.filename_id).distinct()
390         cfq = s.query(ContentFilename)
391         cfq = cfq.filter(~ContentFilename.cafilename_id.in_(cafq))
392         cfq.delete()
393
394         # delete any paths we are storing which have no binary associated with
395         # them
396         capq = s.query(ContentAssociation.filepath_id).distinct()
397         cpq = s.query(ContentFilepath)
398         cpq = cpq.filter(~ContentFilepath.cafilepath_id.in_(capq))
399         cpq.delete()
400
401         s.commit()
402
403
404     def bootstrap_bin(self):
405         """
406         scan the existing debs in the pool to populate the bin_contents table
407         """
408         pooldir = Config()[ 'Dir::Pool' ]
409
410         s = DBConn().session()
411
412         for binary in s.query(DBBinary).yield_per(100):
413             print( "binary: %s" % binary.package )
414             filename = binary.poolfile.filename
415              # Check for existing contents
416             existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
417             if existingq.fetchone():
418                 log.debug( "already imported: %s" % (filename))
419             else:
420                 # We don't have existing contents so import them
421                 log.debug( "scanning: %s" % (filename) )
422
423                 debfile = os.path.join(pooldir, filename)
424                 if os.path.exists(debfile):
425                     Binary(debfile, self.reject).scan_package(binary.binary_id, True)
426                 else:
427                     log.error("missing .deb: %s" % filename)
428
429
430
431     def bootstrap(self):
432         """
433         scan the existing debs in the pool to populate the contents database tables
434         """
435         s = DBConn().session()
436
437
438         # get a mapping of all the override types we care about (right now .deb an .udeb)
439         override_type_map = {};
440         for override_type in s.query(OverrideType).all():
441             if override_type.overridetype.endswith('deb' ):
442                 override_type_map[override_type.overridetype_id] = override_type.overridetype;
443
444         for override in s.query(Override).yield_per(100):
445             if not override_type_map.has_key(override.overridetype_id):
446                 #this isn't an override we care about
447                 continue
448
449             binaries = s.execute("""SELECT b.id, b.architecture
450                                     FROM binaries b
451                                     JOIN bin_associations ba ON ba.bin=b.id
452                                     WHERE ba.suite=:suite
453                                     AND b.package=:package""", {'suite':override.suite_id, 'package':override.package})
454             while True:
455                 binary = binaries.fetchone()
456                 if not binary:
457                     break
458
459                 exists = s.execute("SELECT 1 FROM %s_contents WHERE binary_id=:id limit 1" % override_type_map[override.overridetype_id], {'id':binary.id})
460
461
462                 if exists.fetchone():
463                     print '.',
464                     continue
465                 else:
466                     print '+',
467
468                 s.execute( """INSERT INTO %s_contents (filename,section,package,binary_id,arch,suite)
469                               SELECT file, :section, :package, :binary_id, :arch, :suite
470                               FROM bin_contents
471                               WHERE binary_id=:binary_id;""" % override_type_map[override.overridetype_id],
472                            { 'section' : override.section_id,
473                              'package' : override.package,
474                              'binary_id' : binary.id,
475                              'arch' : binary.architecture,
476                              'suite' : override.suite_id } )
477                 s.commit()
478
479     def generate(self):
480         """
481         Generate contents files for both deb and udeb
482         """
483         self.deb_generate()
484 #        self.udeb_generate()
485
486     def deb_generate(self):
487         """
488         Generate Contents-$arch.gz files for every available arch in each given suite.
489         """
490         session = DBConn().session()
491         debtype_id = get_override_type("deb", session)
492         suites = self._suites()
493
494         inputtoquery = OneAtATime()
495         querytoingest = OneAtATime()
496         ingesttosort = OneAtATime()
497         sorttooutput = OneAtATime()
498         outputtogzip = OneAtATime()
499
500         qt = QueryThread(inputtoquery,querytoingest)
501         it = IngestThread(querytoingest,ingesttosort)
502 # these actually make things worse
503 #        it2 = IngestThread(querytoingest,ingesttosort)
504 #        it3 = IngestThread(querytoingest,ingesttosort)
505 #        it4 = IngestThread(querytoingest,ingesttosort)
506         st = SortThread(ingesttosort,sorttooutput)
507         ot = OutputThread(sorttooutput,outputtogzip)
508         gt = GzipThread(outputtogzip, None)
509
510         qt.start()
511         it.start()
512 #        it2.start()
513 #        it3.start()
514 #        it2.start()
515         st.start()
516         ot.start()
517         gt.start()
518         
519         # Get our suites, and the architectures
520         for suite in [i.lower() for i in suites]:
521             suite_id = get_suite(suite, session).suite_id
522             print( "got suite_id: %s for suite: %s" % (suite_id, suite ) )
523             arch_list = self._arches(suite_id, session)
524
525             for (arch_id,arch_str) in arch_list:
526                 print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
527
528 #                filename = "dists/%s/Contents-%s.gz" % (suite, arch_str)
529                 filename = "dists/%s/Contents-%s" % (suite, arch_str)
530                 cf = DebContentFile(filename, suite, suite_id, arch_str, arch_id)
531                 inputtoquery.enqueue( cf )
532
533         inputtoquery.enqueue( EndOfContents() )
534         gt.join()
535
536     def udeb_generate(self):
537         """
538         Generate Contents-$arch.gz files for every available arch in each given suite.
539         """
540         session = DBConn().session()
541         udebtype_id=DBConn().get_override_type_id("udeb")
542         suites = self._suites()
543
544         inputtoquery = OneAtATime()
545         querytoingest = OneAtATime()
546         ingesttosort = OneAtATime()
547         sorttooutput = OneAtATime()
548         outputtogzip = OneAtATime()
549
550         qt = QueryThread(inputtoquery,querytoingest)
551         it = IngestThread(querytoingest,ingesttosort)
552 # these actually make things worse
553 #        it2 = IngestThread(querytoingest,ingesttosort)
554 #        it3 = IngestThread(querytoingest,ingesttosort)
555 #        it4 = IngestThread(querytoingest,ingesttosort)
556         st = SortThread(ingesttosort,sorttooutput)
557         ot = OutputThread(sorttooutput,outputtogzip)
558         gt = GzipThread(outputtogzip, None)
559
560         qt.start()
561         it.start()
562 #        it2.start()
563 #        it3.start()
564 #        it2.start()
565         st.start()
566         ot.start()
567         gt.start()
568         
569 #        for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"),
570 #                                     ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]:
571
572 #             section_id = DBConn().get_section_id(section) # all udebs should be here)
573 #             if section_id != -1:
574
575                 
576
577 #                 # Get our suites, and the architectures
578 #                 for suite in [i.lower() for i in suites]:
579 #                     suite_id = DBConn().get_suite_id(suite)
580 #                     arch_list = self._arches(suite_id, session)
581
582 #                     for arch_id in arch_list:
583
584 #                         writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
585 #                         try:
586
587 #                             results = session.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id))
588
589 #                             while True:
590 #                                 r = cursor.fetchone()
591 #                                 if not r:
592 #                                     break
593
594 #                                 filename, section, package, arch = r
595 #                                 writer.write(filename, section, package)
596 #                         finally:
597 #                             writer.close()
598
599
600
601
602     def generate(self):
603         """
604         Generate Contents-$arch.gz files for every available arch in each given suite.
605         """
606         session = DBConn().session()
607
608         arch_all_id = get_architecture("all", session).arch_id
609
610         # The MORE fun part. Ok, udebs need their own contents files, udeb, and udeb-nf (not-free)
611         # This is HORRIBLY debian specific :-/
612         for dtype, section, fn_pattern in \
613               [('deb',  None,                        "dists/%s/Contents-%s.gz"),
614                ('udeb', "debian-installer",          "dists/%s/Contents-udeb-%s.gz"),
615                ('udeb', "non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
616
617             overridetype = get_override_type(dtype, session)
618
619             # For udebs, we only look in certain sections (see the for loop above)
620             if section is not None:
621                 section = get_section(section, session)
622
623             # Get our suites
624             for suite in which_suites(session):
625                 # Which architectures do we need to work on
626                 arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
627
628                 # Set up our file writer dictionary
629                 file_writers = {}
630                 try:
631                     # One file writer per arch
632                     for arch in arch_list:
633                         file_writers[arch.arch_id] = GzippedContentWriter(fn_pattern % (suite, arch.arch_string))
634
635                     for r in get_suite_contents(suite, overridetype, section, session=session).fetchall():
636                         filename, section, package, arch_id = r
637
638                         if arch_id == arch_all_id:
639                             # It's arch all, so all contents files get it
640                             for writer in file_writers.values():
641                                 writer.write(filename, section, package)
642                         else:
643                             if file_writers.has_key(arch_id):
644                                 file_writers[arch_id].write(filename, section, package)
645
646                 finally:
647                     # close all the files
648                     for writer in file_writers.values():
649                         writer.finish()
650     def _suites(self):
651         """
652         return a list of suites to operate on
653         """
654         if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
655             suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
656         else:
657             suites = [ 'unstable', 'testing' ]
658 #            suites = Config().SubTree("Suite").List()
659
660         return suites
661
662     def _arches(self, suite, session):
663         """
664         return a list of archs to operate on
665         """
666         arch_list = []
667         arches = session.execute(
668             """SELECT s.architecture, a.arch_string
669             FROM suite_architectures s
670             JOIN architecture a ON (s.architecture=a.id)
671             WHERE suite = :suite_id""",
672             {'suite_id':suite } )
673
674         while True:
675             r = arches.fetchone()
676             if not r:
677                 break
678
679             if r[1] != "source" and r[1] != "all":
680                 arch_list.append((r[0], r[1]))
681
682         return arch_list
683
684
685 ################################################################################
686
687 def main():
688     cnf = Config()
689
690     arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
691                  ('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
692                  ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
693                  ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")),
694                 ]
695
696     commands = {'generate' : Contents.deb_generate,
697                 'bootstrap_bin' : Contents.bootstrap_bin,
698                 'bootstrap' : Contents.bootstrap,
699                 'cruft' : Contents.cruft,
700                 }
701
702     args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv)
703
704     if (len(args) < 1) or not commands.has_key(args[0]):
705         usage()
706
707     if cnf.has_key("%s::%s" % (options_prefix,"Help")):
708         usage()
709
710     level=logging.INFO
711     if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
712         level=logging.ERROR
713
714     elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
715         level=logging.DEBUG
716
717
718     logging.basicConfig( level=level,
719                          format='%(asctime)s %(levelname)s %(message)s',
720                          stream = sys.stderr )
721
722     commands[args[0]](Contents())
723
724 def which_suites(session):
725     """
726     return a list of suites to operate on
727     """
728     if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
729         suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
730     else:
731         suites = Config().SubTree("Suite").List()
732
733     return [get_suite(s.lower(), session) for s in suites]
734
735
736 if __name__ == '__main__':
737     main()