import math
import gzip
import threading
+import traceback
import Queue
import apt_pkg
+import datetime #just for debugging, can be removed
from daklib import utils
from daklib.binary import Binary
from daklib.config import Config
from daklib.dbconn import DBConn
################################################################################
+log=None
+
def usage (exit_code=0):
print """Usage: dak contents [options] command [arguments]
options_prefix = "Contents"
options_prefix = "%s::Options" % options_prefix
-log = logging.getLogger()
+#log = logging.getLogger()
################################################################################
WHERE suite = $1
AND arch = $2"""
-# ask if we already have contents associated with this binary
-olddeb_q = """PREPARE olddeb_q(int) as
- SELECT 1 FROM content_associations
- WHERE binary_pkg = $1
- LIMIT 1"""
-
# find me all of the contents for a given .deb
contents_q = """PREPARE contents_q(int,int) as
- SELECT (p.path||'/'||n.file) AS fn,
- s.section,
- b.package,
- b.architecture
- FROM content_associations c join content_file_paths p ON (c.filepath=p.id)
- JOIN content_file_names n ON (c.filename=n.id)
- JOIN binaries b ON (b.id=c.binary_pkg)
- JOIN override o ON (o.package=b.package)
- JOIN section s ON (s.id=o.section)
- WHERE o.suite = $1 AND o.type = $2
- AND b.type='deb'
- ORDER BY fn"""
-
+ SELECT file, section, package
+ FROM deb_contents
+ WHERE suite = $1
+ AND (arch = $2 or arch=2)"""
+# ORDER BY file"""
+
# find me all of the contents for a given .udeb
-udeb_contents_q = """PREPARE udeb_contents_q(int,int,int) as
- SELECT (p.path||'/'||n.file) AS fn,
- s.section,
- b.package,
- b.architecture
- FROM content_associations c join content_file_paths p ON (c.filepath=p.id)
- JOIN content_file_names n ON (c.filename=n.id)
- JOIN binaries b ON (b.id=c.binary_pkg)
- JOIN override o ON (o.package=b.package)
- JOIN section s ON (s.id=o.section)
- WHERE o.suite = $1 AND o.type = $2
- AND s.id = $3
- AND b.type='udeb'
- ORDER BY fn"""
-
-# FROM content_file_paths p join content_associations c ON (c.filepath=p.id)
-# JOIN content_file_names n ON (c.filename=n.id)
-# JOIN binaries b ON (b.id=c.binary_pkg)
-# JOIN override o ON (o.package=b.package)
-# JOIN section s ON (s.id=o.section)
-# WHERE o.suite = $1 AND o.type = $2
-# AND s.id = $3
-# AND b.id in (SELECT ba.bin from bin_associations ba join binaries b on b.id=ba.bin where (b.architecture=$3 or b.architecture=$4)and ba.suite=$1 and b.type='udeb')
-# GROUP BY fn
-# ORDER BY fn;"""
-
+udeb_contents_q = """PREPARE udeb_contents_q(int,int,text, int) as
+ SELECT file, section, package, arch
+ FROM udeb_contents
+ WHERE suite = $1
+ AND otype = $2
+ AND section = $3
+ and arch = $4
+ ORDER BY file"""
# clear out all of the temporarily stored content associations
# so there should no longer be anything in the queue
remove_pending_contents_cruft_q = """DELETE FROM pending_content_associations"""
-# delete any filenames we are storing which have no binary associated with them
-remove_filename_cruft_q = """DELETE FROM content_file_names
- WHERE id IN (SELECT cfn.id FROM content_file_names cfn
- LEFT JOIN content_associations ca
- ON ca.filename=cfn.id
- WHERE ca.id IS NULL)"""
-
-# delete any paths we are storing which have no binary associated with them
-remove_filepath_cruft_q = """DELETE FROM content_file_paths
- WHERE id IN (SELECT cfn.id FROM content_file_paths cfn
- LEFT JOIN content_associations ca
- ON ca.filepath=cfn.id
- WHERE ca.id IS NULL)"""
-
class EndOfContents(object):
- """
- A sentry object for the end of the filename stream
- """
pass
-class GzippedContentWriter(object):
+class OneAtATime(object):
"""
- An object which will write contents out to a Contents-$arch.gz
- file on a separate thread
"""
+ def __init__(self):
+ self.next_in_line = None
+ self.next_lock = threading.Condition()
+
+ def enqueue(self, next):
+ self.next_lock.acquire()
+ while self.next_in_line:
+ self.next_lock.wait()
+
+ assert( not self.next_in_line )
+ self.next_in_line = next
+ self.next_lock.notify()
+ self.next_lock.release()
+
+ def dequeue(self):
+ self.next_lock.acquire()
+ while not self.next_in_line:
+ self.next_lock.wait()
+ result = self.next_in_line
+ self.next_in_line = None
+ self.next_lock.notify()
+ self.next_lock.release()
+ return result
+
+
+class ContentsWorkThread(threading.Thread):
+ """
+ """
+ def __init__(self, upstream, downstream):
+ threading.Thread.__init__(self)
+ self.upstream = upstream
+ self.downstream = downstream
- header = None # a class object holding the header section of contents file
+ def run(self):
+ while True:
+ try:
+ contents_file = self.upstream.dequeue()
+ if isinstance(contents_file,EndOfContents):
+ if self.downstream:
+ self.downstream.enqueue(contents_file)
+ break
+
+ s = datetime.datetime.now()
+ print("%s start: %s" % (self,contents_file) )
+ self._run(contents_file)
+ print("%s finished: %s in %d seconds" % (self, contents_file, (datetime.datetime.now()-s).seconds ))
+ if self.downstream:
+ self.downstream.enqueue(contents_file)
+ except:
+ traceback.print_exc()
+
+class QueryThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "QueryThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.query()
+
+class IngestThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "IngestThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.ingest()
+
+class SortThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "SortThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.sorted_keys = sorted(contents_file.filenames.keys())
+
+class OutputThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "OutputThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ contents_file.open_file()
+ for fname in contents_file.sorted_keys:
+ contents_file.filehandle.write("%s\t%s\n" % (fname,contents_file.filenames[fname]))
+ contents_file.sorted_keys = None
+ contents_file.filenames.clear()
+
+class GzipThread(ContentsWorkThread):
+ def __init__(self, upstream, downstream):
+ ContentsWorkThread.__init__(self, upstream, downstream)
+
+ def __str__(self):
+ return "GzipThread"
+ __repr__ = __str__
+
+ def _run(self, contents_file):
+ os.system("gzip -f %s" % contents_file.filename)
+
+class ContentFile(object):
+ def __init__(self,
+ filename,
+ suite_str,
+ suite_id,
+ arch_str,
+ arch_id):
+
+ self.filename = filename
+ self.filenames = {}
+ self.sorted_keys = None
+ self.suite_str = suite_str
+ self.suite_id = suite_id
+ self.arch_str = arch_str
+ self.arch_id = arch_id
+ self.cursor = None
+ self.filehandle = None
+
+ def __str__(self):
+ return self.filename
+ __repr__ = __str__
+
+
+ def cleanup(self):
+ self.filenames = None
+ self.sortedkeys = None
+ self.filehandle.close()
+ self.cursor.close()
+
+ def query(self):
+ self.cursor = DBConn().cursor();
+
+ self.cursor.execute("""SELECT file, section || '/' || package
+ FROM deb_contents
+ WHERE ( arch=2 or arch = %d) AND suite = %d
+ """ % (self.arch_id, self.suite_id))
+
+ def ingest(self):
+ while True:
+ r = self.cursor.fetchone()
+ if not r:
+ break
+ filename, package = r
+ if self.filenames.has_key(filename):
+ self.filenames[filename] += ",%s" % (package)
+ else:
+ self.filenames[filename] = "%s" % (package)
+ self.cursor.close()
- def __init__(self, filename):
- """
- @type filename: string
- @param filename: the name of the file to write to
- """
- self.queue = Queue.Queue()
- self.current_file = None
- self.first_package = True
- self.output = self.open_file(filename)
- self.thread = threading.Thread(target=self.write_thread,
- name='Contents writer')
- self.thread.start()
-
- def open_file(self, filename):
+ def open_file(self):
"""
opens a gzip stream to the contents file
"""
- filepath = Config()["Contents::Root"] + filename
- filedir = os.path.dirname(filepath)
+# filepath = Config()["Contents::Root"] + self.filename
+ self.filename = "/home/stew/contents/" + self.filename
+ filedir = os.path.dirname(self.filename)
if not os.path.isdir(filedir):
os.makedirs(filedir)
- return gzip.open(filepath, "w")
-
- def write(self, filename, section, package):
- """
- enqueue content to be written to the file on a separate thread
- """
- self.queue.put((filename,section,package))
-
- def write_thread(self):
- """
- the target of a Thread which will do the actual writing
- """
- while True:
- next = self.queue.get()
- if isinstance(next, EndOfContents):
- self.output.write('\n')
- self.output.close()
- break
-
- (filename,section,package)=next
- if next != self.current_file:
- # this is the first file, so write the header first
- if not self.current_file:
- self.output.write(self._getHeader())
-
- self.output.write('\n%s\t' % filename)
- self.first_package = True
+# self.filehandle = gzip.open(self.filename, "w")
+ self.filehandle = open(self.filename, "w")
+ self._write_header()
- self.current_file=filename
+ def _write_header(self):
+ self._get_header();
+ self.filehandle.write(ContentFile.header)
- if not self.first_package:
- self.output.write(',')
- else:
- self.first_package=False
- self.output.write('%s/%s' % (section,package))
-
- def finish(self):
- """
- enqueue the sentry object so that writers will know to terminate
- """
- self.queue.put(EndOfContents())
+ header=None
@classmethod
- def _getHeader(self):
+ def _get_header(self):
"""
Internal method to return the header for Contents.gz files
This is boilerplate which explains the contents of the file and how
it can be used.
"""
- if not GzippedContentWriter.header:
+ if not ContentFile.header:
if Config().has_key("Contents::Header"):
try:
h = open(os.path.join( Config()["Dir::Templates"],
Config()["Contents::Header"] ), "r")
- GzippedContentWriter.header = h.read()
+ ContentFile.header = h.read()
h.close()
except:
log.error( "error opening header file: %d\n%s" % (Config()["Contents::Header"],
traceback.format_exc() ))
- GzippedContentWriter.header = None
+ ContentFile.header = None
else:
- GzippedContentWriter.header = None
-
- return GzippedContentWriter.header
+ ContentFile.header = None
+ return ContentFile.header
class Contents(object):
"""
cursor = DBConn().cursor();
DBConn().prepare("debs_q",debs_q)
- DBConn().prepare("olddeb_q",olddeb_q)
DBConn().prepare("arches_q",arches_q)
suites = self._suites()
break
count += 1
cursor1 = DBConn().cursor();
- cursor1.execute( "EXECUTE olddeb_q(%d)" % (deb[0] ) )
+ cursor1.execute( "SELECT 1 FROM deb_contents WHERE binary_id = %d LIMIT 1" % (deb[0] ) )
old = cursor1.fetchone()
if old:
- log.debug( "already imported: %s" % (deb[1]) )
+ log.log( "already imported: %s" % (deb[1]) )
else:
- log.debug( "scanning: %s" % (deb[1]) )
+# log.debug( "scanning: %s" % (deb[1]) )
+ log.log( "scanning: %s" % (deb[1]) )
debfile = os.path.join( pooldir, deb[1] )
if os.path.exists( debfile ):
- Binary(debfile, self.reject).scan_package(deb[0],True)
+ Binary(debfile, self.reject).scan_package(deb[0], True)
else:
log.error("missing .deb: %s" % deb[1])
+
def generate(self):
"""
- Generate Contents-$arch.gz files for every available arch in each given suite.
+ Generate contents files for both deb and udeb
"""
- cursor = DBConn().cursor()
-
DBConn().prepare("arches_q", arches_q)
- DBConn().prepare("contents_q", contents_q)
- DBConn().prepare("udeb_contents_q", udeb_contents_q)
-
- debtype_id=DBConn().get_override_type_id("deb")
- udebtype_id=DBConn().get_override_type_id("udeb")
+ self.deb_generate()
+# self.udeb_generate()
- arch_all_id = DBConn().get_architecture_id("all")
+ def deb_generate(self):
+ """
+ Generate Contents-$arch.gz files for every available arch in each given suite.
+ """
+ cursor = DBConn().cursor()
+ debtype_id = DBConn().get_override_type_id("deb")
suites = self._suites()
-
+ inputtoquery = OneAtATime()
+ querytoingest = OneAtATime()
+ ingesttosort = OneAtATime()
+ sorttooutput = OneAtATime()
+ outputtogzip = OneAtATime()
+
+ qt = QueryThread(inputtoquery,querytoingest)
+ it = IngestThread(querytoingest,ingesttosort)
+# these actually make things worse
+# it2 = IngestThread(querytoingest,ingesttosort)
+# it3 = IngestThread(querytoingest,ingesttosort)
+# it4 = IngestThread(querytoingest,ingesttosort)
+ st = SortThread(ingesttosort,sorttooutput)
+ ot = OutputThread(sorttooutput,outputtogzip)
+ gt = GzipThread(outputtogzip, None)
+
+ qt.start()
+ it.start()
+# it2.start()
+# it3.start()
+# it2.start()
+ st.start()
+ ot.start()
+ gt.start()
+
# Get our suites, and the architectures
for suite in [i.lower() for i in suites]:
suite_id = DBConn().get_suite_id(suite)
arch_list = self._arches(cursor, suite_id)
- file_writers = {}
-
- try:
- for arch_id in arch_list:
- file_writers[arch_id[0]] = GzippedContentWriter("dists/%s/Contents-%s.gz" % (suite, arch_id[1]))
-
- cursor.execute("EXECUTE contents_q(%d,%d);" % (suite_id, debtype_id))
+ for (arch_id,arch_str) in arch_list:
+ print( "suite: %s, arch: %s time: %s" %(suite_id, arch_id, datetime.datetime.now().isoformat()) )
- while True:
- r = cursor.fetchone()
- if not r:
- break
-
- filename, section, package, arch = r
-
- if not file_writers.has_key( arch ):
- continue
+# filename = "dists/%s/Contents-%s.gz" % (suite, arch_str)
+ filename = "dists/%s/Contents-%s" % (suite, arch_str)
+ cf = ContentFile(filename, suite, suite_id, arch_str, arch_id)
+ inputtoquery.enqueue( cf )
- if arch == arch_all_id:
- ## its arch all, so all contents files get it
- for writer in file_writers.values():
- writer.write(filename, section, package)
+ inputtoquery.enqueue( EndOfContents() )
+ gt.join()
- else:
- file_writers[arch].write(filename, section, package)
+ def udeb_generate(self):
+ """
+ Generate Contents-$arch.gz files for every available arch in each given suite.
+ """
+ cursor = DBConn().cursor()
- finally:
- # close all the files
- for writer in file_writers.values():
- writer.finish()
+ DBConn().prepare("udeb_contents_q", udeb_contents_q)
+ udebtype_id=DBConn().get_override_type_id("udeb")
+ suites = self._suites()
+# for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s.gz"),
+# ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
- # The MORE fun part. Ok, udebs need their own contents files, udeb, and udeb-nf (not-free)
- # This is HORRIBLY debian specific :-/
- for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s.gz"),
- ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s.gz")]:
+ for section, fn_pattern in [("debian-installer","dists/%s/Contents-udeb-%s"),
+ ("non-free/debian-installer", "dists/%s/Contents-udeb-nf-%s")]:
section_id = DBConn().get_section_id(section) # all udebs should be here)
if section_id != -1:
suite_id = DBConn().get_suite_id(suite)
arch_list = self._arches(cursor, suite_id)
- file_writers = {}
+ for arch_id in arch_list:
- try:
- for arch_id in arch_list:
- file_writers[arch_id[0]] = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
+ writer = GzippedContentWriter(fn_pattern % (suite, arch_id[1]))
+ try:
- cursor.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id))
+ cursor.execute("EXECUTE udeb_contents_q(%d,%d,%d)" % (suite_id, udebtype_id, section_id, arch_id))
- while True:
- r = cursor.fetchone()
- if not r:
- break
+ while True:
+ r = cursor.fetchone()
+ if not r:
+ break
- filename, section, package, arch = r
-
- if not file_writers.has_key( arch ):
- continue
-
- if arch == arch_all_id:
- ## its arch all, so all contents files get it
- for writer in file_writers.values():
- writer.write(filename, section, package)
-
- else:
- file_writers[arch].write(filename, section, package)
- finally:
- # close all the files
- for writer in file_writers.values():
- writer.finish()
+ filename, section, package, arch = r
+ writer.write(filename, section, package)
+ finally:
+ writer.close()
if Config().has_key( "%s::%s" %(options_prefix,"Suite")):
suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")])
else:
- suites = Config().SubTree("Suite").List()
+ suites = [ 'unstable', 'testing' ]
+# suites = Config().SubTree("Suite").List()
return suites
def main():
cnf = Config()
-
+# log = logging.Logger(cnf, "contents")
+
arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")),
('s',"suite", "%s::%s" % (options_prefix,"Suite"),"HasArg"),
('q',"quiet", "%s::%s" % (options_prefix,"Quiet")),
if cnf.has_key("%s::%s" % (options_prefix,"Help")):
usage()
- level=logging.INFO
- if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
- level=logging.ERROR
+# level=logging.INFO
+# if cnf.has_key("%s::%s" % (options_prefix,"Quiet")):
+# level=logging.ERROR
- elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
- level=logging.DEBUG
+# elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")):
+# level=logging.DEBUG
- logging.basicConfig( level=level,
- format='%(asctime)s %(levelname)s %(message)s',
- stream = sys.stderr )
+# logging.basicConfig( level=level,
+# format='%(asctime)s %(levelname)s %(message)s',
+# stream = sys.stderr )
commands[args[0]](Contents())