]> git.decadent.org.uk Git - dak.git/blobdiff - tests/db_test.py
Test suite_name in test_upload()
[dak.git] / tests / db_test.py
index d95f0cbfde4d5fda34f7837d844e9d4c526a46e3..fc9ce893d61d4c25aef3f1adc269cb6c21c7dc43 100644 (file)
@@ -3,18 +3,13 @@ from base_test import DakTestCase, fixture
 from daklib.config import Config
 from daklib.dbconn import DBConn
 
-from sqlalchemy import create_engine, __version__
+from sqlalchemy import create_engine, func, __version__
 from sqlalchemy.exc import SADeprecationWarning
 from sqlalchemy.schema import DDL
 
 import pickle
 import warnings
 
-# suppress some deprecation warnings in squeeze related to sqlalchemy
-warnings.filterwarnings('ignore', \
-    "The SQLAlchemy PostgreSQL dialect has been renamed from 'postgres' to 'postgresql'.*", \
-    SADeprecationWarning)
-
 all_tables = ['architecture', 'archive', 'bin_associations', 'bin_contents',
     'binaries', 'binary_acl', 'binary_acl_map', 'build_queue', 'build_queue_files',
     'changes', 'changes_pending_binaries', 'changes_pending_files',
@@ -44,7 +39,9 @@ class DBDakTestCase(DakTestCase):
         for table in all_tables:
             self.execute(create_trigger % (table, table))
 
-    def setUp(self):
+    metadata = None
+
+    def initialize(self):
         cnf = Config()
         if cnf["DB::Name"] in ('backports', 'obscurity', 'projectb'):
             self.fail("You have configured an invalid database name: '%s'." % \
@@ -63,14 +60,27 @@ class DBDakTestCase(DakTestCase):
 
         pickle_filename = 'db-metadata-%s.pkl' % __version__
         pickle_file = open(fixture(pickle_filename), 'r')
-        self.metadata = pickle.load(pickle_file)
+        DBDakTestCase.metadata = pickle.load(pickle_file)
         self.metadata.ddl_listeners = pickle.load(pickle_file)
         pickle_file.close()
         self.metadata.bind = create_engine(connstr)
         self.metadata.create_all()
         self.create_all_triggers()
+
+    def setUp(self):
+        if self.metadata is None:
+            self.initialize()
         self.session = DBConn().session()
 
+    def now(self):
+        "returns the current time at the db server"
+
+        # we fetch a fresh session each time to avoid caching
+        local_session = DBConn().session()
+        current_time = local_session.query(func.now()).scalar()
+        local_session.close()
+        return current_time
+
     def classes_to_clean(self):
         """
         The function classes_to_clean() returns a list of classes. All objects
@@ -84,5 +94,6 @@ class DBDakTestCase(DakTestCase):
         for class_ in self.classes_to_clean():
             self.session.query(class_).delete()
         self.session.commit()
+        # usually there is no need to drop all tables here
         #self.metadata.drop_all()