]> git.decadent.org.uk Git - dak.git/blobdiff - tests/db_test.py
Add workaround for SQLAlchemy 0.5.
[dak.git] / tests / db_test.py
index d7f906cd34a147ad79916b283c0122be0761e34a..fc9ce893d61d4c25aef3f1adc269cb6c21c7dc43 100644 (file)
@@ -3,7 +3,7 @@ from base_test import DakTestCase, fixture
 from daklib.config import Config
 from daklib.dbconn import DBConn
 
-from sqlalchemy import create_engine, __version__
+from sqlalchemy import create_engine, func, __version__
 from sqlalchemy.exc import SADeprecationWarning
 from sqlalchemy.schema import DDL
 
@@ -39,7 +39,9 @@ class DBDakTestCase(DakTestCase):
         for table in all_tables:
             self.execute(create_trigger % (table, table))
 
-    def setUp(self):
+    metadata = None
+
+    def initialize(self):
         cnf = Config()
         if cnf["DB::Name"] in ('backports', 'obscurity', 'projectb'):
             self.fail("You have configured an invalid database name: '%s'." % \
@@ -58,14 +60,27 @@ class DBDakTestCase(DakTestCase):
 
         pickle_filename = 'db-metadata-%s.pkl' % __version__
         pickle_file = open(fixture(pickle_filename), 'r')
-        self.metadata = pickle.load(pickle_file)
+        DBDakTestCase.metadata = pickle.load(pickle_file)
         self.metadata.ddl_listeners = pickle.load(pickle_file)
         pickle_file.close()
         self.metadata.bind = create_engine(connstr)
         self.metadata.create_all()
         self.create_all_triggers()
+
+    def setUp(self):
+        if self.metadata is None:
+            self.initialize()
         self.session = DBConn().session()
 
+    def now(self):
+        "returns the current time at the db server"
+
+        # we fetch a fresh session each time to avoid caching
+        local_session = DBConn().session()
+        current_time = local_session.query(func.now()).scalar()
+        local_session.close()
+        return current_time
+
     def classes_to_clean(self):
         """
         The function classes_to_clean() returns a list of classes. All objects
@@ -79,5 +94,6 @@ class DBDakTestCase(DakTestCase):
         for class_ in self.classes_to_clean():
             self.session.query(class_).delete()
         self.session.commit()
+        # usually there is no need to drop all tables here
         #self.metadata.drop_all()