未验证 提交 1831c239 编写于 作者: L Lukáš Doktor

Merging pull request 2260

Signed-off-by: NLukáš Doktor <ldoktor@redhat.com>

* https://github.com/avocado-framework/avocado:
  Python 3 port: use six.string_types on isinstance checks
  Python 3 port: adapt to range differences
  Python 3 port: use next() builtin function
  Python 3 port: fix "unclosed file" conditions in avocado.core.{safe,}loader
  selftests/unit/test_test.py: only create fake tests where used
  selftests/unit/test_utils_stacktrace.py: extract raise exception test
  avocado/core/jobdata_compat_36_to_52.py: remove unused code
  avocado/core/loader.py: simplify and improve detection of Python files
  avocado/utils/path.py: optmize is_python method
......@@ -28,6 +28,7 @@ import time
import traceback
from six import iteritems
from six.moves import xrange as range
from . import version
from . import data_dir
......@@ -310,7 +311,7 @@ class Job(object):
if not getattr(self.args, "dry_run", False):
return suite
for i in xrange(len(suite)):
for i in range(len(suite)):
suite[i] = [test.DryRunTest, suite[i][1]]
return suite
......
......@@ -30,7 +30,6 @@ class MuxTree(mux.MuxPlugin):
Excerpt of MuxTree object in order to make it compatible with 52
"""
pools = []
filters = [None, None]
def __iter__(self):
"""
......@@ -46,7 +45,7 @@ class MuxTree(mux.MuxPlugin):
while True:
# TODO: Implement 2nd level filters here
# TODO: This part takes most of the time, optimize it
yield list(itertools.chain(*pools.next()))
yield list(itertools.chain(*next(pools)))
class AvocadoParams(varianter.AvocadoParams):
......@@ -64,7 +63,6 @@ class AvocadoParams(varianter.AvocadoParams):
:param mux_path: list of entry points
:param default_params: dict of params used when no matches found
"""
del tag
super(AvocadoParams, self).__init__(leaves, test_id, mux_path,
default_params)
......
......@@ -25,13 +25,12 @@ import re
import shlex
import sys
from six import iteritems
from six import string_types, iteritems
from . import data_dir
from . import output
from . import test
from . import safeloader
from ..utils import path
from ..utils import stacktrace
from .settings import settings
from .output import LOG_UI
......@@ -322,7 +321,7 @@ class TestLoaderProxy(object):
test_path = test_parameters.pop('modulePath')
else:
test_path = None
if isinstance(test_class, str):
if isinstance(test_class, string_types):
module_name = os.path.basename(test_path).split('.')[0]
test_module_dir = os.path.abspath(os.path.dirname(test_path))
# Tests with local dir imports need this
......@@ -373,11 +372,11 @@ class TestLoader(object):
"'allowed_test_types' in the plugin."
% (self.name, self.name, self.name, types))
raise LoaderError(msg)
elif mapping.itervalues().next() != types:
elif next(mapping.itervalues()) != types:
raise LoaderError("Loader '%s' doesn't support test type '%s',"
" it supports only '%s'"
% (self.name, types,
mapping.itervalues().next()))
next(mapping.itervalues())))
if "loader_options" in extra_params:
raise LoaderError("Loader '%s' doesn't support 'loader_options', "
"please don't use --loader %s:%s"
......@@ -552,13 +551,13 @@ class FileLoader(TestLoader):
# Instrumented tests are defined as string and loaded at the
# execution time.
for tst in tests:
if not isinstance(tst[0], str):
if not isinstance(tst[0], string_types):
return None
else:
test_class = (key for key, value in iteritems(mapping)
if value == self.test_type).next()
test_class = next(key for key, value in iteritems(mapping)
if value == self.test_type)
for tst in tests:
if (isinstance(tst[0], str) or
if (isinstance(tst[0], string_types) or
not issubclass(tst[0], test_class)):
return None
return tests
......@@ -648,7 +647,8 @@ class FileLoader(TestLoader):
if os.path.isdir(path):
path = os.path.join(path, "__init__.py")
mod = ast.parse(open(path).read(), path)
with open(path) as source_file:
mod = ast.parse(source_file.read(), path)
for statement in mod.body:
# Looking for a 'from avocado import Test'
......@@ -844,7 +844,7 @@ class FileLoader(TestLoader):
if avocado_tests:
test_factories = []
for test_class, info in avocado_tests.items():
if isinstance(test_class, str):
if isinstance(test_class, string_types):
for test_method, tags in info:
name = test_name + \
':%s.%s' % (test_class, test_method)
......@@ -928,8 +928,7 @@ class FileLoader(TestLoader):
if os.access(test_path, os.R_OK) is False:
return make_broken(AccessDeniedPath, test_path, "Is not "
"readable")
path_analyzer = path.PathInspector(test_path)
if path_analyzer.is_python():
if test_path.endswith('.py'):
return self._make_existing_file_tests(test_path, make_broken,
subtests_filter)
else:
......
......@@ -25,6 +25,8 @@ import collections
import itertools
import re
from six.moves import xrange as range
from . import tree
from . import varianter
......@@ -95,7 +97,7 @@ class MuxTree(object):
pools.append([pool])
variants = itertools.product(*pools)
while True:
yield list(itertools.chain(*variants.next()))
yield list(itertools.chain(*next(variants)))
@staticmethod
def _valid_variant(variant):
......@@ -127,7 +129,7 @@ class MuxTree(object):
remove = 0
path = node.path + '/'
ppath = path.rsplit('/', 2)[0] + '/'
for i in xrange(len(filter_only)):
for i in range(len(filter_only)):
level = filter_only[i].count('/')
if level < max(keep, remove):
continue
......
......@@ -585,7 +585,7 @@ def add_log_handler(logger, klass=logging.StreamHandler, stream=sys.stdout,
logger = logging.getLogger(logger)
handler = klass(stream)
handler.setLevel(level)
if isinstance(fmt, str):
if isinstance(fmt, string_types):
fmt = logging.Formatter(fmt=fmt)
handler.setFormatter(fmt)
logger.addHandler(handler)
......
......@@ -134,7 +134,8 @@ def find_class_and_methods(path, method_pattern=None, base_class=None):
return base_class_name in base_ids
result = {}
mod = ast.parse(open(path).read(), path)
with open(path) as source_file:
mod = ast.parse(source_file.read(), path)
modules = modules_imported_as(mod)
for statement in mod.body:
......
......@@ -25,6 +25,8 @@ try:
except ImportError:
import configparser as ConfigParser
from six import string_types
from ..utils import path
if 'VIRTUAL_ENV' in os.environ:
......@@ -98,7 +100,7 @@ def convert_value_type(value, value_type):
except Exception:
sval = value
if isinstance(value_type, str):
if isinstance(value_type, string_types):
if value_type == 'str':
value_type = str
elif value_type == 'bool':
......
......@@ -38,7 +38,7 @@ import itertools
import locale
import os
from six import iterkeys, iteritems
from six import string_types, iterkeys, iteritems
from . import output
......@@ -172,7 +172,7 @@ class TreeNode(object):
def __eq__(self, other):
""" Compares node to other node or string to name of this node """
if isinstance(other, str): # Compare names
if isinstance(other, string_types): # Compare names
if self.name == other:
return True
else:
......
......@@ -23,6 +23,7 @@ import hashlib
import re
from six import iterkeys, iteritems, itervalues
from six.moves import xrange as range
from . import tree
from . import dispatcher
......@@ -278,7 +279,7 @@ class AvocadoParam(object):
Get all leaves matching the path
"""
return [self._leaves[i]
for i in xrange(len(self._leaf_names))
for i in range(len(self._leaf_names))
if path.search(self._leaf_names[i])]
def get_or_die(self, path, key):
......
......@@ -18,6 +18,8 @@ import os
import re
import sys
from six.moves import xrange as range
from avocado.core import exit_codes
from avocado.core import jobdata
from avocado.core import status
......@@ -129,7 +131,7 @@ class Replay(CLI):
# Now add _tests that were not executed
skipped_test = {"test": "UNKNOWN", "status": "INTERRUPTED"}
return [_tests[i] if i in _tests else skipped_test
for i in xrange(1, max(max_index, no_tests) + 1)]
for i in range(1, max(max_index, no_tests) + 1)]
def _create_replay_map(self, resultsdir, replay_filter):
"""
......@@ -143,7 +145,7 @@ class Replay(CLI):
with open(json_results, 'r') as json_file:
results = json.loads(json_file.read())
tests = results["tests"]
for _ in xrange(results["total"] + 1 - len(tests)):
for _ in range(results["total"] + 1 - len(tests)):
tests.append({"test": "UNKNOWN", "status": "INTERRUPTED"})
else:
# get partial results from tap
......
......@@ -22,6 +22,8 @@ import random
import string
import tempfile
from six.moves import xrange as range
_RAND_POOL = random.SystemRandom()
log = logging.getLogger('avocado.test')
......@@ -72,11 +74,11 @@ def make_dir_and_populate(basedir='/tmp'):
path = tempfile.mkdtemp(prefix='avocado_' + __name__,
dir=basedir)
n_files = _RAND_POOL.randint(100, 150)
for _ in xrange(n_files):
for _ in range(n_files):
fd, _ = tempfile.mkstemp(dir=path, text=True)
str_length = _RAND_POOL.randint(30, 50)
n_lines = _RAND_POOL.randint(5, 7)
for _ in xrange(n_lines):
for _ in range(n_lines):
os.write(fd, generate_random_string(str_length))
os.close(fd)
except OSError as details:
......
......@@ -23,6 +23,8 @@ __version__ = 'SPARK-0.7 (pre-alpha-7)'
import re
from six.moves import xrange as range
def _namelist(instance):
namelist, namedict, classlist = [], {}, [instance.__class__]
......@@ -326,7 +328,7 @@ class GenericParser:
self.states = {0: self.makeState0()}
self.makeState(0, self._BOF)
for i in xrange(len(tokens)):
for i in range(len(tokens)):
sets.append([])
if sets[i] == []:
......
......@@ -25,6 +25,9 @@ import os
import re
import shutil
import time
from six.moves import xrange as range
from . import process
......@@ -174,7 +177,7 @@ def vg_ramdisk_cleanup(ramdisk_filename=None, vg_ramdisk_dir=None,
if ramdisk_filename is not None:
ramdisk_filename = ramdisk_filename.group(1)
for _ in xrange(10):
for _ in range(10):
result = process.run("losetup -d %s" % loop_device,
ignore_status=True, sudo=True)
if "resource busy" not in result.stderr:
......@@ -194,7 +197,7 @@ def vg_ramdisk_cleanup(ramdisk_filename=None, vg_ramdisk_dir=None,
if vg_ramdisk_dir is not None:
if not process.system("mountpoint %s" % vg_ramdisk_dir,
ignore_status=True):
for _ in xrange(10):
for _ in range(10):
result = process.run("umount %s" % vg_ramdisk_dir,
ignore_status=True, sudo=True)
time.sleep(0.1)
......
......@@ -22,7 +22,6 @@ import tempfile
from . import aurl
PY_EXTENSIONS = ['.py']
SHEBANG = '#!'
......@@ -136,10 +135,8 @@ class PathInspector(object):
return False
def is_python(self):
for extension in PY_EXTENSIONS:
if self.path.endswith(extension):
return True
if self.path.endswith('.py'):
return True
return self.is_script(language='python')
......
......@@ -122,7 +122,7 @@ The following pages document the private APIs of optional Avocado plugins.
:maxdepth: 1
""")
for path in os.walk(optional_plugins_path).next()[1]:
for path in next(os.walk(optional_plugins_path))[1]:
name = "avocado_%s" % os.path.basename(path)
try:
importlib.import_module(name)
......
import time
from six.moves import xrange as range
from avocado.core.output import LOG_UI
from avocado.core.settings import settings
from avocado.core.plugin_interfaces import JobPre, JobPost
......@@ -17,7 +19,7 @@ class Sleep(JobPre, JobPost):
default=3)
def sleep(self, job):
for i in xrange(1, self.seconds + 1):
for i in range(1, self.seconds + 1):
LOG_UI.info("Sleeping %2i/%s", i, self.seconds)
time.sleep(1)
......
......@@ -2,6 +2,8 @@
import os
from six.moves import xrange as range
from avocado import Test
from avocado import main
from avocado.utils import gdb
......@@ -58,10 +60,10 @@ class GdbTest(Test):
self.log.info("Testing execution of multiple GDB instances")
process_count = 10
gdb_instances = []
for i in xrange(0, process_count):
for i in range(0, process_count):
gdb_instances.append(gdb.GDB())
for i in xrange(0, process_count):
for i in range(0, process_count):
self.assertEqual(gdb_instances[i].exit(), 0)
def test_existing_commands_raw(self):
......@@ -213,7 +215,7 @@ class GdbTest(Test):
s = gdb.GDBServer()
g = gdb.GDB()
for i in xrange(0, 100):
for i in range(0, 100):
r = g.connect(s.port)
self.assertEqual(r.result.class_, 'connected')
r = g.disconnect()
......@@ -295,7 +297,7 @@ class GdbTest(Test):
self.log.info("Testing execution of multiple GDB server instances")
process_count = 10
server_instances = []
for i in xrange(0, process_count):
for i in range(0, process_count):
s = gdb.GDBServer()
c = gdb.GDB()
c.connect(s.port)
......@@ -303,7 +305,7 @@ class GdbTest(Test):
c.disconnect()
server_instances.append(s)
for i in xrange(0, process_count):
for i in range(0, process_count):
self.assertTrue(self.is_process_alive(server_instances[i].process))
server_instances[i].exit()
self.assertFalse(self.is_process_alive(server_instances[i].process))
......
......@@ -3,6 +3,8 @@
import os
import time
from six.moves import xrange as range
from avocado import main
from avocado import Test
......@@ -27,7 +29,7 @@ class SleepTenMin(Test):
length = int(self.params.get('sleep_length', default=600))
method = self.params.get('sleep_method', default='builtin')
for _ in xrange(0, cycles):
for _ in range(0, cycles):
self.log.debug("Sleeping for %.2f seconds", length)
if method == 'builtin':
time.sleep(length)
......
......@@ -3,6 +3,8 @@
import base64
import os
from six.moves import xrange as range
from avocado import Test
from avocado import main
......@@ -36,7 +38,7 @@ class WhiteBoard(Test):
iterations = int(self.params.get('whiteboard_writes', default=1))
result = ''
for _ in xrange(0, iterations):
for _ in range(0, iterations):
result += data
self.whiteboard = base64.encodestring(result)
......
......@@ -19,6 +19,7 @@ from StringIO import StringIO
from lxml import etree
from six import iteritems
from six.moves import xrange as range
from avocado.core import exit_codes
from avocado.utils import astring
......@@ -508,7 +509,7 @@ class RunnerOperationTest(unittest.TestCase):
avocado_process = process.SubProcess(cmd_line)
avocado_process.start()
link = os.path.join(self.tmpdir, 'latest')
for trial in xrange(0, 50):
for trial in range(0, 50):
time.sleep(0.1)
if os.path.exists(link) and os.path.islink(link):
avocado_process.wait()
......@@ -529,7 +530,7 @@ class RunnerOperationTest(unittest.TestCase):
self.assertEqual(result['job_id'], u'0' * 40)
# Check if all tests were skipped
self.assertEqual(result['cancel'], 4)
for i in xrange(4):
for i in range(4):
test = result['tests'][i]
self.assertEqual(test['fail_reason'],
u'Test cancelled due to --dry-run')
......
......@@ -188,6 +188,9 @@ class LoaderTestFunctional(unittest.TestCase):
def test_pass(self):
self._test('passtest.py', AVOCADO_TEST_OK, 'INSTRUMENTED')
def test_not_python_module(self):
self._test('passtest', AVOCADO_TEST_OK, 'NOT_A_TEST')
@unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
"Skipping test that take a long time to run, are "
"resource intensive or time sensitve")
......
......@@ -4,7 +4,7 @@ avocado.utils.lv_utils selftests
:copyright: 2016 Red Hat, Inc
"""
from __future__ import print_function
from avocado.utils import process, lv_utils
import glob
import os
import sys
......@@ -12,6 +12,10 @@ import shutil
import tempfile
import unittest
from six.moves import xrange as range
from avocado.utils import process, lv_utils
class LVUtilsTest(unittest.TestCase):
......@@ -55,7 +59,7 @@ class LVUtilsTest(unittest.TestCase):
disk = disks.pop()
self.assertEqual(lv_utils.get_diskspace(disk), "8388608")
except BaseException:
for _ in xrange(10):
for _ in range(10):
res = process.run("rmmod scsi_debug", ignore_status=True,
sudo=True)
if not res.exit_status:
......@@ -63,7 +67,7 @@ class LVUtilsTest(unittest.TestCase):
break
else:
print("Fail to remove scsi_debug: %s" % res)
for _ in xrange(10):
for _ in range(10):
res = process.run("rmmod scsi_debug", ignore_status=True,
sudo=True)
if not res.exit_status:
......
......@@ -8,6 +8,8 @@ import tempfile
import time
import unittest
from six.moves import xrange as range
from avocado.utils.filelock import FileLock
from avocado.utils.stacktrace import prepare_exc_info
from avocado.utils import process
......@@ -176,7 +178,7 @@ class FileLockTest(unittest.TestCase):
def test_filelock(self):
# Calculate the timeout based on t_100_iter + 2e-5*players
start = time.time()
for _ in xrange(100):
for _ in range(100):
with FileLock(self.tmpdir):
pass
timeout = 0.02 + (time.time() - start)
......
......@@ -224,8 +224,8 @@ class TestMuxTree(unittest.TestCase):
mux2.initialize_mux(tree2, "", False)
mux1.update_defaults(tree.TreeNode())
mux2.update_defaults(tree.TreeNode())
variant1 = iter(mux1).next()
variant2 = iter(mux2).next()
variant1 = next(iter(mux1))
variant2 = next(iter(mux2))
self.assertNotEqual(variant1, variant2)
self.assertEqual(str(variant1), "{'mux_path': '', 'variant': "
"[TreeNode(name='child1'), TreeNode(name="
......@@ -292,11 +292,11 @@ class TestAvocadoParams(unittest.TestCase):
yamls = yaml_to_mux.create_from_yaml(["/:" + PATH_PREFIX +
'selftests/.data/mux-selftest-params.yaml'])
self.yamls = iter(mux.MuxTree(yamls))
self.params1 = varianter.AvocadoParams(self.yamls.next(), 'Unittest1',
self.params1 = varianter.AvocadoParams(next(self.yamls), 'Unittest1',
['/ch0/*', '/ch1/*'], {})
self.yamls.next() # Skip 2nd
self.yamls.next() # and 3rd
self.params2 = varianter.AvocadoParams(self.yamls.next(), 'Unittest2',
next(self.yamls) # Skip 2nd
next(self.yamls) # and 3rd
self.params2 = varianter.AvocadoParams(next(self.yamls), 'Unittest2',
['/ch1/*', '/ch0/*'], {})
@unittest.skipIf(not yaml_to_mux.MULTIPLEX_CAPABLE, "Not multiplex capable")
......
......@@ -176,37 +176,35 @@ class SimpleTestClassTest(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
self.pass_script = script.TemporaryScript(
self.script = None
def test_simple_test_pass_status(self):
self.script = script.TemporaryScript(
'avocado_pass.sh',
PASS_SCRIPT_CONTENTS,
'avocado_simpletest_unittest')
self.pass_script.save()
self.script.save()
tst_instance = test.SimpleTest(
name=test.TestID(1, self.script.path),
base_logdir=self.tmpdir)
tst_instance.run_avocado()
self.assertEqual(tst_instance.status, 'PASS')
self.fail_script = script.TemporaryScript(
def test_simple_test_fail_status(self):
self.script = script.TemporaryScript(
'avocado_fail.sh',
FAIL_SCRIPT_CONTENTS,
'avocado_simpletest_unittest')
self.fail_script.save()
self.tst_instance_pass = test.SimpleTest(
name=test.TestID(1, self.pass_script.path),
self.script.save()
tst_instance = test.SimpleTest(
name=test.TestID(1, self.script.path),
base_logdir=self.tmpdir)
self.tst_instance_pass.run_avocado()
self.tst_instance_fail = test.SimpleTest(
name=test.TestID(1, self.fail_script.path),
base_logdir=self.tmpdir)
self.tst_instance_fail.run_avocado()
def test_simple_test_pass_status(self):
self.assertEqual(self.tst_instance_pass.status, 'PASS')
def test_simple_test_fail_status(self):
self.assertEqual(self.tst_instance_fail.status, 'FAIL')
tst_instance.run_avocado()
self.assertEqual(tst_instance.status, 'FAIL')
def tearDown(self):
self.pass_script.remove()
self.fail_script.remove()
if self.script is not None:
self.script.remove()
shutil.rmtree(self.tmpdir)
......
......@@ -55,6 +55,10 @@ class TestUnpickableObject(unittest.TestCase):
Basic selftests for `avocado.utils.stacktrace.str_unpickable_object
"""
def test_raises(self):
self.assertRaises(ValueError, stacktrace.str_unpickable_object,
([{"foo": set([])}]))
def test_basic(self):
""" Basic usage """
def check(exps, obj):
......@@ -63,8 +67,6 @@ class TestUnpickableObject(unittest.TestCase):
for exp in exps:
if not re.search(exp, act):
self.fail("%r no match in:\n%s" % (exp, act))
self.assertRaises(ValueError, stacktrace.str_unpickable_object,
([{"foo": set([])}]))
check(["this => .*Unpickable"], Unpickable())
check([r"this\[0\]\[0\]\[foo\]\.troublemaker => .*Unpickable"],
[[{"foo": InClassUnpickable()}]])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册