未验证 提交 888a7ab3 编写于 作者: C Caio Carrara

Merge remote-tracking branch 'clebergnu/iso9660_capabilities_create_write_v2'

Signed-off-by: NCaio Carrara <ccarrara@redhat.com>
......@@ -29,6 +29,7 @@ import logging
import os
import re
import shutil
import string
import sys
import tempfile
......@@ -391,14 +392,67 @@ class ISO9660PyCDLib(BaseIso9660):
This implementation is based on the pycdlib library
"""
#: Default flags used when creating a new ISO image
DEFAULT_CREATE_FLAGS = {"interchange_level": 3,
"joliet": 3}
def __init__(self, path):
if not has_pycdlib():
raise RuntimeError('This class requires the pycdlib library')
self._iso = pycdlib.PyCdlib()
self._iso.open(path)
self._iso_closed = False
self._path = path
self._iso = None
self._iso_opened_for_create = False
def _open_for_read(self):
if self._iso is None:
self._iso = pycdlib.PyCdlib()
self._iso.open(self._path)
def create(self, flags=None):
"""
Creates a new ISO image
:param flags: the flags used when creating a new image
:type flags: dict
"""
if self._iso is None:
self._iso = pycdlib.PyCdlib()
if flags is None:
flags = self.DEFAULT_CREATE_FLAGS
self._iso.new(**flags)
self._iso_opened_for_create = True
@staticmethod
def _get_iso_path(path):
iso_path = "".join([c for c in path
if c in (string.ascii_letters + string.digits)])
iso_path = iso_path[:7].upper() + ";"
if not os.path.isabs(iso_path):
iso_path = '/' + iso_path[:6] + ";"
return iso_path
@staticmethod
def _get_abs_path(path):
if not os.path.isabs(path):
path = '/' + path
return path
def write(self, path, content):
"""
Writes a new file into the ISO image
:param path: the path of the new file inside the ISO image
:type path: str
:param content: the content of the new file
:type path: bytes
"""
self.create()
self._iso.add_fp(io.BytesIO(content), len(content),
iso_path=self._get_iso_path(path),
joliet_path=self._get_abs_path(path))
def read(self, path):
self._open_for_read()
if not os.path.isabs(path):
path = '/' + path
data = io.BytesIO()
......@@ -406,17 +460,20 @@ class ISO9660PyCDLib(BaseIso9660):
return data.getvalue()
def copy(self, src, dst):
self._open_for_read()
if not os.path.isabs(src):
src = '/' + src
self._iso.get_file_from_iso(dst, joliet_path=src)
def close(self):
if not self._iso_closed:
if self._iso:
if self._iso_opened_for_create:
self._iso.write(self._path)
self._iso.close()
self._iso_closed = True
self._iso = None
def iso9660(path):
def iso9660(path, capabilities=None):
"""
Checks the available tools on a system and chooses class accordingly
......@@ -425,16 +482,32 @@ def iso9660(path):
:param path: path to an iso9660 image file
:type path: str
:param capabilities: list of specific capabilities that are
required for the selected implementation,
such as "read", "copy" and "mnt_dir".
:type capabilities: list
:return: an instance of any iso9660 capable tool
:rtype: :class:`Iso9660IsoInfo`, :class:`Iso9660IsoRead`,
:class:`Iso9660Mount`, :class:`ISO9660PyCDLib` or None
"""
implementations = [('pycdlib', has_pycdlib, ISO9660PyCDLib),
('isoinfo', has_isoinfo, Iso9660IsoInfo),
('iso-read', has_isoread, Iso9660IsoRead),
('mount', can_mount, Iso9660Mount)]
# all implementations so far have these base capabilities
common_capabilities = ["read", "copy", "mnt_dir"]
implementations = [('pycdlib', has_pycdlib, ISO9660PyCDLib,
common_capabilities + ["create", "write"]),
('isoinfo', has_isoinfo, Iso9660IsoInfo,
common_capabilities),
('iso-read', has_isoread, Iso9660IsoRead,
common_capabilities),
('mount', can_mount, Iso9660Mount,
common_capabilities)]
for (name, check, klass) in implementations:
for (name, check, klass, cap) in implementations:
if capabilities is not None and not set(capabilities).issubset(cap):
continue
if check():
logging.debug('Automatically chosen class for iso9660: %s', name)
return klass(path)
......
......@@ -6,9 +6,40 @@ import shutil
import tempfile
import unittest
try:
from unittest import mock
except ImportError:
import mock
from avocado.utils import iso9660, process
class Capabilities(unittest.TestCase):
def setUp(self):
self.iso_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir, ".data",
"sample.iso"))
@mock.patch('avocado.utils.iso9660.has_pycdlib', return_value=True)
def test_capabilities_pycdlib(self, has_pycdlib_mocked):
instance = iso9660.iso9660(self.iso_path, ['read', 'create', 'write'])
self.assertIsInstance(instance, iso9660.ISO9660PyCDLib)
@mock.patch('avocado.utils.iso9660.has_pycdlib', return_value=False)
@mock.patch('avocado.utils.iso9660.has_isoinfo', return_value=False)
@mock.patch('avocado.utils.iso9660.has_isoread', return_value=False)
@mock.patch('avocado.utils.iso9660.can_mount', return_value=False)
def test_capabilities_nobackend(self, has_pycdlib_mocked, has_isoinfo_mocked,
has_isoread_mocked, can_mount_mocked):
self.assertIsNone(iso9660.iso9660(self.iso_path, ['read']))
def test_non_existing_capabilities(self):
self.assertIsNone(iso9660.iso9660(self.iso_path,
['non-existing', 'capabilities']))
class BaseIso9660(unittest.TestCase):
"""
......@@ -142,6 +173,18 @@ class PyCDLib(BaseIso9660):
"""Call the basic workflow"""
self.basic_workflow()
def test_create_write(self):
new_iso_path = os.path.join(self.tmpdir, 'new.iso')
new_iso = iso9660.ISO9660PyCDLib(new_iso_path)
new_iso.create()
content = b"AVOCADO"
for path in ("README", "/readme", "readme.txt", "quite-long-readme.txt"):
new_iso.write(path, content)
new_iso.close()
read_iso = iso9660.ISO9660PyCDLib(new_iso_path)
self.assertEqual(read_iso.read(path), content)
self.assertTrue(os.path.isfile(new_iso_path))
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册