]> git.decadent.org.uk Git - dak.git/commitdiff
Merge remote branch 'ftpmaster/master' into multiproc
authorMark Hymers <mhy@debian.org>
Sat, 9 Apr 2011 08:42:22 +0000 (09:42 +0100)
committerMark Hymers <mhy@debian.org>
Sat, 9 Apr 2011 08:42:22 +0000 (09:42 +0100)
Conflicts:
dak/show_new.py

Signed-off-by: Mark Hymers <mhy@debian.org>
1  2 
dak/generate_packages_sources2.py
dak/generate_releases.py
dak/show_new.py

index 2b792f3c9ad7ecc098dc426fb89761bc000b65fe,fbae045fd1676ce2afc323f34c7404b9b3dbd01c..b157fcbe5fc989d0bb1adce55b9237c1d11313e1
@@@ -31,7 -31,7 +31,7 @@@ Generate Packages/Sources file
  from daklib.dbconn import *
  from daklib.config import Config
  from daklib import utils, daklog
 -from daklib.dakmultiprocessing import Pool
 +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
  from daklib.filewriter import PackagesFileWriter, SourcesFileWriter
  
  import apt_pkg, os, stat, sys
@@@ -119,7 -119,7 +119,7 @@@ def generate_sources(suite_id, componen
  
      message = ["generate sources", suite.suite_name, component.component_name]
      session.rollback()
 -    return message
 +    return (PROC_STATUS_SUCCESS, message)
  
  #############################################################################
  
@@@ -165,7 -165,9 +165,9 @@@ SELEC
    || COALESCE(E'\n' || (SELECT
       STRING_AGG(key || '\: ' || value, E'\n' ORDER BY key)
     FROM external_overrides eo
-    WHERE eo.package = tmp.package
+    WHERE
+      eo.package = tmp.package
+      AND eo.suite = :overridesuite AND eo.component = :component
    ), '')
    || E'\nSection\: ' || sec.section
    || E'\nPriority\: ' || pri.priority
@@@ -223,7 -225,7 +225,7 @@@ def generate_packages(suite_id, compone
  
      message = ["generate-packages", suite.suite_name, component.component_name, architecture.arch_string]
      session.rollback()
 -    return message
 +    return (PROC_STATUS_SUCCESS, message)
  
  #############################################################################
  
@@@ -263,35 -265,28 +265,35 @@@ def main()
  
      component_ids = [ c.component_id for c in session.query(Component).all() ]
  
 -    def log(details):
 -        logger.log(details)
 -
 -    #pool = Pool()
 +    def parse_results(message):
 +        # Split out into (code, msg)
 +        code, msg = message
 +        if code == PROC_STATUS_SUCCESS:
 +            logger.log([msg])
 +        elif code == PROC_STATUS_SIGNALRAISED:
 +            logger.log(['E: Subprocess recieved signal ', msg])
 +        else:
 +            logger.log(['E: ', msg])
 +
 +    pool = DakProcessPool()
      for s in suites:
          if s.untouchable and not force:
              utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name)
          for c in component_ids:
 -            logger.log(generate_sources(s.suite_id, c))
 -            #pool.apply_async(generate_sources, [s.suite_id, c], callback=log)
 +            pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_results)
              for a in s.architectures:
 -                logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb'))
 -                #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log)
 -                logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb'))
 -                #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log)
 +                pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_results)
 +                pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_results)
 +
 +    pool.close()
 +    pool.join()
  
 -    #pool.close()
 -    #pool.join()
      # this script doesn't change the database
      session.close()
  
      logger.close()
  
 +    sys.exit(pool.overall_status())
 +
  if __name__ == '__main__':
      main()
diff --combined dak/generate_releases.py
index 8befbc995a1bd601409ca475c77ff201a40b1657,39a76203967de8fbab9bd379e7a130cdab2ea685..3006364602517c17466c075afa82efc0ee7fed56
@@@ -40,6 -40,7 +40,6 @@@ import bz
  import apt_pkg
  from tempfile import mkstemp, mkdtemp
  import commands
 -from multiprocessing import Pool, TimeoutError
  from sqlalchemy.orm import object_session
  
  from daklib import utils, daklog
@@@ -47,10 -48,10 +47,10 @@@ from daklib.regexes import re_gensubrel
  from daklib.dak_exceptions import *
  from daklib.dbconn import *
  from daklib.config import Config
 +from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS
  
  ################################################################################
  Logger = None                  #: Our logging object
 -results = []                   #: Results of the subprocesses
  
  ################################################################################
  
@@@ -73,6 -74,11 +73,6 @@@ SUITE can be a space seperated list, e.
  
  ########################################################################
  
 -def get_result(arg):
 -    global results
 -    if arg:
 -        results.append(arg)
 -
  def sign_release_dir(suite, dirname):
      cnf = Config()
  
@@@ -152,7 -158,11 +152,11 @@@ class ReleaseWriter(object)
  
          for key, dbfield in attribs:
              if getattr(suite, dbfield) is not None:
-                 out.write("%s: %s\n" % (key, getattr(suite, dbfield)))
+                 # TEMPORARY HACK HACK HACK until we change the way we store the suite names etc
+                 if key == 'Suite' and getattr(suite, dbfield) == 'squeeze-updates':
+                     out.write("Suite: stable-updates\n")
+                 else:
+                     out.write("%s: %s\n" % (key, getattr(suite, dbfield)))
  
          out.write("Date: %s\n" % (time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime(time.time()))))
  
  
  
  def main ():
 -    global Logger, results
 +    global Logger
  
      cnf = Config()
  
          suites = session.query(Suite).filter(Suite.untouchable == False).all()
  
      broken=[]
 -    # For each given suite, run one process
 -    results = []
  
 -    pool = Pool()
 +    pool = DakProcessPool()
  
      for s in suites:
          # Setup a multiprocessing Pool. As many workers as we have CPU cores.
  
          print "Processing %s" % s.suite_name
          Logger.log(['Processing release file for Suite: %s' % (s.suite_name)])
 -        pool.apply_async(generate_helper, (s.suite_id, ), callback=get_result)
 +        pool.apply_async(generate_helper, (s.suite_id, ))
  
      # No more work will be added to our pool, close it and then wait for all to finish
      pool.close()
      pool.join()
  
 -    retcode = 0
 +    retcode = pool.overall_status()
  
 -    if len(results) > 0:
 -        Logger.log(['Release file generation broken: %s' % (results)])
 -        print "Release file generation broken:\n", '\n'.join(results)
 -        retcode = 1
 +    if retcode > 0:
 +        # TODO: CENTRAL FUNCTION FOR THIS / IMPROVE LOGGING
 +        Logger.log(['Release file generation broken: %s' % (','.join([str(x[1]) for x in pool.results]))])
  
      Logger.close()
  
@@@ -352,12 -365,13 +356,12 @@@ def generate_helper(suite_id)
      '''
      session = DBConn().session()
      suite = Suite.get(suite_id, session)
 -    try:
 -        rw = ReleaseWriter(suite)
 -        rw.generate_release_files()
 -    except Exception, e:
 -        return str(e)
  
 -    return
 +    # We allow the process handler to catch and deal with any exceptions
 +    rw = ReleaseWriter(suite)
 +    rw.generate_release_files()
 +
 +    return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name)
  
  #######################################################################################
  
diff --combined dak/show_new.py
index 3b052c18307be406181eef8d2c5e308101998339,f50b7853def339f89466fa63da9a1049aebbd590..513129f9c5d34f91623878ce8e305410bb38a89d
@@@ -37,7 -37,7 +37,7 @@@ from daklib.regexes import re_source_ex
  from daklib.config import Config
  from daklib import daklog
  from daklib.changesutils import *
 -from daklib.dakmultiprocessing import Pool
 +from daklib.dakmultiprocessing import DakProcessPool
  
  # Globals
  Cnf = None
@@@ -154,23 -154,39 +154,39 @@@ def do_pkg(changes_file)
      session = DBConn().session()
      u = Upload()
      u.pkg.changes_file = changes_file
-     (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changes_file)
+     # We can afoord not to check the signature before loading the changes file
+     # as we've validated it already (otherwise it couldn't be in new)
+     # and we can more quickly skip over already processed files this way
      u.load_changes(changes_file)
-     new_queue = get_policy_queue('new', session );
-     u.pkg.directory = new_queue.path
-     u.update_subst()
      origchanges = os.path.abspath(u.pkg.changes_file)
-     files = u.pkg.files
-     changes = u.pkg.changes
-     htmlname = changes["source"] + "_" + changes["version"] + ".html"
-     sources.add(htmlname)
  
-     htmlfile = os.path.join(cnf["Show-New::HTMLPath"], htmlname)
+     # Still be cautious in case paring the changes file went badly
+     if u.pkg.changes.has_key('source') and u.pkg.changes.has_key('version'):
+         htmlname = u.pkg.changes["source"] + "_" + u.pkg.changes["version"] + ".html"
+         htmlfile = os.path.join(cnf["Show-New::HTMLPath"], htmlname)
+     else:
+         # Changes file was bad
+         print "Changes file %s missing source or version field" % changes_file
+         session.close()
+         return
+     # Have we already processed this?
      if os.path.exists(htmlfile) and \
          os.stat(htmlfile).st_mtime > os.stat(origchanges).st_mtime:
+             sources.add(htmlname)
              session.close()
 -            return
 +            return (PROC_STATUS_SUCCESS, '%s already up-to-date' % htmlfile)
  
+     # Now we'll load the fingerprint
+     (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changes_file, session=session)
+     new_queue = get_policy_queue('new', session );
+     u.pkg.directory = new_queue.path
+     u.update_subst()
+     files = u.pkg.files
+     changes = u.pkg.changes
+     sources.add(htmlname)
      for deb_filename, f in files.items():
          if deb_filename.endswith(".udeb") or deb_filename.endswith(".deb"):
              u.binary_file_checks(deb_filename, session)
      outfile.close()
      session.close()
  
 +    return (PROC_STATUS_SUCCESS, '%s already updated' % htmlfile)
 +
  ################################################################################
  
  def usage (exit_code=0):
@@@ -252,15 -266,16 +268,16 @@@ def main()
  
      examine_package.use_html=1
  
 -    #pool = Pool(processes=1)
 +    pool = DakProcessPool()
      for changes_file in changes_files:
          changes_file = utils.validate_changes_file_arg(changes_file, 0)
          if not changes_file:
              continue
          print "\n" + changes_file
 -        #pool.apply_async(do_pkg, (changes_file,))
 +        pool.apply_async(do_pkg, (changes_file,))
+         do_pkg(changes_file)
 -    #pool.close()
 -    #pool.join()
 +    pool.close()
 +    pool.join()
  
      files = set(os.listdir(cnf["Show-New::HTMLPath"]))
      to_delete = filter(lambda x: x.endswith(".html"), files.difference(sources))