提交 4d8b174b 编写于 作者: A Ansgar Burchardt

add module for filesystem transactions

daklib.fstransactions allows to copy, remove, and move files around in
transactions, that is all operations will succeed or be reverted.
上级 04c332f3
# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Transactions for filesystem actions
"""
import errno
import os
import shutil
class _FilesystemAction(object):
@property
def temporary_name(self):
raise NotImplementedError()
def check_for_temporary(self):
try:
if os.path.exists(self.temporary_name):
raise IOError("Temporary file '{0}' already exists.".format(self.temporary_name))
except NotImplementedError:
pass
class _FilesystemCopyAction(_FilesystemAction):
def __init__(self, source, destination, link=True, symlink=False, mode=None):
self.destination = destination
self.need_cleanup = False
self.check_for_temporary()
destdir = os.path.dirname(self.destination)
if not os.path.exists(destdir):
os.makedirs(destdir, 0o2775)
if symlink:
os.symlink(source, self.destination)
elif link:
try:
os.link(source, self.destination)
except OSError:
shutil.copy2(source, self.destination)
else:
shutil.copy2(source, self.destination)
self.need_cleanup = True
if mode is not None:
os.chmod(self.destination, mode)
@property
def temporary_name(self):
return self.destination
def commit(self):
pass
def rollback(self):
if self.need_cleanup:
os.unlink(self.destination)
self.need_cleanup = False
class _FilesystemUnlinkAction(_FilesystemAction):
def __init__(self, path):
self.path = path
self.need_cleanup = False
self.check_for_temporary()
os.rename(self.path, self.temporary_name)
self.need_cleanup = True
@property
def temporary_name(self):
return "{0}.dak-rm".format(self.path)
def commit(self):
if self.need_cleanup:
os.unlink(self.temporary_name)
self.need_cleanup = False
def rollback(self):
if self.need_cleanup:
os.rename(self.temporary_name, self.path)
self.need_cleanup = False
class _FilesystemCreateAction(_FilesystemAction):
def __init__(self, path):
self.path = path
self.need_cleanup = True
@property
def temporary_name(self):
return self.path
def commit(self):
pass
def rollback(self):
if self.need_cleanup:
os.unlink(self.path)
self.need_cleanup = False
class FilesystemTransaction(object):
"""transactions for filesystem actions"""
def __init__(self):
self.actions = []
def copy(self, source, destination, link=True, symlink=False, mode=None):
"""copy `source` to `destination`
Args:
source (str): source file
destination (str): destination file
Kwargs:
link (bool): Try hardlinking, falling back to copying.
symlink (bool): Create a symlink instead
mode (int): Permissions to change `destination` to.
"""
self.actions.append(_FilesystemCopyAction(source, destination, link=link, symlink=symlink, mode=mode))
def move(self, source, destination, mode=None):
"""move `source` to `destination`
Args:
source (str): source file
destination (str): destination file
Kwargs:
mode (int): Permissions to change `destination` to.
"""
self.copy(source, destination, link=True, mode=mode)
self.unlink(source)
def unlink(self, path):
"""unlink `path`
Args:
path (str): file to unlink
"""
self.actions.append(_FilesystemUnlinkAction(path))
def create(self, path, mode=None):
"""create `filename` and return file handle
Args:
filename (str): file to create
Kwargs:
mode (int): Permissions for the new file
Returns:
file handle of the new file
"""
destdir = os.path.dirname(path)
if not os.path.exists(destdir):
os.makedirs(destdir, 0o2775)
if os.path.exists(path):
raise IOError("File '{0}' already exists.".format(path))
fh = open(path, 'w')
self.actions.append(_FilesystemCreateAction(path))
if mode is not None:
os.chmod(path, mode)
return fh
def commit(self):
"""Commit all recorded actions."""
try:
for action in self.actions:
action.commit()
except:
self.rollback()
raise
finally:
self.actions = []
def rollback(self):
"""Undo all recorded actions."""
try:
for action in self.actions:
action.rollback()
finally:
self.actions = []
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if type is None:
self.commit()
else:
self.rollback()
return None
#! /usr/bin/env python
#
# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from base_test import DakTestCase
from daklib.fstransactions import FilesystemTransaction
from unittest import main
import os
import shutil
import tempfile
class TemporaryDirectory:
def __init__(self):
self.directory = None
def __str__(self):
return self.directory
def filename(self, suffix):
return os.path.join(self.directory, suffix)
def __enter__(self):
self.directory = tempfile.mkdtemp()
return self
def __exit__(self, *args):
if self.directory is not None:
shutil.rmtree(self.directory)
self.directory = None
return None
class FilesystemTransactionTestCase(DakTestCase):
def _copy_a_b(self, tmp, fs, **kwargs):
fs.copy(tmp.filename('a'), tmp.filename('b'), **kwargs)
def _write_to_a(self, tmp):
with open(tmp.filename('a'), 'w') as fh:
print >>fh, 'a'
def test_copy_non_existing(self):
def copy():
with TemporaryDirectory() as t:
with FilesystemTransaction() as fs:
self._copy_a_b(t, fs)
self.assertRaises(IOError, copy)
def test_copy_existing_and_commit(self):
with TemporaryDirectory() as t:
self._write_to_a(t)
with FilesystemTransaction() as fs:
self._copy_a_b(t, fs)
self.assert_(os.path.exists(t.filename('a')))
self.assert_(os.path.exists(t.filename('b')))
self.assert_(os.path.exists(t.filename('a')))
self.assert_(os.path.exists(t.filename('b')))
def test_copy_existing_and_rollback(self):
with TemporaryDirectory() as t:
self._write_to_a(t)
class TestException(Exception):
pass
try:
with FilesystemTransaction() as fs:
self._copy_a_b(t, fs)
self.assert_(os.path.exists(t.filename('a')))
self.assert_(os.path.exists(t.filename('b')))
raise TestException()
except TestException:
pass
self.assert_(os.path.exists(t.filename('a')))
self.assert_(not os.path.exists(t.filename('b')))
def test_unlink_and_commit(self):
with TemporaryDirectory() as t:
self._write_to_a(t)
a = t.filename('a')
with FilesystemTransaction() as fs:
self.assert_(os.path.exists(a))
fs.unlink(a)
self.assert_(not os.path.exists(a))
self.assert_(not os.path.exists(a))
def test_unlink_and_rollback(self):
with TemporaryDirectory() as t:
self._write_to_a(t)
a = t.filename('a')
class TestException(Exception):
pass
try:
with FilesystemTransaction() as fs:
self.assert_(os.path.exists(a))
fs.unlink(a)
self.assert_(not os.path.exists(a))
raise TestException()
except TestException:
pass
self.assert_(os.path.exists(a))
if __name__ == '__main__':
main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册