#!/usr/bin/env python3 import argparse import base64 import collections import inspect import io import json import multiprocessing import os import socket import subprocess import sys import tempfile import time import unittest try: import pkg_resources PKG_RESOURCES_AVAILABLE = True except ImportError: PKG_RESOURCES_AVAILABLE = False #: The amount of time (in seconds) between each internal status check RUNNER_RUN_CHECK_INTERVAL = 0.01 #: The amount of time (in seconds) between a status report from a #: runner that performs its work asynchronously RUNNER_RUN_STATUS_INTERVAL = 0.5 #: All known runner commands, capable of being used by a #: SpawnMethod.STANDALONE_EXECUTABLE compatible spawners RUNNERS_REGISTRY_STANDALONE_EXECUTABLE = {} #: All known runner Python classes. This is a dictionary keyed by a #: runnable kind, and value is a class that inherits from #: :class:`BaseRunner`. Suitable for spawners compatible with #: SpawnMethod.PYTHON_CLASS RUNNERS_REGISTRY_PYTHON_CLASS = {} def check_tasks_requirements(tasks, runners_registry=None): """ Checks if tasks have runner requirements fulfilled :param tasks: the tasks whose runner requirements will be checked :type tasks: list of :class:`Task` :param runners_registry: a registry with previously found (and not found) runners keyed by a task's runnable kind. Defaults to :attr:`RUNNERS_REGISTRY_STANDALONE_EXECUTABLE` :type runners_registry: dict :return: two list of tasks in a tuple, with the first being the tasks that pass the requirements check and the second the tasks that fail the requirements check :rtype: tuple of (list, list) """ if runners_registry is None: runners_registry = RUNNERS_REGISTRY_STANDALONE_EXECUTABLE ok = [] missing = [] for task in tasks: runner = task.runnable.pick_runner_command(runners_registry) if runner: ok.append(task) else: missing.append(task) return (ok, missing) class Runnable: """ Describes an entity that be executed in the context of a task A instance of :class:`BaseRunner` is the entity that will actually execute a runnable. """ def __init__(self, kind, uri, *args, **kwargs): self.kind = kind self.uri = uri self.args = args self.tags = kwargs.pop('tags', None) self.requirements = kwargs.pop('requirements', None) self.kwargs = kwargs def __repr__(self): fmt = ('') return fmt.format(self.kind, self.uri, self.args, self.kwargs, self.tags, self.requirements) @classmethod def from_args(cls, args): """Returns a runnable from arguments""" decoded_args = [_arg_decode_base64(arg) for arg in args.get('arg', ())] return cls(args.get('kind'), args.get('uri'), *decoded_args, **_key_val_args_to_kwargs(args.get('kwargs', []))) @classmethod def from_recipe(cls, recipe_path): """ Returns a runnable from a runnable recipe file :param recipe_path: Path to a recipe file :rtype: instance of :class:`Runnable` """ with open(recipe_path) as recipe_file: recipe = json.load(recipe_file) return cls(recipe.get('kind'), recipe.get('uri'), *recipe.get('args', ()), **recipe.get('kwargs', {})) def get_command_args(self): """ Returns the command arguments that adhere to the runner interface This is useful for building 'runnable-run' and 'task-run' commands that can be executed on a command line interface. :returns: the arguments that can be used on an avocado-runner command :rtype: list """ args = ['-k', self.kind] if self.uri is not None: args.append('-u') args.append(self.uri) for arg in self.args: args.append('-a') if arg.startswith('-'): arg = 'base64:%s' % base64.b64encode(arg.encode()).decode('ascii') args.append(arg) if self.tags is not None: args.append('tags=json:%s' % json.dumps(self.get_serializable_tags())) for key, val in self.kwargs.items(): if not isinstance(val, str) or isinstance(val, int): val = "json:%s" % json.dumps(val) args.append('%s=%s' % (key, val)) return args def get_dict(self): """ Returns a dictionary representation for the current runnable This is usually the format that will be converted to a format that can be serialized to disk, such as JSON. :rtype: :class:`collections.OrderedDict` """ recipe = collections.OrderedDict(kind=self.kind) if self.uri is not None: recipe['uri'] = self.uri if self.args is not None: recipe['args'] = self.args kwargs = self.kwargs.copy() if self.tags is not None: kwargs['tags'] = self.get_serializable_tags() if kwargs: recipe['kwargs'] = kwargs return recipe def get_json(self): """ Returns a JSON representation :rtype: str """ return json.dumps(self.get_dict()) def get_serializable_tags(self): tags = {} # sets are not serializable in json for key, val in self.tags.items(): if isinstance(val, set): val = list(val) tags[key] = val return tags def write_json(self, recipe_path): """ Writes a file with a JSON representation (also known as a recipe) """ with open(recipe_path, 'w') as recipe_file: recipe_file.write(self.get_json()) def is_kind_supported_by_runner_command(self, runner_command): """Checks if a runner command that seems a good fit declares support.""" cmd = runner_command + ['capabilities'] try: process = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) except (FileNotFoundError, PermissionError): return False out, _ = process.communicate() try: capabilities = json.loads(out.decode()) except json.decoder.JSONDecodeError: return False return self.kind in capabilities.get('runnables', []) def pick_runner_command(self, runners_registry=None): """Selects a runner command based on the runner. And when finding a suitable runner, keeps found runners in registry. This utility function will look at the given task and try to find a matching runner. The matching runner probe results are kept in a registry (that is modified by this function) so that further executions take advantage of previous probes. This is related to the :data:`SpawnMethod.STANDALONE_EXECUTABLE` :param runners_registry: a registry with previously found (and not found) runners keyed by runnable kind :param runners_registry: dict :returns: command line arguments to execute the runner :rtype: list of str or None """ if runners_registry is None: runners_registry = RUNNERS_REGISTRY_STANDALONE_EXECUTABLE runner_cmd = runners_registry.get(self.kind) if runner_cmd is False: return None if runner_cmd is not None: return runner_cmd standalone_executable_cmd = ['avocado-runner-%s' % self.kind] if self.is_kind_supported_by_runner_command(standalone_executable_cmd): runners_registry[self.kind] = standalone_executable_cmd return standalone_executable_cmd # attempt to find Python module files that are named after the # runner convention within the avocado.core namespace dir. # Looking for the file only avoids an attempt to load the module # and should be a lot faster core_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) module_name = self.kind.replace('-', '_') module_filename = 'nrunner_%s.py' % module_name if os.path.exists(os.path.join(core_dir, module_filename)): full_module_name = 'avocado.core.%s' % module_name candidate_cmd = [sys.executable, '-m', full_module_name] if self.is_kind_supported_by_runner_command(candidate_cmd): runners_registry[self.kind] = candidate_cmd return candidate_cmd # exhausted probes, let's save the negative on the cache and avoid # future similar problems runners_registry[self.kind] = False def pick_runner_class_from_entry_point(self): """Selects a runner class from entry points based on kind. This is related to the :data:`SpawnMethod.PYTHON_CLASS`. This completements the :data:`RUNNERS_REGISTRY_PYTHON_CLASS` on systems that have setuptools available. :returns: a class that inherits from :class:`BaseRunner` or None """ if not PKG_RESOURCES_AVAILABLE: return namespace = 'avocado.plugins.runnable.runner' for ep in pkg_resources.iter_entry_points(namespace): if ep.name == self.kind: try: obj = ep.load() return obj except ImportError: return def pick_runner_class(self, runners_registry=None): """Selects a runner class from the registry based on kind. This is related to the :data:`SpawnMethod.PYTHON_CLASS` :param runners_registry: a registry with previously registered runner classes, keyed by runnable kind :param runners_registry: dict :returns: a class that inherits from :class:`BaseRunner` :raises: ValueError if kind there's no runner from kind of runnable """ if runners_registry is None: runners_registry = RUNNERS_REGISTRY_PYTHON_CLASS runner = runners_registry.get(self.kind, None) if runner is None: runner = self.pick_runner_class_from_entry_point() if runner is not None: return runner raise ValueError('Unsupported kind of runnable: %s' % self.kind) class BaseRunner: """ Base interface for a Runner """ def __init__(self, runnable): self.runnable = runnable def prepare_status(self, status_type, additional_info=None): """Prepare a status dict with some basic information. This will add the keyword 'status' and 'time' to all status. :param: status_type: The type of event ('started', 'running', 'finished') :param: addional_info: Any additional information that you would like to add to the dict. This must be a dict. :rtype: dict """ status = {'status': status_type, 'time': time.time()} if isinstance(additional_info, dict): status.update(additional_info) return status def run(self): yield {} class NoOpRunner(BaseRunner): """ Sample runner that performs no action before reporting FINISHED status Runnable attributes usage: * uri: not used * args: not used """ def run(self): yield self.prepare_status('started') yield self.prepare_status('finished', {'result': 'pass'}) RUNNERS_REGISTRY_PYTHON_CLASS['noop'] = NoOpRunner class ExecRunner(BaseRunner): """ Runner for standalone executables with or without arguments Runnable attributes usage: * uri: path to a binary to be executed as another process * args: arguments to be given on the command line to the binary given by path * kwargs: key=val to be set as environment variables to the process """ def run(self): env = None if self.runnable.kwargs: current = dict(os.environ) current.update(self.runnable.kwargs) env = current process = subprocess.Popen( [self.runnable.uri] + list(self.runnable.args), stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) yield self.prepare_status('started') most_current_execution_state_time = None while process.poll() is None: time.sleep(RUNNER_RUN_CHECK_INTERVAL) now = time.time() if most_current_execution_state_time is not None: next_execution_state_mark = (most_current_execution_state_time + RUNNER_RUN_STATUS_INTERVAL) if (most_current_execution_state_time is None or now > next_execution_state_mark): most_current_execution_state_time = now yield self.prepare_status('running') stdout = process.stdout.read() process.stdout.close() stderr = process.stderr.read() process.stderr.close() return_code = process.returncode yield self.prepare_status('finished', {'returncode': return_code, 'stdout': stdout, 'stderr': stderr}) RUNNERS_REGISTRY_PYTHON_CLASS['exec'] = ExecRunner class ExecTestRunner(ExecRunner): """ Runner for standalone executables treated as tests This is similar in concept to the Avocado "SIMPLE" test type, in which an executable returning 0 means that a test passed, and anything else means that a test failed. Runnable attributes usage is identical to :class:`ExecRunner` """ def run(self): for most_current_execution_state in super(ExecTestRunner, self).run(): if 'returncode' in most_current_execution_state: if most_current_execution_state['returncode'] == 0: most_current_execution_state['result'] = 'pass' else: most_current_execution_state['result'] = 'fail' yield most_current_execution_state RUNNERS_REGISTRY_PYTHON_CLASS['exec-test'] = ExecTestRunner class PythonUnittestRunner(BaseRunner): """ Runner for Python unittests The runnable uri is used as the test name that the native unittest TestLoader will use to find the test. A native unittest test runner (TextTestRunner) will be used to execute the test. Runnable attributes usage: * uri: a "dotted name" that can be given to Python standard library's :meth:`unittest.TestLoader.loadTestsFromName` method. While it's not enforced, it's highly recommended that this is "a test method within a test case class" within a test module. Example is: "module.Class.test_method". * args: not used * kwargs: not used """ @staticmethod def _uri_to_unittest_name(uri): if ':' in uri: module, class_method = uri.rsplit(':', 1) else: module = uri class_method = None if module.endswith('.py'): module = module[:-3] if module.startswith(os.path.curdir): module = module[1:] if module.startswith(os.path.sep): module = module[1:] module = module.replace(os.path.sep, ".") if class_method: return '%s.%s' % (module, class_method) return module @staticmethod def _run_unittest(uri, queue): sys.path.insert(0, ".") stream = io.StringIO() unittest_name = PythonUnittestRunner._uri_to_unittest_name(uri) suite = unittest.TestLoader().loadTestsFromName(unittest_name) runner = unittest.TextTestRunner(stream=stream, verbosity=0) unittest_result = runner.run(suite) time_end = time.time() if len(unittest_result.errors) > 0: result = 'error' elif len(unittest_result.failures) > 0: result = 'fail' elif len(unittest_result.skipped) > 0: result = 'skip' else: result = 'pass' stream.seek(0) output = {'status': 'finished', 'result': result, 'output': stream.read(), 'time': time_end} stream.close() queue.put(output) def run(self): if not self.runnable.uri: error_msg = 'uri is required but was not given' yield self.prepare_status('finished', {'result': 'error', 'output': error_msg}) return queue = multiprocessing.SimpleQueue() process = multiprocessing.Process(target=self._run_unittest, args=(self.runnable.uri, queue)) process.start() yield self.prepare_status('started') most_current_execution_state_time = None while queue.empty(): time.sleep(RUNNER_RUN_CHECK_INTERVAL) now = time.time() if most_current_execution_state_time is not None: next_execution_state_mark = (most_current_execution_state_time + RUNNER_RUN_STATUS_INTERVAL) if (most_current_execution_state_time is None or now > next_execution_state_mark): most_current_execution_state_time = now yield self.prepare_status('running') yield queue.get() RUNNERS_REGISTRY_PYTHON_CLASS['python-unittest'] = PythonUnittestRunner def _parse_key_val(argument): key_value = argument.split('=', 1) if len(key_value) < 2: msg = ('Invalid keyword parameter: "%s". Valid option must ' 'be a "KEY=VALUE" like expression' % argument) raise argparse.ArgumentTypeError(msg) return tuple(key_value) def _arg_decode_base64(arg): """ Decode arguments possibly encoded as base64 :param arg: the possibly encoded argument :type arg: str :returns: the decoded argument :rtype: str """ prefix = 'base64:' if arg.startswith(prefix): content = arg[len(prefix):] return base64.decodebytes(content.encode()).decode() return arg def _kwarg_decode_json(value): """ Decode arguments possibly encoded as base64 :param value: the possibly encoded argument :type value: str :returns: the decoded keyword argument as Python object """ prefix = 'json:' if value.startswith(prefix): content = value[len(prefix):] return json.loads(content) return value def _key_val_args_to_kwargs(kwargs): result = {} for key, val in kwargs: result[key] = _kwarg_decode_json(val) return result class StatusEncoder(json.JSONEncoder): # pylint: disable=E0202 def default(self, o): if isinstance(o, bytes): return {'__base64_encoded__': base64.b64encode(o).decode('ascii')} return json.JSONEncoder.default(self, o) def json_dumps(data): return json.dumps(data, ensure_ascii=True, cls=StatusEncoder) class TaskStatusService: """ Implementation of interface that a task can use to post status updates TODO: make the interface generic and this just one of the implementations """ def __init__(self, uri): self.uri = uri self.connection = None def post(self, status): host, port = self.uri.split(':') port = int(port) if self.connection is None: self.connection = socket.create_connection((host, port)) data = json_dumps(status) self.connection.send(data.encode('ascii') + "\n".encode('ascii')) def close(self): if self.connection is not None: self.connection.close() def __repr__(self): return ''.format(self.uri) class Task: """ Wraps the execution of a runnable While a runnable describes what to be run, and gets run by a runner, a task should be a unique entity to track its state, that is, whether it is pending, is running or has finished. :param identifier: :param runnable: """ def __init__(self, identifier, runnable, status_uris=None, known_runners=None): self.identifier = identifier self.runnable = runnable self.status_services = [] if status_uris is not None: for status_uri in status_uris: self.status_services.append(TaskStatusService(status_uri)) if known_runners is None: known_runners = {} self.known_runners = known_runners self.spawn_handle = None self.output_dir = None def __repr__(self): fmt = '