From 8f5c838b7e8a4d2d6f1a7de58bb903b9012129b9 Mon Sep 17 00:00:00 2001 From: TeslaZhao Date: Mon, 13 Sep 2021 16:26:06 +0800 Subject: [PATCH] Add bytes type in tensor proto, when data is large, pack/unpack performace is better than repeated value --- core/predictor/framework/infer.cpp | 4 ++-- core/predictor/framework/infer.h | 12 +++++------ python/pipeline/gateway/proto/gateway.proto | 15 ++++++++++---- python/pipeline/operator.py | 11 ++++++++-- python/pipeline/pipeline_client.py | 21 ++++++++++++++------ python/pipeline/proto/pipeline_service.proto | 15 ++++++++++---- 6 files changed, 54 insertions(+), 24 deletions(-) diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp index e0c284df..02906122 100644 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -349,7 +349,7 @@ T* VersionedInferEngine::get_core() { } template -T* VersionedInferEngine::get_core(uint64_t version) { +T* VersionedInferEngine::get_core(const uint64_t version) { auto iter = _versions.find(version); if (iter == _versions.end()) { LOG(ERROR) << "Not found version engine: " << version; @@ -539,7 +539,7 @@ int InferManager::infer(const char* model_name, } template -T* InferManager::get_core(const char* model_name, uint64_t version) { +T* InferManager::get_core(const char* model_name, const uint64_t version) { auto it = _map.find(model_name); if (it == _map.end()) { LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 67a7cf2f..45146e43 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -277,7 +277,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine { LOG(WARNING) << "Loading cube cache[" << next_idx << "] ..."; std::string model_path = conf.model_dir(); if (access(model_path.c_str(), F_OK) == 0) { - std::string cube_cache_path = model_path + "cube_cache"; + std::string cube_cache_path = model_path + "/" + "cube_cache"; int reload_cache_ret = md->caches[next_idx]->reload_data(cube_cache_path); LOG(WARNING) << "Loading cube cache[" << next_idx << "] done."; } else { @@ -543,9 +543,9 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { lod_tensor_in->CopyFromCpu(data); } else { LOG(ERROR) << "Inference not support type[" - << (*tensorVector_in_pointer)[i].dtype - << "],name[" << (*tensorVector_in_pointer)[i].name - << "]" << " copy into core failed!"; + << (*tensorVector_in_pointer)[i].dtype << "],name[" + << (*tensorVector_in_pointer)[i].name << "]" + << " copy into core failed!"; } // Paddle inference will support FP16 in next version. // else if ((*tensorVector_in_pointer)[i].dtype == @@ -724,7 +724,7 @@ class VersionedInferEngine : public InferEngine { int infer(const void* in, void* out, uint32_t batch_size, uint64_t version); template - T* get_core(uint64_t version); + T* get_core(const uint64_t version); int proc_initialize_impl(const configure::EngineDesc& conf, bool); @@ -789,7 +789,7 @@ class InferManager { // Versioned get engine core template - T* get_core(const char* model_name, uint64_t version); + T* get_core(const char* model_name, const uint64_t version); // query model version int query_version(const std::string& model, uint64_t& version); diff --git a/python/pipeline/gateway/proto/gateway.proto b/python/pipeline/gateway/proto/gateway.proto index 68df5747..8dc27a2e 100644 --- a/python/pipeline/gateway/proto/gateway.proto +++ b/python/pipeline/gateway/proto/gateway.proto @@ -51,6 +51,12 @@ message Tensor { // VarType: STRING repeated string str_data = 9; + // VarType: BYTES, is suitable for big data. No need to save data types and + // dimensions + // pack method: pack by BytesIO, saved by np.save + // unpack method: load by np.load, unpack by BytesIO. + bytes byte_data = 10; + // Element types: // 0 => INT64 // 1 => FP32 @@ -65,17 +71,18 @@ message Tensor { // 10 => COMPLEX64 // 11 => COMPLEX128 // 12 => STRING - int32 elem_type = 10; + // 13 => BYTES + int32 elem_type = 20; // Shape of the tensor, including batch dimensions. - repeated int32 shape = 11; + repeated int32 shape = 21; // Level of data(LOD), support variable length data, only for fetch tensor // currently. - repeated int32 lod = 12; + repeated int32 lod = 22; // Correspond to the variable 'name' in the model description prototxt. - string name = 13; + string name = 23; }; // The structure of the service request. The input data can be repeated string diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 2a74714f..4d4717d6 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -26,6 +26,7 @@ import collections import numpy as np import json from numpy import * +from io import BytesIO if sys.version_info.major == 2: import Queue elif sys.version_info.major == 3: @@ -59,7 +60,8 @@ _TENSOR_DTYPE_2_NUMPY_DATA_DTYPE = { 9: "bool", # VarType.BOOL 10: "complex64", # VarType.COMPLEX64 11: "complex128", # VarType.COMPLEX128 - 12: "string", # dismatch with numpy + 12: "string", # load by numpy + 13: "bytes", # load by numpy } @@ -1577,10 +1579,11 @@ class RequestOp(Op): UINT8 INT8 BOOL + BYTES Unsupported type: + STRING COMPLEX64 COMPLEX128 - STRING Args: tensor: one tensor in request.tensors. @@ -1634,6 +1637,10 @@ class RequestOp(Op): elif tensor.elem_type == 9: # VarType: BOOL np_data = np.array(tensor.bool_data).astype(bool).reshape(dims) + elif tensor.elem_type == 13: + # VarType: BYTES + byte_data = BytesIO(tensor.byte_data) + np_data = np.load(byte_data, allow_pickle=True) else: _LOGGER.error("Sorry, the type {} of tensor {} is not supported.". format(tensor.elem_type, tensor.name)) diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index cf5462d6..5585463e 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -25,6 +25,7 @@ from .channel import ChannelDataErrcode from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2_grpc import six +from io import BytesIO _LOGGER = logging.getLogger(__name__) @@ -47,7 +48,8 @@ class PipelineClient(object): self._stub = pipeline_service_pb2_grpc.PipelineServiceStub( self._channel) - def _pack_request_package(self, feed_dict, pack_tensor_format, profile): + def _pack_request_package(self, feed_dict, pack_tensor_format, + use_tensor_bytes, profile): req = pipeline_service_pb2.Request() logid = feed_dict.get("logid") @@ -99,11 +101,9 @@ class PipelineClient(object): one_tensor = req.tensors.add() one_tensor.name = key - if (sys.version_info.major == 2 and - isinstance(value, (str, unicode)) or - ((sys.version_info.major == 3) and isinstance(value, str))): + if isinstance(value, str): one_tensor.string_data.add(value) - one_tensor.elem_type = 12 #12 => string + one_tensor.elem_type = 12 #12 => string in proto continue if isinstance(value, np.ndarray): @@ -112,6 +112,13 @@ class PipelineClient(object): for one_dim in value.shape: one_tensor.shape.append(one_dim) + # packed into bytes + if use_tensor_bytes is True: + np_bytes = BytesIO() + np.save(np_bytes, value, allow_pickle=True) + one_tensor.byte_data = np_bytes.getvalue() + one_tensor.elem_type = 13 #13 => bytes in proto + flat_value = value.flatten().tolist() # copy data if value.dtype == "int64": @@ -162,6 +169,7 @@ class PipelineClient(object): fetch=None, asyn=False, pack_tensor_format=False, + use_tensor_bytes=False, profile=False, log_id=0): if not isinstance(feed_dict, dict): @@ -170,7 +178,8 @@ class PipelineClient(object): if fetch is not None and not isinstance(fetch, list): raise TypeError("fetch must be list type with format: [name].") print("PipelineClient::predict pack_data time:{}".format(time.time())) - req = self._pack_request_package(feed_dict, pack_tensor_format, profile) + req = self._pack_request_package(feed_dict, pack_tensor_format, + use_tensor_bytes, profile) req.logid = log_id if not asyn: print("PipelineClient::predict before time:{}".format(time.time())) diff --git a/python/pipeline/proto/pipeline_service.proto b/python/pipeline/proto/pipeline_service.proto index e8bf04e3..ff3eda8b 100644 --- a/python/pipeline/proto/pipeline_service.proto +++ b/python/pipeline/proto/pipeline_service.proto @@ -48,6 +48,12 @@ message Tensor { // VarType: STRING repeated string str_data = 9; + // VarType: BYTES, is suitable for big data. No need to save data types and + // dimensions + // pack method: pack by BytesIO, saved by np.save + // unpack method: load by np.load, unpack by BytesIO. + bytes byte_data = 10; + // Element types: // 0 => INT64 // 1 => FP32 @@ -62,17 +68,18 @@ message Tensor { // 10 => COMPLEX64 // 11 => COMPLEX128 // 12 => STRING - int32 elem_type = 10; + // 13 => BYTES + int32 elem_type = 20; // Shape of the tensor, including batch dimensions. - repeated int32 shape = 11; + repeated int32 shape = 21; // Level of data(LOD), support variable length data, only for fetch tensor // currently. - repeated int32 lod = 12; + repeated int32 lod = 22; // Correspond to the variable 'name' in the model description prototxt. - string name = 13; + string name = 23; }; // The structure of the service request. The input data can be repeated string -- GitLab