--- /dev/null
+# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Transactions for filesystem actions
+"""
+
+import errno
+import os
+import shutil
+
+class _FilesystemAction(object):
+ @property
+ def temporary_name(self):
+ raise NotImplementedError()
+
+ def check_for_temporary(self):
+ try:
+ if os.path.exists(self.temporary_name):
+ raise IOError("Temporary file '{0}' already exists.".format(self.temporary_name))
+ except NotImplementedError:
+ pass
+
+class _FilesystemCopyAction(_FilesystemAction):
+ def __init__(self, source, destination, link=True, symlink=False, mode=None):
+ self.destination = destination
+ self.need_cleanup = False
+
+ self.check_for_temporary()
+ destdir = os.path.dirname(self.destination)
+ if not os.path.exists(destdir):
+ os.makedirs(destdir, 0o2775)
+ if symlink:
+ os.symlink(source, self.destination)
+ elif link:
+ try:
+ os.link(source, self.destination)
+ except OSError:
+ shutil.copy2(source, self.destination)
+ else:
+ shutil.copy2(source, self.destination)
+
+ self.need_cleanup = True
+ if mode is not None:
+ os.chmod(self.destination, mode)
+
+ @property
+ def temporary_name(self):
+ return self.destination
+
+ def commit(self):
+ pass
+
+ def rollback(self):
+ if self.need_cleanup:
+ os.unlink(self.destination)
+ self.need_cleanup = False
+
+class _FilesystemUnlinkAction(_FilesystemAction):
+ def __init__(self, path):
+ self.path = path
+ self.need_cleanup = False
+
+ self.check_for_temporary()
+ os.rename(self.path, self.temporary_name)
+ self.need_cleanup = True
+
+ @property
+ def temporary_name(self):
+ return "{0}.dak-rm".format(self.path)
+
+ def commit(self):
+ if self.need_cleanup:
+ os.unlink(self.temporary_name)
+ self.need_cleanup = False
+
+ def rollback(self):
+ if self.need_cleanup:
+ os.rename(self.temporary_name, self.path)
+ self.need_cleanup = False
+
+class _FilesystemCreateAction(_FilesystemAction):
+ def __init__(self, path):
+ self.path = path
+ self.need_cleanup = True
+
+ @property
+ def temporary_name(self):
+ return self.path
+
+ def commit(self):
+ pass
+
+ def rollback(self):
+ if self.need_cleanup:
+ os.unlink(self.path)
+ self.need_cleanup = False
+
+class FilesystemTransaction(object):
+ """transactions for filesystem actions"""
+ def __init__(self):
+ self.actions = []
+
+ def copy(self, source, destination, link=True, symlink=False, mode=None):
+ """copy `source` to `destination`
+
+ Args:
+ source (str): source file
+ destination (str): destination file
+
+ Kwargs:
+ link (bool): Try hardlinking, falling back to copying.
+ symlink (bool): Create a symlink instead
+ mode (int): Permissions to change `destination` to.
+ """
+ self.actions.append(_FilesystemCopyAction(source, destination, link=link, symlink=symlink, mode=mode))
+
+ def move(self, source, destination, mode=None):
+ """move `source` to `destination`
+
+ Args:
+ source (str): source file
+ destination (str): destination file
+
+ Kwargs:
+ mode (int): Permissions to change `destination` to.
+ """
+ self.copy(source, destination, link=True, mode=mode)
+ self.unlink(source)
+
+ def unlink(self, path):
+ """unlink `path`
+
+ Args:
+ path (str): file to unlink
+ """
+ self.actions.append(_FilesystemUnlinkAction(path))
+
+ def create(self, path, mode=None):
+ """create `filename` and return file handle
+
+ Args:
+ filename (str): file to create
+
+ Kwargs:
+ mode (int): Permissions for the new file
+
+ Returns:
+ file handle of the new file
+ """
+ destdir = os.path.dirname(path)
+ if not os.path.exists(destdir):
+ os.makedirs(destdir, 0o2775)
+ if os.path.exists(path):
+ raise IOError("File '{0}' already exists.".format(path))
+ fh = open(path, 'w')
+ self.actions.append(_FilesystemCreateAction(path))
+ if mode is not None:
+ os.chmod(path, mode)
+ return fh
+
+ def commit(self):
+ """Commit all recorded actions."""
+ try:
+ for action in self.actions:
+ action.commit()
+ except:
+ self.rollback()
+ raise
+ finally:
+ self.actions = []
+
+ def rollback(self):
+ """Undo all recorded actions."""
+ try:
+ for action in self.actions:
+ action.rollback()
+ finally:
+ self.actions = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ if type is None:
+ self.commit()
+ else:
+ self.rollback()
+ return None
--- /dev/null
+#! /usr/bin/env python
+#
+# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from base_test import DakTestCase
+from daklib.fstransactions import FilesystemTransaction
+
+from unittest import main
+
+import os
+import shutil
+import tempfile
+
+
+class TemporaryDirectory:
+ def __init__(self):
+ self.directory = None
+ def __str__(self):
+ return self.directory
+ def filename(self, suffix):
+ return os.path.join(self.directory, suffix)
+ def __enter__(self):
+ self.directory = tempfile.mkdtemp()
+ return self
+ def __exit__(self, *args):
+ if self.directory is not None:
+ shutil.rmtree(self.directory)
+ self.directory = None
+ return None
+
+class FilesystemTransactionTestCase(DakTestCase):
+ def _copy_a_b(self, tmp, fs, **kwargs):
+ fs.copy(tmp.filename('a'), tmp.filename('b'), **kwargs)
+
+ def _write_to_a(self, tmp):
+ with open(tmp.filename('a'), 'w') as fh:
+ print >>fh, 'a'
+
+ def test_copy_non_existing(self):
+ def copy():
+ with TemporaryDirectory() as t:
+ with FilesystemTransaction() as fs:
+ self._copy_a_b(t, fs)
+
+ self.assertRaises(IOError, copy)
+
+ def test_copy_existing_and_commit(self):
+ with TemporaryDirectory() as t:
+ self._write_to_a(t)
+
+ with FilesystemTransaction() as fs:
+ self._copy_a_b(t, fs)
+ self.assert_(os.path.exists(t.filename('a')))
+ self.assert_(os.path.exists(t.filename('b')))
+
+ self.assert_(os.path.exists(t.filename('a')))
+ self.assert_(os.path.exists(t.filename('b')))
+
+ def test_copy_existing_and_rollback(self):
+ with TemporaryDirectory() as t:
+ self._write_to_a(t)
+
+ class TestException(Exception):
+ pass
+ try:
+ with FilesystemTransaction() as fs:
+ self._copy_a_b(t, fs)
+ self.assert_(os.path.exists(t.filename('a')))
+ self.assert_(os.path.exists(t.filename('b')))
+ raise TestException()
+ except TestException:
+ pass
+
+ self.assert_(os.path.exists(t.filename('a')))
+ self.assert_(not os.path.exists(t.filename('b')))
+
+ def test_unlink_and_commit(self):
+ with TemporaryDirectory() as t:
+ self._write_to_a(t)
+ a = t.filename('a')
+ with FilesystemTransaction() as fs:
+ self.assert_(os.path.exists(a))
+ fs.unlink(a)
+ self.assert_(not os.path.exists(a))
+ self.assert_(not os.path.exists(a))
+
+ def test_unlink_and_rollback(self):
+ with TemporaryDirectory() as t:
+ self._write_to_a(t)
+ a = t.filename('a')
+ class TestException(Exception):
+ pass
+
+ try:
+ with FilesystemTransaction() as fs:
+ self.assert_(os.path.exists(a))
+ fs.unlink(a)
+ self.assert_(not os.path.exists(a))
+ raise TestException()
+ except TestException:
+ pass
+ self.assert_(os.path.exists(a))
+
+if __name__ == '__main__':
+ main()