from sqlalchemy import create_engine, __version__
from sqlalchemy.exc import SADeprecationWarning
+from sqlalchemy.schema import DDL
import pickle
import warnings
"The SQLAlchemy PostgreSQL dialect has been renamed from 'postgres' to 'postgresql'.*", \
SADeprecationWarning)
+all_tables = ['architecture', 'archive', 'bin_associations', 'bin_contents',
+ 'binaries', 'binary_acl', 'binary_acl_map', 'build_queue', 'build_queue_files',
+ 'changes', 'changes_pending_binaries', 'changes_pending_files',
+ 'changes_pending_files_map', 'changes_pending_source',
+ 'changes_pending_source_files', 'changes_pool_files', 'component', 'config',
+ 'dsc_files', 'files', 'fingerprint', 'keyring_acl_map', 'keyrings', 'location',
+ 'maintainer', 'new_comments', 'override', 'override_type', 'policy_queue',
+ 'priority', 'section', 'source', 'source_acl', 'src_associations',
+ 'src_format', 'src_uploaders', 'suite', 'suite_architectures',
+ 'suite_build_queue_copy', 'suite_src_formats', 'uid', 'upload_blocks']
+
+drop_plpgsql = "DROP LANGUAGE IF EXISTS plpgsql CASCADE"
+create_plpgsql = "CREATE LANGUAGE plpgsql"
+create_function = """CREATE OR REPLACE FUNCTION tfunc_set_modified() RETURNS trigger AS $$
+ BEGIN NEW.modified = now(); return NEW; END;
+ $$ LANGUAGE 'plpgsql'"""
+create_trigger = """CREATE TRIGGER modified_%s BEFORE UPDATE ON %s
+ FOR EACH ROW EXECUTE PROCEDURE tfunc_set_modified()"""
+
class DBDakTestCase(DakTestCase):
+ def execute(self, statement):
+ DDL(statement).execute(self.metadata.bind)
+
+ def create_all_triggers(self):
+ for statement in (drop_plpgsql, create_plpgsql, create_function):
+ self.execute(statement)
+ for table in all_tables:
+ self.execute(create_trigger % (table, table))
+
def setUp(self):
cnf = Config()
if cnf["DB::Name"] in ('backports', 'obscurity', 'projectb'):
pickle_file.close()
self.metadata.bind = create_engine(connstr)
self.metadata.create_all()
+ self.create_all_triggers()
self.session = DBConn().session()
+ def classes_to_clean(self):
+ """
+ The function classes_to_clean() returns a list of classes. All objects
+ of each class will be deleted from the database in tearDown(). This
+ function should be overridden in derived test cases as needed.
+ """
+ return ()
+
def tearDown(self):
- self.session.close()
+ self.session.rollback()
+ for class_ in self.classes_to_clean():
+ self.session.query(class_).delete()
+ self.session.commit()
#self.metadata.drop_all()
--- /dev/null
+#!/usr/bin/env python
+
+from db_test import DBDakTestCase
+
+from daklib.dbconn import DBConn, Uid
+
+import time
+import unittest
+
+class TimestampTestCase(DBDakTestCase):
+ """
+ TimestampTestCase checks that the timestamps created and modified are
+ working correctly.
+
+ TODO: Should we check all tables?
+ """
+
+ def now(self):
+ local_session = DBConn().session()
+ query = local_session.query('now').from_statement('select now() as now')
+ local_session.close()
+ return query.one().now
+
+ def sleep(self):
+ time.sleep(0.001)
+
+ def test_timestamps(self):
+ timestamp01 = self.now()
+ self.sleep()
+ uid = Uid(uid = 'ftp-master@debian.org')
+ self.session.add(uid)
+ self.session.commit()
+ created01 = uid.created
+ modified01 = uid.modified
+ self.sleep()
+ timestamp02 = self.now()
+ self.assertTrue(timestamp01 < created01)
+ self.assertTrue(timestamp01 < modified01)
+ self.assertTrue(created01 < timestamp02)
+ self.assertTrue(modified01 < timestamp02)
+ self.sleep()
+ uid.name = 'ftp team'
+ self.session.commit()
+ created02 = uid.created
+ modified02 = uid.modified
+ self.assertEqual(created01, created02)
+ self.assertTrue(modified01 < modified02)
+ self.sleep()
+ timestamp03 = self.now()
+ self.assertTrue(modified02 < timestamp03)
+
+ def classes_to_clean(self):
+ return (Uid,)
+
+if __name__ == '__main__':
+ unittest.main()