提交 9874c0b5 编写于 作者: R Rudá Moura

Merge pull request #252 from lmr/process_init_on_demand_v5

[V5] Process init on demand
......@@ -245,9 +245,10 @@ class RpmBackend(BaseBackend):
cmd_format = "rpm -qa --qf '%s' | sort"
query_format = "%s\n" % self.SOFTWARE_COMPONENT_QRY
cmd_format = cmd_format % query_format
cmd_result = process.run(cmd_format, verbose=False)
cmd_result = process.run(cmd_format, verbose=False, shell=True)
else:
cmd_result = process.run('rpm -qa | sort', verbose=False)
cmd_result = process.run('rpm -qa | sort', verbose=False,
shell=True)
out = cmd_result.stdout.strip()
installed_packages = out.splitlines()
......@@ -357,7 +358,7 @@ class YumBackend(RpmBackend):
self.cfgparser.read(self.repo_file_path)
y_cmd = executable + ' --version | head -1'
cmd_result = process.run(y_cmd, ignore_status=True,
verbose=False)
verbose=False, shell=True)
out = cmd_result.stdout.strip()
try:
ver = re.findall('\d*.\d*.\d*', out)[0]
......@@ -627,7 +628,8 @@ class AptBackend(DpkgBackend):
self.repo_file_path = '/etc/apt/sources.list.d/autotest'
cmd_result = process.run('apt-get -v | head -1',
ignore_status=True,
verbose=False)
verbose=False,
shell=True)
out = cmd_result.stdout.strip()
try:
ver = re.findall('\d\S*', out)[0]
......
......@@ -222,7 +222,8 @@ class SubProcess(object):
Run a subprocess in the background, collecting stdout/stderr streams.
"""
def __init__(self, cmd, verbose=True, allow_output_check='all'):
def __init__(self, cmd, verbose=True, allow_output_check='all',
shell=False):
"""
Creates the subprocess object, stdout/err, reader threads and locks.
......@@ -239,45 +240,67 @@ class SubProcess(object):
(default), and 'none', to allow
none to be recorded.
:type allow_output_check: str
:param shell: Whether to run the subprocess in a subshell.
:type shell: bool
"""
self.cmd = cmd
self.verbose = verbose
if self.verbose:
log.info("Running '%s'", cmd)
self.sp = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
self.allow_output_check = allow_output_check
self.start_time = time.time()
self.result = CmdResult(cmd)
self.stdout_file = StringIO.StringIO()
self.stderr_file = StringIO.StringIO()
self.stdout_lock = threading.Lock()
self.stdout_thread = threading.Thread(target=self._fd_drainer,
name="%s-stdout" % cmd,
args=[self.sp.stdout])
self.stdout_thread.daemon = True
self.stderr_lock = threading.Lock()
self.stderr_thread = threading.Thread(target=self._fd_drainer,
name="%s-stderr" % cmd,
args=[self.sp.stderr])
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
def signal_handler(signum, frame):
self.result.interrupted = True
self.wait()
self.result = CmdResult(self.cmd)
self.shell = shell
self._popen = None
signal.signal(signal.SIGINT, signal_handler)
def __repr__(self):
if self._popen is None:
rc = '(not started)'
elif self.result.exit_status is None:
rc = '(running)'
else:
rc = self.result.exit_status
return '%s(cmd=%r, rc=%r)' % (self.__class__.__name__, self.cmd, rc)
def __str__(self):
if self.result.exit_status is None:
rc = '(still running)'
if self._popen is None:
rc = '(not started)'
elif self.result.exit_status is None:
rc = '(running)'
else:
rc = self.result.exit_status
return 'SubProcess(cmd="%s", rc="%s")' % (self.cmd, rc)
rc = '(finished with exit status=%d)' % self.result.exit_status
return '%s %s' % (self.cmd, rc)
def _init_subprocess(self):
if self._popen is None:
if self.verbose:
log.info("Running '%s'", self.cmd)
if self.shell is False:
cmd = shlex.split(self.cmd)
else:
cmd = self.cmd
self._popen = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=self.shell)
self.start_time = time.time()
self.stdout_file = StringIO.StringIO()
self.stderr_file = StringIO.StringIO()
self.stdout_lock = threading.Lock()
self.stdout_thread = threading.Thread(target=self._fd_drainer,
name="%s-stdout" % self.cmd,
args=[self._popen.stdout])
self.stdout_thread.daemon = True
self.stderr_lock = threading.Lock()
self.stderr_thread = threading.Thread(target=self._fd_drainer,
name="%s-stderr" % self.cmd,
args=[self._popen.stderr])
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
def signal_handler(signum, frame):
self.result.interrupted = True
self.wait()
signal.signal(signal.SIGINT, signal_handler)
def _fd_drainer(self, input_pipe):
"""
......@@ -286,7 +309,7 @@ class SubProcess(object):
:param input_pipe: File like object to the stream.
"""
stream_prefix = "%s"
if input_pipe == self.sp.stdout:
if input_pipe == self._popen.stdout:
prefix = '[stdout] %s'
if self.allow_output_check in ['none', 'stderr']:
stream_logger = None
......@@ -294,7 +317,7 @@ class SubProcess(object):
stream_logger = stdout_log
output_file = self.stdout_file
lock = self.stdout_lock
elif input_pipe == self.sp.stderr:
elif input_pipe == self._popen.stderr:
prefix = '[stderr] %s'
if self.allow_output_check in ['none', 'stdout']:
stream_logger = None
......@@ -325,6 +348,7 @@ class SubProcess(object):
lock.release()
def _fill_results(self, rc):
self._init_subprocess()
self.result.exit_status = rc
if self.result.duration == 0:
self.result.duration = time.time() - self.start_time
......@@ -338,11 +362,24 @@ class SubProcess(object):
self.stdout_thread.join()
self.stderr_thread.join()
# Clean subprocess pipes and populate stdout/err
self.sp.stdout.close()
self.sp.stderr.close()
self._popen.stdout.close()
self._popen.stderr.close()
self.result.stdout = self.get_stdout()
self.result.stderr = self.get_stderr()
def start(self):
"""
Start running the subprocess.
This method is particularly useful for background processes, since
you can start the subprocess and not block your test flow.
:return: Subprocess PID.
:rtype: int
"""
self._init_subprocess()
return self._popen.pid
def get_stdout(self):
"""
Get the full stdout of the subprocess so far.
......@@ -350,6 +387,7 @@ class SubProcess(object):
:return: Standard output of the process.
:rtype: str
"""
self._init_subprocess()
self.stdout_lock.acquire()
stdout = self.stdout_file.getvalue()
self.stdout_lock.release()
......@@ -362,6 +400,7 @@ class SubProcess(object):
:return: Standard error of the process.
:rtype: str
"""
self._init_subprocess()
self.stderr_lock.acquire()
stderr = self.stderr_file.getvalue()
self.stderr_lock.release()
......@@ -371,12 +410,14 @@ class SubProcess(object):
"""
Send a :attr:`signal.SIGTERM` to the process.
"""
self._init_subprocess()
self.send_signal(signal.SIGTERM)
def kill(self):
"""
Send a :attr:`signal.SIGKILL` to the process.
"""
self._init_subprocess()
self.send_signal(signal.SIGKILL)
def send_signal(self, sig):
......@@ -385,13 +426,15 @@ class SubProcess(object):
:param sig: Signal to send.
"""
self.sp.send_signal(sig)
self._init_subprocess()
self._popen.send_signal(sig)
def poll(self):
"""
Call the subprocess poll() method, fill results if rc is not None.
"""
rc = self.sp.poll()
self._init_subprocess()
rc = self._popen.poll()
if rc is not None:
self._fill_results(rc)
return rc
......@@ -400,22 +443,42 @@ class SubProcess(object):
"""
Call the subprocess poll() method, fill results if rc is not None.
"""
rc = self.sp.wait()
self._init_subprocess()
rc = self._popen.wait()
if rc is not None:
self._fill_results(rc)
return rc
def stop(self):
"""
Stop background subprocess.
Call this method to terminate the background subprocess and
wait for it results.
"""
self._init_subprocess()
if self.result.exit_status is None:
self.terminate()
return self.wait()
def run(self, timeout=None, sig=signal.SIGTERM):
"""
Wait for the process to end, filling and returning the result attr.
Start a process and wait for it to end, returning the result attr.
If the process was already started using .start(), this will simply
wait for it to end.
:param timeout: Time (seconds) we'll wait until the process is
finished. If it's not, we'll try to terminate it
and get a status.
:type timeout: float
:param sig: Signal to send to the process in case it did not end after
the specified timeout.
:type sig: int
:returns: The command result object.
:rtype: A :class:`avocado.utils.process.CmdResult` instance.
"""
self._init_subprocess()
start_time = time.time()
if timeout is None:
......@@ -440,7 +503,7 @@ class SubProcess(object):
self.poll()
# If all this work fails, we're dealing with a zombie process.
e_msg = 'Zombie Process %s' % self.sp.pid
e_msg = 'Zombie Process %s' % self._popen.pid
assert self.result.exit_status is not None, e_msg
return self.result
......@@ -452,7 +515,7 @@ class GDBSubProcess(object):
Runs a subprocess inside the GNU Debugger
'''
def __init__(self, cmd, verbose=True, allow_output_check='all'):
def __init__(self, cmd, verbose=True, allow_output_check='all', shell=False):
"""
Creates the subprocess object, stdout/err, reader threads and locks.
......@@ -757,7 +820,8 @@ def get_sub_process_klass(cmd):
return SubProcess
def run(cmd, timeout=None, verbose=True, ignore_status=False, allow_output_check='all'):
def run(cmd, timeout=None, verbose=True, ignore_status=False,
allow_output_check='all', shell=False):
"""
Run a subprocess, returning a CmdResult object.
......@@ -782,13 +846,15 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False, allow_output_check
(default), and 'none', to allow
none to be recorded.
:type allow_output_check: str
:param shell: Whether to run the command on a subshell
:type shell: bool
:return: An :class:`avocado.utils.process.CmdResult` object.
:raise: :class:`avocado.core.exceptions.CmdError`, if ``ignore_status=False``.
"""
klass = get_sub_process_klass(cmd)
sp = klass(cmd=cmd, verbose=verbose,
allow_output_check=allow_output_check)
allow_output_check=allow_output_check, shell=shell)
cmd_result = sp.run(timeout=timeout)
fail_condition = cmd_result.exit_status != 0 or cmd_result.interrupted
if fail_condition and not ignore_status:
......@@ -797,7 +863,7 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False, allow_output_check
def system(cmd, timeout=None, verbose=True, ignore_status=False,
allow_output_check='all'):
allow_output_check='all', shell=False):
"""
Run a subprocess, returning its exit code.
......@@ -822,16 +888,20 @@ def system(cmd, timeout=None, verbose=True, ignore_status=False,
(default), and 'none', to allow
none to be recorded.
:type allow_output_check: str
:param shell: Whether to run the command on a subshell
:type shell: bool
:return: Exit code.
:rtype: int
:raise: :class:`avocado.core.exceptions.CmdError`, if ``ignore_status=False``.
"""
cmd_result = run(cmd=cmd, timeout=timeout, verbose=verbose, ignore_status=ignore_status,
allow_output_check=allow_output_check)
allow_output_check=allow_output_check, shell=shell)
return cmd_result.exit_status
def system_output(cmd, timeout=None, verbose=True, ignore_status=False, allow_output_check='all'):
def system_output(cmd, timeout=None, verbose=True, ignore_status=False,
allow_output_check='all', shell=False):
"""
Run a subprocess, returning its output.
......@@ -855,10 +925,13 @@ def system_output(cmd, timeout=None, verbose=True, ignore_status=False, allow_ou
(default), and 'none', to allow
none to be recorded.
:type allow_output_check: str
:param shell: Whether to run the command on a subshell
:type shell: bool
:return: Command output.
:rtype: str
:raise: :class:`avocado.core.exceptions.CmdError`, if ``ignore_status=False``.
"""
cmd_result = run(cmd=cmd, timeout=timeout, verbose=verbose, ignore_status=ignore_status,
allow_output_check=allow_output_check)
allow_output_check=allow_output_check, shell=shell)
return cmd_result.stdout
#!/usr/bin/env python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2014
# Author: Ruda Moura <rmoura@redhat.com>
import os
import sys
import unittest
import time
import tempfile
# simple magic for using scripts within a source tree
basedir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..', '..', '..')
basedir = os.path.abspath(basedir)
if os.path.isdir(os.path.join(basedir, 'avocado')):
sys.path.append(basedir)
from avocado.utils import process
FAKE_VMSTAT_CONTENTS = """#!/usr/bin/python
import time
import random
import signal
import sys
class FakeVMStat(object):
def __init__(self, interval):
self.interval = interval
self._sysrand = random.SystemRandom()
def interrupt_handler(signum, frame):
sys.exit(0)
signal.signal(signal.SIGINT, interrupt_handler)
signal.signal(signal.SIGTERM, interrupt_handler)
def get_r(self):
return self._sysrand.randint(0, 2)
def get_b(self):
return 0
def get_swpd(self):
return 0
def get_free(self):
return self._sysrand.randint(1500000, 1600000)
def get_buff(self):
return self._sysrand.randint(290000, 300000)
def get_cache(self):
return self._sysrand.randint(2900000, 3000000)
def get_si(self):
return 0
def get_so(self):
return 0
def get_bi(self):
return self._sysrand.randint(0, 50)
def get_bo(self):
return self._sysrand.randint(0, 500)
def get_in(self):
return self._sysrand.randint(200, 3000)
def get_cs(self):
return self._sysrand.randint(1000, 4000)
def get_us(self):
return self._sysrand.randint(0, 40)
def get_sy(self):
return self._sysrand.randint(1, 5)
def get_id(self):
return self._sysrand.randint(50, 100)
def get_wa(self):
return 0
def get_st(self):
return 0
def start(self):
print "procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----"
print " r b swpd free buff cache si so bi bo in cs us sy id wa st"
while True:
r = self.get_r()
b = self.get_b()
swpd = self.get_swpd()
free = self.get_free()
buff = self.get_buff()
cache = self.get_cache()
si = self.get_si()
so = self.get_so()
bi = self.get_bi()
bo = self.get_bo()
m_in = self.get_in()
cs = self.get_cs()
us = self.get_us()
sy = self.get_sy()
m_id = self.get_id()
wa = self.get_wa()
st = self.get_st()
print ("%2d %2d %2d %7d %6d %7d %1d %1d %2d %3d %4d %2d %2d %1d %3d %1d %1d" %
(r, b, swpd, free, buff, cache, si, so, bi, bo, m_in, cs,
us, sy, m_id, wa, st))
time.sleep(self.interval)
if __name__ == '__main__':
vmstat = FakeVMStat(interval=float(sys.argv[1]))
vmstat.start()
"""
FAKE_UPTIME_CONTENTS = """#!/usr/bin/python
if __name__ == '__main__':
print "17:56:34 up 8:06, 7 users, load average: 0.26, 0.20, 0.21"
"""
class ProcessTest(unittest.TestCase):
def setUp(self):
self.base_logdir = tempfile.mkdtemp(prefix='avocado_process_functional')
self.fake_vmstat = os.path.join(self.base_logdir, 'vmstat')
with open(self.fake_vmstat, 'w') as fake_vmstat_obj:
fake_vmstat_obj.write(FAKE_VMSTAT_CONTENTS)
os.chmod(self.fake_vmstat, 0775)
self.fake_uptime = os.path.join(self.base_logdir, 'uptime')
with open(self.fake_uptime, 'w') as fake_uptime_obj:
fake_uptime_obj.write(FAKE_UPTIME_CONTENTS)
os.chmod(self.fake_uptime, 0775)
def test_process_start(self):
proc = process.SubProcess('%s 1' % self.fake_vmstat)
proc.start()
time.sleep(3)
proc.terminate()
proc.wait()
stdout = proc.get_stdout()
self.assertIn('memory', stdout, 'result: %s' % stdout)
self.assertRegexpMatches(stdout, '[0-9]+')
def test_process_run(self):
proc = process.SubProcess(self.fake_uptime)
result = proc.run()
self.assertEqual(result.exit_status, 0, 'result: %s' % result)
self.assertIn('load average', result.stdout)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册