from daklib.dbconn import *
from daklib.config import Config
from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
import apt_pkg, os, stat, sys
from daklib.lists import getSources, getBinaries, getArchAll
+EXIT_STATUS = 0
+
def listPath(suite, component, architecture = None, type = None,
incremental_mode = False):
"""returns full path to the list file"""
Options = cnf.SubTree("Filelist::Options")
if Options['Help']:
usage()
- #pool = Pool()
+ pool = DakProcessPool()
query_suites = query_suites. \
filter(Suite.suite_name.in_(utils.split_args(Options['Suite'])))
query_components = query_components. \
query_architectures = query_architectures. \
filter(Architecture.arch_string.in_(utils.split_args(Options['Architecture'])))
- def log(message):
- Logger.log([message])
+ def parse_results(message):
+ # Split out into (code, msg)
+ code, msg = message
+ if code == PROC_STATUS_SUCCESS:
+ Logger.log([msg])
+ elif code == PROC_STATUS_SIGNALRAISED:
+ Logger.log(['E: Subprocess recieved signal ', msg])
+ else:
+ Logger.log(['E: ', msg])
for suite in query_suites:
suite_id = suite.suite_id
if architecture not in suite.architectures:
pass
elif architecture.arch_string == 'source':
- Logger.log([writeSourceList(suite_id, component_id, Options['Incremental'])])
- #pool.apply_async(writeSourceList,
- # (suite_id, component_id, Options['Incremental']), callback=log)
+ pool.apply_async(writeSourceList,
+ (suite_id, component_id, Options['Incremental']), callback=parse_results)
elif architecture.arch_string == 'all':
- Logger.log([writeAllList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
- #pool.apply_async(writeAllList,
- # (suite_id, component_id, architecture_id, 'deb',
- # Options['Incremental']), callback=log)
- Logger.log([writeAllList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
- #pool.apply_async(writeAllList,
- # (suite_id, component_id, architecture_id, 'udeb',
- # Options['Incremental']), callback=log)
+ pool.apply_async(writeAllList,
+ (suite_id, component_id, architecture_id, 'deb',
+ Options['Incremental']), callback=parse_results)
+ pool.apply_async(writeAllList,
+ (suite_id, component_id, architecture_id, 'udeb',
+ Options['Incremental']), callback=parse_results)
else: # arch any
- Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
- #pool.apply_async(writeBinaryList,
- # (suite_id, component_id, architecture_id, 'deb',
- # Options['Incremental']), callback=log)
- Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
- #pool.apply_async(writeBinaryList,
- # (suite_id, component_id, architecture_id, 'udeb',
- # Options['Incremental']), callback=log)
- #pool.close()
- #pool.join()
+ pool.apply_async(writeBinaryList,
+ (suite_id, component_id, architecture_id, 'deb',
+ Options['Incremental']), callback=parse_results)
+ pool.apply_async(writeBinaryList,
+ (suite_id, component_id, architecture_id, 'udeb',
+ Options['Incremental']), callback=parse_results)
+ pool.close()
+ pool.join()
+
# this script doesn't change the database
session.close()
Logger.close()
+ sys.exit(pool.overall_status())
+
if __name__ == '__main__':
main()
from daklib.dbconn import *
from daklib.config import Config
from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
from daklib.filewriter import PackagesFileWriter, SourcesFileWriter
import apt_pkg, os, stat, sys
+EXIT_STATUS = 0
+
def usage():
print """Usage: dak generate-packages-sources2 [OPTIONS]
Generate the Packages/Sources files
component_ids = [ c.component_id for c in session.query(Component).all() ]
- def log(details):
- logger.log(details)
-
- #pool = Pool()
+ def parse_results(message):
+ # Split out into (code, msg)
+ code, msg = message
+ if code == PROC_STATUS_SUCCESS:
+ Logger.log([msg])
+ elif code == PROC_STATUS_SIGNALRAISED:
+ Logger.log(['E: Subprocess recieved signal ', msg])
+ else:
+ Logger.log(['E: ', msg])
+
+ pool = DakProcessPool()
for s in suites:
if s.untouchable and not force:
utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name)
for c in component_ids:
- logger.log(generate_sources(s.suite_id, c))
- #pool.apply_async(generate_sources, [s.suite_id, c], callback=log)
+ pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_result)
for a in s.architectures:
- logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb'))
- #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log)
- logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb'))
- #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log)
+ pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_result)
+ pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_result)
+
+ pool.close()
+ pool.join()
- #pool.close()
- #pool.join()
# this script doesn't change the database
session.close()
logger.close()
+ sys.exit(pool.output_status())
+
if __name__ == '__main__':
main()
import apt_pkg
from tempfile import mkstemp, mkdtemp
import commands
-from multiprocessing import Pool, TimeoutError
from sqlalchemy.orm import object_session
from daklib import utils, daklog
from daklib.dak_exceptions import *
from daklib.dbconn import *
from daklib.config import Config
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS
################################################################################
Logger = None #: Our logging object
-results = [] #: Results of the subprocesses
################################################################################
########################################################################
-def get_result(arg):
- global results
- if arg:
- results.append(arg)
-
def sign_release_dir(suite, dirname):
cnf = Config()
def main ():
- global Logger, results
+ global Logger
cnf = Config()
suites = session.query(Suite).filter(Suite.untouchable == False).all()
broken=[]
- # For each given suite, run one process
- results = []
- pool = Pool()
+ pool = DakProcessPool()
for s in suites:
# Setup a multiprocessing Pool. As many workers as we have CPU cores.
pool.close()
pool.join()
- retcode = 0
+ retcode = p.overall_status()
- if len(results) > 0:
- Logger.log(['Release file generation broken: %s' % (results)])
- print "Release file generation broken:\n", '\n'.join(results)
- retcode = 1
+ if retcode > 0:
+ Logger.log(['Release file generation broken: %s' % (p.results)])
Logger.close()
'''
session = DBConn().session()
suite = Suite.get(suite_id, session)
- try:
- rw = ReleaseWriter(suite)
- rw.generate_release_files()
- except Exception, e:
- return str(e)
- return
+ # We allow the process handler to catch and deal with any exceptions
+ rw = ReleaseWriter(suite)
+ rw.generate_release_files()
+
+ return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name)
#######################################################################################
from daklib.config import Config
from daklib import daklog
from daklib.changesutils import *
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool
# Globals
Cnf = None
examine_package.use_html=1
- pool = Pool()
+ pool = DakProcessPool()
for changes_file in changes_files:
changes_file = utils.validate_changes_file_arg(changes_file, 0)
if not changes_file:
###############################################################################
-import multiprocessing
+from multiprocessing.pool import Pool
+from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGCHLD, SIGALRM
+
import sqlalchemy.orm.session
+__all__ = []
+
+PROC_STATUS_SUCCESS = 0 # Everything ok
+PROC_STATUS_EXCEPTION = 1 # An exception was caught
+PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
+PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
+
+__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
+ 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
+
+class SignalException(Exception):
+ def __init__(self, signum):
+ self.signum = signum
+
+ def __str__(self):
+ return "<SignalException: %d>" % self.signum
+
+__all__.append('SignalException')
+
+def signal_handler(signum, info):
+ raise SignalException(signum)
+
def _func_wrapper(func, *args, **kwds):
+ # We need to handle signals to avoid hanging
+ signal(SIGHUP, signal_handler)
+ signal(SIGTERM, signal_handler)
+ signal(SIGPIPE, signal_handler)
+ signal(SIGCHLD, signal_handler)
+ signal(SIGALRM, signal_handler)
+
+ # We expect our callback function to return:
+ # (status, messages)
+ # Where:
+ # status is one of PROC_STATUS_*
+ # messages is a string used for logging
try:
- return func(*args, **kwds)
+ return (func(*args, **kwds))
+ except SignalException, e:
+ return (PROC_STATUS_SIGNALRAISED, e.signum)
+ except Exception, e:
+ return (PROC_STATUS_EXCEPTION, str(e))
finally:
# Make sure connections are closed. We might die otherwise.
sqlalchemy.orm.session.Session.close_all()
-class Pool():
+
+class DakProcessPool(Pool):
def __init__(self, *args, **kwds):
- self.pool = multiprocessing.Pool(*args, **kwds)
+ Pool.__init__(self, *args, **kwds)
self.results = []
+ self.int_results = []
def apply_async(self, func, args=(), kwds={}, callback=None):
wrapper_args = list(args)
wrapper_args.insert(0, func)
- self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback))
-
- def close(self):
- self.pool.close()
+ self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
def join(self):
- self.pool.join()
- #for r in self.results:
- # # return values were already handled in the callbacks, but asking
- # # for them might raise exceptions which would otherwise be lost
- # r.get()
+ Pool.join(self)
+ for r in self.int_results:
+ # return values were already handled in the callbacks, but asking
+ # for them might raise exceptions which would otherwise be lost
+ self.results.append(r.get())
+
+ def overall_status(self):
+ # Return the highest of our status results
+ # This basically allows us to do sys.exit(overall_status()) and have us
+ # exit 0 if everything was good and non-zero if not
+ status = 0
+ for r in self.results:
+ if r[0] > status:
+ status = r[0]
+ return status
+
+__all__.append('DakProcessPool')
--- /dev/null
+#!/usr/bin/python
+
+from base_test import DakTestCase
+
+from daklib.dakmultiprocessing import DakProcessPool, \
+ PROC_STATUS_SUCCESS, PROC_STATUS_MISCFAILURE, \
+ PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED
+import signal
+
+def test_function(num, num2):
+ from os import kill, getpid
+
+ if num == 1:
+ sigs = [signal.SIGTERM, signal.SIGCHLD, signal.SIGALRM, signal.SIGHUP]
+ kill(getpid(), sigs[num2])
+
+ if num2 == 3:
+ raise Exception('Test uncaught exception handling')
+
+ if num == 0 and num2 == 1:
+ return (PROC_STATUS_MISCFAILURE, 'Test custom error return')
+
+ return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2))
+
+class DakProcessPoolTestCase(DakTestCase):
+ def testPool(self):
+ def alarm_handler(signum, frame):
+ raise AssertionError('Timed out')
+
+ # Shouldn't take us more than 15 seconds to run this test
+ signal.signal(signal.SIGALRM, alarm_handler)
+ signal.alarm(15)
+
+ p = DakProcessPool()
+ for s in range(3):
+ for j in range(4):
+ p.apply_async(test_function, [s, j])
+
+ p.close()
+ p.join()
+
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, signal.SIG_DFL)
+
+ expected = [(PROC_STATUS_SUCCESS, 'blah, 0, 0'),
+ (PROC_STATUS_MISCFAILURE, 'Test custom error return'),
+ (PROC_STATUS_SUCCESS, 'blah, 0, 2'),
+ (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling'),
+ (PROC_STATUS_SIGNALRAISED, 15),
+ (PROC_STATUS_SIGNALRAISED, 17),
+ (PROC_STATUS_SIGNALRAISED, 14),
+ (PROC_STATUS_SIGNALRAISED, 1),
+ (PROC_STATUS_SUCCESS, 'blah, 2, 0'),
+ (PROC_STATUS_SUCCESS, 'blah, 2, 1'),
+ (PROC_STATUS_SUCCESS, 'blah, 2, 2'),
+ (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling')]
+
+ self.assertEqual( len(p.results), len(expected) )
+
+ for r in range(len(p.results)):
+ self.assertEqual(p.results[r], expected[r])