提交 4cd1052e 编写于 作者: B barrierye

add timeout in op

上级 45d47d4a
...@@ -267,7 +267,8 @@ class Op(object): ...@@ -267,7 +267,8 @@ class Op(object):
server_name=None, server_name=None,
fetch_names=None, fetch_names=None,
concurrency=1, concurrency=1,
timeout=-1): timeout=-1,
retry=2):
self._run = False self._run = False
# TODO: globally unique check # TODO: globally unique check
self._name = name # to identify the type of OP, it must be globally unique self._name = name # to identify the type of OP, it must be globally unique
...@@ -285,6 +286,7 @@ class Op(object): ...@@ -285,6 +286,7 @@ class Op(object):
self._server_port = server_port self._server_port = server_port
self._device = device self._device = device
self._timeout = timeout self._timeout = timeout
self._retry = retry
def set_client(self, client_config, server_name, fetch_names): def set_client(self, client_config, server_name, fetch_names):
self._client = Client() self._client = Client()
...@@ -387,24 +389,33 @@ class Op(object): ...@@ -387,24 +389,33 @@ class Op(object):
error_info = None error_info = None
if self.with_serving(): if self.with_serving():
_profiler.record("{}{}-midp_0".format(self._name, for i in range(self._retry):
concurrency_idx)) _profiler.record("{}{}-midp_0".format(self._name,
if self._time > 0: concurrency_idx))
try: if self._time > 0:
data = func_timeout.func_timeout( try:
self._time, self.midprocess, args=(data, )) middata = func_timeout.func_timeout(
except func_timeout.FunctionTimedOut: self._time, self.midprocess, args=(data, ))
logging.error("error: timeout") except func_timeout.FunctionTimedOut:
error_info = "{}({}): timeout".format( logging.error("error: timeout")
self._name, concurrency_idx) error_info = "{}({}): timeout".format(
except Exception as e: self._name, concurrency_idx)
logging.error("error: {}".format(e)) except Exception as e:
error_info = "{}({}): {}".format(self._name, logging.error("error: {}".format(e))
concurrency_idx, e) error_info = "{}({}): {}".format(
else: self._name, concurrency_idx, e)
data = self.midprocess(data) else:
_profiler.record("{}{}-midp_1".format(self._name, middata = self.midprocess(data)
concurrency_idx)) _profiler.record("{}{}-midp_1".format(self._name,
concurrency_idx))
if error_info is None:
data = middata
break
if i + 1 < self._retry:
error_info = None
logging.warn(
self._log("warn: timeout, retry({})".format(i +
1)))
_profiler.record("{}{}-postp_0".format(self._name, _profiler.record("{}{}-postp_0".format(self._name,
concurrency_idx)) concurrency_idx))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册