提交 8a9016a0 编写于 作者: C Cleber Rosa

Merge remote-tracking branch 'lmr/migrate-virt-test-from-autotest-v4'

......@@ -65,7 +65,9 @@ clean:
test -L etc/avocado/conf.d/virt.conf && rm -f etc/avocado/conf.d/virt.conf || true
test -L avocado/core/plugins/virt_test.py && rm -f avocado/core/plugins/virt_test.py || true
test -L avocado/core/plugins/virt_test_list.py && rm -f avocado/core/plugins/virt_test_list.py || true
test -L avocado/core/plugins/virt_test_bootstrap.py && rm -f avocado/core/plugins/virt_test_bootstrap.py || true
test -L etc/avocado/conf.d/virt-test.conf && rm -f etc/avocado/conf.d/virt-test.conf || true
test -L virttest && rm -f virttest || true
check: clean check_cyclical modules_boundaries
selftests/checkall
......@@ -79,15 +81,17 @@ modules_boundaries:
link: link_virt link_vt
link_virt:
test -d ../avocado-virt/avocado/virt && ln -s ../../avocado-virt/avocado/virt avocado || true
test -f ../avocado-virt/etc/avocado/conf.d/virt.conf && ln -s ../../../../avocado-virt/etc/avocado/conf.d/virt.conf etc/avocado/conf.d/ || true
test -f ../avocado-virt/avocado/core/plugins/virt.py && ln -s ../../../../avocado-virt/avocado/core/plugins/virt.py avocado/core/plugins/ || true
test -f ../avocado-virt/avocado/core/plugins/virt_bootstrap.py && ln -s ../../../../avocado-virt/avocado/core/plugins/virt_bootstrap.py avocado/core/plugins/ || true
test -d ../avocado-virt/avocado/virt && ln -sf ../../avocado-virt/avocado/virt avocado || true
test -f ../avocado-virt/etc/avocado/conf.d/virt.conf && ln -sf ../../../../avocado-virt/etc/avocado/conf.d/virt.conf etc/avocado/conf.d/ || true
test -f ../avocado-virt/avocado/core/plugins/virt.py && ln -sf ../../../../avocado-virt/avocado/core/plugins/virt.py avocado/core/plugins/ || true
test -f ../avocado-virt/avocado/core/plugins/virt_bootstrap.py && ln -sf ../../../../avocado-virt/avocado/core/plugins/virt_bootstrap.py avocado/core/plugins/ || true
link_vt:
test -f ../avocado-vt/etc/avocado/conf.d/virt-test.conf && ln -s ../../../../avocado-vt/etc/avocado/conf.d/virt-test.conf etc/avocado/conf.d/ || true
test -f ../avocado-vt/avocado/core/plugins/virt_test.py && ln -s ../../../../avocado-vt/avocado/core/plugins/virt_test.py avocado/core/plugins/ || true
test -f ../avocado-vt/avocado/core/plugins/virt_test_list.py && ln -s ../../../../avocado-vt/avocado/core/plugins/virt_test_list.py avocado/core/plugins/ || true
test -f ../avocado-vt/etc/avocado/conf.d/virt-test.conf && ln -sf ../../../../avocado-vt/etc/avocado/conf.d/virt-test.conf etc/avocado/conf.d/ || true
test -f ../avocado-vt/avocado/core/plugins/virt_test.py && ln -sf ../../../../avocado-vt/avocado/core/plugins/virt_test.py avocado/core/plugins/ || true
test -f ../avocado-vt/avocado/core/plugins/virt_test_list.py && ln -sf ../../../../avocado-vt/avocado/core/plugins/virt_test_list.py avocado/core/plugins/ || true
test -f ../avocado-vt/avocado/core/plugins/virt_test_bootstrap.py && ln -sf ../../../../avocado-vt/avocado/core/plugins/virt_test_bootstrap.py avocado/core/plugins/ || true
test -d ../avocado-vt/virttest && ln -sf ../avocado-vt/virttest . || true
man: man/avocado.1 man/avocado-rest-client.1
......
此差异已折叠。
......@@ -65,6 +65,26 @@ def string_to_bitlist(data):
return result
def shell_escape(command):
"""
Escape special characters from a command so that it can be passed
as a double quoted (" ") string in a (ba)sh command.
:param command: the command string to escape.
:return: The escaped command string. The required englobing double
quotes are NOT added and so should be added at some point by
the caller.
See also: http://www.tldp.org/LDP/abs/html/escapingsection.html
"""
command = command.replace("\\", "\\\\")
command = command.replace("$", r'\$')
command = command.replace('"', r'\"')
command = command.replace('`', r'\`')
return command
def strip_console_codes(output, custom_codes=None):
"""
Remove the Linux console escape and control sequences from the console
......
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# This code was inspired in the autotest project,
#
# client/base_utils.py
# Original author: Martin J Bligh <mbligh@google.com>
# Original author: John Admanski <jadmanski@google.com>
"""
Get information from the current's machine CPU.
"""
import re
def _list_matches(lst, pattern):
"""
True if any item in list matches the specified pattern.
"""
compiled = re.compile(pattern)
for element in lst:
match = compiled.search(element)
if match:
return 1
return 0
def _get_cpu_info():
"""
Reads /proc/cpuinfo and returns a list of file lines
:returns: `list` of lines from /proc/cpuinfo file
:rtype: `list`
"""
with open('/proc/cpuinfo', 'r') as f:
cpuinfo = f.readlines()
return cpuinfo
def cpu_has_flags(flags):
"""
Check if a list of flags are available on current CPU info
:param flags: A `list` of cpu flags that must exists on the current CPU.
:type flags: `list`
:returns: `bool` True if all the flags were found or False if not
:rtype: `list`
"""
cpu_info = _get_cpu_info()
if not isinstance(flags, list):
flags = [flags]
for flag in flags:
if not _list_matches(cpu_info, '.*%s.*' % flag):
return False
return True
def get_cpu_vendor_name():
"""
Get the current cpu vendor name
:returns: string 'intel' or 'amd' or 'power7' depending on the current CPU architecture.
:rtype: `string`
"""
vendors_map = {
'intel': ("GenuineIntel", ),
'amd': ("AMD", ),
'power7': ("POWER7", )
}
cpu_info = _get_cpu_info()
for vendor, identifiers in vendors_map.items():
for identifier in identifiers:
if _list_matches(cpu_info, identifier):
return vendor
return None
def get_cpu_arch():
"""
Work out which CPU architecture we're running on
"""
cpuinfo = _get_cpu_info()
if _list_matches(cpuinfo, '^cpu.*(RS64|POWER3|Broadband Engine)'):
return 'power'
elif _list_matches(cpuinfo, '^cpu.*POWER4'):
return 'power4'
elif _list_matches(cpuinfo, '^cpu.*POWER5'):
return 'power5'
elif _list_matches(cpuinfo, '^cpu.*POWER6'):
return 'power6'
elif _list_matches(cpuinfo, '^cpu.*POWER7'):
return 'power7'
elif _list_matches(cpuinfo, '^cpu.*POWER8'):
return 'power8'
elif _list_matches(cpuinfo, '^cpu.*PPC970'):
return 'power970'
elif _list_matches(cpuinfo, 'ARM'):
return 'arm'
elif _list_matches(cpuinfo, '^flags.*:.* lm .*'):
return 'x86_64'
else:
return 'i386'
......@@ -24,6 +24,8 @@ import shutil
import urllib2
from . import aurl
from . import output
from . import crypto
log = logging.getLogger('avocado.test')
......@@ -71,15 +73,48 @@ def url_download(url, filename, data=None, timeout=300):
src_file.close()
def get_file(src, dst, permissions=None):
def url_download_interactive(url, output_file, title='', chunk_size=102400):
"""
Get a file from src and put it in dest, returning dest path.
:param src: source path or URL. May be local or a remote file.
:param dst: destination path.
:param permissions: (optional) set access permissions.
:return: destination path.
Interactively downloads a given file url to a given output file.
:type url: string
:param url: URL for the file to be download
:type output_file: string
:param output_file: file name or absolute path on which to save the file to
:type title: string
:param title: optional title to go along the progress bar
:type chunk_size: integer
:param chunk_size: amount of data to read at a time
"""
output_dir = os.path.dirname(output_file)
output_file = open(output_file, 'w+b')
input_file = urllib2.urlopen(url)
try:
file_size = int(input_file.headers['Content-Length'])
except KeyError:
raise ValueError('Could not find file size in HTTP headers')
logging.info('Downloading %s, %s to %s', os.path.basename(url),
output.display_data_size(file_size), output_dir)
progress_bar = output.ProgressBar(maximum=file_size, title=title)
# Download the file, while interactively updating the progress
progress_bar.draw()
while True:
data = input_file.read(chunk_size)
if data:
progress_bar.append_amount(len(data))
output_file.write(data)
else:
progress_bar.update_amount(file_size)
break
output_file.close()
def _get_file(src, dst, permissions=None):
if src == dst:
return
......@@ -91,3 +126,55 @@ def get_file(src, dst, permissions=None):
if permissions:
os.chmod(dst, permissions)
return dst
def get_file(src, dst, permissions=None, hash_expected=None,
hash_algorithm="md5", download_retries=1):
"""
Gets a file from a source location, optionally using caching.
If no hash_expected is provided, simply download the file. Else,
keep trying to download the file until download_failures exceeds
download_retries or the hashes match.
If the hashes match, return dst. If download_failures exceeds
download_retries, raise an EnvironmentError.
:param src: source path or URL. May be local or a remote file.
:param dst: destination path.
:param permissions: (optional) set access permissions.
:param hash_expected: Hash string that we expect the file downloaded to
have.
:param hash_algorithm: Algorithm used to calculate the hash string
(md5, sha1).
:param download_retries: Number of times we are going to retry a failed
download.
:raise: EnvironmentError.
:return: destination path.
"""
def _verify_hash(filename):
if os.path.isfile(filename):
return crypto.hash_file(filename, algorithm=hash_algorithm)
return None
if hash_expected is None:
return _get_file(src, dst, permissions)
download_failures = 0
hash_file = _verify_hash(dst)
while not hash_file == hash_expected:
hash_file = _verify_hash(_get_file(src, dst, permissions))
if hash_file != hash_expected:
log.error("It seems that dst %s is corrupted" % dst)
download_failures += 1
if download_failures > download_retries:
raise EnvironmentError("Failed to retrieve %s. "
"Possible reasons - Network connectivity "
"problems or incorrect hash_expected "
"provided -> '%s'" %
(src, hash_expected))
else:
log.error("Retrying download of src %s", src)
return dst
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2015
# Author: Lucas Meneghel Rodrigues <lmr@redhat.com>
"""
APIs to download/update git repositories from inside python scripts.
"""
import os
import logging
from . import process
from . import astring
from . import path
__all__ = ["GitRepoHelper", "get_repo"]
class GitRepoHelper(object):
"""
Helps to deal with git repos, mostly fetching content from a repo
"""
def __init__(self, uri, branch='master', lbranch='master', commit=None,
destination_dir=None, base_uri=None):
"""
Instantiates a new GitRepoHelper
:type uri: string
:param uri: git repository url
:type branch: string
:param branch: git remote branch
:type destination_dir: string
:param destination_dir: path of a dir where to save downloaded code
:type commit: string
:param commit: specific commit to download
:type lbranch: string
:param lbranch: git local branch name, if different from remote
:type base_uri: string
:param base_uri: a closer, usually local, git repository url from where
to fetch content first
"""
self.uri = uri
self.base_uri = base_uri
self.branch = branch
self.commit = commit
if destination_dir is None:
uri_basename = uri.split("/")[-1]
self.destination_dir = os.path.join("/tmp", uri_basename)
else:
self.destination_dir = destination_dir
if lbranch is None:
self.lbranch = branch
else:
self.lbranch = lbranch
self.cmd = path.find_command('git')
def init(self):
"""
Initializes a directory for receiving a verbatim copy of git repo
This creates a directory if necessary, and either resets or inits
the repo
"""
if not os.path.exists(self.destination_dir):
logging.debug('Creating directory %s for git repo %s',
self.destination_dir, self.uri)
os.makedirs(self.destination_dir)
os.chdir(self.destination_dir)
if os.path.exists('.git'):
logging.debug('Resetting previously existing git repo at %s for '
'receiving git repo %s',
self.destination_dir, self.uri)
self.git_cmd('reset --hard')
else:
logging.debug('Initializing new git repo at %s for receiving '
'git repo %s',
self.destination_dir, self.uri)
self.git_cmd('init')
def git_cmd(self, cmd, ignore_status=False):
"""
Wraps git commands.
:param cmd: Command to be executed.
:param ignore_status: Whether we should suppress error.CmdError
exceptions if the command did return exit code !=0 (True), or
not suppress them (False).
"""
os.chdir(self.destination_dir)
return process.run(r"%s %s" % (self.cmd, astring.shell_escape(cmd)),
ignore_status=ignore_status)
def fetch(self, uri):
"""
Performs a git fetch from the remote repo
"""
logging.info("Fetching git [REP '%s' BRANCH '%s'] -> %s",
uri, self.branch, self.destination_dir)
self.git_cmd("fetch -q -f -u -t %s %s:%s" %
(uri, self.branch, self.lbranch))
def get_top_commit(self):
"""
Returns the topmost commit id for the current branch.
:return: Commit id.
"""
return self.git_cmd('log --pretty=format:%H -1').stdout.strip()
def get_top_tag(self):
"""
Returns the topmost tag for the current branch.
:return: Tag.
"""
try:
return self.git_cmd('describe').stdout.strip()
except process.CmdError:
return None
def checkout(self, branch=None, commit=None):
"""
Performs a git checkout for a given branch and start point (commit)
:param branch: Remote branch name.
:param commit: Specific commit hash.
"""
if branch is None:
branch = self.branch
logging.debug('Checking out branch %s', branch)
self.git_cmd("checkout %s" % branch)
if commit is None:
commit = self.commit
if commit is not None:
logging.debug('Checking out commit %s', self.commit)
self.git_cmd("checkout %s" % self.commit)
else:
logging.debug('Specific commit not specified')
top_commit = self.get_top_commit()
top_tag = self.get_top_tag()
if top_tag is None:
top_tag_desc = 'no tag found'
else:
top_tag_desc = 'tag %s' % top_tag
logging.info("git commit ID is %s (%s)", top_commit, top_tag_desc)
def execute(self):
"""
Performs all steps necessary to initialize and download a git repo.
This includes the init, fetch and checkout steps in one single
utility method.
"""
self.init()
if self.base_uri is not None:
self.fetch(self.base_uri)
self.fetch(self.uri)
self.checkout()
def get_repo(uri, branch='master', lbranch='master', commit=None,
destination_dir=None, base_uri=None):
"""
Utility function that retrieves a given git code repository.
:type uri: string
:param uri: git repository url
:type branch: string
:param branch: git remote branch
:type destination_dir: string
:param destination_dir: path of a dir where to save downloaded code
:type commit: string
:param commit: specific commit to download
:type lbranch: string
:param lbranch: git local branch name, if different from remote
:type base_uri: string
:param uri: a closer, usually local, git repository url from where to
fetch content first from
"""
repo = GitRepoHelper(uri, branch, lbranch, commit, destination_dir,
base_uri)
repo.execute()
return repo.destination_dir
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2015
# Author: Cleber Rosa <crosa@redhat.com>
"""
Basic ISO9660 file-system support.
This code does not attempt (so far) to implement code that knows about
ISO9660 internal structure. Instead, it uses commonly available support
either in userspace tools or on the Linux kernel itself (via mount).
"""
__all__ = ['iso9660', 'Iso9660IsoInfo', 'Iso9660IsoRead', 'Iso9660Mount']
import os
import logging
import tempfile
import shutil
import re
from . import process
def has_userland_tool(executable):
"""
Returns whether the system has a given executable
:param executable: the name of the executable
:type executable: str
:rtype: bool
"""
if os.path.isabs(executable):
return os.path.exists(executable)
else:
for d in os.environ['PATH'].split(':'):
f = os.path.join(d, executable)
if os.path.exists(f):
return True
return False
def has_isoinfo():
"""
Returns whether the system has the isoinfo executable
Maybe more checks could be added to see if isoinfo supports the needed
features
:rtype: bool
"""
return has_userland_tool('isoinfo')
def has_isoread():
"""
Returns whether the system has the iso-read executable
Maybe more checks could be added to see if iso-read supports the needed
features
:rtype: bool
"""
return has_userland_tool('iso-read')
def can_mount():
"""
Test wether the current user can perform a loop mount
AFAIK, this means being root, having mount and iso9660 kernel support
:rtype: bool
"""
if os.getuid() != 0:
logging.debug('Can not use mount: current user is not "root"')
return False
if not has_userland_tool('mount'):
logging.debug('Can not use mount: missing "mount" tool')
return False
if 'iso9660' not in open('/proc/filesystems').read():
logging.debug('Can not use mount: lack of iso9660 kernel support')
return False
return True
class BaseIso9660(object):
"""
Represents a ISO9660 filesystem
This class holds common functionality and has many abstract methods
"""
def __init__(self, path):
self.path = path
self._verify_path(path)
@staticmethod
def _verify_path(path):
"""
Verify that the current set path is accessible
:param path: the path for test
:type path: str
:raise OSError: path does not exist or path could not be read
:rtype: None
"""
if not os.path.exists(path):
raise OSError('File or device path does not exist: %s' %
path)
if not os.access(path, os.R_OK):
raise OSError('File or device path could not be read: %s' %
path)
def read(self, path):
"""
Abstract method to read data from path
:param path: path to the file
:returns: data content from the file
:rtype: str
"""
raise NotImplementedError
def copy(self, src, dst):
"""
Simplistic version of copy that relies on read()
:param src: source path
:type src: str
:param dst: destination path
:type dst: str
:rtype: None
"""
content = self.read(src)
output = open(dst, 'w+b')
output.write(content)
output.close()
def close(self):
"""
Cleanup and free any resources being used
:rtype: None
"""
pass
class Iso9660IsoInfo(BaseIso9660):
"""
Represents a ISO9660 filesystem
This implementation is based on the cdrkit's isoinfo tool
"""
def __init__(self, path):
super(Iso9660IsoInfo, self).__init__(path)
self.joliet = False
self.rock_ridge = False
self.el_torito = False
self._get_extensions(path)
def _get_extensions(self, path):
cmd = 'isoinfo -i %s -d' % path
output = process.system_output(cmd)
if re.findall("\nJoliet", output):
self.joliet = True
if re.findall("\nRock Ridge signatures", output):
self.rock_ridge = True
if re.findall("\nEl Torito", output):
self.el_torito = True
@staticmethod
def _normalize_path(path):
if not os.path.isabs(path):
path = os.path.join('/', path)
return path
def _get_filename_in_iso(self, path):
cmd = 'isoinfo -i %s -f' % self.path
flist = process.system_output(cmd)
fname = re.findall("(%s.*)" % self._normalize_path(path), flist, re.I)
if fname:
return fname[0]
return None
def read(self, path):
cmd = ['isoinfo', '-i %s' % self.path]
fname = self._normalize_path(path)
if self.joliet:
cmd.append("-J")
elif self.rock_ridge:
cmd.append("-R")
else:
fname = self._get_filename_in_iso(path)
if not fname:
logging.warn("Could not find '%s' in iso '%s'", path, self.path)
return ""
cmd.append("-x %s" % fname)
result = process.run(" ".join(cmd))
return result.stdout
class Iso9660IsoRead(BaseIso9660):
"""
Represents a ISO9660 filesystem
This implementation is based on the libcdio's iso-read tool
"""
def __init__(self, path):
super(Iso9660IsoRead, self).__init__(path)
self.temp_dir = tempfile.mkdtemp()
def read(self, path):
temp_file = os.path.join(self.temp_dir, path)
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, path, temp_file)
process.run(cmd)
return open(temp_file).read()
def copy(self, src, dst):
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, src, dst)
process.run(cmd)
def close(self):
shutil.rmtree(self.temp_dir, True)
class Iso9660Mount(BaseIso9660):
"""
Represents a mounted ISO9660 filesystem.
"""
def __init__(self, path):
"""
initializes a mounted ISO9660 filesystem
:param path: path to the ISO9660 file
:type path: str
"""
super(Iso9660Mount, self).__init__(path)
self.mnt_dir = tempfile.mkdtemp()
process.run('mount -t iso9660 -v -o loop,ro %s %s' %
(path, self.mnt_dir))
def read(self, path):
"""
Read data from path
:param path: path to read data
:type path: str
:return: data content
:rtype: str
"""
full_path = os.path.join(self.mnt_dir, path)
return open(full_path).read()
def copy(self, src, dst):
"""
:param src: source
:type src: str
:param dst: destination
:type dst: str
:rtype: None
"""
full_path = os.path.join(self.mnt_dir, src)
shutil.copy(full_path, dst)
def close(self):
"""
Perform umount operation on the temporary dir
:rtype: None
"""
if os.path.ismount(self.mnt_dir):
process.run('fuser -k %s' % self.mnt_dir, ignore_status=True)
process.run('umount %s' % self.mnt_dir)
shutil.rmtree(self.mnt_dir)
def iso9660(path):
"""
Checks the avaiable tools on a system and chooses class accordingly
This is a convinience function, that will pick the first avaialable
iso9660 capable tool.
:param path: path to an iso9660 image file
:type path: str
:return: an instance of any iso9660 capable tool
:rtype: :class:`Iso9660IsoInfo`, :class:`Iso9660IsoRead`,
:class:`Iso9660Mount` or None
"""
implementations = [('isoinfo', has_isoinfo, Iso9660IsoInfo),
('iso-read', has_isoread, Iso9660IsoRead),
('mount', can_mount, Iso9660Mount)]
for (name, check, klass) in implementations:
if check():
logging.debug('Automatically chosen class for iso9660: %s', name)
return klass(path)
return None
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# This code was inspired in the autotest project,
#
# client/base_utils.py
# Original author: Ross Brattain <ross.b.brattain@intel.com>
"""
APIs to list and load/unload linux kernel modules.
"""
import re
import logging
from . import process
LOG = logging.getLogger('avocado.test')
def load_module(module_name):
# Checks if a module has already been loaded
if module_is_loaded(module_name):
return False
process.system('/sbin/modprobe ' + module_name)
return True
def parse_lsmod_for_module(l_raw, module_name, escape=True):
"""
Use a regexp to parse raw lsmod output and get module information
:param l_raw: raw output of lsmod
:type l_raw: str
:param module_name: Name of module to search for
:type module_name: str
:param escape: Escape regexp tokens in module_name, default True
:type escape: bool
:return: Dictionary of module info, name, size, submodules if present
:rtype: dict
"""
# re.escape the module name for safety
if escape:
module_search = re.escape(module_name)
else:
module_search = module_name
# ^module_name spaces size spaces used optional spaces optional submodules
# use multiline regex to scan the entire output as one string without
# having to splitlines use named matches so we can extract the dictionary
# with groupdict
pattern = (r"^(?P<name>%s)\s+(?P<size>\d+)\s+(?P<used>\d+)"
"\s*(?P<submodules>\S+)?$")
lsmod = re.search(pattern % module_search, l_raw, re.M)
if lsmod:
# default to empty list if no submodules
module_info = lsmod.groupdict([])
# convert size to integer because it is an integer
module_info['size'] = int(module_info['size'])
module_info['used'] = int(module_info['used'])
if module_info['submodules']:
module_info['submodules'] = module_info['submodules'].split(',')
return module_info
else:
# return empty dict to be consistent
return {}
def loaded_module_info(module_name):
"""
Get loaded module details: Size and Submodules.
:param module_name: Name of module to search for
:type module_name: str
:return: Dictionary of module info, name, size, submodules if present
:rtype: dict
"""
l_raw = process.system_output('/sbin/lsmod')
return parse_lsmod_for_module(l_raw, module_name)
def get_submodules(module_name):
"""
Get all submodules of the module.
:param module_name: Name of module to search for
:type module_name: str
:return: List of the submodules
:rtype: list
"""
module_info = loaded_module_info(module_name)
module_list = []
try:
submodules = module_info["submodules"]
except KeyError:
LOG.info("Module %s is not loaded" % module_name)
else:
module_list = submodules
for module in submodules:
module_list += get_submodules(module)
return module_list
def unload_module(module_name):
"""
Removes a module. Handles dependencies. If even then it's not possible
to remove one of the modules, it will throw an error.CmdError exception.
:param module_name: Name of the module we want to remove.
:type module_name: str
"""
module_info = loaded_module_info(module_name)
try:
submodules = module_info['submodules']
except KeyError:
LOG.info("Module %s is already unloaded" % module_name)
else:
for module in submodules:
unload_module(module)
module_info = loaded_module_info(module_name)
try:
module_used = module_info['used']
except KeyError:
LOG.info("Module %s is already unloaded" % module_name)
return
if module_used != 0:
raise RuntimeError("Module %s is still in use. "
"Can not unload it." % module_name)
process.system("/sbin/modprobe -r %s" % module_name)
LOG.info("Module %s unloaded" % module_name)
def module_is_loaded(module_name):
"""
Is module loaded
:param module_name: Name of module to search for
:type module_name: str
:return: True is module is loaded
:rtype: bool
"""
module_name = module_name.replace('-', '_')
return bool(loaded_module_info(module_name))
def get_loaded_modules():
lsmod_output = process.system_output('/sbin/lsmod').splitlines()[1:]
return [line.split(None, 1)[0] for line in lsmod_output]
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# This code was inspired in the autotest project,
#
# client/shared/utils.py
# Original author: Cleber Rosa <crosa@redhat.com>
#
# Copyright: Red Hat Inc. 2015
# Author: Lucas Meneghel Rodrigues <lmr@redhat.com>
"""
Utility functions for user friendly display of information.
"""
import sys
def display_data_size(size):
"""
Display data size in human readable units (SI).
:param size: Data size, in Bytes.
:type size: int
:return: Human readable string with data size, using SI prefixes.
"""
prefixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
factor = float(1000)
i = 0
while size >= factor:
if i >= len(prefixes) - 1:
break
size /= factor
i += 1
return '%.2f %s' % (size, prefixes[i])
class ProgressBar(object):
"""
Displays interactively the progress of a given task
Inspired/adapted from https://gist.github.com/t0xicCode/3306295
"""
def __init__(self, minimum=0, maximum=100, width=75, title=''):
"""
Initializes a new progress bar
:type minimum: integer
:param minimum: minimum (initial) value on the progress bar
:type maximum: integer
:param maximum: maximum (final) value on the progress bar
:type width: integer
:param with: number of columns, that is screen width
"""
assert maximum > minimum
self.prog_bar = ''
self.old_prog_bar = ''
if title:
width -= len(title)
self.minimum = minimum
self.maximum = maximum
self.range = maximum - minimum
self.width = width
self.title = title
self.current_amount = minimum
self.update_amount(minimum)
def append_amount(self, amount):
"""
Increments the current amount value.
"""
self.update_amount(self.current_amount + amount)
def update_percentage(self, percentage):
"""
Updates the progress bar to the new percentage.
"""
self.update_amount((percentage * float(self.maximum)) / 100.0)
def update_amount(self, amount):
"""
Performs sanity checks and update the current amount.
"""
if amount < self.minimum:
amount = self.minimum
if amount > self.maximum:
amount = self.maximum
self.current_amount = amount
self._update_progress_bar()
self.draw()
def _update_progress_bar(self):
"""
Builds the actual progress bar text.
"""
diff = float(self.current_amount - self.minimum)
done = (diff / float(self.range)) * 100.0
done = int(round(done))
all_full = self.width - 2
hashes = (done / 100.0) * all_full
hashes = int(round(hashes))
if hashes == 0:
screen_text = "[>%s]" % (' '*(all_full-1))
elif hashes == all_full:
screen_text = "[%s]" % ('='*all_full)
else:
screen_text = "[%s>%s]" % ('='*(hashes-1), ' '*(all_full-hashes))
percent_string = str(done) + "%"
# slice the percentage into the bar
screen_text = ' '.join([screen_text, percent_string])
if self.title:
screen_text = '%s: %s' % (self.title,
screen_text)
self.prog_bar = screen_text
def draw(self):
"""
Prints the updated text to the screen.
"""
if self.prog_bar != self.old_prog_bar:
self.old_prog_bar = self.prog_bar
sys.stdout.write('\r' + self.prog_bar)
sys.stdout.flush()
def __str__(self):
"""
Returns the current progress bar.
"""
return str(self.prog_bar)
......@@ -158,7 +158,7 @@ def process_in_ptree_is_defunct(ppid):
return True
for pid in pids:
cmd = "ps --no-headers -o cmd %d" % int(pid)
proc_name = system_output(cmd, ignore_status=True)
proc_name = system_output(cmd, ignore_status=True, verbose=False)
if '<defunct>' in proc_name:
defunct = True
break
......@@ -171,7 +171,7 @@ def get_children_pids(ppid):
param ppid: parent PID
return: list of PIDs of all children/threads of ppid
"""
return system_output("ps -L --ppid=%d -o lwp" % ppid).split('\n')[1:]
return system_output("ps -L --ppid=%d -o lwp" % ppid, verbose=False).split('\n')[1:]
class CmdResult(object):
......
此差异已折叠。
......@@ -7,3 +7,5 @@ Sphinx>=1.3b1
flexmock>=0.9.7
# inspektor (static and style checks)
inspektor>=0.1.12
# mock (some unittests use it)
mock>=1.0.0
......@@ -12,3 +12,4 @@ Pillow==2.2.1
snakefood==1.4
networkx==1.9.1
pygraphviz==1.3rc2
mock==1.2.0
import unittest
import os
import sys
# simple magic for using scripts within a source tree
basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
basedir = os.path.dirname(basedir)
if os.path.isdir(os.path.join(basedir, 'avocado')):
sys.path.append(basedir)
from avocado.utils import linux_modules
class TestLsmod(unittest.TestCase):
LSMOD_OUT = """\
Module Size Used by
ccm 17773 2
ip6t_rpfilter 12546 1
ip6t_REJECT 12939 2
xt_conntrack 12760 9
ebtable_nat 12807 0
ebtable_broute 12731 0
bridge 110862 1 ebtable_broute
stp 12868 1 bridge
llc 13941 2 stp,bridge
ebtable_filter 12827 0
ebtables 30758 3 ebtable_broute,ebtable_nat,ebtable_filter
ip6table_nat 13015 1
nf_conntrack_ipv6 18738 6
nf_defrag_ipv6 34712 1 nf_conntrack_ipv6
nf_nat_ipv6 13213 1 ip6table_nat
ip6table_mangle 12700 1
ip6table_security 12710 1
ip6table_raw 12683 1
ip6table_filter 12815 1
"""
def test_parse_lsmod(self):
lsmod_info = linux_modules.parse_lsmod_for_module(
self.LSMOD_OUT, "ebtables")
submodules = ['ebtable_broute', 'ebtable_nat', 'ebtable_filter']
assert lsmod_info['submodules'] == submodules
assert lsmod_info == {
'name': "ebtables",
'size': 30758,
'used': 3,
'submodules': submodules
}
@staticmethod
def test_parse_lsmod_is_empty():
lsmod_info = linux_modules.parse_lsmod_for_module("", "ebtables")
assert lsmod_info == {}
def test_parse_lsmod_no_submodules(self):
lsmod_info = linux_modules.parse_lsmod_for_module(self.LSMOD_OUT, "ccm")
submodules = []
assert lsmod_info['submodules'] == submodules
assert lsmod_info == {
'name': "ccm",
'size': 17773,
'used': 2,
'submodules': submodules
}
def test_parse_lsmod_single_submodules(self):
lsmod_info = linux_modules.parse_lsmod_for_module(
self.LSMOD_OUT, "bridge")
submodules = ['ebtable_broute']
assert lsmod_info['submodules'] == submodules
assert lsmod_info == {
'name': "bridge",
'size': 110862,
'used': 1,
'submodules': submodules
}
if __name__ == '__main__':
unittest.main()
import unittest
import os
import sys
# simple magic for using scripts within a source tree
basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
basedir = os.path.dirname(basedir)
if os.path.isdir(os.path.join(basedir, 'avocado')):
sys.path.append(basedir)
from avocado.utils import output
class UtilsOutputTest(unittest.TestCase):
def testDisplayDataSizeFactor1024(self):
self.assertEqual(output.display_data_size(103), '103.00 B')
self.assertEqual(output.display_data_size(1024**1), '1.02 KB')
self.assertEqual(output.display_data_size(1024**2), '1.05 MB')
self.assertEqual(output.display_data_size(1024**3), '1.07 GB')
self.assertEqual(output.display_data_size(1024**4), '1.10 TB')
self.assertEqual(output.display_data_size(1024**5), '1.13 PB')
self.assertEqual(output.display_data_size(1024**6), '1152.92 PB')
def testDisplayDataSizeFactor1000(self):
self.assertEqual(output.display_data_size(1000**1), '1.00 KB')
self.assertEqual(output.display_data_size(1000**2), '1.00 MB')
self.assertEqual(output.display_data_size(1000**3), '1.00 GB')
self.assertEqual(output.display_data_size(1000**4), '1.00 TB')
self.assertEqual(output.display_data_size(1000**5), '1.00 PB')
self.assertEqual(output.display_data_size(1000**6), '1000.00 PB')
if __name__ == '__main__':
unittest.main()
# Copyright(c) 2013 Intel Corporation.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms and conditions of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
#
# This program is distributed in the hope it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
#
# The full GNU General Public License is included in this distribution in
# the file called "COPYING".
import unittest
from mock import MagicMock, patch
from avocado.utils import service
class TestSystemd(unittest.TestCase):
def setUp(self):
self.service_name = "fake_service"
init_name = "systemd"
command_generator = service._command_generators[init_name]
self.service_command_generator = service._ServiceCommandGenerator(
command_generator)
def test_all_commands(self):
for cmd in (c for c in self.service_command_generator.commands if c not in ["list", "set_target"]):
ret = getattr(
self.service_command_generator, cmd)(self.service_name)
if cmd == "is_enabled":
cmd = "is-enabled"
assert ret == ["systemctl", cmd, "%s.service" % self.service_name]
def test_set_target(self):
ret = getattr(
self.service_command_generator, "set_target")("multi-user.target")
assert ret == ["systemctl", "isolate", "multi-user.target"]
class TestSysVInit(unittest.TestCase):
def setUp(self):
self.service_name = "fake_service"
init_name = "init"
command_generator = service._command_generators[init_name]
self.service_command_generator = service._ServiceCommandGenerator(
command_generator)
def test_all_commands(self):
command_name = "service"
for cmd in (c for c in self.service_command_generator.commands if c not in ["list", "set_target"]):
ret = getattr(
self.service_command_generator, cmd)(self.service_name)
if cmd == "is_enabled":
command_name = "chkconfig"
cmd = ""
elif cmd == 'enable':
command_name = "chkconfig"
cmd = "on"
elif cmd == 'disable':
command_name = "chkconfig"
cmd = "off"
assert ret == [command_name, self.service_name, cmd]
def test_set_target(self):
ret = getattr(
self.service_command_generator, "set_target")("multi-user.target")
assert ret == ["telinit", "3"]
class TestSpecificServiceManager(unittest.TestCase):
def setUp(self):
self.run_mock = MagicMock()
self.init_name = "init"
get_name_of_init_mock = MagicMock(return_value="init")
@patch.object(service, "get_name_of_init", get_name_of_init_mock)
def patch_service_command_generator():
return service._auto_create_specific_service_command_generator()
@patch.object(service, "get_name_of_init", get_name_of_init_mock)
def patch_service_result_parser():
return service._auto_create_specific_service_result_parser()
service_command_generator = patch_service_command_generator()
service_result_parser = patch_service_result_parser()
self.service_manager = service._SpecificServiceManager(
"boot.lldpad", service_command_generator,
service_result_parser, self.run_mock)
def test_start(self):
service = "lldpad"
self.service_manager.start()
assert self.run_mock.call_args[0][
0] == "service boot.%s start" % service
def test_stop_with_args(self):
service = "lldpad"
self.service_manager.stop(ignore_status=True)
assert self.run_mock.call_args[0][
0] == "service boot.%s stop" % service
def test_list_is_not_present_in_SpecifcServiceManager(self):
assert not hasattr(self.service_manager, "list")
def test_set_target_is_not_present_in_SpecifcServiceManager(self):
assert not hasattr(self.service_manager, "set_target")
class TestServiceManager(unittest.TestCase):
@staticmethod
def get_service_manager_from_init_and_run(init_name, run_mock):
command_generator = service._command_generators[init_name]
result_parser = service._result_parsers[init_name]
service_manager = service._service_managers[init_name]
service_command_generator = service._ServiceCommandGenerator(
command_generator)
service_result_parser = service._ServiceResultParser(result_parser)
return service_manager(service_command_generator, service_result_parser, run_mock)
class TestSystemdServiceManager(TestServiceManager):
def setUp(self):
self.run_mock = MagicMock()
self.init_name = "systemd"
self.service_manager = super(TestSystemdServiceManager,
self).get_service_manager_from_init_and_run(self.init_name,
self.run_mock)
def test_start(self):
service = "lldpad"
self.service_manager.start(service)
assert self.run_mock.call_args[0][
0] == "systemctl start %s.service" % service
def test_list(self):
list_result_mock = MagicMock(exit_status=0, stdout="sshd.service enabled\n"
"vsftpd.service disabled\n"
"systemd-sysctl.service static\n")
run_mock = MagicMock(return_value=list_result_mock)
service_manager = super(TestSystemdServiceManager,
self).get_service_manager_from_init_and_run(self.init_name,
run_mock)
list_result = service_manager.list(ignore_status=False)
assert run_mock.call_args[0][
0] == "systemctl list-unit-files --type=service --no-pager --full"
assert list_result == {'sshd': "enabled",
'vsftpd': "disabled",
'systemd-sysctl': "static"}
def test_set_default_runlevel(self):
runlevel = service.convert_sysv_runlevel(3)
mktemp_mock = MagicMock(return_value="temp_filename")
symlink_mock = MagicMock()
rename_mock = MagicMock()
@patch.object(service, "mktemp", mktemp_mock)
@patch("os.symlink", symlink_mock)
@patch("os.rename", rename_mock)
def _():
self.service_manager.change_default_runlevel(runlevel)
assert mktemp_mock.called
assert symlink_mock.call_args[0][
0] == "/usr/lib/systemd/system/multi-user.target"
assert rename_mock.call_args[0][
1] == "/etc/systemd/system/default.target"
_()
def test_unknown_runlevel(self):
self.assertRaises(ValueError,
service.convert_systemd_target_to_runlevel, "unknown")
def test_runlevels(self):
assert service.convert_sysv_runlevel(0) == "poweroff.target"
assert service.convert_sysv_runlevel(1) == "rescue.target"
assert service.convert_sysv_runlevel(2) == "multi-user.target"
assert service.convert_sysv_runlevel(5) == "graphical.target"
assert service.convert_sysv_runlevel(6) == "reboot.target"
class TestSysVInitServiceManager(TestServiceManager):
def setUp(self):
self.run_mock = MagicMock()
self.init_name = "init"
self.service_manager = super(TestSysVInitServiceManager,
self).get_service_manager_from_init_and_run(self.init_name,
self.run_mock)
def test_list(self):
list_result_mock = MagicMock(exit_status=0,
stdout="sshd 0:off 1:off 2:off 3:off 4:off 5:off 6:off\n"
"vsftpd 0:off 1:off 2:off 3:off 4:off 5:on 6:off\n"
"xinetd based services:\n"
" amanda: off\n"
" chargen-dgram: on\n")
run_mock = MagicMock(return_value=list_result_mock)
service_manager = super(TestSysVInitServiceManager,
self).get_service_manager_from_init_and_run(self.init_name,
run_mock)
list_result = service_manager.list(ignore_status=False)
assert run_mock.call_args[0][
0] == "chkconfig --list"
assert list_result == {'sshd': {0: "off", 1: "off", 2: "off", 3: "off", 4: "off", 5: "off", 6: "off"},
'vsftpd': {0: "off", 1: "off", 2: "off", 3: "off", 4: "off", 5: "on", 6: "off"},
'xinetd': {'amanda': "off", 'chargen-dgram': "on"}}
def test_enable(self):
service = "lldpad"
self.service_manager.enable(service)
assert self.run_mock.call_args[0][0] == "chkconfig lldpad on"
def test_unknown_runlevel(self):
self.assertRaises(ValueError,
service.convert_sysv_runlevel, "unknown")
def test_runlevels(self):
assert service.convert_systemd_target_to_runlevel(
"poweroff.target") == '0'
assert service.convert_systemd_target_to_runlevel(
"rescue.target") == 's'
assert service.convert_systemd_target_to_runlevel(
"multi-user.target") == '3'
assert service.convert_systemd_target_to_runlevel(
"graphical.target") == '5'
assert service.convert_systemd_target_to_runlevel(
"reboot.target") == '6'
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册