1 # Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 """Transactions for filesystem actions
24 class _FilesystemAction(object):
26 def temporary_name(self):
27 raise NotImplementedError()
29 def check_for_temporary(self):
31 if os.path.exists(self.temporary_name):
32 raise IOError("Temporary file '{0}' already exists.".format(self.temporary_name))
33 except NotImplementedError:
36 class _FilesystemCopyAction(_FilesystemAction):
37 def __init__(self, source, destination, link=True, symlink=False, mode=None):
38 self.destination = destination
39 self.need_cleanup = False
41 self.check_for_temporary()
42 destdir = os.path.dirname(self.destination)
43 if not os.path.exists(destdir):
44 os.makedirs(destdir, 0o2775)
46 os.symlink(source, self.destination)
49 os.link(source, self.destination)
51 shutil.copy2(source, self.destination)
53 shutil.copy2(source, self.destination)
55 self.need_cleanup = True
57 os.chmod(self.destination, mode)
60 def temporary_name(self):
61 return self.destination
68 os.unlink(self.destination)
69 self.need_cleanup = False
71 class _FilesystemUnlinkAction(_FilesystemAction):
72 def __init__(self, path):
74 self.need_cleanup = False
76 self.check_for_temporary()
77 os.rename(self.path, self.temporary_name)
78 self.need_cleanup = True
81 def temporary_name(self):
82 return "{0}.dak-rm".format(self.path)
86 os.unlink(self.temporary_name)
87 self.need_cleanup = False
91 os.rename(self.temporary_name, self.path)
92 self.need_cleanup = False
94 class _FilesystemCreateAction(_FilesystemAction):
95 def __init__(self, path):
97 self.need_cleanup = True
100 def temporary_name(self):
107 if self.need_cleanup:
109 self.need_cleanup = False
111 class FilesystemTransaction(object):
112 """transactions for filesystem actions"""
116 def copy(self, source, destination, link=True, symlink=False, mode=None):
117 """copy `source` to `destination`
120 source (str): source file
121 destination (str): destination file
124 link (bool): Try hardlinking, falling back to copying.
125 symlink (bool): Create a symlink instead
126 mode (int): Permissions to change `destination` to.
128 self.actions.append(_FilesystemCopyAction(source, destination, link=link, symlink=symlink, mode=mode))
130 def move(self, source, destination, mode=None):
131 """move `source` to `destination`
134 source (str): source file
135 destination (str): destination file
138 mode (int): Permissions to change `destination` to.
140 self.copy(source, destination, link=True, mode=mode)
143 def unlink(self, path):
147 path (str): file to unlink
149 self.actions.append(_FilesystemUnlinkAction(path))
151 def create(self, path, mode=None):
152 """create `filename` and return file handle
155 filename (str): file to create
158 mode (int): Permissions for the new file
161 file handle of the new file
163 destdir = os.path.dirname(path)
164 if not os.path.exists(destdir):
165 os.makedirs(destdir, 0o2775)
166 if os.path.exists(path):
167 raise IOError("File '{0}' already exists.".format(path))
169 self.actions.append(_FilesystemCreateAction(path))
175 """Commit all recorded actions."""
177 for action in self.actions:
186 """Undo all recorded actions."""
188 for action in self.actions:
196 def __exit__(self, type, value, traceback):