提交 f0362b33 编写于 作者: B barrierye

catch grpc exception inside Client

上级 5e4c811c
...@@ -39,13 +39,8 @@ for ei in range(10000): ...@@ -39,13 +39,8 @@ for ei in range(10000):
feed_dict['dense_input'] = data[0][0] feed_dict['dense_input'] = data[0][0]
for i in range(1, 27): for i in range(1, 27):
feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i] feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i]
try: fetch_map = client.predict(feed=feed_dict, fetch=["prob"])
fetch_map = client.predict(feed=feed_dict, fetch=["prob"]) if fetch_map["status_code"] == 0:
except grpc.RpcError as e:
status_code = e.code()
if grpc.StatusCode.DEADLINE_EXCEEDED == status_code:
print('timeout')
else:
prob_list.append(fetch_map['prob'][0][1]) prob_list.append(fetch_map['prob'][0][1])
label_list.append(data[0][-1][0]) label_list.append(data[0][-1][0])
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient as Client from paddle_serving_client import MultiLangClient as Client
import paddle
import functools import functools
import time import time
import threading import threading
...@@ -23,34 +22,30 @@ import grpc ...@@ -23,34 +22,30 @@ import grpc
client = Client() client = Client()
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
test_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.test(), buf_size=500),
batch_size=1)
complete_task_count = [0] complete_task_count = [0]
lock = threading.Lock() lock = threading.Lock()
def call_back(call_future, data): def call_back(call_future):
fetch_map = call_future.result() try:
print("{} {}".format(fetch_map["price"][0], data[0][1][0])) fetch_map = call_future.result()
with lock: print(fetch_map)
complete_task_count[0] += 1 except grpc.RpcError as e:
print(e.code())
finally:
with lock:
complete_task_count[0] += 1
x = [
0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283,
0.4919, 0.1856, 0.0795, -0.0332
]
task_count = 0 task_count = 0
for data in test_reader(): for i in range(3):
try: future = client.predict(feed={"x": x}, fetch=["price"], asyn=True)
future = client.predict( task_count += 1
feed={"x": data[0][0]}, fetch=["price"], asyn=True) future.add_done_callback(functools.partial(call_back))
except grpc.RpcError as e:
status_code = e.code()
if grpc.StatusCode.DEADLINE_EXCEEDED == status_code:
print('timeout')
else:
task_count += 1
future.add_done_callback(functools.partial(call_back, data=data))
while complete_task_count[0] != task_count: while complete_task_count[0] != task_count:
time.sleep(0.1) time.sleep(0.1)
...@@ -12,27 +12,21 @@ ...@@ -12,27 +12,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient as Client from paddle_serving_client import MultiLangClient as Client
import paddle
import grpc
client = Client() client = Client()
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
batch_size = 2 batch_size = 2
test_reader = paddle.batch( x = [
paddle.reader.shuffle( 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283,
paddle.dataset.uci_housing.test(), buf_size=500), 0.4919, 0.1856, 0.0795, -0.0332
batch_size=batch_size) ]
for data in test_reader(): for i in range(3):
batch_feed = [{"x": x[0]} for x in data] batch_feed = [{"x": x} for j in range(batch_size)]
try: fetch_map = client.predict(feed=batch_feed, fetch=["price"])
fetch_map = client.predict(feed=batch_feed, fetch=["price"]) if fetch_map["status_code"] == 0:
except grpc.RpcError as e:
status_code = e.code()
if grpc.StatusCode.DEADLINE_EXCEEDED == status_code:
print('timeout')
else:
print(fetch_map) print(fetch_map)
else:
print(fetch_map["status_code"])
...@@ -14,24 +14,17 @@ ...@@ -14,24 +14,17 @@
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient as Client from paddle_serving_client import MultiLangClient as Client
import grpc
import paddle
client = Client() client = Client()
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
test_reader = paddle.batch( x = [
paddle.reader.shuffle( 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283,
paddle.dataset.uci_housing.test(), buf_size=500), 0.4919, 0.1856, 0.0795, -0.0332
batch_size=1) ]
for i in range(3):
for data in test_reader(): fetch_map = client.predict(feed={"x": x}, fetch=["price"], is_python=False)
try: if fetch_map["status_code"] == 0:
fetch_map = client.predict( print(fetch_map)
feed={"x": data[0][0]}, fetch=["price"], is_python=False)
except grpc.RpcError as e:
status_code = e.code()
if grpc.StatusCode.DEADLINE_EXCEEDED == status_code:
print('timeout')
else: else:
print("{} {}".format(fetch_map["price"][0], data[0][1][0])) print(fetch_map["status_code"])
...@@ -14,24 +14,18 @@ ...@@ -14,24 +14,18 @@
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient as Client from paddle_serving_client import MultiLangClient as Client
import grpc import numpy as np
import paddle
client = Client() client = Client()
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
test_reader = paddle.batch( x = [
paddle.reader.shuffle( 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283,
paddle.dataset.uci_housing.test(), buf_size=500), 0.4919, 0.1856, 0.0795, -0.0332
batch_size=1) ]
for i in range(3):
for data in test_reader(): fetch_map = client.predict(feed={"x": np.array(x)}, fetch=["price"])
try: if fetch_map["status_code"] == 0:
fetch_map = client.predict( print(fetch_map)
feed={"x": data[0][0].tolist()}, fetch=["price"])
except grpc.RpcError as e:
status_code = e.code()
if grpc.StatusCode.DEADLINE_EXCEEDED == status_code:
print('timeout')
else: else:
print("{} {}".format(fetch_map["price"][0], data[0][1][0])) print(fetch_map["status_code"])
...@@ -14,23 +14,17 @@ ...@@ -14,23 +14,17 @@
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient as Client from paddle_serving_client import MultiLangClient as Client
import paddle
import grpc
client = Client() client = Client()
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
test_reader = paddle.batch( x = [
paddle.reader.shuffle( 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283,
paddle.dataset.uci_housing.test(), buf_size=500), 0.4919, 0.1856, 0.0795, -0.0332
batch_size=1) ]
for i in range(3):
for data in test_reader(): fetch_map = client.predict(feed={"x": x}, fetch=["price"])
try: if fetch_map["status_code"] == 0:
fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"]) print(fetch_map)
except grpc.RpcError as e:
status_code = e.code()
if grpc.StatusCode.DEADLINE_EXCEEDED == status_code:
print('timeout')
else: else:
print("{} {}".format(fetch_map["price"][0], data[0][1][0])) print(fetch_map["status_code"])
...@@ -14,24 +14,21 @@ ...@@ -14,24 +14,21 @@
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient as Client from paddle_serving_client import MultiLangClient as Client
import paddle
import grpc import grpc
client = Client() client = Client()
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
client.set_rpc_timeout_ms(1) client.set_rpc_timeout_ms(1)
test_reader = paddle.batch( x = [
paddle.reader.shuffle( 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283,
paddle.dataset.uci_housing.test(), buf_size=500), 0.4919, 0.1856, 0.0795, -0.0332
batch_size=1) ]
for i in range(3):
for data in test_reader(): fetch_map = client.predict(feed={"x": x}, fetch=["price"])
try: if fetch_map["status_code"] == 0:
fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"]) print(fetch_map)
except grpc.RpcError as e: elif fetch_map["status_code"] == grpc.StatusCode.DEADLINE_EXCEEDED:
status_code = e.code() print('timeout')
if grpc.StatusCode.DEADLINE_EXCEEDED == status_code:
print('timeout')
else: else:
print("{} {}".format(fetch_map["price"][0], data[0][1][0])) print(fetch_map["status_code"])
...@@ -19,7 +19,6 @@ from imdb_reader import IMDBDataset ...@@ -19,7 +19,6 @@ from imdb_reader import IMDBDataset
client = MultiLangClient() client = MultiLangClient()
# If you have more than one model, make sure that the input # If you have more than one model, make sure that the input
# and output of more than one model are the same. # and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here # you can define any english sentence or dataset here
......
...@@ -569,6 +569,7 @@ class MultiLangClient(object): ...@@ -569,6 +569,7 @@ class MultiLangClient(object):
ret = multi_result_map.values()[0] ret = multi_result_map.values()[0]
else: else:
ret = multi_result_map ret = multi_result_map
ret["status_code"] = 0
return ret if not need_variant_tag else [ret, tag] return ret if not need_variant_tag else [ret, tag]
def _done_callback_func(self, fetch, is_python, need_variant_tag): def _done_callback_func(self, fetch, is_python, need_variant_tag):
...@@ -589,12 +590,15 @@ class MultiLangClient(object): ...@@ -589,12 +590,15 @@ class MultiLangClient(object):
is_python=True): is_python=True):
req = self._pack_inference_request(feed, fetch, is_python=is_python) req = self._pack_inference_request(feed, fetch, is_python=is_python)
if not asyn: if not asyn:
resp = self.stub_.Inference(req, timeout=self.rpc_timeout_s_) try:
return self._unpack_inference_response( resp = self.stub_.Inference(req, timeout=self.rpc_timeout_s_)
resp, return self._unpack_inference_response(
fetch, resp,
is_python=is_python, fetch,
need_variant_tag=need_variant_tag) is_python=is_python,
need_variant_tag=need_variant_tag)
except grpc.RpcError as e:
return {"status_code": e.code()}
else: else:
call_future = self.stub_.Inference.future( call_future = self.stub_.Inference.future(
req, timeout=self.rpc_timeout_s_) req, timeout=self.rpc_timeout_s_)
...@@ -612,7 +616,10 @@ class MultiLangPredictFuture(object): ...@@ -612,7 +616,10 @@ class MultiLangPredictFuture(object):
self.callback_func_ = callback_func self.callback_func_ = callback_func
def result(self): def result(self):
resp = self.call_future_.result() try:
resp = self.call_future_.result()
except grpc.RpcError as e:
return {"status_code": e.code()}
return self.callback_func_(resp) return self.callback_func_(resp)
def add_done_callback(self, fn): def add_done_callback(self, fn):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册