提交 17fb3fcf 编写于 作者: R Rick Chao 提交者: TensorFlower Gardener

MultiProcessRunner: Add UnexpectedSubprocessExitError to be raised if the exit...

MultiProcessRunner: Add UnexpectedSubprocessExitError to be raised if the exit code from a subprocess in unexpected. This results in subprocess having segfault failing the test, which would not have before this change.

PiperOrigin-RevId: 317734369
Change-Id: I008f9bd4dd5b7a68e8e4a7f17b7e5d490a30c09a
上级 a3dc11ea
......@@ -463,14 +463,28 @@ class MultiProcessRunner(object):
process_statuses = self._queue_to_list(self._process_status_queue)
if not self._all_forced_terminated and len(
process_statuses) != self._outstanding_subprocess_count:
raise RuntimeError(
'missing statuses from %d subproceses.' %
(self._outstanding_subprocess_count - len(process_statuses)))
raise UnexpectedSubprocessExitError(
'Missing status(es) from %d subprocess(es). See logs for details.' %
(self._outstanding_subprocess_count - len(process_statuses)),
self._get_mpr_result(process_statuses))
for process_status in process_statuses:
assert isinstance(process_status, _ProcessStatusInfo)
if not process_status.is_successful:
six.reraise(*process_status.exc_info)
# Checking all the processes that are expected to exit properly.
for (task_type, task_id), p in self._processes.items():
if self._dependence_on_chief and task_type != 'chief':
# If _dependence_on_chief, other processes may have been
# forced-terminated, which is expected.
continue
# Successfully exiting process has exit code 0.
if p.exitcode > 0:
raise UnexpectedSubprocessExitError(
'Subprocess %s-%d exited with exit code %d. See logs for details.' %
(task_type, task_id, p.exitcode),
self._get_mpr_result(process_statuses))
logging.info('Joining log reading threads.')
for thread in self._reading_threads:
thread.join()
......@@ -506,6 +520,8 @@ class MultiProcessRunner(object):
for (task_type, task_id), p in self._processes.items():
try:
os.kill(p.pid, sig)
logging.info('%s-%d terminated with signal %r.', task_type, task_id,
sig)
except ProcessLookupError:
logging.info('Attempting to kill %s-%d but it does not exist.',
task_type, task_id)
......@@ -658,6 +674,9 @@ class _ProcFunc(object):
self._resources.process_status_queue.put(info)
self._close_streaming()
# Exit with code 0 as it's considered successful exit at this point.
sys.exit(0)
class SubprocessTimeoutError(RuntimeError):
"""An error that indicates there is at least one subprocess timing out.
......@@ -672,6 +691,19 @@ class SubprocessTimeoutError(RuntimeError):
self.mpr_result = mpr_result
class UnexpectedSubprocessExitError(RuntimeError):
"""An error indicating there is at least one subprocess with unexpected exit.
When this is raised, a `MultiProcessRunnerResult` object can be retrieved by
`UnexpectedSubprocessExitError`'s mpr_result attribute. See
`MultiProcessRunner.join()` for more information.
"""
def __init__(self, msg, mpr_result):
super(UnexpectedSubprocessExitError, self).__init__(msg)
self.mpr_result = mpr_result
def _set_tf_config(task_type, task_id, cluster_spec, rpc_layer=None):
"""Set TF_CONFIG environment variable."""
tf_config_dict = {
......
......@@ -18,6 +18,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ctypes
import json
import os
import threading
......@@ -324,6 +325,40 @@ class MultiProcessRunnerTest(test.TestCase):
any('(logging) {}-0, i: {}'.format(job, iteration) in line
for line in list_to_assert))
def test_seg_fault_raises_error(self):
def proc_func_expected_to_seg_fault():
ctypes.string_at(0) # Intentionally made seg fault.
with self.assertRaises(
multi_process_runner.UnexpectedSubprocessExitError) as cm:
multi_process_runner.run(
proc_func_expected_to_seg_fault,
multi_worker_test_base.create_cluster_spec(num_workers=1),
list_stdout=True)
self.assertIn('Missing status(es) from 1 subprocess(es).',
str(cm.exception))
list_to_assert = cm.exception.mpr_result.stdout
self.assertTrue(any('SIGSEGV' in line for line in list_to_assert))
def test_seg_fault_in_chief_raises_error(self):
def proc_func_expected_to_seg_fault():
if multi_worker_test_base.get_task_type() == 'worker':
time.sleep(10000)
ctypes.string_at(0) # Intentionally made seg fault.
with self.assertRaises(
multi_process_runner.UnexpectedSubprocessExitError) as cm:
multi_process_runner.run(
proc_func_expected_to_seg_fault,
multi_worker_test_base.create_cluster_spec(
has_chief=True, num_workers=1),
list_stdout=True)
self.assertIn('Subprocess chief-0 exited with exit code',
str(cm.exception))
list_to_assert = cm.exception.mpr_result.stdout
self.assertTrue(any('SIGSEGV' in line for line in list_to_assert))
if __name__ == '__main__':
multi_process_runner.test_main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册