import sys
import os
import logging
-import math
import gzip
import threading
import traceback
import Queue
import apt_pkg
-import datetime #just for debugging, can be removed
+import datetime
+import traceback
from daklib import utils
from daklib.binary import Binary
from daklib.config import Config
-from daklib.dbconn import DBConn
-################################################################################
+from daklib.dbconn import *
-log=None
+################################################################################
def usage (exit_code=0):
print """Usage: dak contents [options] command [arguments]
generate
generate Contents-$arch.gz files
+ bootstrap_bin
+ scan the debs in the existing pool and load contents into the bin_contents table
+
bootstrap
- scan the debs in the existing pool and load contents in the the database
+ copy data from the bin_contents table into the deb_contents / udeb_contents tables
cruft
remove files/paths which are no longer referenced by a binary
options_prefix = "Contents"
options_prefix = "%s::Options" % options_prefix
-#log = logging.getLogger()
+log = logging.getLogger()
################################################################################
-# get all the arches delivered for a given suite
-# this should probably exist somehere common
-arches_q = """PREPARE arches_q(int) as
- SELECT s.architecture, a.arch_string
- FROM suite_architectures s
- JOIN architecture a ON (s.architecture=a.id)
- WHERE suite = $1"""
-
-# find me the .deb for a given binary id
-debs_q = """PREPARE debs_q(int, int) as
- SELECT b.id, f.filename FROM bin_assoc_by_arch baa
- JOIN binaries b ON baa.bin=b.id
- JOIN files f ON b.file=f.id
- WHERE suite = $1
- AND arch = $2"""
-
-# find me all of the contents for a given .deb
-contents_q = """PREPARE contents_q(int,int) as
- SELECT file, section, package
- FROM deb_contents
- WHERE suite = $1
- AND (arch = $2 or arch=2)"""
-# ORDER BY file"""
-
-# find me all of the contents for a given .udeb
-udeb_contents_q = """PREPARE udeb_contents_q(int,int,text, int) as
- SELECT file, section, package, arch
- FROM udeb_contents
- WHERE suite = $1
- AND otype = $2
- AND section = $3
- and arch = $4
- ORDER BY file"""
-
-
-# clear out all of the temporarily stored content associations
-# this should be run only after p-a has run. after a p-a
-# run we should have either accepted or rejected every package
-# so there should no longer be anything in the queue
-remove_pending_contents_cruft_q = """DELETE FROM pending_content_associations"""
-
class EndOfContents(object):
+ """
+ A sentry object for the end of the filename stream
+ """
pass
class OneAtATime(object):
"""
+ a one space queue which sits between multiple possible producers
+ and multiple possible consumers
"""
def __init__(self):
self.next_in_line = None
- self.next_lock = threading.Condition()
+ self.read_lock = threading.Condition()
+ self.write_lock = threading.Condition()
+ self.die = False
def enqueue(self, next):
- self.next_lock.acquire()
+ self.write_lock.acquire()
while self.next_in_line:
- self.next_lock.wait()
-
+ if self.die:
+ return
+ self.write_lock.wait()
+
assert( not self.next_in_line )
self.next_in_line = next
- self.next_lock.notify()
- self.next_lock.release()
+ self.write_lock.release()
+ self.read_lock.acquire()
+ self.read_lock.notify()
+ self.read_lock.release()
def dequeue(self):
- self.next_lock.acquire()
+ self.read_lock.acquire()
while not self.next_in_line:
- self.next_lock.wait()
+ if self.die:
+ return
+ self.read_lock.wait()
+
result = self.next_in_line
+
self.next_in_line = None
- self.next_lock.notify()
- self.next_lock.release()
+ self.read_lock.release()
+ self.write_lock.acquire()
+ self.write_lock.notify()
+ self.write_lock.release()
+
return result
-
+
class ContentsWorkThread(threading.Thread):
"""
contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
contents_file.sorted_keys = None
contents_file.filenames.clear()
-
+
class GzipThread(ContentsWorkThread):
def __init__(self, upstream, downstream):
ContentsWorkThread.__init__(self, upstream, downstream)
def _run(self, contents_file):
os.system("gzip -f %s" % contents_file.filename)
-
+
class ContentFile(object):
def __init__(self,
filename,
suite_str,
- suite_id,
- arch_str,
- arch_id):
+ suite_id):
self.filename = filename
self.filenames = {}
self.sorted_keys = None
self.suite_str = suite_str
self.suite_id = suite_id
- self.arch_str = arch_str
- self.arch_id = arch_id
- self.cursor = None
+ self.session = None
self.filehandle = None
+ self.results = None
def __str__(self):
return self.filename
self.filenames = None
self.sortedkeys = None
self.filehandle.close()
- self.cursor.close()
-
- def query(self):
- self.cursor = DBConn().cursor();
-
- self.cursor.execute("""SELECT file, section || '/' || package
- FROM deb_contents
- WHERE ( arch=2 or arch = %d) AND suite = %d
- """ % (self.arch_id, self.suite_id))
+ self.session.close()
def ingest(self):
while True:
- r = self.cursor.fetchone()
+ r = self.results.fetchone()
if not r:
break
filename, package = r
- if self.filenames.has_key(filename):
- self.filenames[filename] += ",%s" % (package)
- else:
- self.filenames[filename] = "%s" % (package)
- self.cursor.close()
+ self.filenames[filename]=package
+
+ self.session.close()
def open_file(self):
"""
opens a gzip stream to the contents file
"""
-# filepath = Config()["Contents::Root"] + self.filename
- self.filename = "/home/stew/contents/" + self.filename
+ filepath = Config()["Contents::Root"] + self.filename
filedir = os.path.dirname(self.filename)
if not os.path.isdir(filedir):
os.makedirs(filedir)
-# self.filehandle = gzip.open(self.filename, "w")
self.filehandle = open(self.filename, "w")
self._write_header()
return ContentFile.header
+
+class DebContentFile(ContentFile):
+ def __init__(self,
+ filename,
+ suite_str,
+ suite_id,
+ arch_str,
+ arch_id):
+ ContentFile.__init__(self,
+ filename,
+ suite_str,
+ suite_id )
+ self.arch_str = arch_str
+ self.arch_id = arch_id
+
+ def query(self):
+ self.session = DBConn().session();
+
+ self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
+ FROM deb_contents
+ WHERE ( arch=2 or arch = :arch) AND suite = :suite
+ """, { 'arch':self.arch_id, 'suite':self.suite_id } )
+
+class UdebContentFile(ContentFile):
+ def __init__(self,
+ filename,
+ suite_str,
+ suite_id,
+ section_name,
+ section_id):
+ ContentFile.__init__(self,
+ filename,
+ suite_str,
+ suite_id )
+
+ def query(self):
+ self.session = DBConn().session();
+
+ self.results = self.session.execute("""SELECT filename, comma_separated_list(section || '/' || package)
+ FROM udeb_contents
+ WHERE suite = :suite
+ group by filename
+ """ , { 'suite': self.suite_id } )
+
class Contents(object):
"""
Class capable of generating Contents-$arch.gz files
-
- Usage GenerateContents().generateContents( ["main","contrib","non-free"] )
"""
-
def __init__(self):
self.header = None
def reject(self, message):
log.error("E: %s" % message)
- # goal column for section column
- _goal_column = 54
-
def cruft(self):
"""
remove files/paths from the DB which are no longer referenced
by binaries and clean the temporary table
"""
- cursor = DBConn().cursor();
- cursor.execute( "BEGIN WORK" )
- cursor.execute( remove_pending_contents_cruft_q )
- cursor.execute( remove_filename_cruft_q )
- cursor.execute( remove_filepath_cruft_q )
- cursor.execute( "COMMIT" )
+ s = DBConn().session()
+
+ # clear out all of the temporarily stored content associations
+ # this should be run only after p-a has run. after a p-a
+ # run we should have either accepted or rejected every package
+ # so there should no longer be anything in the queue
+ s.query(PendingContentAssociation).delete()
+
+ # delete any filenames we are storing which have no binary associated
+ # with them
+ cafq = s.query(ContentAssociation.filename_id).distinct()
+ cfq = s.query(ContentFilename)
+ cfq = cfq.filter(~ContentFilename.cafilename_id.in_(cafq))
+ cfq.delete()
+
+ # delete any paths we are storing which have no binary associated with
+ # them
+ capq = s.query(ContentAssociation.filepath_id).distinct()
+ cpq = s.query(ContentFilepath)
+ cpq = cpq.filter(~ContentFilepath.cafilepath_id.in_(capq))
+ cpq.delete()
+
+ s.commit()
+
+
+ def bootstrap_bin(self):
+ """
+ scan the existing debs in the pool to populate the bin_contents table
+ """
+ pooldir = Config()[ 'Dir::Pool' ]
+
+ s = DBConn().session()
+
+ for binary in s.query(DBBinary).yield_per(100):
+ print( "binary: %s" % binary.package )
+ filename = binary.poolfile.filename
+ # Check for existing contents
+ existingq = s.execute( "select 1 from bin_contents where binary_id=:id", {'id':binary.binary_id} );
+ if existingq.fetchone():
+ log.debug( "already imported: %s" % (filename))
+ else:
+ # We don't have existing contents so import them
+ log.debug( "scanning: %s" % (filename) )
+
+ debfile = os.path.join(pooldir, filename)
+ if os.path.exists(debfile):
+ Binary(debfile, self.reject).scan_package(binary.binary_id, True)
+ else:
+ log.error("missing .deb: %s" % filename)
+
def bootstrap(self):
"""
scan the existing debs in the pool to populate the contents database tables
"""
- pooldir = Config()[ 'Dir::Pool' ]
+ s = DBConn().session()
+
+
+ # get a mapping of all the override types we care about (right now .deb an .udeb)
+ override_type_map = {};
+ for override_type in s.query(OverrideType).all():
+ if override_type.overridetype.endswith('deb' ):
+ override_type_map[override_type.overridetype_id] = override_type.overridetype;
+
+ for override in s.query(Override).yield_per(100):
+ if not override_type_map.has_key(override.overridetype_id):
+ #this isn't an override we care about
+ continue
+
+ binaries = s.execute("""SELECT b.id, b.architecture
+ FROM binaries b
+ JOIN bin_associations ba ON ba.bin=b.id
+ WHERE ba.suite=:suite
+ AND b.package=:package""", {'suite':override.suite_id, 'package':override.package})
+ while True:
+ binary = binaries.fetchone()
+ if not binary:
+ break
- cursor = DBConn().cursor();
- DBConn().prepare("debs_q",debs_q)
- DBConn().prepare("arches_q",arches_q)
+ exists = s.execute("SELECT 1 FROM %s_contents WHERE binary_id=:id limit 1" % override_type_map[override.overridetype_id], {'id':binary.id})
- suites = self._suites()
- for suite in [i.lower() for i in suites]:
- suite_id = DBConn().get_suite_id(suite)
-
- arch_list = self._arches(cursor, suite_id)
- arch_all_id = DBConn().get_architecture_id("all")
- for arch_id in arch_list:
- cursor.execute( "EXECUTE debs_q(%d, %d)" % ( suite_id, arch_id[0] ) )
-
- count = 0
- while True:
- deb = cursor.fetchone()
- if not deb:
- break
- count += 1
- cursor1 = DBConn().cursor();
- cursor1.execute( "SELECT 1 FROM deb_contents WHERE binary_id = %d LIMIT 1" % (deb[0] ) )
- old = cursor1.fetchone()
- if old:
- log.log( "already imported: %s" % (deb[1]) )
- else:
-# log.debug( "scanning: %s" % (deb[1]) )
- log.log( "scanning: %s" % (deb[1]) )
- debfile = os.path.join( pooldir, deb[1] )
- if os.path.exists( debfile ):
- Binary(debfile, self.reject).scan_package(deb[0], True)
- else:
- log.error("missing .deb: %s" % deb[1])
+ if exists.fetchone():
+ print '.',
+ continue
+ else:
+ print '+',
+
+ s.execute( """INSERT INTO %s_contents (filename,section,package,binary_id,arch,suite)
+ SELECT file, :section, :package, :binary_id, :arch, :suite
+ FROM bin_contents
+ WHERE binary_id=:binary_id;""" % override_type_map[override.overridetype_id],
+ { 'section' : override.section_id,
+ 'package' : override.package,
+ 'binary_id' : binary.id,
+ 'arch' : binary.architecture,
+ 'suite' : override.suite_id } )
+ s.commit()
def generate(self):
"""
Generate contents files for both deb and udeb
"""
- DBConn().prepare("arches_q", arches_q)
self.deb_generate()
-# self.udeb_generate()
+ self.udeb_generate()
def deb_generate(self):
"""
Generate Contents-$arch.gz files for every available arch in each given suite.
"""
- cursor = DBConn().cursor()
- debtype_id = DBConn().get_override_type_id("deb")
+ session = DBConn().session()
+ debtype_id = get_override_type("deb", session)
suites = self._suites()
inputtoquery = OneAtATime()
qt = QueryThread(inputtoquery,querytoingest)
it = IngestThread(querytoingest,ingesttosort)
-# these actually make things worse
-# it2 = IngestThread(querytoingest,ingesttosort)
-# it3 = IngestThread(querytoingest,ingesttosort)
-# it4 = IngestThread(querytoingest,ingesttosort)
st = SortThread(ingesttosort,sorttooutput)
ot = OutputThread(sorttooutput,outputtogzip)
gt = GzipThread(outputtogzip, None)
qt.start()
it.start()
-# it2.start()
-# it3.start()
-# it2.start()
st.start()
ot.start()
gt.start()
-
+
# Get our suites, and the architectures
for suite in [i.lower() for i in suites]:
- suite_id = DBConn().get_suite_id(suite)
- arch_list = self._arches(cursor, suite_id)
+ suite_id = get_suite(suite, session).suite_id
+ print( "got suite_id: %s for suite: %s" % (suite_id, suite ) )
+ arch_list = self._arches(suite_id, session)
for (arch_id,arch_str) in arch_list:
print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
-# filename = "dists/%s/Contents-%s.gz" % (suite, arch_str)
filename = "dists/%s/Contents-%s" % (suite, arch_str)
- cf = ContentFile(filename, suite, suite_id, arch_str, arch_id)
+ cf = DebContentFile(filename, suite, suite_id, arch_str, arch_id)
inputtoquery.enqueue( cf )
inputtoquery.enqueue( EndOfContents() )
"""
Generate Contents-$arch.gz files for every available arch in each given suite.
"""
- cursor = DBConn().cursor()
-
- DBConn().prepare("udeb_contents_q", udeb_contents_q)
+ session = DBConn().session()
udebtype_id=DBConn().get_override_type_id("udeb")
suites = self._suites()
-# for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s.gz"),
-# ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
+ inputtoquery = OneAtATime()
+ querytoingest = OneAtATime()
+ ingesttosort = OneAtATime()
+ sorttooutput = OneAtATime()
+ outputtogzip = OneAtATime()
- for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"),
- ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]:
+ qt = QueryThread(inputtoquery,querytoingest)
+ it = IngestThread(querytoingest,ingesttosort)
+ st = SortThread(ingesttosort,sorttooutput)
+ ot = OutputThread(sorttooutput,outputtogzip)
+ gt = GzipThread(outputtogzip, None)
- section_id = DBConn().get_section_id(section) # all udebs should be here)
- if section_id != -1:
+ qt.start()
+ it.start()
+ st.start()
+ ot.start()
+ gt.start()
- # Get our suites, and the architectures
- for suite in [i.lower() for i in suites]:
- suite_id = DBConn().get_suite_id(suite)
- arch_list = self._arches(cursor, suite_id)
- for arch_id in arch_list:
+ def generate(self):
+ """
+ Generate Contents-$arch.gz files for every available arch in each given suite.
+ """
+ session = DBConn().session()
- writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
- try:
+ arch_all_id = get_architecture("all", session).arch_id
- cursor.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id))
+ # The MORE fun part. Ok, udebs need their own contents files, udeb, and udeb-nf (not-free)
+ # This is HORRIBLY debian specific :-/
+ for dtype, section, fn_pattern in \
+ [('deb', None, "dists/%s/Contents-%s.gz"),
+ ('udeb', "debian-installer", "dists/%s/Contents-udeb-%s.gz"),
+ ('udeb', "non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
- while True:
- r = cursor.fetchone()
- if not r:
- break
+ overridetype = get_override_type(dtype, session)
- filename, section, package, arch = r
- writer.write(filename, section, package)
- finally:
- writer.close()
+ # For udebs, we only look in certain sections (see the for loop above)
+ if section is not None:
+ section = get_section(section, session)
+
+ # Get our suites
+ for suite in which_suites(session):
+ # Which architectures do we need to work on
+ arch_list = get_suite_architectures(suite.suite_name, skipsrc=True, skipall=True, session=session)
+ # Set up our file writer dictionary
+ file_writers = {}
+ try:
+ # One file writer per arch
+ for arch in arch_list:
+ file_writers[arch.arch_id] = GzippedContentWriter(fn_pattern % (suite, arch.arch_string))
+ for r in get_suite_contents(suite, overridetype, section, session=session).fetchall():
+ filename, section, package, arch_id = r
-################################################################################
+ if arch_id == arch_all_id:
+ # It's arch all, so all contents files get it
+ for writer in file_writers.values():
+ writer.write(filename, section, package)
+ else:
+ if file_writers.has_key(arch_id):
+ file_writers[arch_id].write(filename, section, package)
+ finally:
+ # close all the files
+ for writer in file_writers.values():
+ writer.finish()
def _suites(self):
"""
return a list of suites to operate on
if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
else:
- suites = [ 'unstable', 'testing' ]
-# suites = Config().SubTree("Suite").List()
+ suites = Config().SubTree("Suite").List()
return suites
- def _arches(self, cursor, suite):
+ def _arches(self, suite, session):
"""
return a list of archs to operate on
"""
arch_list = []
- cursor.execute("EXECUTE arches_q(%d)" % (suite))
+ arches = session.execute(
+ """SELECT s.architecture, a.arch_string
+ FROM suite_architectures s
+ JOIN architecture a ON (s.architecture=a.id)
+ WHERE suite = :suite_id""",
+ {'suite_id':suite } )
+
while True:
- r = cursor.fetchone()
+ r = arches.fetchone()
if not r:
break
return arch_list
-################################################################################
+################################################################################
def main():
cnf = Config()
-# log = logging.Logger(cnf, "contents")
-
+
arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
]
commands = {'generate' : Contents.generate,
+ 'bootstrap_bin' : Contents.bootstrap_bin,
'bootstrap' : Contents.bootstrap,
'cruft' : Contents.cruft,
}
if cnf.has_key("%s::%s" % (options_prefix,"Help")):
usage()
-# level=logging.INFO
-# if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
-# level=logging.ERROR
+ level=logging.INFO
+ if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
+ level=logging.ERROR
-# elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
-# level=logging.DEBUG
+ elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
+ level=logging.DEBUG
-# logging.basicConfig( level=level,
-# format='%(asctime)s %(levelname)s %(message)s',
-# stream = sys.stderr )
+ logging.basicConfig( level=level,
+ format='%(asctime)s %(levelname)s %(message)s',
+ stream = sys.stderr )
commands[args[0]](Contents())
+def which_suites(session):
+ """
+ return a list of suites to operate on
+ """
+ if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
+ suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
+ else:
+ suites = Config().SubTree("Suite").List()
+
+ return [get_suite(s.lower(), session) for s in suites]
+
+
if __name__ == '__main__':
main()