from daklib.dbconn import *
from daklib.config import Config
from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
import apt_pkg, os, stat, sys
from daklib.lists import getSources, getBinaries, getArchAll
file.write(filename + '\n')
session.rollback()
file.close()
- return message
+ return (PROC_STATUS_SUCCESS, message)
def writeAllList(suite_id, component_id, architecture_id, type, incremental_mode):
session = DBConn().session()
file.write(filename + '\n')
session.rollback()
file.close()
- return message
+ return (PROC_STATUS_SUCCESS, message)
def writeBinaryList(suite_id, component_id, architecture_id, type, incremental_mode):
session = DBConn().session()
file.write(filename + '\n')
session.rollback()
file.close()
- return message
+ return (PROC_STATUS_SUCCESS, message)
def usage():
print """Usage: dak generate_filelist [OPTIONS]
Options = cnf.SubTree("Filelist::Options")
if Options['Help']:
usage()
- #pool = Pool()
+ pool = DakProcessPool()
query_suites = query_suites. \
filter(Suite.suite_name.in_(utils.split_args(Options['Suite'])))
query_components = query_components. \
query_architectures = query_architectures. \
filter(Architecture.arch_string.in_(utils.split_args(Options['Architecture'])))
- def log(message):
- Logger.log([message])
+ def parse_results(message):
+ # Split out into (code, msg)
+ code, msg = message
+ if code == PROC_STATUS_SUCCESS:
+ Logger.log([msg])
+ elif code == PROC_STATUS_SIGNALRAISED:
+ Logger.log(['E: Subprocess recieved signal ', msg])
+ else:
+ Logger.log(['E: ', msg])
for suite in query_suites:
suite_id = suite.suite_id
if architecture not in suite.architectures:
pass
elif architecture.arch_string == 'source':
- Logger.log([writeSourceList(suite_id, component_id, Options['Incremental'])])
- #pool.apply_async(writeSourceList,
- # (suite_id, component_id, Options['Incremental']), callback=log)
+ pool.apply_async(writeSourceList,
+ (suite_id, component_id, Options['Incremental']), callback=parse_results)
elif architecture.arch_string == 'all':
- Logger.log([writeAllList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
- #pool.apply_async(writeAllList,
- # (suite_id, component_id, architecture_id, 'deb',
- # Options['Incremental']), callback=log)
- Logger.log([writeAllList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
- #pool.apply_async(writeAllList,
- # (suite_id, component_id, architecture_id, 'udeb',
- # Options['Incremental']), callback=log)
+ pool.apply_async(writeAllList,
+ (suite_id, component_id, architecture_id, 'deb',
+ Options['Incremental']), callback=parse_results)
+ pool.apply_async(writeAllList,
+ (suite_id, component_id, architecture_id, 'udeb',
+ Options['Incremental']), callback=parse_results)
else: # arch any
- Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'deb', Options['Incremental'])])
- #pool.apply_async(writeBinaryList,
- # (suite_id, component_id, architecture_id, 'deb',
- # Options['Incremental']), callback=log)
- Logger.log([writeBinaryList(suite_id, component_id, architecture_id, 'udeb', Options['Incremental'])])
- #pool.apply_async(writeBinaryList,
- # (suite_id, component_id, architecture_id, 'udeb',
- # Options['Incremental']), callback=log)
- #pool.close()
- #pool.join()
+ pool.apply_async(writeBinaryList,
+ (suite_id, component_id, architecture_id, 'deb',
+ Options['Incremental']), callback=parse_results)
+ pool.apply_async(writeBinaryList,
+ (suite_id, component_id, architecture_id, 'udeb',
+ Options['Incremental']), callback=parse_results)
+ pool.close()
+ pool.join()
+
# this script doesn't change the database
session.close()
Logger.close()
+ sys.exit(pool.overall_status())
+
if __name__ == '__main__':
main()
from daklib.dbconn import *
from daklib.config import Config
from daklib import utils, daklog
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS, PROC_STATUS_SIGNALRAISED
from daklib.filewriter import PackagesFileWriter, SourcesFileWriter
import apt_pkg, os, stat, sys
message = ["generate sources", suite.suite_name, component.component_name]
session.rollback()
- return message
+ return (PROC_STATUS_SUCCESS, message)
#############################################################################
message = ["generate-packages", suite.suite_name, component.component_name, architecture.arch_string]
session.rollback()
- return message
+ return (PROC_STATUS_SUCCESS, message)
#############################################################################
component_ids = [ c.component_id for c in session.query(Component).all() ]
- def log(details):
- logger.log(details)
-
- #pool = Pool()
+ def parse_results(message):
+ # Split out into (code, msg)
+ code, msg = message
+ if code == PROC_STATUS_SUCCESS:
+ logger.log([msg])
+ elif code == PROC_STATUS_SIGNALRAISED:
+ logger.log(['E: Subprocess recieved signal ', msg])
+ else:
+ logger.log(['E: ', msg])
+
+ pool = DakProcessPool()
for s in suites:
if s.untouchable and not force:
utils.fubar("Refusing to touch %s (untouchable and not forced)" % s.suite_name)
for c in component_ids:
- logger.log(generate_sources(s.suite_id, c))
- #pool.apply_async(generate_sources, [s.suite_id, c], callback=log)
+ pool.apply_async(generate_sources, [s.suite_id, c], callback=parse_results)
for a in s.architectures:
- logger.log(generate_packages(s.suite_id, c, a.arch_id, 'deb'))
- #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=log)
- logger.log(generate_packages(s.suite_id, c, a.arch_id, 'udeb'))
- #pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=log)
+ pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'deb'], callback=parse_results)
+ pool.apply_async(generate_packages, [s.suite_id, c, a.arch_id, 'udeb'], callback=parse_results)
+
+ pool.close()
+ pool.join()
- #pool.close()
- #pool.join()
# this script doesn't change the database
session.close()
logger.close()
+ sys.exit(pool.overall_status())
+
if __name__ == '__main__':
main()
import apt_pkg
from tempfile import mkstemp, mkdtemp
import commands
-from multiprocessing import Pool, TimeoutError
from sqlalchemy.orm import object_session
from daklib import utils, daklog
from daklib.dak_exceptions import *
from daklib.dbconn import *
from daklib.config import Config
+from daklib.dakmultiprocessing import DakProcessPool, PROC_STATUS_SUCCESS
################################################################################
Logger = None #: Our logging object
-results = [] #: Results of the subprocesses
################################################################################
########################################################################
-def get_result(arg):
- global results
- if arg:
- results.append(arg)
-
def sign_release_dir(suite, dirname):
cnf = Config()
def main ():
- global Logger, results
+ global Logger
cnf = Config()
suites = session.query(Suite).filter(Suite.untouchable == False).all()
broken=[]
- # For each given suite, run one process
- results = []
- pool = Pool()
+ pool = DakProcessPool()
for s in suites:
# Setup a multiprocessing Pool. As many workers as we have CPU cores.
print "Processing %s" % s.suite_name
Logger.log(['Processing release file for Suite: %s' % (s.suite_name)])
- pool.apply_async(generate_helper, (s.suite_id, ), callback=get_result)
+ pool.apply_async(generate_helper, (s.suite_id, ))
# No more work will be added to our pool, close it and then wait for all to finish
pool.close()
pool.join()
- retcode = 0
+ retcode = pool.overall_status()
- if len(results) > 0:
- Logger.log(['Release file generation broken: %s' % (results)])
- print "Release file generation broken:\n", '\n'.join(results)
- retcode = 1
+ if retcode > 0:
+ # TODO: CENTRAL FUNCTION FOR THIS / IMPROVE LOGGING
+ Logger.log(['Release file generation broken: %s' % (','.join([str(x[1]) for x in pool.results]))])
Logger.close()
'''
session = DBConn().session()
suite = Suite.get(suite_id, session)
- try:
- rw = ReleaseWriter(suite)
- rw.generate_release_files()
- except Exception, e:
- return str(e)
- return
+ # We allow the process handler to catch and deal with any exceptions
+ rw = ReleaseWriter(suite)
+ rw.generate_release_files()
+
+ return (PROC_STATUS_SUCCESS, 'Release file written for %s' % suite.suite_name)
#######################################################################################
from daklib.config import Config
from daklib import daklog
from daklib.changesutils import *
-from daklib.dakmultiprocessing import Pool
+from daklib.dakmultiprocessing import DakProcessPool
# Globals
Cnf = None
os.stat(htmlfile).st_mtime > os.stat(origchanges).st_mtime:
sources.add(htmlname)
session.close()
- return
+ return (PROC_STATUS_SUCCESS, '%s already up-to-date' % htmlfile)
# Now we'll load the fingerprint
(u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changes_file, session=session)
outfile.close()
session.close()
+ return (PROC_STATUS_SUCCESS, '%s already updated' % htmlfile)
+
################################################################################
def usage (exit_code=0):
examine_package.use_html=1
- #pool = Pool(processes=1)
+ pool = DakProcessPool()
for changes_file in changes_files:
changes_file = utils.validate_changes_file_arg(changes_file, 0)
if not changes_file:
continue
print "\n" + changes_file
- #pool.apply_async(do_pkg, (changes_file,))
+ pool.apply_async(do_pkg, (changes_file,))
do_pkg(changes_file)
- #pool.close()
- #pool.join()
+ pool.close()
+ pool.join()
files = set(os.listdir(cnf["Show-New::HTMLPath"]))
to_delete = filter(lambda x: x.endswith(".html"), files.difference(sources))
################################################################################
-class Logger:
+class Logger(object):
"Logger object"
- Cnf = None
- logfile = None
- program = None
+ __shared_state = {}
- def __init__ (self, Cnf, program, debug=0, print_starting=True):
+ def __init__(self, *args, **kwargs):
+ self.__dict__ = self.__shared_state
+
+ if not getattr(self, 'initialised', False):
+ from daklib.config import Config
+ self.initialised = True
+
+ # To be backwards compatibile, dump the first argument if it's a
+ # Config object. TODO: Fix up all callers and remove this
+ if len(args) > 0 and isinstance(args[0], Config):
+ args = list(args)
+ args.pop(0)
+
+ self.__setup(*args, **kwargs)
+
+
+ def __setup(self, program='unknown', debug=False, print_starting=True, include_pid=False):
"Initialize a new Logger object"
- self.Cnf = Cnf
self.program = program
+ self.debug = debug
+ self.include_pid = include_pid
+
# Create the log directory if it doesn't exist
- logdir = Cnf["Dir::Log"]
+ from daklib.config import Config
+ logdir = Config()["Dir::Log"]
if not os.path.exists(logdir):
umask = os.umask(00000)
os.makedirs(logdir, 02775)
os.umask(umask)
+
# Open the logfile
logfilename = "%s/%s" % (logdir, time.strftime("%Y-%m"))
logfile = None
+
if debug:
logfile = sys.stderr
else:
umask = os.umask(00002)
logfile = utils.open_file(logfilename, 'a')
os.umask(umask)
+
self.logfile = logfile
+
if print_starting:
self.log(["program start"])
###############################################################################
-import multiprocessing
+from multiprocessing.pool import Pool
+from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
+
import sqlalchemy.orm.session
+__all__ = []
+
+PROC_STATUS_SUCCESS = 0 # Everything ok
+PROC_STATUS_EXCEPTION = 1 # An exception was caught
+PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
+PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
+
+__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
+ 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
+
+class SignalException(Exception):
+ def __init__(self, signum):
+ self.signum = signum
+
+ def __str__(self):
+ return "<SignalException: %d>" % self.signum
+
+__all__.append('SignalException')
+
+def signal_handler(signum, info):
+ raise SignalException(signum)
+
def _func_wrapper(func, *args, **kwds):
+ # We need to handle signals to avoid hanging
+ signal(SIGHUP, signal_handler)
+ signal(SIGTERM, signal_handler)
+ signal(SIGPIPE, signal_handler)
+ signal(SIGALRM, signal_handler)
+
+ # We expect our callback function to return:
+ # (status, messages)
+ # Where:
+ # status is one of PROC_STATUS_*
+ # messages is a string used for logging
try:
- return func(*args, **kwds)
+ return (func(*args, **kwds))
+ except SignalException, e:
+ return (PROC_STATUS_SIGNALRAISED, e.signum)
+ except Exception, e:
+ return (PROC_STATUS_EXCEPTION, str(e))
finally:
# Make sure connections are closed. We might die otherwise.
sqlalchemy.orm.session.Session.close_all()
-class Pool():
+
+class DakProcessPool(Pool):
def __init__(self, *args, **kwds):
- self.pool = multiprocessing.Pool(*args, **kwds)
+ Pool.__init__(self, *args, **kwds)
self.results = []
+ self.int_results = []
def apply_async(self, func, args=(), kwds={}, callback=None):
wrapper_args = list(args)
wrapper_args.insert(0, func)
- self.results.append(self.pool.apply_async(_func_wrapper, wrapper_args, kwds, callback))
-
- def close(self):
- self.pool.close()
+ self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
def join(self):
- self.pool.join()
- #for r in self.results:
- # # return values were already handled in the callbacks, but asking
- # # for them might raise exceptions which would otherwise be lost
- # r.get()
+ Pool.join(self)
+ for r in self.int_results:
+ # return values were already handled in the callbacks, but asking
+ # for them might raise exceptions which would otherwise be lost
+ self.results.append(r.get())
+
+ def overall_status(self):
+ # Return the highest of our status results
+ # This basically allows us to do sys.exit(overall_status()) and have us
+ # exit 0 if everything was good and non-zero if not
+ status = 0
+ for r in self.results:
+ if r[0] > status:
+ status = r[0]
+ return status
+
+__all__.append('DakProcessPool')
--- /dev/null
+#!/usr/bin/python
+
+from base_test import DakTestCase
+
+from daklib.dakmultiprocessing import DakProcessPool, \
+ PROC_STATUS_SUCCESS, PROC_STATUS_MISCFAILURE, \
+ PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED
+import signal
+
+def test_function(num, num2):
+ from os import kill, getpid
+
+ if num == 1:
+ sigs = [signal.SIGTERM, signal.SIGPIPE, signal.SIGALRM, signal.SIGHUP]
+ kill(getpid(), sigs[num2])
+
+ if num2 == 3:
+ raise Exception('Test uncaught exception handling')
+
+ if num == 0 and num2 == 1:
+ return (PROC_STATUS_MISCFAILURE, 'Test custom error return')
+
+ return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2))
+
+class DakProcessPoolTestCase(DakTestCase):
+ def testPool(self):
+ def alarm_handler(signum, frame):
+ raise AssertionError('Timed out')
+
+ # Shouldn't take us more than 15 seconds to run this test
+ signal.signal(signal.SIGALRM, alarm_handler)
+ signal.alarm(15)
+
+ p = DakProcessPool()
+ for s in range(3):
+ for j in range(4):
+ p.apply_async(test_function, [s, j])
+
+ p.close()
+ p.join()
+
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, signal.SIG_DFL)
+
+ expected = [(PROC_STATUS_SUCCESS, 'blah, 0, 0'),
+ (PROC_STATUS_MISCFAILURE, 'Test custom error return'),
+ (PROC_STATUS_SUCCESS, 'blah, 0, 2'),
+ (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling'),
+ (PROC_STATUS_SIGNALRAISED, 15),
+ (PROC_STATUS_SIGNALRAISED, 13),
+ (PROC_STATUS_SIGNALRAISED, 14),
+ (PROC_STATUS_SIGNALRAISED, 1),
+ (PROC_STATUS_SUCCESS, 'blah, 2, 0'),
+ (PROC_STATUS_SUCCESS, 'blah, 2, 1'),
+ (PROC_STATUS_SUCCESS, 'blah, 2, 2'),
+ (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling')]
+
+ self.assertEqual( len(p.results), len(expected) )
+
+ for r in range(len(p.results)):
+ self.assertEqual(p.results[r], expected[r])