提交 fa153d54 编写于 作者: H HexToString

save temp

上级 8bd8bdd9
......@@ -33,7 +33,9 @@ if (WITH_PYTHON)
add_custom_target(general_model_config_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(general_model_config_py_proto general_model_config_py_proto_init)
py_grpc_proto_compile(general_model_service_py_proto SRCS proto/general_model_service.proto)
add_custom_target(general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(general_model_service_py_proto general_model_service_py_proto_init)
if (CLIENT)
py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto)
......@@ -50,7 +52,12 @@ if (WITH_PYTHON)
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMENT "Copy generated general_model_config proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_command(TARGET general_model_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMENT "Copy generated general_model_service proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
......@@ -77,6 +84,12 @@ if (WITH_PYTHON)
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMENT "Copy generated general_model_config proto file into directory paddle_serving_server/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_command(TARGET general_model_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMENT "Copy generated general_model_service proto file into directory paddle_serving_server/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
......
......@@ -20,10 +20,16 @@ import time
client = HttpClient()
client.load_client_config(sys.argv[1])
# if you want to enable Encrypt Module,uncommenting the following line
'''
if you want to enable Encrypt Module,uncommenting the following line
'''
# client.use_key("./key")
client.set_response_compress(True)
client.set_request_compress(True)
'''
if you want to compress,uncommenting the following line
'''
#client.set_response_compress(True)
#client.set_request_compress(True)
#client.set_http_proto(True)
fetch_list = client.get_fetch_names()
import paddle
test_reader = paddle.batch(
......@@ -34,7 +40,7 @@ test_reader = paddle.batch(
for data in test_reader():
new_data = np.zeros((1, 13)).astype("float32")
new_data[0] = data[0][0]
fetch_map = client.predict(
fetch_map = client.grpc_client_predict(
feed={"x": new_data}, fetch=fetch_list, batch=True)
print(fetch_map)
break
......@@ -347,68 +347,69 @@ class Client(object):
raise ValueError(
"Fetch names should not be empty or out of saved fetch list.")
feed_i = feed_batch[0]
for key in feed_i:
feed_dict = feed_batch[0]
for key in feed_dict:
if ".lod" not in key and key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key))
if ".lod" in key:
continue
self.shape_check(feed_i, key)
self.shape_check(feed_dict, key)
if self.feed_types_[key] in int_type:
int_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = np.expand_dims(feed_i[key], 0).repeat(
feed_dict[key] = np.expand_dims(feed_dict[key], 0).repeat(
1, axis=0)
if isinstance(feed_i[key], np.ndarray):
shape_lst.extend(list(feed_i[key].shape))
if isinstance(feed_dict[key], np.ndarray):
shape_lst.extend(list(feed_dict[key].shape))
int_shape.append(shape_lst)
else:
int_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
int_lod_slot_batch.append(feed_i["{}.lod".format(key)])
if "{}.lod".format(key) in feed_dict:
int_lod_slot_batch.append(feed_dict["{}.lod".format(key)])
else:
int_lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray):
int_slot.append(np.ascontiguousarray(feed_i[key]))
if isinstance(feed_dict[key], np.ndarray):
int_slot.append(np.ascontiguousarray(feed_dict[key]))
self.has_numpy_input = True
else:
int_slot.append(np.ascontiguousarray(feed_i[key]))
int_slot.append(np.ascontiguousarray(feed_dict[key]))
self.all_numpy_input = False
elif self.feed_types_[key] in float_type:
float_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = np.expand_dims(feed_i[key], 0).repeat(
feed_dict[key] = np.expand_dims(feed_dict[key], 0).repeat(
1, axis=0)
if isinstance(feed_i[key], np.ndarray):
shape_lst.extend(list(feed_i[key].shape))
if isinstance(feed_dict[key], np.ndarray):
shape_lst.extend(list(feed_dict[key].shape))
float_shape.append(shape_lst)
else:
float_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
float_lod_slot_batch.append(feed_i["{}.lod".format(key)])
if "{}.lod".format(key) in feed_dict:
float_lod_slot_batch.append(feed_dict["{}.lod".format(key)])
else:
float_lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray):
float_slot.append(np.ascontiguousarray(feed_i[key]))
if isinstance(feed_dict[key], np.ndarray):
float_slot.append(np.ascontiguousarray(feed_dict[key]))
self.has_numpy_input = True
else:
float_slot.append(np.ascontiguousarray(feed_i[key]))
float_slot.append(np.ascontiguousarray(feed_dict[key]))
self.all_numpy_input = False
#if input is string, feed is not numpy.
elif self.feed_types_[key] in string_type:
string_feed_names.append(key)
string_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
string_lod_slot_batch.append(feed_i["{}.lod".format(key)])
if "{}.lod".format(key) in feed_dict:
string_lod_slot_batch.append(feed_dict["{}.lod".format(
key)])
else:
string_lod_slot_batch.append([])
string_slot.append(feed_i[key])
string_slot.append(feed_dict[key])
self.has_numpy_input = True
self.profile_.record('py_prepro_1')
......
......@@ -21,7 +21,13 @@ import google.protobuf.text_format
import gzip
from collections import Iterable
import base64
import sys
import grpc
from .proto import general_model_service_pb2
sys.path.append(
os.path.join(os.path.abspath(os.path.dirname(__file__)), 'proto'))
from .proto import general_model_service_pb2_grpc
#param 'type'(which is in feed_var or fetch_var) = 0 means dataType is int64
#param 'type'(which is in feed_var or fetch_var) = 1 means dataType is float32
#param 'type'(which is in feed_var or fetch_var) = 2 means dataType is int32
......@@ -79,6 +85,8 @@ class HttpClient(object):
self.key = None
self.try_request_gzip = False
self.try_response_gzip = False
self.total_data_number = 0
self.http_proto = False
def load_client_config(self, model_config_path_list):
if isinstance(model_config_path_list, str):
......@@ -157,6 +165,9 @@ class HttpClient(object):
def set_response_compress(self, try_response_gzip):
self.try_response_gzip = try_response_gzip
def set_http_proto(self, http_proto):
self.http_proto = http_proto
# use_key is the function of encryption.
def use_key(self, key_filename):
with open(key_filename, "rb") as f:
......@@ -192,7 +203,44 @@ class HttpClient(object):
batch=False,
need_variant_tag=False,
log_id=0):
if feed is None or fetch is None:
feed_dict = self.get_feedvar_dict(feed)
fetch_list = self.get_legal_fetch(fetch)
headers = {}
postData = ''
if self.http_proto == True:
postData = self.process_proto_data(feed_dict, fetch_list, batch,
log_id).SerializeToString()
headers["Content-Type"] = "application/proto"
else:
postData = self.process_json_data(feed_dict, fetch_list, batch,
log_id)
headers["Content-Type"] = "application/json"
web_url = "http://" + self.ip + ":" + self.server_port + self.service_name
# 当数据区长度大于512字节时才压缩.
if self.try_request_gzip and self.total_data_number > 512:
postData = gzip.compress(bytes(postData, 'utf-8'))
headers["Content-Encoding"] = "gzip"
if self.try_response_gzip:
headers["Accept-encoding"] = "gzip"
# requests支持自动识别解压
result = requests.post(url=web_url, headers=headers, data=postData)
if result == None:
return None
if result.status_code == 200:
if result.headers["Content-Type"] == 'application/proto':
response = general_model_service_pb2.Response()
response.ParseFromString(result.content)
return response
else:
return result.json()
return result
def get_legal_fetch(self, fetch):
if fetch is None:
raise ValueError("You should specify feed and fetch for prediction")
fetch_list = []
......@@ -203,6 +251,21 @@ class HttpClient(object):
else:
raise ValueError("Fetch only accepts string and list of string")
fetch_names = []
for key in fetch_list:
if key in self.fetch_names_:
fetch_names.append(key)
if len(fetch_names) == 0:
raise ValueError(
"Fetch names should not be empty or out of saved fetch list.")
return {}
return fetch_names
def get_feedvar_dict(self, feed):
if feed is None:
raise ValueError("You should specify feed and fetch for prediction")
feed_batch = []
if isinstance(feed, dict):
feed_batch.append(feed)
......@@ -224,116 +287,172 @@ class HttpClient(object):
if len(feed_batch) != 1:
raise ValueError("len of feed_batch can only be 1.")
fetch_names = []
for key in fetch_list:
if key in self.fetch_names_:
fetch_names.append(key)
if len(fetch_names) == 0:
raise ValueError(
"Fetch names should not be empty or out of saved fetch list.")
return {}
feed_i = feed_batch[0]
return feed_batch[0]
def process_json_data(self, feed_dict, fetch_list, batch, log_id):
Request = {}
Request["fetch_var_names"] = fetch_list
Request["log_id"] = int(log_id)
Request["tensor"] = []
index = 0
total_data_number = 0
for key in feed_i:
for key in feed_dict:
if ".lod" not in key and key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key))
if ".lod" in key:
continue
Request["tensor"].append('')
Request["tensor"][index] = {}
lod = []
if "{}.lod".format(key) in feed_i:
lod = feed_i["{}.lod".format(key)]
shape = self.feed_shapes_[key].copy()
elem_type = self.feed_types_[key]
data_value = feed_i[key]
data_key = proto_data_key_list[elem_type]
# feed_i[key] 可以是np.ndarray
# 也可以是list或tuple
# 当np.ndarray需要处理为list
if isinstance(feed_i[key], np.ndarray):
shape_lst = []
# 0维numpy 需要在外层再加一个[]
if feed_i[key].ndim == 0:
data_value = [feed_i[key].tolist()]
shape_lst.append(1)
else:
shape_lst.extend(list(feed_i[key].shape))
shape = shape_lst
data_value = feed_i[key].flatten().tolist()
# 当Batch为False,shape字段前插一个1,表示batch维
# 当Batch为True,则直接使用numpy.shape作为batch维度
if batch == False:
shape.insert(0, 1)
# 当是list或tuple时,需要把多层嵌套展开
elif isinstance(feed_i[key], (list, tuple)):
# 当Batch为False,shape字段前插一个1,表示batch维
# 当Batch为True, 由于list并不像numpy那样规整,所以
# 无法获取shape,此时取第一维度作为Batch维度.
# 插入到feedVar.shape前面.
if batch == False:
shape.insert(0, 1)
else:
shape.insert(0, len(feed_i[key]))
feed_i[key] = [x for x in list_flatten(feed_i[key])]
data_value = feed_i[key]
else:
# 输入可能是单个的str或int值等
# 此时先统一处理为一个list
# 由于输入比较特殊,shape保持原feedvar中不变
data_value = []
data_value.append(feed_i[key])
if isinstance(feed_i[key], str):
if self.feed_types_[key] != bytes_type:
raise ValueError(
"feedvar is not string-type,feed can`t be a single string."
)
else:
if self.feed_types_[key] == bytes_type:
raise ValueError(
"feedvar is string-type,feed, feed can`t be a single int or others."
)
# 如果不压缩,那么不需要统计数据量。
if self.try_request_gzip:
total_data_number = total_data_number + data_bytes_number(
data_value)
Request["tensor"][index]["elem_type"] = elem_type
Request["tensor"][index]["shape"] = shape
Request["tensor"][index][data_key] = data_value
proto_index = self.feed_names_to_idx_[key]
Request["tensor"][index]["name"] = self.feed_real_names[proto_index]
Request["tensor"][index]["alias_name"] = key
if len(lod) > 0:
Request["tensor"][index]["lod"] = lod
index = index + 1
result = None
tensor_dict = self.process_tensor(key, feed_dict, batch)
data_key = tensor_dict["data_key"]
data_value = tensor_dict["data_value"]
tensor = {}
tensor[data_key] = data_value
tensor["shape"] = tensor_dict["shape"]
tensor["elem_type"] = tensor_dict["elem_type"]
tensor["name"] = tensor_dict["name"]
tensor["alias_name"] = tensor_dict["alias_name"]
if "lod" in tensor_dict:
tensor["lod"] = tensor_dict["lod"]
Request["tensor"].append(tensor)
# request
web_url = "http://" + self.ip + ":" + self.server_port + self.service_name
postData = json.dumps(Request)
headers = {}
# 当数据区长度大于512字节时才压缩.
if self.try_request_gzip and total_data_number > 512:
postData = gzip.compress(bytes(postData, 'utf-8'))
headers["Content-Encoding"] = "gzip"
if self.try_response_gzip:
headers["Accept-encoding"] = "gzip"
# requests支持自动识别解压
result = requests.post(url=web_url, headers=headers, data=postData)
return postData
if result == None:
return None
if result.status_code == 200:
return result.json()
return result
def process_proto_data(self, feed_dict, fetch_list, batch, log_id):
req = general_model_service_pb2.Request()
req.fetch_var_names.extend(fetch_list)
req.log_id = log_id
for key in feed_dict:
tensor = general_model_service_pb2.Tensor()
if ".lod" not in key and key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key))
if ".lod" in key:
continue
tensor_dict = self.process_tensor(key, feed_dict, batch)
tensor.shape.extend(tensor_dict["shape"])
tensor.name = tensor_dict["name"]
tensor.alias_name = tensor_dict["alias_name"]
tensor.elem_type = tensor_dict["elem_type"]
if "lod" in tensor_dict:
tensor.lod.extend(tensor_dict["lod"])
if tensor_dict["data_key"] == "int64_data":
tensor.int64_data.extend(tensor_dict["data_value"])
elif tensor_dict["data_key"] == "float_data":
tensor.float_data.extend(tensor_dict["data_value"])
elif tensor_dict["data_key"] == "int_data":
tensor.int_data.extend(tensor_dict["data_value"])
elif tensor_dict["data_key"] == "data":
tensor.data.extend(tensor_dict["data_value"])
else:
raise ValueError(
"tensor element_type must be one of [int64_data,float_data,int_data,data]."
)
req.tensor.append(tensor)
return req
def process_tensor(self, key, feed_dict, batch):
lod = []
if "{}.lod".format(key) in feed_dict:
lod = feed_dict["{}.lod".format(key)]
shape = self.feed_shapes_[key].copy()
elem_type = self.feed_types_[key]
data_value = feed_dict[key]
data_key = proto_data_key_list[elem_type]
proto_index = self.feed_names_to_idx_[key]
name = self.feed_real_names[proto_index]
alias_name = key
# feed_dict[key] 可以是np.ndarray
# 也可以是list或tuple
# 当np.ndarray需要处理为list
if isinstance(feed_dict[key], np.ndarray):
shape_lst = []
# 0维numpy 需要在外层再加一个[]
if feed_dict[key].ndim == 0:
data_value = [feed_dict[key].tolist()]
shape_lst.append(1)
else:
shape_lst.extend(list(feed_dict[key].shape))
shape = shape_lst
data_value = feed_dict[key].flatten().tolist()
# 当Batch为False,shape字段前插一个1,表示batch维
# 当Batch为True,则直接使用numpy.shape作为batch维度
if batch == False:
shape.insert(0, 1)
# 当是list或tuple时,需要把多层嵌套展开
elif isinstance(feed_dict[key], (list, tuple)):
# 当Batch为False,shape字段前插一个1,表示batch维
# 当Batch为True, 由于list并不像numpy那样规整,所以
# 无法获取shape,此时取第一维度作为Batch维度.
# 插入到feedVar.shape前面.
if batch == False:
shape.insert(0, 1)
else:
shape.insert(0, len(feed_dict[key]))
feed_dict[key] = [x for x in list_flatten(feed_dict[key])]
data_value = feed_dict[key]
else:
# 输入可能是单个的str或int值等
# 此时先统一处理为一个list
# 由于输入比较特殊,shape保持原feedvar中不变
data_value = []
data_value.append(feed_dict[key])
if isinstance(feed_dict[key], str):
if self.feed_types_[key] != bytes_type:
raise ValueError(
"feedvar is not string-type,feed can`t be a single string."
)
else:
if self.feed_types_[key] == bytes_type:
raise ValueError(
"feedvar is string-type,feed, feed can`t be a single int or others."
)
# 如果不压缩,那么不需要统计数据量。
if self.try_request_gzip:
self.total_data_number = self.total_data_number + data_bytes_number(
data_value)
tensor_dict = {}
tensor_dict["data_key"] = data_key
tensor_dict["data_value"] = data_value
tensor_dict["shape"] = shape
tensor_dict["elem_type"] = elem_type
tensor_dict["name"] = name
tensor_dict["alias_name"] = alias_name
if len(lod) > 0:
tensor_dict["lod"] = lod
return tensor_dict
def grpc_client_predict(self,
feed=None,
fetch=None,
batch=False,
need_variant_tag=False,
log_id=0):
feed_dict = self.get_feedvar_dict(feed)
fetch_list = self.get_legal_fetch(fetch)
postData = self.process_proto_data(feed_dict, fetch_list, batch, log_id)
print('proto data', postData)
'''
# https://github.com/tensorflow/serving/issues/1382
options = [('grpc.max_receive_message_length', 512 * 1024 * 1024),
('grpc.max_send_message_length', 512 * 1024 * 1024),
('grpc.lb_policy_name', 'round_robin')]
'''
endpoints = [self.ip + ":" + self.server_port]
g_endpoint = 'ipv4:{}'.format(','.join(endpoints))
print("my endpoint is ", g_endpoint)
self.channel_ = grpc.insecure_channel(g_endpoint, options=options)
self.stub_ = general_model_service_pb2_grpc.GeneralModelServiceStub(
self.channel_)
resp = self.stub_.inference(postData, timeout=self.http_timeout_ms)
return resp
......@@ -180,6 +180,8 @@ def serve_args():
default=False,
action="store_true",
help="Use gpu_multi_stream")
parser.add_argument(
"--grpc", default=False, action="store_true", help="Use grpc test")
return parser.parse_args()
......@@ -384,4 +386,33 @@ if __name__ == "__main__":
)
server.serve_forever()
else:
start_multi_card(args)
# this is for grpc Test
if args.grpc:
from .proto import general_model_service_pb2
sys.path.append(
os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'proto'))
from .proto import general_model_service_pb2_grpc
import google.protobuf.text_format
from concurrent import futures
import grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class GeneralModelService(
general_model_service_pb2_grpc.GeneralModelServiceServicer):
def inference(self, request, context):
return general_model_service_pb2.Response()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
general_model_service_pb2_grpc.add_GeneralModelServiceServicer_to_server(
GeneralModelService(), server)
server.add_insecure_port('[::]:9393')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
else:
start_multi_card(args)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册