]> git.decadent.org.uk Git - dak.git/blobdiff - tests/test_multiprocessing.py
Merge remote branch 'mhy/multiproc' into merge
[dak.git] / tests / test_multiprocessing.py
diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py
new file mode 100755 (executable)
index 0000000..595032f
--- /dev/null
@@ -0,0 +1,61 @@
+#!/usr/bin/python
+
+from base_test import DakTestCase
+
+from daklib.dakmultiprocessing import DakProcessPool, \
+                                      PROC_STATUS_SUCCESS,   PROC_STATUS_MISCFAILURE, \
+                                      PROC_STATUS_EXCEPTION, PROC_STATUS_SIGNALRAISED
+import signal
+
+def test_function(num, num2):
+    from os import kill, getpid
+
+    if num == 1:
+        sigs = [signal.SIGTERM, signal.SIGPIPE, signal.SIGALRM, signal.SIGHUP]
+        kill(getpid(), sigs[num2])
+
+    if num2 == 3:
+        raise Exception('Test uncaught exception handling')
+
+    if num == 0 and num2 == 1:
+        return (PROC_STATUS_MISCFAILURE, 'Test custom error return')
+
+    return (PROC_STATUS_SUCCESS, 'blah, %d, %d' % (num, num2))
+
+class DakProcessPoolTestCase(DakTestCase):
+    def testPool(self):
+        def alarm_handler(signum, frame):
+            raise AssertionError('Timed out')
+
+        # Shouldn't take us more than 15 seconds to run this test
+        signal.signal(signal.SIGALRM, alarm_handler)
+        signal.alarm(15)
+
+        p = DakProcessPool()
+        for s in range(3):
+            for j in range(4):
+                p.apply_async(test_function, [s, j])
+
+        p.close()
+        p.join()
+
+        signal.alarm(0)
+        signal.signal(signal.SIGALRM, signal.SIG_DFL)
+
+        expected = [(PROC_STATUS_SUCCESS,      'blah, 0, 0'),
+                    (PROC_STATUS_MISCFAILURE,  'Test custom error return'),
+                    (PROC_STATUS_SUCCESS,      'blah, 0, 2'),
+                    (PROC_STATUS_EXCEPTION,    'Test uncaught exception handling'),
+                    (PROC_STATUS_SIGNALRAISED, 15),
+                    (PROC_STATUS_SIGNALRAISED, 13),
+                    (PROC_STATUS_SIGNALRAISED, 14),
+                    (PROC_STATUS_SIGNALRAISED, 1),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 0'),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 1'),
+                    (PROC_STATUS_SUCCESS,      'blah, 2, 2'),
+                    (PROC_STATUS_EXCEPTION,    'Test uncaught exception handling')]
+
+        self.assertEqual( len(p.results), len(expected) )
+
+        for r in range(len(p.results)):
+            self.assertEqual(p.results[r], expected[r])