From 5fe069964ed5905e74bc6816b30f32316b9917a0 Mon Sep 17 00:00:00 2001 From: barriery Date: Fri, 7 Aug 2020 05:50:59 +0000 Subject: [PATCH] update grpc-impl --- .../multi_lang_general_model_service.proto | 1 + .../java/io/paddle/serving/client/Client.java | 131 +++++++++++++++--- .../multi_lang_general_model_service.proto | 1 + python/paddle_serving_client/__init__.py | 11 +- python/paddle_serving_server/__init__.py | 12 +- python/paddle_serving_server_gpu/__init__.py | 10 +- python/pipeline/operator.py | 45 +++--- 7 files changed, 165 insertions(+), 46 deletions(-) diff --git a/core/configure/proto/multi_lang_general_model_service.proto b/core/configure/proto/multi_lang_general_model_service.proto index b83450ae..c23f4dbb 100644 --- a/core/configure/proto/multi_lang_general_model_service.proto +++ b/core/configure/proto/multi_lang_general_model_service.proto @@ -37,6 +37,7 @@ message InferenceRequest { repeated string feed_var_names = 2; repeated string fetch_var_names = 3; required bool is_python = 4 [ default = false ]; + required uint64 log_id = 5 [ default = 0 ]; }; message InferenceResponse { diff --git a/java/src/main/java/io/paddle/serving/client/Client.java b/java/src/main/java/io/paddle/serving/client/Client.java index 1e09e0c2..742d4f91 100644 --- a/java/src/main/java/io/paddle/serving/client/Client.java +++ b/java/src/main/java/io/paddle/serving/client/Client.java @@ -192,14 +192,16 @@ public class Client { private InferenceRequest _packInferenceRequest( List> feed_batch, - Iterable fetch) throws IllegalArgumentException { + Iterable fetch, + long log_id) throws IllegalArgumentException { List feed_var_names = new ArrayList(); feed_var_names.addAll(feed_batch.get(0).keySet()); InferenceRequest.Builder req_builder = InferenceRequest.newBuilder() .addAllFeedVarNames(feed_var_names) .addAllFetchVarNames(fetch) - .setIsPython(false); + .setIsPython(false) + .setLogId(log_id); for (HashMap feed_data: feed_batch) { FeedInst.Builder inst_builder = FeedInst.newBuilder(); for (String name: feed_var_names) { @@ -332,76 +334,151 @@ public class Client { public Map predict( HashMap feed, Iterable fetch) { - return predict(feed, fetch, false); + return predict(feed, fetch, false, 0); + } + + public Map predict( + HashMap feed, + Iterable fetch, + long log_id) { + return predict(feed, fetch, false, log_id); } public Map> ensemble_predict( HashMap feed, Iterable fetch) { - return ensemble_predict(feed, fetch, false); + return ensemble_predict(feed, fetch, false, 0); + } + + public Map> ensemble_predict( + HashMap feed, + Iterable fetch, + long log_id) { + return ensemble_predict(feed, fetch, false, log_id); } public PredictFuture asyn_predict( HashMap feed, Iterable fetch) { - return asyn_predict(feed, fetch, false); + return asyn_predict(feed, fetch, false, 0); + } + + public PredictFuture asyn_predict( + HashMap feed, + Iterable fetch, + long log_id) { + return asyn_predict(feed, fetch, false, log_id); } public Map predict( HashMap feed, Iterable fetch, Boolean need_variant_tag) { + return predict(feed, fetch, need_variant_tag, 0); + } + + public Map predict( + HashMap feed, + Iterable fetch, + Boolean need_variant_tag, + long log_id) { List> feed_batch = new ArrayList>(); feed_batch.add(feed); - return predict(feed_batch, fetch, need_variant_tag); + return predict(feed_batch, fetch, need_variant_tag, log_id); } - + public Map> ensemble_predict( HashMap feed, Iterable fetch, Boolean need_variant_tag) { + return ensemble_predict(feed, fetch, need_variant_tag, 0); + } + + public Map> ensemble_predict( + HashMap feed, + Iterable fetch, + Boolean need_variant_tag, + long log_id) { List> feed_batch = new ArrayList>(); feed_batch.add(feed); - return ensemble_predict(feed_batch, fetch, need_variant_tag); + return ensemble_predict(feed_batch, fetch, need_variant_tag, log_id); } public PredictFuture asyn_predict( HashMap feed, Iterable fetch, Boolean need_variant_tag) { + return asyn_predict(feed, fetch, need_variant_tag, 0); + } + + public PredictFuture asyn_predict( + HashMap feed, + Iterable fetch, + Boolean need_variant_tag, + long log_id) { List> feed_batch = new ArrayList>(); feed_batch.add(feed); - return asyn_predict(feed_batch, fetch, need_variant_tag); + return asyn_predict(feed_batch, fetch, need_variant_tag, log_id); } public Map predict( List> feed_batch, Iterable fetch) { - return predict(feed_batch, fetch, false); + return predict(feed_batch, fetch, false, 0); + } + + public Map predict( + List> feed_batch, + Iterable fetch, + long log_id) { + return predict(feed_batch, fetch, false, log_id); } public Map> ensemble_predict( List> feed_batch, Iterable fetch) { - return ensemble_predict(feed_batch, fetch, false); + return ensemble_predict(feed_batch, fetch, false, 0); + } + + public Map> ensemble_predict( + List> feed_batch, + Iterable fetch, + long log_id) { + return ensemble_predict(feed_batch, fetch, false, log_id); } public PredictFuture asyn_predict( List> feed_batch, Iterable fetch) { - return asyn_predict(feed_batch, fetch, false); + return asyn_predict(feed_batch, fetch, false, 0); + } + + public PredictFuture asyn_predict( + List> feed_batch, + Iterable fetch, + long log_id) { + return asyn_predict(feed_batch, fetch, false, log_id); } public Map predict( List> feed_batch, Iterable fetch, Boolean need_variant_tag) { + return predict(feed_batch, fetch, need_variant_tag, 0); + } + + public Map predict( + List> feed_batch, + Iterable fetch, + Boolean need_variant_tag, + long log_id) { try { profiler_.record("java_prepro_0"); - InferenceRequest req = _packInferenceRequest(feed_batch, fetch); + InferenceRequest req = _packInferenceRequest( + feed_batch, fetch, log_id); profiler_.record("java_prepro_1"); profiler_.record("java_client_infer_0"); @@ -415,7 +492,7 @@ public class Client { = new ArrayList>>( ensemble_result.entrySet()); if (list.size() != 1) { - System.out.format("predict failed: please use ensemble_predict impl.\n"); + System.out.format("Failed to predict: please use ensemble_predict impl.\n"); return null; } profiler_.record("java_postpro_1"); @@ -423,7 +500,7 @@ public class Client { return list.get(0).getValue(); } catch (StatusRuntimeException e) { - System.out.format("predict failed: %s\n", e.toString()); + System.out.format("Failed to predict: %s\n", e.toString()); return null; } } @@ -432,9 +509,18 @@ public class Client { List> feed_batch, Iterable fetch, Boolean need_variant_tag) { + return ensemble_predict(feed_batch, fetch, need_variant_tag, 0); + } + + public Map> ensemble_predict( + List> feed_batch, + Iterable fetch, + Boolean need_variant_tag, + long log_id) { try { profiler_.record("java_prepro_0"); - InferenceRequest req = _packInferenceRequest(feed_batch, fetch); + InferenceRequest req = _packInferenceRequest( + feed_batch, fetch, log_id); profiler_.record("java_prepro_1"); profiler_.record("java_client_infer_0"); @@ -449,7 +535,7 @@ public class Client { return ensemble_result; } catch (StatusRuntimeException e) { - System.out.format("predict failed: %s\n", e.toString()); + System.out.format("Failed to predict: %s\n", e.toString()); return null; } } @@ -458,7 +544,16 @@ public class Client { List> feed_batch, Iterable fetch, Boolean need_variant_tag) { - InferenceRequest req = _packInferenceRequest(feed_batch, fetch); + return asyn_predict(feed_batch, fetch, need_variant_tag, 0); + } + + public PredictFuture asyn_predict( + List> feed_batch, + Iterable fetch, + Boolean need_variant_tag, + long log_id) { + InferenceRequest req = _packInferenceRequest( + feed_batch, fetch, log_id); ListenableFuture future = futureStub_.inference(req); PredictFuture predict_future = new PredictFuture(future, (InferenceResponse resp) -> { diff --git a/java/src/main/proto/multi_lang_general_model_service.proto b/java/src/main/proto/multi_lang_general_model_service.proto index b83450ae..c23f4dbb 100644 --- a/java/src/main/proto/multi_lang_general_model_service.proto +++ b/java/src/main/proto/multi_lang_general_model_service.proto @@ -37,6 +37,7 @@ message InferenceRequest { repeated string feed_var_names = 2; repeated string fetch_var_names = 3; required bool is_python = 4 [ default = false ]; + required uint64 log_id = 5 [ default = 0 ]; }; message InferenceResponse { diff --git a/python/paddle_serving_client/__init__.py b/python/paddle_serving_client/__init__.py index e4f4ccee..c46e141d 100644 --- a/python/paddle_serving_client/__init__.py +++ b/python/paddle_serving_client/__init__.py @@ -466,10 +466,11 @@ class MultiLangClient(object): if var.is_lod_tensor: self.lod_tensor_set_.add(var.alias_name) - def _pack_inference_request(self, feed, fetch, is_python): + def _pack_inference_request(self, feed, fetch, is_python, log_id): req = multi_lang_general_model_service_pb2.InferenceRequest() req.fetch_var_names.extend(fetch) req.is_python = is_python + req.log_id = log_id feed_batch = None if isinstance(feed, dict): feed_batch = [feed] @@ -602,12 +603,13 @@ class MultiLangClient(object): fetch, need_variant_tag=False, asyn=False, - is_python=True): + is_python=True, + log_id=0): if not asyn: try: self.profile_.record('py_prepro_0') req = self._pack_inference_request( - feed, fetch, is_python=is_python) + feed, fetch, is_python=is_python, log_id=log_id) self.profile_.record('py_prepro_1') self.profile_.record('py_client_infer_0') @@ -626,7 +628,8 @@ class MultiLangClient(object): except grpc.RpcError as e: return {"serving_status_code": e.code()} else: - req = self._pack_inference_request(feed, fetch, is_python=is_python) + req = self._pack_inference_request( + feed, fetch, is_python=is_python, log_id=log_id) call_future = self.stub_.Inference.future( req, timeout=self.rpc_timeout_s_) return MultiLangPredictFuture( diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 678c0583..1fea9ab1 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -502,6 +502,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. feed_names = list(request.feed_var_names) fetch_names = list(request.fetch_var_names) is_python = request.is_python + log_id = request.log_id feed_batch = [] for feed_inst in request.insts: feed_dict = {} @@ -530,7 +531,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. data.shape = list(feed_inst.tensor_array[idx].shape) feed_dict[name] = data feed_batch.append(feed_dict) - return feed_batch, fetch_names, is_python + return feed_batch, fetch_names, is_python, log_id def _pack_inference_response(self, ret, fetch_names, is_python): resp = multi_lang_general_model_service_pb2.InferenceResponse() @@ -583,10 +584,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. return resp def Inference(self, request, context): - feed_dict, fetch_names, is_python = self._unpack_inference_request( - request) + feed_dict, fetch_names, is_python, log_id = \ + self._unpack_inference_request(request) ret = self.bclient_.predict( - feed=feed_dict, fetch=fetch_names, need_variant_tag=True) + feed=feed_dict, + fetch=fetch_names, + need_variant_tag=True, + log_id=log_id) return self._pack_inference_response(ret, fetch_names, is_python) def GetClientConfig(self, request, context): diff --git a/python/paddle_serving_server_gpu/__init__.py b/python/paddle_serving_server_gpu/__init__.py index 1d94bf30..ad84d4e1 100644 --- a/python/paddle_serving_server_gpu/__init__.py +++ b/python/paddle_serving_server_gpu/__init__.py @@ -556,6 +556,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. feed_names = list(request.feed_var_names) fetch_names = list(request.fetch_var_names) is_python = request.is_python + log_id = request.log_id feed_batch = [] for feed_inst in request.insts: feed_dict = {} @@ -637,10 +638,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. return resp def Inference(self, request, context): - feed_dict, fetch_names, is_python = self._unpack_inference_request( - request) + feed_dict, fetch_names, is_python, log_id \ + = self._unpack_inference_request(request) ret = self.bclient_.predict( - feed=feed_dict, fetch=fetch_names, need_variant_tag=True) + feed=feed_dict, + fetch=fetch_names, + need_variant_tag=True, + log_id=log_id) return self._pack_inference_response(ret, fetch_names, is_python) def GetClientConfig(self, request, context): diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 67dfc6a6..4b32d121 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -199,7 +199,7 @@ class Op(object): (_, input_dict), = input_dicts.items() return input_dict - def process(self, feed_batch): + def process(self, feed_batch, typical_logid): err, err_info = ChannelData.check_batch_npdata(feed_batch) if err != 0: _LOGGER.critical( @@ -207,7 +207,7 @@ class Op(object): "preprocess func.".format(err_info))) os._exit(-1) call_result = self.client.predict( - feed=feed_batch, fetch=self._fetch_names) + feed=feed_batch, fetch=self._fetch_names, log_id=typical_logid) if isinstance(self.client, MultiLangClient): if call_result is None or call_result["serving_status_code"] != 0: return None @@ -330,53 +330,64 @@ class Op(object): err_channeldata_dict = {} if self.with_serving: data_ids = preped_data_dict.keys() + typical_logid = data_ids[0] + if len(data_ids) != 1: + for data_id in data_ids: + _LOGGER.info( + "(logid={}) During access to PaddleServingService," + " we selected logid={} (batch: {}) as a representative" + " for logging.".format(data_id, typical_logid, + data_ids)) feed_batch = [preped_data_dict[data_id] for data_id in data_ids] midped_batch = None ecode = ChannelDataEcode.OK.value if self._timeout <= 0: try: - midped_batch = self.process(feed_batch) + midped_batch = self.process(feed_batch, typical_logid) except Exception as e: ecode = ChannelDataEcode.UNKNOW.value - error_info = "{} Failed to process(batch: {}): {}".format( - op_info_prefix, data_ids, e) + error_info = "(logid={}) {} Failed to process(batch: {}): {}".format( + typical_logid, op_info_prefix, data_ids, e) _LOGGER.error(error_info, exc_info=True) else: for i in range(self._retry): try: midped_batch = func_timeout.func_timeout( - self._timeout, self.process, args=(feed_batch, )) + self._timeout, + self.process, + args=(feed_batch, typical_logid)) except func_timeout.FunctionTimedOut as e: if i + 1 >= self._retry: ecode = ChannelDataEcode.TIMEOUT.value - error_info = "{} Failed to process(batch: {}): " \ + error_info = "(logid={}) {} Failed to process(batch: {}): " \ "exceeded retry count.".format( - op_info_prefix, data_ids) + typical_logid, op_info_prefix, data_ids) _LOGGER.error(error_info) else: _LOGGER.warning( - "{} Failed to process(batch: {}): timeout, and retrying({}/{})" - .format(op_info_prefix, data_ids, i + 1, - self._retry)) + "(logid={}) {} Failed to process(batch: {}): timeout," + " and retrying({}/{})...".format( + typical_logid, op_info_prefix, data_ids, i + + 1, self._retry)) except Exception as e: ecode = ChannelDataEcode.UNKNOW.value - error_info = "{} Failed to process(batch: {}): {}".format( - op_info_prefix, data_ids, e) + error_info = "(logid={}) {} Failed to process(batch: {}): {}".format( + typical_logid, op_info_prefix, data_ids, e) _LOGGER.error(error_info, exc_info=True) break else: break if ecode != ChannelDataEcode.OK.value: for data_id in data_ids: - _LOGGER.error("(logid={}) {}".format(data_id, error_info)) err_channeldata_dict[data_id] = ChannelData( ecode=ecode, error_info=error_info, data_id=data_id) elif midped_batch is None: # op client return None - error_info = "{} Failed to predict, please check if PaddleServingService" \ - " is working properly.".format(op_info_prefix) + error_info = "(logid={}) {} Failed to predict, please check if " \ + "PaddleServingService is working properly.".format( + typical_logid, op_info_prefix) + _LOGGER.error(error_info) for data_id in data_ids: - _LOGGER.error("(logid={}) {}".format(data_id, error_info)) err_channeldata_dict[data_id] = ChannelData( ecode=ChannelDataEcode.CLIENT_ERROR.value, error_info=error_info, -- GitLab