]> git.decadent.org.uk Git - dak.git/blob - tests/test_daklib_fstransactions.py
Merge remote-tracking branch 'jcristau/formatone-no-tar-sig'
[dak.git] / tests / test_daklib_fstransactions.py
1 #! /usr/bin/env python
2 #
3 # Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 from base_test import DakTestCase
20 from daklib.fstransactions import FilesystemTransaction
21
22 from unittest import main
23
24 import os
25 import shutil
26 import tempfile
27
28
29 class TemporaryDirectory:
30     def __init__(self):
31         self.directory = None
32     def __str__(self):
33         return self.directory
34     def filename(self, suffix):
35         return os.path.join(self.directory, suffix)
36     def __enter__(self):
37         self.directory = tempfile.mkdtemp()
38         return self
39     def __exit__(self, *args):
40         if self.directory is not None:
41             shutil.rmtree(self.directory)
42             self.directory = None
43         return None
44
45 class FilesystemTransactionTestCase(DakTestCase):
46     def _copy_a_b(self, tmp, fs, **kwargs):
47         fs.copy(tmp.filename('a'), tmp.filename('b'), **kwargs)
48
49     def _write_to_a(self, tmp):
50         with open(tmp.filename('a'), 'w') as fh:
51             print >>fh, 'a'
52
53     def test_copy_non_existing(self):
54         def copy():
55             with TemporaryDirectory() as t:
56                 with FilesystemTransaction() as fs:
57                     self._copy_a_b(t, fs)
58
59         self.assertRaises(IOError, copy)
60
61     def test_copy_existing_and_commit(self):
62         with TemporaryDirectory() as t:
63             self._write_to_a(t)
64
65             with FilesystemTransaction() as fs:
66                 self._copy_a_b(t, fs)
67                 self.assert_(os.path.exists(t.filename('a')))
68                 self.assert_(os.path.exists(t.filename('b')))
69
70             self.assert_(os.path.exists(t.filename('a')))
71             self.assert_(os.path.exists(t.filename('b')))
72
73     def test_copy_existing_and_rollback(self):
74         with TemporaryDirectory() as t:
75             self._write_to_a(t)
76
77             class TestException(Exception):
78                 pass
79             try:
80                 with FilesystemTransaction() as fs:
81                     self._copy_a_b(t, fs)
82                     self.assert_(os.path.exists(t.filename('a')))
83                     self.assert_(os.path.exists(t.filename('b')))
84                     raise TestException()
85             except TestException:
86                 pass
87
88             self.assert_(os.path.exists(t.filename('a')))
89             self.assert_(not os.path.exists(t.filename('b')))
90
91     def test_unlink_and_commit(self):
92         with TemporaryDirectory() as t:
93             self._write_to_a(t)
94             a = t.filename('a')
95             with FilesystemTransaction() as fs:
96                 self.assert_(os.path.exists(a))
97                 fs.unlink(a)
98                 self.assert_(not os.path.exists(a))
99             self.assert_(not os.path.exists(a))
100
101     def test_unlink_and_rollback(self):
102         with TemporaryDirectory() as t:
103             self._write_to_a(t)
104             a = t.filename('a')
105             class TestException(Exception):
106                 pass
107
108             try:
109                 with FilesystemTransaction() as fs:
110                     self.assert_(os.path.exists(a))
111                     fs.unlink(a)
112                     self.assert_(not os.path.exists(a))
113                     raise TestException()
114             except TestException:
115                 pass
116             self.assert_(os.path.exists(a))
117
118 if __name__ == '__main__':
119     main()