X-Git-Url: https://git.decadent.org.uk/gitweb/?a=blobdiff_plain;f=tests%2Ftest_multiprocessing.py;fp=tests%2Ftest_multiprocessing.py;h=c67e51fb36fd6625c7d3981876ed0b3ad90eb9ec;hb=3b8862ae0e21fae9fc552d4df160f45684976d7d;hp=0000000000000000000000000000000000000000;hpb=3be5bc9fca7a4721d0f60563bf2545541c685adb;p=dak.git diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py new file mode 100755 index 00000000..c67e51fb --- /dev/null +++ b/tests/test_multiprocessing.py @@ -0,0 +1,61 @@ +#!/usr/bin/python + +from base_test import DakTestCase + +from daklib.dakmultiprocessing import DakProcessPool, \ + PROC_STATUS_SUCCESS, PROC_STATUS_MISCFAILURE, \ + PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED +import signal + +def test_function(num, num2): + from os import kill, getpid + + if num == 1: + sigs = [signal.SIGTERM, signal.SIGCHLD, signal.SIGALRM, signal.SIGHUP] + kill(getpid(), sigs[num2]) + + if num2 == 3: + raise Exception('Test uncaught exception handling') + + if num == 0 and num2 == 1: + return (PROC_STATUS_MISCFAILURE, 'Test custom error return') + + return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2)) + +class DakProcessPoolTestCase(DakTestCase): + def testPool(self): + def alarm_handler(signum, frame): + raise AssertionError('Timed out') + + # Shouldn't take us more than 15 seconds to run this test + signal.signal(signal.SIGALRM, alarm_handler) + signal.alarm(15) + + p = DakProcessPool() + for s in range(3): + for j in range(4): + p.apply_async(test_function, [s, j]) + + p.close() + p.join() + + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + + expected = [(PROC_STATUS_SUCCESS, 'blah, 0, 0'), + (PROC_STATUS_MISCFAILURE, 'Test custom error return'), + (PROC_STATUS_SUCCESS, 'blah, 0, 2'), + (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling'), + (PROC_STATUS_SIGNALRAISED, 15), + (PROC_STATUS_SIGNALRAISED, 17), + (PROC_STATUS_SIGNALRAISED, 14), + (PROC_STATUS_SIGNALRAISED, 1), + (PROC_STATUS_SUCCESS, 'blah, 2, 0'), + (PROC_STATUS_SUCCESS, 'blah, 2, 1'), + (PROC_STATUS_SUCCESS, 'blah, 2, 2'), + (PROC_STATUS_EXCEPTION, 'Test uncaught exception handling')] + + self.assertEqual( len(p.results), len(expected) ) + + for r in range(len(p.results)): + self.assertEqual(p.results[r], expected[r])