提交 d71a430a 编写于 作者: L Lukáš Doktor

selftests..remote,vm: Rework the unittests

Use flexmock to supplement remote machine and check booth
vm and remote plugins deeper.
Signed-off-by: NLukáš Doktor <ldoktor@redhat.com>
上级 2c8122e5
import unittest
import os
import sys
import json
import argparse
# simple magic for using scripts within a source tree
basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
basedir = os.path.dirname(basedir)
if os.path.isdir(os.path.join(basedir, 'avocado')):
sys.path.append(basedir)
from avocado.core import status
from avocado.core import job_id
from avocado.plugins import remote
class _Stream(object):
job_unique_id = job_id.create_unique_job_id()
def start_file_logging(self, param1, param2):
pass
#!/usr/bin/env python
def stop_file_logging(self):
pass
def set_tests_info(self, info):
pass
import unittest
def notify(self, event, msg):
pass
from flexmock import flexmock, flexmock_teardown
def add_test(self, state):
pass
from avocado.plugins import remote
def set_test_status(self, status, state):
pass
JSON_RESULTS = ('Something other than json\n'
'{"tests": [{"test": "sleeptest.1", "url": "sleeptest", '
'"status": "PASS", "time": 1.23, "start": 0, "end": 1.23}],'
'"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.'
'37/debug.log", "errors": 0, "skip": 0, "time": 1.4, '
'"start": 0, "end": 1.4, "pass": 1, "failures": 0, "total": '
'1}\nAdditional stuff other than json')
class Args(list):
def __init__(self):
self.url = 'sleeptest'
class RemoteTestRunnerTest(unittest.TestCase):
class RemoteResultTest(unittest.TestCase):
""" Tests RemoteTestRunner """
def setUp(self):
stream = _Stream()
stream.logfile = 'debug.log'
self.test_result = remote.RemoteTestResult(stream, Args())
j = '''{"tests": [{"test": "sleeptest.1", "url": "sleeptest", "status": "PASS",
"time": 1.23, "start": 0, "end": 1.23}],
"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.37/debug.log",
"errors": 0, "skip": 0, "time": 1.4, "start": 0, "end": 1.4,
"pass": 1, "failures": 0, "total": 1}'''
self.results = json.loads(j)
def test_check(self):
failures = []
self.test_result.start_tests()
for tst in self.results['tests']:
test = remote.RemoteTest(name=tst['test'],
time=tst['time'],
start=tst['start'],
end=tst['end'],
status=tst['status'])
self.test_result.start_test(test.get_state())
self.test_result.check_test(test.get_state())
if not status.mapping[test.status]:
failures.append(test.tagged_name)
self.test_result.end_tests()
self.assertEqual(self.test_result.tests_total, 1)
self.assertEqual(len(self.test_result.passed), 1)
self.assertEqual(len(self.test_result.failed), 0)
self.assertEqual(len(failures), 0)
flexmock(remote.RemoteTestRunner).should_receive('__init__')
self.remote = remote.RemoteTestRunner(None, None)
test_results = flexmock(stdout=JSON_RESULTS)
stream = flexmock(job_unique_id='sleeptest.1',
debuglog='/local/path/dirname')
Remote = flexmock()
args = ("cd ~/avocado/tests; avocado run --force-job-id sleeptest.1 "
"--json - --archive sleeptest")
(Remote.should_receive('run').with_args(args, ignore_status=True)
.once().and_return(test_results))
Results = flexmock(remote=Remote, urls=['sleeptest'],
stream=stream)
Results.should_receive('setup').once().ordered()
Results.should_receive('start_tests').once().ordered()
args = {'status': u'PASS', 'whiteboard': '', 'time_start': 0,
'name': u'sleeptest.1', 'class_name': 'RemoteTest',
'traceback': 'Not supported yet',
'text_output': 'Not supported yet', 'time_end': 1.23,
'tagged_name': u'sleeptest.1', 'time_elapsed': 1.23,
'fail_class': 'Not supported yet', 'job_unique_id': '',
'fail_reason': 'Not supported yet'}
Results.should_receive('start_test').once().with_args(args).ordered()
Results.should_receive('check_test').once().with_args(args).ordered()
(Remote.should_receive('receive_files')
.with_args('/local/path', '/home/user/avocado/logs/run-2014-05-26-'
'15.45.37.zip')).once().ordered()
(flexmock(remote.archive).should_receive('uncompress')
.with_args('/local/path/run-2014-05-26-15.45.37.zip', '/local/path')
.once().ordered())
(flexmock(remote.os).should_receive('remove')
.with_args('/local/path/run-2014-05-26-15.45.37.zip').once()
.ordered())
Results.should_receive('end_tests').once().ordered()
Results.should_receive('tear_down').once().ordered()
self.remote.result = Results
def tearDown(self):
flexmock_teardown()
def test_run_suite(self):
""" Test RemoteTestRunner.run_suite() """
self.remote.run_suite(None)
flexmock_teardown() # Checks the expectations
class RemoteTestResultTest(unittest.TestCase):
""" Tests the RemoteTestResult """
def setUp(self):
Remote = flexmock()
Stream = flexmock()
(flexmock(remote.os).should_receive('getcwd')
.and_return('/current/directory').ordered())
Stream.should_receive('notify').once().ordered()
remote_remote = flexmock(remote.remote)
(remote_remote.should_receive('Remote')
.with_args('hostname', 'username', 'password', 22, quiet=True)
.once().ordered()
.and_return(Remote))
(Remote.should_receive('makedir').with_args('~/avocado/tests')
.once().ordered())
(flexmock(remote.os.path).should_receive('exists')
.with_args('/tests/sleeptest').once().and_return(True).ordered())
(flexmock(remote.os.path).should_receive('exists')
.with_args('/tests/other/test').once().and_return(True).ordered())
(flexmock(remote.os.path).should_receive('exists')
.with_args('passtest').once().and_return(False).ordered())
(flexmock(remote.data_dir).should_receive('get_test_dir').once()
.and_return('/path/to/default/tests/location').ordered())
(Remote.should_receive('makedir')
.with_args("~/avocado/tests/path/to/default/tests/location")
.once().ordered())
(Remote.should_receive('send_files')
.with_args("/path/to/default/tests/location",
"~/avocado/tests/path/to/default/tests").once().ordered())
(Remote.should_receive('makedir')
.with_args("~/avocado/tests/tests")
.once().ordered())
(Remote.should_receive('send_files')
.with_args("/tests", "~/avocado/tests").once().ordered())
Args = flexmock(test_result_total=1,
url=['/tests/sleeptest', '/tests/other/test',
'passtest'],
remote_username='username',
remote_hostname='hostname',
remote_port=22,
remote_password='password',
remote_no_copy=False)
self.remote = remote.RemoteTestResult(Stream, Args)
def tearDown(self):
flexmock_teardown()
def test_setup(self):
""" Tests RemoteTestResult.test_setup() """
self.remote.setup()
flexmock_teardown()
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python
import unittest
import os
import sys
import json
# simple magic for using scripts within a source tree
basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
basedir = os.path.dirname(basedir)
if os.path.isdir(os.path.join(basedir, 'avocado')):
sys.path.append(basedir)
from flexmock import flexmock, flexmock_teardown
from avocado.core import status
from avocado.core import job_id
from avocado.plugins import vm, remote
class _Stream(object):
job_unique_id = job_id.create_unique_job_id()
def start_file_logging(self, param1, param2):
pass
def stop_file_logging(self):
pass
def set_tests_info(self, info):
pass
def notify(self, event, msg):
pass
JSON_RESULTS = ('Something other than json\n'
'{"tests": [{"test": "sleeptest.1", "url": "sleeptest", '
'"status": "PASS", "time": 1.23, "start": 0, "end": 1.23}],'
'"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.'
'37/debug.log", "errors": 0, "skip": 0, "time": 1.4, '
'"start": 0, "end": 1.4, "pass": 1, "failures": 0, "total": '
'1}\nAdditional stuff other than json')
def add_test(self, state):
pass
def set_test_status(self, status, state):
pass
class VMTestResultTest(unittest.TestCase):
class Args(list):
def __init__(self):
self.url = 'sleeptest'
class VMResultTest(unittest.TestCase):
""" Tests the VMTestResult """
def setUp(self):
stream = _Stream()
stream.logfile = 'debug.log'
self.test_result = vm.VMTestResult(stream, Args())
j = '''{"tests": [{"test": "sleeptest.1", "url": "sleeptest", "status": "PASS",
"time": 1.23, "start": 0.0, "end": 1.23}],
"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.37/debug.log",
"errors": 0, "skip": 0, "time": 1.4,
"pass": 1, "failures": 0, "total": 1}'''
self.results = json.loads(j)
def test_check(self):
failures = []
self.test_result.start_tests()
for tst in self.results['tests']:
test = remote.RemoteTest(name=tst['test'],
time=tst['time'],
start=tst['start'],
end=tst['end'],
status=tst['status'])
self.test_result.start_test(test.get_state())
self.test_result.check_test(test.get_state())
if not status.mapping[test.status]:
failures.append(test.tagged_name)
self.test_result.end_tests()
self.assertEqual(self.test_result.tests_total, 1)
self.assertEqual(len(self.test_result.passed), 1)
self.assertEqual(len(self.test_result.failed), 0)
self.assertEqual(len(failures), 0)
# remote.RemoteTestResult.__init__()
Stream = flexmock()
(flexmock(remote.os).should_receive('getcwd')
.and_return('/current/directory').once().ordered())
# vm.VMTestResult.setup()
(Stream.should_receive('notify')
.with_args(msg="VM DOMAIN : domain", event="message"))
mock_vm = flexmock(snapshot=True,
domain=flexmock(isActive=lambda: True))
virt = flexmock(vm.virt)
virt.should_receive('vm_connect').and_return(mock_vm).once().ordered()
mock_vm.should_receive('start').and_return(True).once().ordered()
mock_vm.should_receive('create_snapshot').once().ordered()
RemoteTestResult = flexmock(remote.RemoteTestResult)
RemoteTestResult.should_receive('setup').once().ordered()
# vm.RemoteTestResult()
Args = flexmock(test_result_total=1,
url=['/tests/sleeptest', '/tests/other/test',
'passtest'],
vm_domain='domain',
vm_username='username',
vm_hostname='hostname',
vm_port=22,
vm_password='password',
vm_cleanup=True,
vm_no_copy=False,
vm_hypervisor_uri='my_hypervisor_uri')
self.remote = vm.VMTestResult(Stream, Args)
# vm.RemoteTestResult.tear_down()
RemoteTestResult.should_receive('tear_down').once().ordered()
mock_vm.should_receive('restore_snapshot').once().ordered()
def tearDown(self):
flexmock_teardown()
def test_setup(self):
""" Tests VMTestResult.test_setup() """
self.remote.setup()
self.remote.tear_down()
flexmock_teardown()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册