提交 44451fee 编写于 作者: C Cleber Rosa

GDB: add transparent support to avocado/utils/process.py

So that any code using our process API can be run through a debugger.
Signed-off-by: NCleber Rosa <crosa@redhat.com>
上级 a98e4365
......@@ -22,8 +22,12 @@ import StringIO
import signal
import subprocess
import time
import stat
import shlex
import threading
from avocado import gdb
from avocado import runtime
from avocado.core import exceptions
log = logging.getLogger('avocado.test')
......@@ -404,6 +408,226 @@ class SubProcess(object):
return self.result
class GDBSubProcess(object):
'''
Runs a subprocess inside the GNU Debugger
'''
def __init__(self, cmd, verbose=True):
self.cmd = cmd
self.args = shlex.split(cmd)
self.binary = self.args[0]
self.binary_path = os.path.abspath(self.cmd)
self.result = CmdResult(cmd)
self.gdb_server = gdb.GDBServer()
self.gdb = gdb.GDB()
self.gdb.connect(self.gdb_server.port)
self.gdb.set_file(self.binary)
def _get_breakpoints(self):
breakpoints = []
for expr in runtime.GDB_RUN_BINARY_NAMES_EXPR:
expr_binary_name, breakpoint = split_gdb_expr(expr)
binary_name = os.path.basename(self.binary)
if expr_binary_name == binary_name:
breakpoints.append(breakpoint)
if not breakpoints:
breakpoints.append(gdb.GDB.DEFAULT_BREAK)
return breakpoints
def generate_gdb_connect_cmds(self):
current_test = runtime.CURRENT_TEST
if current_test is not None:
binary_name = os.path.basename(self.binary)
script_name = '%s.gdb.connect_commands' % binary_name
path = os.path.join(current_test.outputdir, script_name)
cmds = open(path, 'w')
cmds.write('file %s\n' % self.binary)
cmds.write('target extended-remote :%s\n' % self.gdb_server.port)
cmds.close()
return path
def generate_gdb_connect_sh(self):
cmds = self.generate_gdb_connect_cmds()
if not cmds:
return
current_test = runtime.CURRENT_TEST
if current_test is not None:
binary_name = os.path.basename(self.binary)
fifo_name = "%s.gdb.cont.fifo" % os.path.basename(binary_name)
fifo_path = os.path.join(runtime.CURRENT_TEST.workdir, fifo_name)
script_name = '%s.gdb.sh' % binary_name
script_path = os.path.join(current_test.outputdir, script_name)
script = open(script_path, 'w')
script.write("#!/bin/sh\n")
script.write("%s -x %s\n" % (gdb.GDB.GDB_PATH, cmds))
script.write("echo -n 'C' > %s\n" % fifo_path)
script.close()
os.chmod(script_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return (script_path, fifo_path)
def handle_break_hit(self, response):
self.gdb.disconnect()
script_path, fifo_path = self.generate_gdb_connect_sh()
msg = ("\n\nTEST PAUSED because of debugger breakpoint. "
"To DEBUG your application run:\n%s\n\n"
"NOTE: please use *disconnect* command in gdb before exiting, "
"or else the debugged process will be KILLED\n" % script_path)
runtime.CURRENT_TEST.paused = True
runtime.CURRENT_TEST.paused_msg = msg
runtime.CURRENT_TEST.report_state()
runtime.CURRENT_TEST.paused_msg = ''
os.mkfifo(fifo_path)
f = open(fifo_path, 'r')
c = f.read(1)
f.close()
os.unlink(fifo_path)
return c
def _is_thread_stopped(self):
result = False
thread_info_result = self.gdb.cmd("-thread-info")
thread_info_mi_result = thread_info_result.result
if hasattr(thread_info_mi_result, 'result'):
thread_info = thread_info_mi_result.result
current_thread = thread_info.current_thread_id
for thread in thread_info.threads:
if current_thread == thread.id and thread.state == "stopped":
result = True
break
return result
@staticmethod
def _get_exit_status(parsed_msg):
"""
Returns the exit code converted to an integer
"""
code = parsed_msg.result.exit_code
if (code.startswith('0x') and len(code) > 2):
return int(code[2:], 16)
elif (code.startswith('0') and len(code) > 1):
return int(code[1:], 8)
else:
return int(code)
def wait_for_exit(self):
"""
Waits until debugger receives a message about the binary exit
"""
result = False
messages = []
while True:
try:
msgs = self.gdb.read_until_break()
messages += msgs
except:
pass
try:
msg = messages.pop(0)
parsed_msg = gdb.parse_mi(msg)
if gdb.is_exit(parsed_msg):
self.result.exit_status = self._get_exit_status(parsed_msg)
result = True
break
elif gdb.is_break_hit(parsed_msg):
# waits on fifo read() until end of debug session is notified
r = self.handle_break_hit(parsed_msg)
if r == 'C':
self.gdb.connect(self.gdb_server.port)
if self._is_thread_stopped():
r = self.gdb.cli_cmd("continue")
else:
log.warn('Binary "%s" terminated inside the '
'debugger before avocado was resumed. '
'Because important information about the '
'process was lost, we will assume it '
'exited with status 0. Please let avocado'
' finish the execution of your binary to '
'have dependable results.', self.binary)
self.result.exit_status = 0
result = True
break
except IndexError:
continue
return result
def run(self, timeout=None):
for b in self._get_breakpoints():
self.gdb.set_break(b, ignore_error=True)
result = self.gdb.run(self.args[1:])
while True:
r = self.wait_for_exit()
if r:
self.gdb.disconnect()
self.gdb_server.exit()
return self.result
def split_gdb_expr(expr):
'''
Splits a GDB expr into (binary_name, breakpoint_location)
Returns :attr:`avocado.gdb.GDB.DEFAULT_BREAK` as the default breakpoint
if one is not given.
:param expr: an expression of the form <binary_name>[:<breakpoint>]
:type expr: str
:returns: a (binary_name, breakpoint_location) tuple
:rtype: tuple
'''
expr_split = expr.split(':', 1)
if len(expr_split) == 2:
r = tuple(expr_split)
else:
r = (expr_split[0], gdb.GDB.DEFAULT_BREAK)
return r
def should_run_inside_gdb(cmd):
'''
Wether the given command should be run inside the GNU debugger
:param cmd: the command arguments, from where we extract the binary name
'''
args = shlex.split(cmd)
cmd_binary_name = os.path.basename(args[0])
for expr in runtime.GDB_RUN_BINARY_NAMES_EXPR:
binary_name = os.path.basename(expr.split(':', 1)[0])
if cmd_binary_name == binary_name:
return True
return False
def get_sub_process_klass(cmd):
'''
Which sub process implementation should be used
Either the regular one, or the GNU Debugger version
:param cmd: the command arguments, from where we extract the binary name
'''
if should_run_inside_gdb(cmd):
return GDBSubProcess
else:
return SubProcess
def run(cmd, timeout=None, verbose=True, ignore_status=False):
"""
Run a subprocess, returning a CmdResult object.
......@@ -423,7 +647,8 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False):
:return: An :class:`avocado.utils.process.CmdResult` object.
:raise: :class:`avocado.core.exceptions.CmdError`, if ``ignore_status=False``.
"""
sp = SubProcess(cmd=cmd, verbose=verbose)
klass = get_sub_process_klass(cmd)
sp = klass(cmd=cmd, verbose=verbose)
cmd_result = sp.run(timeout=timeout)
fail_condition = cmd_result.exit_status != 0 or cmd_result.interrupted
if fail_condition and not ignore_status:
......
#!/usr/bin/python
import unittest
from avocado import runtime
from avocado.utils import process
class TestGDBProcess(unittest.TestCase):
def setUp(self):
self.current_runtime_expr = runtime.GDB_RUN_BINARY_NAMES_EXPR[:]
def cleanUp(self):
runtime.GDB_RUN_BINARY_NAMES_EXPR = self.current_runtime_expr
def test_should_run_inside_gdb(self):
runtime.GDB_RUN_BINARY_NAMES_EXPR = ['foo']
self.assertTrue(process.should_run_inside_gdb('foo'))
self.assertTrue(process.should_run_inside_gdb('/usr/bin/foo'))
self.assertFalse(process.should_run_inside_gdb('/usr/bin/fooz'))
runtime.GDB_RUN_BINARY_NAMES_EXPR.append('foo:main')
self.assertTrue(process.should_run_inside_gdb('foo'))
self.assertFalse(process.should_run_inside_gdb('bar'))
runtime.GDB_RUN_BINARY_NAMES_EXPR.append('bar:main.c:5')
self.assertTrue(process.should_run_inside_gdb('bar'))
self.assertFalse(process.should_run_inside_gdb('baz'))
self.assertTrue(process.should_run_inside_gdb('bar 1 2 3'))
self.assertTrue(process.should_run_inside_gdb('/usr/bin/bar 1 2 3'))
def test_get_sub_process_klass(self):
runtime.GDB_RUN_BINARY_NAMES_EXPR = []
self.assertIs(process.get_sub_process_klass('/bin/true'),
process.SubProcess)
runtime.GDB_RUN_BINARY_NAMES_EXPR.append('/bin/false')
self.assertIs(process.get_sub_process_klass('/bin/false'),
process.GDBSubProcess)
self.assertIs(process.get_sub_process_klass('false'),
process.GDBSubProcess)
self.assertIs(process.get_sub_process_klass('true'),
process.SubProcess)
def test_split_gdb_expr(self):
binary, breakpoint = process.split_gdb_expr('foo:debug_print')
self.assertEqual(binary, 'foo')
self.assertEqual(breakpoint, 'debug_print')
binary, breakpoint = process.split_gdb_expr('bar')
self.assertEqual(binary, 'bar')
self.assertEqual(breakpoint, 'main')
binary, breakpoint = process.split_gdb_expr('baz:main.c:57')
self.assertEqual(binary, 'baz')
self.assertEqual(breakpoint, 'main.c:57')
self.assertIsInstance(process.split_gdb_expr('foo'), tuple)
self.assertIsInstance(process.split_gdb_expr('foo:debug_print'), tuple)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册