3 from db_test import DBDakTestCase
5 from daklib.dbconn import DBConn
7 from multiprocessing import Pool
13 session = DBConn().session()
14 result = session.query('foo').from_statement('select 7 as foo').scalar()
19 class MultiProcTestCase(DBDakTestCase):
21 This TestCase checks that DBConn works with multiprocessing. A fresh
22 subprocess needs to call reset() on DBConn(). See function read_number()
26 def save_result(self, result):
31 Test apply_async() with a database session.
35 pool.apply_async(read_number, (), callback = self.save_result)
36 pool.apply_async(read_number, (), callback = self.save_result)
37 pool.apply_async(read_number, (), callback = self.save_result)
38 pool.apply_async(read_number, (), callback = self.save_result)
39 pool.apply_async(read_number, (), callback = self.save_result)
42 self.assertEqual(5 * 7, self.result)
44 if __name__ == '__main__':