提交 dbe906c4 编写于 作者: A Amador Pahim 提交者: GitHub

Merge pull request #1525 from clebergnu/iso9660_3

avocado.utils.iso9660: Several improvements and cleanups [v3]
......@@ -43,9 +43,8 @@ def has_userland_tool(executable):
if os.path.isabs(executable):
return os.path.exists(executable)
else:
for d in os.environ['PATH'].split(':'):
f = os.path.join(d, executable)
if os.path.exists(f):
for path in os.environ['PATH'].split(':'):
if os.path.exists(os.path.join(path, executable)):
return True
return False
......@@ -82,8 +81,9 @@ def can_mount():
:rtype: bool
"""
if os.getuid() != 0:
logging.debug('Can not use mount: current user is not "root"')
if not process.can_sudo():
logging.debug('Can not use mount: current user is not "root" and'
"sudo is not configured.")
return False
if not has_userland_tool('mount'):
......@@ -91,8 +91,10 @@ def can_mount():
return False
if 'iso9660' not in open('/proc/filesystems').read():
logging.debug('Can not use mount: lack of iso9660 kernel support')
return False
process.system("modprobe iso9660", ignore_status=True, sudo=True)
if 'iso9660' not in open('/proc/filesystems').read():
logging.debug('Can not use mount: lack of iso9660 kernel support')
return False
return True
......@@ -151,6 +153,12 @@ class BaseIso9660(object):
output.write(content)
output.close()
def mnt_dir(self):
"""
Returns a path to the browsable content of the iso
"""
raise NotImplementedError
def close(self):
"""
Cleanup and free any resources being used
......@@ -160,7 +168,38 @@ class BaseIso9660(object):
pass
class Iso9660IsoInfo(BaseIso9660):
class MixInMntDirMount(object):
"""
Mix in class which defines `mnt_dir` property and instantiates the
Iso9660Mount class to provide one. It requires `self.path` to store
path to the target iso file.
"""
_mount_instance = None
path = None
@property
def mnt_dir(self):
"""
Returns a path to the browsable content of the iso
"""
if self._mount_instance is None:
if not self.path:
raise RuntimeError("Path to iso image not available: %s"
% self.path)
self._mount_instance = Iso9660Mount(self.path)
return self._mount_instance.mnt_dir
def close(self):
"""
Cleanups and frees any resources being used
"""
super(MixInMntDirMount, self).close()
if self._mount_instance:
self._mount_instance.close()
self._mount_instance = None
class Iso9660IsoInfo(MixInMntDirMount, BaseIso9660):
"""
Represents a ISO9660 filesystem
......@@ -176,6 +215,9 @@ class Iso9660IsoInfo(BaseIso9660):
self._get_extensions(path)
def _get_extensions(self, path):
"""
Get and store the image's extensions
"""
cmd = 'isoinfo -i %s -d' % path
output = process.system_output(cmd)
......@@ -188,11 +230,17 @@ class Iso9660IsoInfo(BaseIso9660):
@staticmethod
def _normalize_path(path):
"""
Normalize the path to match isoinfo notation
"""
if not os.path.isabs(path):
path = os.path.join('/', path)
return path
def _get_filename_in_iso(self, path):
"""
Locate the path in the list of files inside the iso image
"""
cmd = 'isoinfo -i %s -f' % self.path
flist = process.system_output(cmd)
......@@ -212,7 +260,8 @@ class Iso9660IsoInfo(BaseIso9660):
else:
fname = self._get_filename_in_iso(path)
if not fname:
logging.warn("Could not find '%s' in iso '%s'", path, self.path)
logging.warn(
"Could not find '%s' in iso '%s'", path, self.path)
return ""
cmd.append("-x %s" % fname)
......@@ -220,7 +269,7 @@ class Iso9660IsoInfo(BaseIso9660):
return result.stdout
class Iso9660IsoRead(BaseIso9660):
class Iso9660IsoRead(MixInMntDirMount, BaseIso9660):
"""
Represents a ISO9660 filesystem
......@@ -243,6 +292,7 @@ class Iso9660IsoRead(BaseIso9660):
process.run(cmd)
def close(self):
super(Iso9660IsoRead, self).close()
shutil.rmtree(self.temp_dir, True)
......@@ -260,9 +310,9 @@ class Iso9660Mount(BaseIso9660):
:type path: str
"""
super(Iso9660Mount, self).__init__(path)
self.mnt_dir = tempfile.mkdtemp(prefix='avocado_' + __name__)
self._mnt_dir = tempfile.mkdtemp(prefix='avocado_' + __name__)
process.run('mount -t iso9660 -v -o loop,ro %s %s' %
(path, self.mnt_dir))
(path, self.mnt_dir), sudo=True)
def read(self, path):
"""
......@@ -293,11 +343,20 @@ class Iso9660Mount(BaseIso9660):
:rtype: None
"""
if os.path.ismount(self.mnt_dir):
process.run('fuser -k %s' % self.mnt_dir, ignore_status=True)
process.run('umount %s' % self.mnt_dir)
shutil.rmtree(self.mnt_dir)
if self._mnt_dir:
if os.path.ismount(self._mnt_dir):
process.run('fuser -k %s' % self.mnt_dir, ignore_status=True,
sudo=True)
process.run('umount %s' % self.mnt_dir, sudo=True)
shutil.rmtree(self._mnt_dir)
self._mnt_dir = None
@property
def mnt_dir(self):
if not self._mnt_dir:
raise RuntimeError("Trying to get mnt_dir of already closed iso %s"
% self.path)
return self._mnt_dir
def iso9660(path):
......
......@@ -98,6 +98,18 @@ class CmdError(Exception):
return "CmdError"
def can_sudo():
"""
:return: True when sudo is available (or is root)
"""
if os.getuid() == 0:
return True
elif system_output("id -u", ignore_status=True, sudo=True).strip() == "0":
return True
else:
return False
def pid_exists(pid):
"""
Return True if a given PID exists.
......
"""
Verifies the avocado.utils.iso9660 functionality
"""
import os
import shutil
import sys
import tempfile
from avocado.utils import iso9660, process
if sys.version_info[:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
class BaseIso9660(unittest.TestCase):
"""
Base class defining setup and tests for shared Iso9660 functionality
"""
def setUp(self):
self.iso_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir, ".data",
"sample.iso"))
self.iso = None
self.tmpdir = tempfile.mkdtemp(prefix="avocado_" + __name__)
def basic_workflow(self):
"""
Check the basic Iso9660 workflow
:warning: Make sure to include this in per-implementation tests
due to ast loader we can't just define a base-class.
"""
self.assertEqual(self.iso.read("file"),
"file content\n")
dst = os.path.join(self.tmpdir, "file")
self.iso.copy(os.path.join("Dir", "in_dir_file"), dst)
self.assertEqual(open(dst).read(), "content of in-dir-file\n")
self.iso.close()
self.iso.close() # check that double-close won't fail
@unittest.skipIf(not process.can_sudo(),
"This test requires sudo or root")
def mnt_dir_workflow(self):
"""
Check the mnt_dir functionality
:warning: Make sure to include this in per-implementation tests
due to ast loader we can't just define a base-class.
"""
base = self.iso.mnt_dir
os.path.isdir(os.path.join(base, "Dir"))
self.assertEqual(open(os.path.join(base, "file")).read(),
"file content\n")
self.assertEqual(open(os.path.join(base, "Dir", "in_dir_file")).read(),
"content of in-dir-file\n")
self.iso.close()
self.assertFalse(os.path.exists(base), "the mnt_dir is suppose to be "
"destroyed after iso.close()")
def tearDown(self):
if self.iso is not None:
self.iso.close()
shutil.rmtree(self.tmpdir)
class IsoInfo(BaseIso9660):
"""
IsoInfo-based check
"""
@unittest.skipIf(process.system("which isoinfo", ignore_status=True),
"isoinfo not installed.")
def setUp(self):
super(IsoInfo, self).setUp()
self.iso = iso9660.Iso9660IsoInfo(self.iso_path)
def test_basic_workflow(self):
"""Call the basic workflow"""
self.basic_workflow()
def test_mnt_dir(self):
"""Use the mnt_dir property"""
self.mnt_dir_workflow()
class IsoRead(BaseIso9660):
"""
IsoRead-based check
"""
@unittest.skipIf(process.system("which iso-read", ignore_status=True),
"iso-read not installed.")
def setUp(self):
super(IsoRead, self).setUp()
self.iso = iso9660.Iso9660IsoRead(self.iso_path)
def test_basic_workflow(self):
"""Call the basic workflow"""
self.basic_workflow()
def test_mnt_dir(self):
"""Use the mnt_dir property"""
self.mnt_dir_workflow()
class IsoMount(BaseIso9660):
"""
Mount-based check
"""
@unittest.skipIf(not process.can_sudo(),
"This test requires sudo or root")
def setUp(self):
super(IsoMount, self).setUp()
self.iso = iso9660.Iso9660Mount(self.iso_path)
def test_basic_workflow(self):
"""Call the basic workflow"""
self.basic_workflow()
def test_mnt_dir(self):
"""Use the mnt_dir property"""
self.mnt_dir_workflow()
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册