提交 1c9eb21b 编写于 作者: L Lukáš Doktor

avocado.test: Add support for params with paths

This is a testing version of params with paths support. By default
it acts as the old Params, but you can use params.get(key, path=....)
to utilize new-params handling.

For now the PATH for relative paths is hardcoded to "/test/*" but it
will be specifiable on the command line. I just need to pass it to the
test which I'd like to do over "metadata" (see below) rather then adding
it temporarily to the params and risk colisions.

The code is not optimized and in order to be compatible it uses old
concepts. Some of them I'd like to get rid of in near future:

* default_params => currently used only to set default "metadata" like
  test timeout. I'd like to replace it for RW Test.metadata, which would
  be overwritten durint __init__ if params contain the keys in specific
  paths. IMO they shouldn't be shared with test params.
* test_factory's params are currently dict with some metadata like
  wether this test is executed from directory or by uri. Again, they are
  metadata and should be handled separately. Some of them might be reused
  to params if needed, but not generaly.
* create separated multiplexer plugin generate multiple variants
* reconsider the need for "objects", "object_params" and "object_counts"
* couple of others I forget to mention...

There is one real change I made, I got rid of ${key}_${type} to specify
type of the argument. Yaml supports any type we might like and by
our convention all tests should be written to work without any params.
Thus this is IMO more confusing, than beneficial. But if you insist
I can copy&paste the support for it.
Signed-off-by: NLukáš Doktor <ldoktor@redhat.com>
上级 20b3ac96
......@@ -20,6 +20,8 @@ Multiplex and create variants.
"""
import itertools
import logging
import re
from avocado.core import tree
......@@ -91,6 +93,368 @@ def multiplex_yamls(input_yamls, filter_only=None, filter_out=None,
return multiplex_pools(pools)
# TODO: Create multiplexer plugin and split these functions into multiple files
class NoMatchError(KeyError):
pass
class AvocadoParams(object):
"""
Params object used to retrieve params from given path. It supports
absolute and relative paths. For relative paths one can define multiple
paths to search for the value.
It contains compatibility wrapper to act as the original avocado Params,
but by special useage you can utilize the new API. See ``get()``
docstring for details.
It supports querying for params of given path and key and copies the
"objects", "object_params" and "object_counts" methods (not tested).
Unsafely it also supports pickling, although to work properly params would
have to be deepcopied. This is not required for the current avocado usage.
You can also iterate through all keys, but this can generate quite a lot
of duplicite entries inherited from ancestor nodes. It shouldn't produce
false values, though.
In this version each new "get()" call is logged into "avocado.test" log.
This is subject of change (separate file, perhaps)
"""
# TODO: Use "test" to log params.get()
def __init__(self, leaves, test_id, tag, mux_entry, default_params):
"""
:param leaves: List of TreeNode leaves defining current variant
:param test_id: test id
:param tag: test tag
:param mux_entry: list of entry points
:param default_params: dict of params used when no matches found
"""
self._rel_paths = []
leaves = list(leaves)
for i, path in enumerate(mux_entry):
path_leaves = self._get_matching_leaves(path, leaves)
self._rel_paths.append(AvocadoParam(path_leaves,
'%d: %s' % (i, path)))
# Don't use non-mux-entry params for relative paths
path_leaves = self._get_matching_leaves('/*', leaves)
self._abs_path = AvocadoParam(path_leaves, '*: *')
self.id = test_id
self.tag = tag
self._log = logging.getLogger("avocado.test").debug
self._cache = {} # TODO: Implement something more efficient
# TODO: Get rid of this and prepare something better
self._default_parmas = default_params
def __getstate__(self):
""" log can't be pickled """
copy = self.__dict__.copy()
del(copy['_log'])
return copy
def __setstate__(self, orig):
""" refresh log """
self.__dict__.update(orig)
self._log = logging.getLogger("avocado.test").debug
def __str__(self):
return "params {%s, %s}" % (", ".join(_.str_leaves_variant
for _ in self._rel_paths),
self._abs_path.str_leaves_variant)
def log(self, key, path, default, value):
""" Predefined format for displaying params query """
self._log("PARAMS: %-20s | %-20s | %-10s => %r"
% (key, path, default, value))
def _get_matching_leaves(self, path, leaves):
"""
Pops and returns list of matching nodes
:param path: Path (str)
:param leaves: list of TreeNode leaves
"""
path = self._greedy_path(path)
path_leaves = [leaf for leaf in leaves if path.match(leaf.path + '/')]
for leaf in path_leaves:
leaves.remove(leaf)
return path_leaves
@staticmethod
def _greedy_path(path):
"""
converts user-friendly asterisk path to python regexp and compiles it:
path = "" => ^$ only
path = "/" => / only
path = "/asdf/fdsa" => /asdf/fdsa only
path = "asdf/fdsa" => $MUX_ENTRY/?.*/asdf/fdsa
path = "/*/asdf" => /[^/]*/asdf
path = "asdf/*" => $MUX_ENTRY/?.*/asdf/.*
path = "/asdf/*" => /asdf/.*
FIXME: __QUESTION__: Should "/path/*/path" match only
/path/$anything/path or can multiple levels be present
(/path/$multiple/$levels/path). The first is complaint to BASH, the
second might be easier to use. Alternatively we can allow multiple
levels only when "/*/" is used.
"""
if not path:
return re.compile('^$')
if path[0] != '/':
prefix = '.*/'
else:
prefix = ''
if path[-1] == '*':
suffix = ''
path = path[:-1]
else:
suffix = '$'
return re.compile(prefix + path.replace('*', '[^/]*') + suffix)
@staticmethod
def _is_abspath(path):
""" Is this an absolute or relative path? """
if path.pattern and path.pattern[0] == '/':
return True
else:
return False
def __getattr__(self, attr):
"""
Compatibility to old Params
:warning: This will be removed soon. Use params.get() instead
"""
if attr == '__getnewargs__': # pickling uses this attr
raise AttributeError
elif attr in self.__dict__:
return self.__dict__[attr]
else:
msg = ("You're probably retrieving param %s via attributes "
" (self.params.$key) which is obsoleted. Use "
"self.params.get($key) instead." % attr)
self._log.error(msg)
self.get(attr)
def get(self, *args, **kwargs):
"""
Retrieve params
Old API: ``params.get(key, failobj=None)`` (any matching param)
New API: ``params.get(key, path=$MUX_ENTRY/*, default=None)``
As old and new API overlaps, you must use all 3 arguments or
explicitely use key argument "path" or "default".
Concerning params clashes this version only validates that only single
param or multiple params of the same values are retrieved. This will
be replaced with proper origin check in the future.
"""
def compatibility(args, kwargs):
"""
Be 100% compatible with old API while allow _SOME OF_ the new APIs
calls:
OLD: get(key), get(key, default), get(key, failobj=default)
NEW: get(key, path, default), get(key, path=path),
get(key, default=default)
:warning: We are unable to distinguish old get(key, default) vs.
new get(key, path), therefor if you want to use the new
API you must specify path/default using named arguments
or supply all 3 arguments:
get(key, path, default), get(key, path=path),
get(key, default=default).
This will be removed in final version.
"""
if len(args) < 1:
raise TypeError("Incorrect arguments: params.get(%s, %s)"
% (args, kwargs))
elif 'failobj' in kwargs:
return [args[0], '/*', kwargs['failobj']] # Old API
elif len(args) > 2 or 'default' in kwargs or 'path' in kwargs:
try:
if 'default' in kwargs:
default = kwargs['default']
elif len(args) > 2:
default = args[2]
else:
default = None
if 'path' in kwargs:
path = kwargs['path']
elif len(args) > 1:
path = args[1]
else:
path = None
key = args[0]
return [key, path, default]
except IndexError:
raise TypeError("Incorrect arguments: params.get(%s, %s)"
% (args, kwargs))
else: # Old API
if len(args) == 1:
return [args[0], '/*', None]
else:
return [args[0], '/*', args[1]]
key, path, default = compatibility(args, kwargs)
if path is None: # default path is any relative path
path = '*'
try:
return self._cache[(key, path, default)]
except KeyError:
value = self._get(key, path, default)
self.log(key, path, default, value)
self._cache[(key, path, default)] = value
return value
def _get(self, key, path, default):
"""
Actual params retrieval
:param key: key you're looking for
:param path: namespace
:param default: default value when not found
:raise KeyError: In case of multiple different values (params clash)
"""
path = self._greedy_path(path)
for param in self._rel_paths:
try:
return param.get_or_die(path, key)
except NoMatchError:
pass
if self._is_abspath(path):
try:
return self._abs_path.get_or_die(path, key)
except NoMatchError:
pass
return self._default_parmas.get(key, default)
def _get_leaf(self, path):
""" Get single leaf matching the path """
path = self._greedy_path(path)
for param in self._rel_paths:
try:
return param.get_leaf(path)
except NoMatchError:
pass
raise NoMatchError('No leaves matching "%s" pattern found in %s'
% (path.pattern, self))
def objects(self, key, path=None):
"""
Return the names of objects defined using a given key.
:param key: The name of the key whose value lists the objects
(e.g. 'nics').
"""
return self.get(path, key, "").split()
def iteritems(self):
"""
Very basic implementation which iterates through __ALL__ params,
which generates lots of duplicite entries due to inherited values.
"""
for param in self._rel_paths:
for pair in param.iteritems():
self.log(pair[0], '/*', None, pair[1])
yield pair
for pair in self._abs_path.iteritems():
yield pair
class AvocadoParam(object):
"""
This is a single slice params. It can contain multiple leaves and tries to
find matching results.
Currently it doesn't care about params origin, it requires single result
or failure. In future it'll get the origin from LeafParam and if it's the
same it'll proceed, otherwise raise exception (as it can't decide which
variable is desired)
"""
def __init__(self, leaves, name):
"""
:param leaves: this slice's leaves
:param name: this slice's name (identifier used in exceptions)
"""
# Basic initialization
self._leaves = leaves
# names cache (leaf.path is quite expensive)
self._leaf_names = [leaf.path + '/' for leaf in leaves]
self.name = name
@property
def str_leaves_variant(self):
""" String with identifier and all params """
return "%s (%s)" % (self.name, self._leaf_names)
def _get_leaves(self, path):
"""
Get all leaves matching the path
"""
return [self._leaves[i]
for i in xrange(len(self._leaf_names))
if path.match(self._leaf_names[i])]
def get_leaf(self, path):
"""
:param path: Desired path
:return: Single leaf containing the path
:raise NoMatchError: When no leaf matches the path
:raise KeyError: When multiple leaves matches the path
"""
leaves = self._get_leaves(path)
if len(leaves) == 1:
return leaves[0]
elif len(leaves) == 0:
raise NoMatchError('No leaves matchng "%s" pattern found in %s'
% (path.pattern, self.str_leaves_variant))
else:
raise KeyError('Multiple leaves matching "%s" found: %s'
% (path.pattern, self.str_leaves_variant))
def get(self, path, key, default=None):
"""
Returns value of key from $path path. Multiple matching path are
acceptable when only one of them contains the key.
"""
try:
self.get_or_die(path, key)
except NoMatchError:
return default
def get_or_die(self, path, key):
"""
Get a value or raise exception if not present
:raise NoMatchError: When no matches
:raise KeyError: When value is not certain (multiple matches)
"""
# TODO: Implement clash detection based on origin rather than value
leaves = self._get_leaves(path)
ret = [leaf.environment[key]
for leaf in leaves
if key in leaf.environment]
if len(ret) == 1:
return ret[0]
elif not ret:
raise NoMatchError("No matches to %s => %s in %s"
% (path.pattern, key, self.str_leaves_variant))
else:
raise ValueError("Multiple %s leaves contain the key '%s'; %s"
% (path.pattern, key,
["%s=>%s" % (leaf.name, leaf.environment[key])
for leaf in leaves
if key in leaf.environment]))
def iteritems(self):
"""
Very basic implementation which iterates through __ALL__ params,
which generates lots of duplicite entries due to inherited values.
"""
for leaf in self._leaves:
for pair in leaf.environment.iteritems():
yield pair
class Mux(object):
def __init__(self, args):
......@@ -101,6 +465,7 @@ class Mux(object):
self.pools = parse_yamls(mux_files, filter_only, filter_out)
else: # no variants
self.pools = None
self._mux_entry = getattr(args, 'mux_entry_point', ['/test/*'])
def get_number_of_tests(self, test_suite):
# Currently number of tests is symetrical
......@@ -115,12 +480,7 @@ class Mux(object):
i = None
for i, variant in enumerate(multiplex_pools(self.pools)):
test_factory = [template[0], template[1].copy()]
params = template[1]['params'].copy()
for node in variant:
params.update(node.environment)
params.update({'tag': i})
params.update({'id': template[1]['params']['id'] + str(i)})
test_factory[1]['params'] = params
test_factory[1]['params'] = variant
yield test_factory
if i is None: # No variants, use template
yield template
......
......@@ -31,14 +31,13 @@ if sys.version_info[:2] == (2, 6):
else:
import unittest
from avocado import sysinfo
from avocado import sysinfo, multiplexer
from avocado.core import data_dir
from avocado.core import exceptions
from avocado.utils import genio
from avocado.utils import path as utils_path
from avocado.utils import process
from avocado.utils import stacktrace
from avocado.utils.params import Params
from avocado.version import VERSION
......@@ -82,12 +81,8 @@ class Test(unittest.TestCase):
else:
self.name = self.__class__.__name__
if params is None:
params = {}
self.params = Params(params)
self._raw_params = params
self.tag = tag or None
self.tag = tag or self.params.get('tag')
self.job = job
basename = os.path.basename(self.name)
......@@ -135,34 +130,19 @@ class Test(unittest.TestCase):
self.stdout_log = logging.getLogger("avocado.test.stdout")
self.stderr_log = logging.getLogger("avocado.test.stderr")
self.log.info('START %s', self.tagged_name)
self.log.debug('')
self.log.debug('Test instance parameters:')
# Set the helper set_default to the params object
setattr(self.params, 'set_default', self._set_default)
# Apply what comes from the params dict
for key in sorted(self.params.keys()):
self.log.debug(' %s = %s', key, self.params.get(key))
self.log.debug('')
if isinstance(params, dict):
self.default_params = self.default_params.copy()
self.default_params.update(params)
params = []
elif params is None:
params = []
self.params = multiplexer.AvocadoParams(params, self.name, self.tag,
['/test/*'],
self.default_params)
# Apply what comes from the default_params dict
self.log.debug('Default parameters:')
for key in sorted(self.default_params.keys()):
self.log.debug(' %s = %s', key, self.default_params.get(key))
self.params.set_default(key, self.default_params[key])
self.log.debug('')
self.log.debug('Test instance params override defaults whenever available')
self.log.info('START %s', self.tagged_name)
self.log.debug('')
# If there's a timeout set, log a timeout reminder
if self.params.timeout:
self.log.info('Test timeout set. Will wait %.2f s for '
'PID %s to end',
float(self.params.timeout), os.getpid())
self.log.info('')
self.debugdir = None
self.resultsdir = None
self.status = None
......@@ -227,20 +207,13 @@ class Test(unittest.TestCase):
'tag', 'tagged_name', 'text_output', 'time_elapsed',
'traceback', 'workdir', 'whiteboard', 'time_start',
'time_end', 'running', 'paused', 'paused_msg',
'fail_class']
'fail_class', 'params']
state = dict([(key, self.__dict__.get(key)) for key in preserve_attr])
state['params'] = dict(self.__dict__['params'])
state['class_name'] = self.__class__.__name__
state['job_logdir'] = self.job.logdir
state['job_unique_id'] = self.job.unique_id
return state
def _set_default(self, key, default):
try:
self.params[key]
except Exception:
self.params[key] = default
def get_data_path(self, basename):
"""
Find a test dependency path inside the test data dir.
......
import UserDict
from threading import Lock
from avocado.core import exceptions
from avocado import settings
class ParamNotFound(exceptions.TestError):
pass
class ParamInvalidType(exceptions.TestError):
pass
class Params(UserDict.IterableUserDict):
"""
A dict-like object passed to every test.
"""
lock = Lock()
def __getitem__(self, key):
""" overrides the error messages of missing params[$key] """
try:
value = UserDict.IterableUserDict.__getitem__(self, key)
vtype = UserDict.IterableUserDict.get(self, "%s_type" % key)
if vtype is not None:
return settings.convert_value_type(value, vtype)
else:
return value
except KeyError:
raise ParamNotFound("Mandatory parameter '%s' is missing. "
"Check your cfg files for typos/mistakes" %
key)
except Exception, details:
raise ParamInvalidType("Parameter '%s' value '%r' failed to "
"convert to %s: %s" %
(key, value, vtype, details))
def __getattr__(self, attr):
try:
return UserDict.IterableUserDict.__getattr__(self, attr) # @UndefinedVariable
except AttributeError:
try:
return self.__getitem__(attr)
except ParamNotFound:
return None
def copy(self):
new_dict = {}
for key in self:
new_dict[key] = self[key]
return Params(new_dict)
def objects(self, key):
"""
Return the names of objects defined using a given key.
:param key: The name of the key whose value lists the objects
(e.g. 'nics').
"""
return self.get(key, "").split()
def object_params(self, obj_name):
"""
Return a dict-like object containing the parameters of an individual
object.
This method behaves as follows: the suffix '_' + obj_name is removed
from all key names that have it. Other key names are left unchanged.
The values of keys with the suffix overwrite the values of their
suffix-less versions.
:param obj_name: The name of the object (objects are listed by the
objects() method).
"""
suffix = "_" + obj_name
self.lock.acquire()
new_dict = self.data.copy()
self.lock.release()
for key in new_dict.keys():
if key.endswith(suffix):
new_key = key.split(suffix)[0]
new_dict[new_key] = new_dict[key]
return new_dict
def object_counts(self, count_key, base_name):
"""
This is a generator method: to give it the name of a count key and a
base_name, and it returns an iterator over all the values from params
"""
count = self.get(count_key, 1)
# Protect in case original is modified for some reason
cpy = self.copy()
for number in xrange(1, int(count) + 1):
key = "%s%s" % (base_name, number)
yield (key, cpy.get(key))
......@@ -11,14 +11,14 @@ class SleepTest(test.Test):
"""
Example test for avocado.
"""
default_params = {'sleep_length': 1.0}
def runTest(self):
"""
Sleep for length seconds.
"""
self.log.debug("Sleeping for %.2f seconds", self.params.sleep_length)
time.sleep(self.params.sleep_length)
sleep_length = self.params.get('sleep_length', default=1)
self.log.debug("Sleeping for %.2f seconds", sleep_length)
time.sleep(sleep_length)
if __name__ == "__main__":
......
!using : test
short:
sleep_length: 0.5
medium:
......
......@@ -80,16 +80,14 @@ class MultiplexTests(unittest.TestCase):
def test_run_mplex_passtest(self):
cmd_line = './scripts/avocado run --sysinfo=off passtest --multiplex examples/tests/sleeptest.py.data/sleeptest.yaml'
expected_rc = 0
# A typical pass has about 13 lines of output,
# so we expect the full job log has at least 4 times
# this value. If that is not the case, something is wrong with
# the output.
self.run_and_check(cmd_line, expected_rc, 13 * 4)
# Header is 2 lines + 5 lines per each test
self.run_and_check(cmd_line, expected_rc, 2 + 5 * 4)
def test_run_mplex_doublepass(self):
cmd_line = './scripts/avocado run --sysinfo=off passtest passtest --multiplex examples/tests/sleeptest.py.data/sleeptest.yaml'
# Should run 2-times 4 variants of pass test
self.run_and_check(cmd_line, expected_rc=0, expected_lines=2 * 4 * 13)
# Header is 2 lines + 5 lines per each test * 2 tests
self.run_and_check(cmd_line, expected_rc=0,
expected_lines=2 + 2 * 5 * 4)
def test_run_mplex_failtest(self):
cmd_line = './scripts/avocado run --sysinfo=off passtest failtest --multiplex examples/tests/sleeptest.py.data/sleeptest.yaml'
......@@ -101,11 +99,9 @@ class MultiplexTests(unittest.TestCase):
'examples/tests/sleeptest.py.data/sleeptest.yaml '
'examples/tests/sleeptest.py.data/sleeptest.yaml')
expected_rc = 0
# A typical pass has about 13 lines of output,
# so we expect the full job log has at least 4 times
# this value. If that is not the case, something is wrong with
# the output.
self.run_and_check(cmd_line, expected_rc, 13 * 4)
# Header is 2 lines + 5 lines per each test (mux files are merged thus
# only 1x4 variants are generated as in mplex_doublepass test)
self.run_and_check(cmd_line, expected_rc, 2 + 5 * 4)
def test_run_mplex_params(self):
cmd_line = ('./scripts/avocado run --sysinfo=off examples/tests/env_variables.sh '
......
import unittest
from avocado.utils import params
BASE_DICT = {
'image_boot': 'yes',
'image_boot_stg': 'no',
'image_chain': '',
'image_clone_command': 'cp --reflink=auto %s %s',
'image_format': 'qcow2',
'image_format_stg': 'qcow2',
'image_name': 'images/f18-64',
'image_name_stg': 'enospc',
'image_raw_device': 'no',
'image_remove_command': 'rm -rf %s',
'image_size': '10G',
'image_snapshot_stg': 'no',
'image_unbootable_pattern': 'Hard Disk.*not a bootable disk',
'image_verify_bootable': 'yes',
'images': 'image1 stg',
}
CORRECT_RESULT_MAPPING = {"image1": {'image_boot_stg': 'no',
'image_snapshot_stg': 'no',
'image_chain': '',
'image_unbootable_pattern': 'Hard Disk.*not a bootable disk',
'image_name': 'images/f18-64',
'image_remove_command': 'rm -rf %s',
'image_name_stg': 'enospc',
'image_clone_command': 'cp --reflink=auto %s %s',
'image_size': '10G', 'images': 'image1 stg',
'image_raw_device': 'no',
'image_format': 'qcow2',
'image_boot': 'yes',
'image_verify_bootable': 'yes',
'image_format_stg': 'qcow2'},
"stg": {'image_snapshot': 'no',
'image_boot_stg': 'no',
'image_snapshot_stg': 'no',
'image_chain': '',
'image_unbootable_pattern': 'Hard Disk.*not a bootable disk',
'image_name': 'enospc',
'image_remove_command': 'rm -rf %s',
'image_name_stg': 'enospc',
'image_clone_command': 'cp --reflink=auto %s %s',
'image_size': '10G',
'images': 'image1 stg',
'image_raw_device': 'no',
'image_format': 'qcow2',
'image_boot': 'no',
'image_verify_bootable': 'yes',
'image_format_stg': 'qcow2'}}
class TestParams(unittest.TestCase):
def setUp(self):
self.params = params.Params(BASE_DICT)
def testObjects(self):
self.assertEquals(self.params.objects("images"), ['image1', 'stg'])
def testObjectsParams(self):
for key in CORRECT_RESULT_MAPPING.keys():
self.assertEquals(self.params.object_params(key),
CORRECT_RESULT_MAPPING[key])
def testGetItemMissing(self):
try:
self.params['bogus']
raise ValueError("Did not get a ParamNotFound error when trying "
"to access a non-existing param")
# pylint: disable=E0712
except params.ParamNotFound:
pass
def testGetItem(self):
self.assertEqual(self.params['image_size'], "10G")
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册