提交 6829588e 编写于 作者: Z zhangjun

remove paddle_serving_server_gpu

上级 12e4ed33
...@@ -7,13 +7,8 @@ if (CLIENT) ...@@ -7,13 +7,8 @@ if (CLIENT)
endif() endif()
if (SERVER) if (SERVER)
if (NOT WITH_GPU AND NOT WITH_LITE)
file(INSTALL pipeline DESTINATION paddle_serving_server) file(INSTALL pipeline DESTINATION paddle_serving_server)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py)
else()
file(INSTALL pipeline DESTINATION paddle_serving_server_gpu)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py)
endif()
set(PY_FILES ${SERVING_SERVER_PY_FILES}) set(PY_FILES ${SERVING_SERVER_PY_FILES})
SET(PACKAGE_NAME "serving_server") SET(PACKAGE_NAME "serving_server")
set(SETUP_LOG_FILE "setup.py.server.log") set(SETUP_LOG_FILE "setup.py.server.log")
...@@ -22,25 +17,20 @@ endif() ...@@ -22,25 +17,20 @@ endif()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/util.py configure_file(${CMAKE_CURRENT_SOURCE_DIR}/util.py
${CMAKE_CURRENT_BINARY_DIR}/util.py) ${CMAKE_CURRENT_BINARY_DIR}/util.py)
if (CLIENT) if (CLIENT)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in
${CMAKE_CURRENT_BINARY_DIR}/setup.py) ${CMAKE_CURRENT_BINARY_DIR}/setup.py)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../tools/python_tag.py configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../tools/python_tag.py
${CMAKE_CURRENT_BINARY_DIR}/python_tag.py) ${CMAKE_CURRENT_BINARY_DIR}/python_tag.py)
endif() endif()
if (APP) if (APP)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.app.in configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.app.in
${CMAKE_CURRENT_BINARY_DIR}/setup.py) ${CMAKE_CURRENT_BINARY_DIR}/setup.py)
endif() endif()
if (SERVER) if (SERVER)
if (NOT WITH_GPU AND NOT WITH_LITE)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.server.in configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.server.in
${CMAKE_CURRENT_BINARY_DIR}/setup.py) ${CMAKE_CURRENT_BINARY_DIR}/setup.py)
else()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.server_gpu.in
${CMAKE_CURRENT_BINARY_DIR}/setup.py)
endif()
endif() endif()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/gen_version.py configure_file(${CMAKE_CURRENT_SOURCE_DIR}/gen_version.py
...@@ -50,17 +40,17 @@ set (SERVING_CLIENT_CORE ${PADDLE_SERVING_BINARY_DIR}/core/general-client/*.so) ...@@ -50,17 +40,17 @@ set (SERVING_CLIENT_CORE ${PADDLE_SERVING_BINARY_DIR}/core/general-client/*.so)
message("python env: " ${py_env}) message("python env: " ${py_env})
if (APP) if (APP)
add_custom_command( add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_app/ ${PADDLE_SERVING_BINARY_DIR}/python/ COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_app/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "app" COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "app"
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_APP_CORE} general_model_config_py_proto ${PY_FILES}) DEPENDS ${SERVING_APP_CORE} general_model_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
endif() endif()
if (CLIENT) if (CLIENT)
add_custom_command( add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_client/ ${PADDLE_SERVING_BINARY_DIR}/python/ COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_client/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND ${CMAKE_COMMAND} -E copy ${SERVING_CLIENT_CORE} ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/serving_client.so COMMAND ${CMAKE_COMMAND} -E copy ${SERVING_CLIENT_CORE} ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/serving_client.so
...@@ -68,90 +58,48 @@ add_custom_command( ...@@ -68,90 +58,48 @@ add_custom_command(
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "client" COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "client"
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_CLIENT_CORE} sdk_configure_py_proto ${PY_FILES}) DEPENDS ${SERVING_CLIENT_CORE} sdk_configure_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS serving_client ${PADDLE_SERVING_BINARY_DIR}/.timestamp) add_custom_target(paddle_python ALL DEPENDS serving_client ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
endif() endif()
if (SERVER) if (SERVER)
if(NOT WITH_GPU AND NOT WITH_LITE)
add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "server"
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
elseif(WITH_TRT)
if(CUDA_VERSION EQUAL 10.1) if(CUDA_VERSION EQUAL 10.1)
set(SUFFIX 101) set(SUFFIX 101)
elseif(CUDA_VERSION EQUAL 10.2) elseif(CUDA_VERSION EQUAL 10.2)
set(SUFFIX 102) set(SUFFIX 102)
elseif(CUDA_VERSION EQUAL 11.0) elseif(CUDA_VERSION EQUAL 11.0)
set(SUFFIX 11) set(SUFFIX 11)
endif()
add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r
${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server_gpu/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py
"server_gpu" ${SUFFIX}
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
elseif(WITH_LITE)
if(WITH_XPU)
add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r
${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server_gpu/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py
"server_gpu" arm-xpu
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
else()
add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r
${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server_gpu/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py
"server_gpu" arm
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
endif() endif()
else()
add_custom_command( add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r COMMAND cp -r
${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server_gpu/ ${PADDLE_SERVING_BINARY_DIR}/python/ ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py
"server_gpu" ${CUDA_VERSION_MAJOR} "server" ${VERSION_SUFFIX}
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES}) DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
endif()
endif() endif()
set(SERVING_CLIENT_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/) set(SERVING_CLIENT_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/)
set(SERVING_SERVER_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/) set(SERVING_SERVER_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/)
if (CLIENT) if (CLIENT)
install(DIRECTORY ${SERVING_CLIENT_PYTHON_PACKAGE_DIR} install(DIRECTORY ${SERVING_CLIENT_PYTHON_PACKAGE_DIR}
DESTINATION opt/serving_client/share/wheels DESTINATION opt/serving_client/share/wheels
) )
endif() endif()
if (SERVER) if (SERVER)
install(DIRECTORY ${SERVING_SERVER_PYTHON_PACKAGE_DIR} install(DIRECTORY ${SERVING_SERVER_PYTHON_PACKAGE_DIR}
DESTINATION opt/serving_server/share/wheels DESTINATION opt/serving_server/share/wheels
) )
endif() endif()
if (CLIENT OR SERVER) if (CLIENT OR SERVER)
find_program(PATCHELF_EXECUTABLE patchelf) find_program(PATCHELF_EXECUTABLE patchelf)
if (NOT PATCHELF_EXECUTABLE) if (NOT PATCHELF_EXECUTABLE)
message(FATAL_ERROR "patchelf not found, please install it.\n" message(FATAL_ERROR "patchelf not found, please install it.\n"
"For Ubuntu, the command is: apt-get install -y patchelf.") "For Ubuntu, the command is: apt-get install -y patchelf.")
endif() endif()
endif() endif()
...@@ -35,7 +35,7 @@ def update_info(file_name, feature, info): ...@@ -35,7 +35,7 @@ def update_info(file_name, feature, info):
if len(sys.argv) > 2: if len(sys.argv) > 2:
update_info("paddle_serving_server_gpu/version.py", "cuda_version", update_info("paddle_serving_server/version.py", "cuda_version",
sys.argv[2]) sys.argv[2])
path = "paddle_serving_" + sys.argv[1] path = "paddle_serving_" + sys.argv[1]
......
...@@ -13,737 +13,8 @@ ...@@ -13,737 +13,8 @@
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import os SERVER_VERSION = "0.0.0"
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
import tarfile
import socket
import paddle_serving_server as paddle_serving_server
from .version import serving_server_version
from contextlib import closing
import collections
import shutil
import numpy as np
import grpc
from .proto import multi_lang_general_model_service_pb2
import sys
if sys.platform.startswith('win') is False:
import fcntl
sys.path.append(
os.path.join(os.path.abspath(os.path.dirname(__file__)), 'proto'))
from .proto import multi_lang_general_model_service_pb2_grpc
from multiprocessing import Pool, Process
from concurrent import futures
__version__ = SERVER_VERSION
class OpMaker(object): cuda_version = "9"
def __init__(self): commit_id = ""
self.op_dict = { \ No newline at end of file
"general_infer": "GeneralInferOp",
"general_reader": "GeneralReaderOp",
"general_response": "GeneralResponseOp",
"general_text_reader": "GeneralTextReaderOp",
"general_text_response": "GeneralTextResponseOp",
"general_single_kv": "GeneralSingleKVOp",
"general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp",
"general_copy": "GeneralCopyOp"
}
self.node_name_suffix_ = collections.defaultdict(int)
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
node_type))
node = server_sdk.DAGNode()
# node.name will be used as the infer engine name
if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'
.format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node])
def get_op_sequence(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object):
def __init__(self):
self.server_handle_ = None
self.infer_service_conf = None
self.model_toolkit_conf = None
self.resource_conf = None
self.memory_optimization = False
self.ir_optimization = False
self.model_conf = None
self.workflow_fn = "workflow.prototxt"
self.resource_fn = "resource.prototxt"
self.infer_service_fn = "infer_service.prototxt"
self.model_toolkit_fn = "model_toolkit.prototxt"
self.general_model_config_fn = "general_model.prototxt"
self.cube_config_fn = "cube.conf"
self.workdir = ""
self.max_concurrency = 0
self.num_threads = 4
self.port = 8080
self.reload_interval_s = 10
self.max_body_size = 64 * 1024 * 1024
self.module_path = os.path.dirname(paddle_serving_server.__file__)
self.cur_path = os.getcwd()
self.use_local_bin = False
self.mkl_flag = False
self.encryption_model = False
self.product_name = None
self.container_id = None
self.model_config_paths = None # for multi-model in a workflow
def get_fetch_list(self):
fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
return fetch_names
def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency
def set_num_threads(self, threads):
self.num_threads = threads
def set_max_body_size(self, body_size):
if body_size >= self.max_body_size:
self.max_body_size = body_size
else:
print(
"max_body_size is less than default value, will use default value in service."
)
def set_port(self, port):
self.port = port
def set_reload_interval(self, interval):
self.reload_interval_s = interval
def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False):
self.memory_optimization = flag
def set_ir_optimize(self, flag=False):
self.ir_optimization = flag
def use_encryption_model(self, flag=False):
self.encryption_model = flag
def set_product_name(self, product_name=None):
if product_name == None:
raise ValueError("product_name can't be None.")
self.product_name = product_name
def set_container_id(self, container_id):
if container_id == None:
raise ValueError("container_id can't be None.")
self.container_id = container_id
def check_local_bin(self):
if "SERVING_BIN" in os.environ:
self.use_local_bin = True
self.bin_path = os.environ["SERVING_BIN"]
def _prepare_engine(self, model_config_paths, device):
if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf()
for engine_name, model_config_path in model_config_paths.items():
engine = server_sdk.EngineDesc()
engine.name = engine_name
engine.reloadable_meta = model_config_path + "/fluid_time_file"
os.system("touch {}".format(engine.reloadable_meta))
engine.reloadable_type = "timestamp_ne"
engine.runtime_thread_num = 0
engine.batch_infer_size = 0
engine.enable_batch_align = 0
engine.model_data_path = model_config_path
engine.enable_memory_optimization = self.memory_optimization
engine.enable_ir_optimization = self.ir_optimization
engine.static_optimization = False
engine.force_update_static_cache = False
if os.path.exists('{}/__params__'.format(model_config_path)):
suffix = ""
else:
suffix = "_DIR"
if device == "cpu":
if self.encryption_model:
engine.type = "FLUID_CPU_ANALYSIS_ENCRYPT"
else:
engine.type = "FLUID_CPU_ANALYSIS" + suffix
elif device == "gpu":
if self.encryption_model:
engine.type = "FLUID_GPU_ANALYSIS_ENCRYPT"
else:
engine.type = "FLUID_GPU_ANALYSIS" + suffix
self.model_toolkit_conf.engines.extend([engine])
def _prepare_infer_service(self, port):
if self.infer_service_conf == None:
self.infer_service_conf = server_sdk.InferServiceConf()
self.infer_service_conf.port = port
infer_service = server_sdk.InferService()
infer_service.name = "GeneralModelService"
infer_service.workflows.extend(["workflow1"])
self.infer_service_conf.services.extend([infer_service])
def _prepare_resource(self, workdir, cube_conf):
self.workdir = workdir
if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn),
"w") as fout:
fout.write(str(self.model_conf))
self.resource_conf = server_sdk.ResourceConf()
for workflow in self.workflow_conf.workflows:
for node in workflow.nodes:
if "dist_kv" in node.name:
self.resource_conf.cube_config_path = workdir
self.resource_conf.cube_config_file = self.cube_config_fn
if cube_conf == None:
raise ValueError(
"Please set the path of cube.conf while use dist_kv op."
)
shutil.copy(cube_conf, workdir)
if "quant" in node.name:
self.resource_conf.cube_quant_bits = 8
self.resource_conf.model_toolkit_path = workdir
self.resource_conf.model_toolkit_file = self.model_toolkit_fn
self.resource_conf.general_model_path = workdir
self.resource_conf.general_model_file = self.general_model_config_fn
if self.product_name != None:
self.resource_conf.auth_product_name = self.product_name
if self.container_id != None:
self.resource_conf.auth_container_id = self.container_id
def _write_pb_str(self, filepath, pb_obj):
with open(filepath, "w") as fout:
fout.write(str(pb_obj))
def load_model_config(self, model_config_paths):
# At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same.
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# If there is only one model path, use the default infer_op.
# Because there are several infer_op type, we need to find
# it from workflow_conf.
default_engine_names = [
'general_infer_0', 'general_dist_kv_infer_0',
'general_dist_kv_quant_infer_0'
]
engine_name = None
for node in self.workflow_conf.workflows[0].nodes:
if node.name in default_engine_names:
engine_name = node.name
break
if engine_name is None:
raise Exception(
"You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
)
self.model_config_paths = {engine_name: model_config_paths}
workflow_oi_config_path = self.model_config_paths[engine_name]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = list(self.model_config_paths.items())[0][
1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig()
f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf)
# check config here
# print config here
def use_mkl(self, flag):
self.mkl_flag = flag
def get_device_version(self):
avx_flag = False
mkl_flag = self.mkl_flag
openblas_flag = False
r = os.system("cat /proc/cpuinfo | grep avx > /dev/null 2>&1")
if r == 0:
avx_flag = True
if avx_flag:
if mkl_flag:
device_version = "serving-cpu-avx-mkl-"
else:
device_version = "serving-cpu-avx-openblas-"
else:
if mkl_flag:
print(
"Your CPU does not support AVX, server will running with noavx-openblas mode."
)
device_version = "serving-cpu-noavx-openblas-"
return device_version
def download_bin(self):
os.chdir(self.module_path)
need_download = False
device_version = self.get_device_version()
folder_name = device_version + serving_server_version
tar_name = folder_name + ".tar.gz"
bin_url = "https://paddle-serving.bj.bcebos.com/bin/" + tar_name
self.server_path = os.path.join(self.module_path, folder_name)
#acquire lock
version_file = open("{}/version.py".format(self.module_path), "r")
fcntl.flock(version_file, fcntl.LOCK_EX)
if not os.path.exists(self.server_path):
print('Frist time run, downloading PaddleServing components ...')
r = os.system('wget ' + bin_url + ' --no-check-certificate')
if r != 0:
if os.path.exists(tar_name):
os.remove(tar_name)
raise SystemExit(
'Download failed, please check your network or permission of {}.'
.format(self.module_path))
else:
try:
print('Decompressing files ..')
tar = tarfile.open(tar_name)
tar.extractall()
tar.close()
except:
if os.path.exists(exe_path):
os.remove(exe_path)
raise SystemExit(
'Decompressing failed, please check your permission of {} or disk space left.'
.format(self.module_path))
finally:
os.remove(tar_name)
#release lock
version_file.close()
os.chdir(self.cur_path)
self.bin_path = self.server_path + "/serving"
def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if workdir == None:
workdir = "./tmp"
os.system("mkdir {}".format(workdir))
else:
os.system("mkdir {}".format(workdir))
os.system("touch {}/fluid_time_file".format(workdir))
if not self.port_is_available(port):
raise SystemExit("Port {} is already used".format(port))
self.set_port(port)
self._prepare_resource(workdir, cube_conf)
self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port)
self.workdir = workdir
infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
resource_fn = "{}/{}".format(workdir, self.resource_fn)
model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)
self._write_pb_str(infer_service_fn, self.infer_service_conf)
self._write_pb_str(workflow_fn, self.workflow_conf)
self._write_pb_str(resource_fn, self.resource_conf)
self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def run_server(self):
# just run server with system command
# currently we do not load cube
self.check_local_bin()
if not self.use_local_bin:
self.download_bin()
else:
print("Use local bin : {}".format(self.bin_path))
command = "{} " \
"-enable_model_toolkit " \
"-inferservice_path {} " \
"-inferservice_file {} " \
"-max_concurrency {} " \
"-num_threads {} " \
"-port {} " \
"-reload_interval_s {} " \
"-resource_path {} " \
"-resource_file {} " \
"-workflow_path {} " \
"-workflow_file {} " \
"-bthread_concurrency {} " \
"-max_body_size {} ".format(
self.bin_path,
self.workdir,
self.infer_service_fn,
self.max_concurrency,
self.num_threads,
self.port,
self.reload_interval_s,
self.workdir,
self.resource_fn,
self.workdir,
self.workflow_fn,
self.num_threads,
self.max_body_size)
print("Going to Run Command")
print(command)
os.system(command)
class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
MultiLangGeneralModelServiceServicer):
def __init__(self, model_config_path, is_multi_model, endpoints):
self.is_multi_model_ = is_multi_model
self.model_config_path_ = model_config_path
self.endpoints_ = endpoints
with open(self.model_config_path_) as f:
self.model_config_str_ = str(f.read())
self._parse_model_config(self.model_config_str_)
self._init_bclient(self.model_config_path_, self.endpoints_)
def _init_bclient(self, model_config_path, endpoints, timeout_ms=None):
from paddle_serving_client import Client
self.bclient_ = Client()
if timeout_ms is not None:
self.bclient_.set_rpc_timeout_ms(timeout_ms)
self.bclient_.load_client_config(model_config_path)
self.bclient_.connect(endpoints)
def _parse_model_config(self, model_config_str):
model_conf = m_config.GeneralModelConfig()
model_conf = google.protobuf.text_format.Merge(model_config_str,
model_conf)
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.feed_types_ = {}
self.feed_shapes_ = {}
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_types_ = {}
self.lod_tensor_set_ = set()
for i, var in enumerate(model_conf.feed_var):
self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
for i, var in enumerate(model_conf.fetch_var):
self.fetch_types_[var.alias_name] = var.fetch_type
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _flatten_list(self, nested_list):
for item in nested_list:
if isinstance(item, (list, tuple)):
for sub_item in self._flatten_list(item):
yield sub_item
else:
yield item
def _unpack_inference_request(self, request):
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
log_id = request.log_id
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
for idx, name in enumerate(feed_names):
var = feed_inst.tensor_array[idx]
v_type = self.feed_types_[name]
data = None
if is_python:
if v_type == 0: # int64
data = np.frombuffer(var.data, dtype="int64")
elif v_type == 1: # float32
data = np.frombuffer(var.data, dtype="float32")
elif v_type == 2: # int32
data = np.frombuffer(var.data, dtype="int32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
data = np.array(list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
data = np.array(list(var.float_data), dtype="float32")
elif v_type == 2: # int32
data = np.array(list(var.int_data), dtype="int32")
else:
raise Exception("error type.")
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
if len(var.lod) > 0:
feed_dict["{}.lod".format(name)] = var.lod
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse()
if ret is None:
resp.err_code = 1
return resp
results, tag = ret
resp.tag = tag
resp.err_code = 0
if not self.is_multi_model_:
results = {'general_infer_0': results}
for model_name, model_result in results.items():
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = model_result[name].tobytes()
else:
if v_type == 0: # int64
tensor.int64_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 1: # float32
tensor.float_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 2: # int32
tensor.int_data.extend(model_result[name].reshape(-1)
.tolist())
else:
raise Exception("error type.")
tensor.shape.extend(list(model_result[name].shape))
if "{}.lod".format(name) in model_result:
tensor.lod.extend(model_result["{}.lod".format(name)]
.tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
model_output.engine_name = model_name
resp.outputs.append(model_output)
return resp
def SetTimeout(self, request, context):
# This porcess and Inference process cannot be operate at the same time.
# For performance reasons, do not add thread lock temporarily.
timeout_ms = request.timeout_ms
self._init_bclient(self.model_config_path_, self.endpoints_, timeout_ms)
resp = multi_lang_general_model_service_pb2.SimpleResponse()
resp.err_code = 0
return resp
def Inference(self, request, context):
feed_batch, fetch_names, is_python, log_id = \
self._unpack_inference_request(request)
ret = self.bclient_.predict(
feed=feed_batch,
fetch=fetch_names,
batch=True,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context):
resp = multi_lang_general_model_service_pb2.GetClientConfigResponse()
resp.client_config_str = self.model_config_str_
return resp
class MultiLangServer(object):
def __init__(self):
self.bserver_ = Server()
self.worker_num_ = 4
self.body_size_ = 64 * 1024 * 1024
self.concurrency_ = 100000
self.is_multi_model_ = False # for model ensemble
def set_max_concurrency(self, concurrency):
self.concurrency_ = concurrency
self.bserver_.set_max_concurrency(concurrency)
def set_num_threads(self, threads):
self.worker_num_ = threads
self.bserver_.set_num_threads(threads)
def set_max_body_size(self, body_size):
self.bserver_.set_max_body_size(body_size)
if body_size >= self.body_size_:
self.body_size_ = body_size
else:
print(
"max_body_size is less than default value, will use default value in service."
)
def use_encryption_model(self, flag=False):
self.encryption_model = flag
def set_port(self, port):
self.gport_ = port
def set_reload_interval(self, interval):
self.bserver_.set_reload_interval(interval)
def set_op_sequence(self, op_seq):
self.bserver_.set_op_sequence(op_seq)
def set_op_graph(self, op_graph):
self.bserver_.set_op_graph(op_graph)
def set_memory_optimize(self, flag=False):
self.bserver_.set_memory_optimize(flag)
def set_ir_optimize(self, flag=False):
self.bserver_.set_ir_optimize(flag)
def set_op_sequence(self, op_seq):
self.bserver_.set_op_sequence(op_seq)
def use_mkl(self, flag):
self.bserver_.use_mkl(flag)
def load_model_config(self, server_config_paths, client_config_path=None):
self.bserver_.load_model_config(server_config_paths)
if client_config_path is None:
if isinstance(server_config_paths, dict):
self.is_multi_model_ = True
client_config_path = '{}/serving_server_conf.prototxt'.format(
list(server_config_paths.items())[0][1])
else:
client_config_path = '{}/serving_server_conf.prototxt'.format(
server_config_paths)
self.bclient_config_path_ = client_config_path
def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if not self._port_is_available(port):
raise SystemExit("Prot {} is already used".format(port))
default_port = 12000
self.port_list_ = []
for i in range(1000):
if default_port + i != port and self._port_is_available(default_port
+ i):
self.port_list_.append(default_port + i)
break
self.bserver_.prepare_server(
workdir=workdir,
port=self.port_list_[0],
device=device,
cube_conf=cube_conf)
self.set_port(port)
def _launch_brpc_service(self, bserver):
bserver.run_server()
def _port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def run_server(self):
p_bserver = Process(
target=self._launch_brpc_service, args=(self.bserver_, ))
p_bserver.start()
options = [('grpc.max_send_message_length', self.body_size_),
('grpc.max_receive_message_length', self.body_size_)]
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.worker_num_),
options=options,
maximum_concurrent_rpcs=self.concurrency_)
multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
MultiLangServerServiceServicer(
self.bclient_config_path_, self.is_multi_model_,
["0.0.0.0:{}".format(self.port_list_[0])]), server)
server.add_insecure_port('[::]:{}'.format(self.gport_))
server.start()
p_bserver.join()
server.wait_for_termination()
from .proto import server_configure_pb2 as server_sdk
import google.protobuf.text_format
import collections
class OpMaker(object):
def __init__(self):
self.op_dict = {
"general_infer": "GeneralInferOp",
"general_reader": "GeneralReaderOp",
"general_response": "GeneralResponseOp",
"general_text_reader": "GeneralTextReaderOp",
"general_text_response": "GeneralTextResponseOp",
"general_single_kv": "GeneralSingleKVOp",
"general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv": "GeneralDistKVOp"
}
self.node_name_suffix_ = collections.defaultdict(int)
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
node_type))
node = server_sdk.DAGNode()
# node.name will be used as the infer engine name
if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'
.format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node])
def get_op_sequence(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
...@@ -28,7 +28,6 @@ import logging ...@@ -28,7 +28,6 @@ import logging
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class Monitor(object): class Monitor(object):
''' '''
Monitor base class. It is used to monitor the remote model, pull and update the local model. Monitor base class. It is used to monitor the remote model, pull and update the local model.
......
import sys
import os
import google.protobuf.text_format
from .proto import general_model_config_pb2 as m_config
from .proto import multi_lang_general_model_service_pb2
sys.path.append(
os.path.join(os.path.abspath(os.path.dirname(__file__)), 'proto'))
from .proto import multi_lang_general_model_service_pb2_grpc
class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
MultiLangGeneralModelServiceServicer):
def __init__(self, model_config_path, is_multi_model, endpoints):
self.is_multi_model_ = is_multi_model
self.model_config_path_ = model_config_path
self.endpoints_ = endpoints
with open(self.model_config_path_) as f:
self.model_config_str_ = str(f.read())
self._parse_model_config(self.model_config_str_)
self._init_bclient(self.model_config_path_, self.endpoints_)
def _init_bclient(self, model_config_path, endpoints, timeout_ms=None):
from paddle_serving_client import Client
self.bclient_ = Client()
if timeout_ms is not None:
self.bclient_.set_rpc_timeout_ms(timeout_ms)
self.bclient_.load_client_config(model_config_path)
self.bclient_.connect(endpoints)
def _parse_model_config(self, model_config_str):
model_conf = m_config.GeneralModelConfig()
model_conf = google.protobuf.text_format.Merge(model_config_str,
model_conf)
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.feed_types_ = {}
self.feed_shapes_ = {}
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_types_ = {}
self.lod_tensor_set_ = set()
for i, var in enumerate(model_conf.feed_var):
self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
for i, var in enumerate(model_conf.fetch_var):
self.fetch_types_[var.alias_name] = var.fetch_type
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _flatten_list(self, nested_list):
for item in nested_list:
if isinstance(item, (list, tuple)):
for sub_item in self._flatten_list(item):
yield sub_item
else:
yield item
def _unpack_inference_request(self, request):
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
log_id = request.log_id
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
for idx, name in enumerate(feed_names):
var = feed_inst.tensor_array[idx]
v_type = self.feed_types_[name]
data = None
if is_python:
if v_type == 0:
data = np.frombuffer(var.data, dtype="int64")
elif v_type == 1:
data = np.frombuffer(var.data, dtype="float32")
elif v_type == 2:
data = np.frombuffer(var.data, dtype="int32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
data = np.array(list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
data = np.array(list(var.float_data), dtype="float32")
elif v_type == 2:
data = np.array(list(var.int_data), dtype="int32")
else:
raise Exception("error type.")
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
if len(var.lod) > 0:
feed_dict["{}.lod".format(name)] = var.lod
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse()
if ret is None:
resp.err_code = 1
return resp
results, tag = ret
resp.tag = tag
resp.err_code = 0
if not self.is_multi_model_:
results = {'general_infer_0': results}
for model_name, model_result in results.items():
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = model_result[name].tobytes()
else:
if v_type == 0: # int64
tensor.int64_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 1: # float32
tensor.float_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 2: # int32
tensor.int_data.extend(model_result[name].reshape(-1)
.tolist())
else:
raise Exception("error type.")
tensor.shape.extend(list(model_result[name].shape))
if "{}.lod".format(name) in model_result:
tensor.lod.extend(model_result["{}.lod".format(name)]
.tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
model_output.engine_name = model_name
resp.outputs.append(model_output)
return resp
def SetTimeout(self, request, context):
# This porcess and Inference process cannot be operate at the same time.
# For performance reasons, do not add thread lock temporarily.
timeout_ms = request.timeout_ms
self._init_bclient(self.model_config_path_, self.endpoints_, timeout_ms)
resp = multi_lang_general_model_service_pb2.SimpleResponse()
resp.err_code = 0
return resp
def Inference(self, request, context):
feed_batch, fetch_names, is_python, log_id \
= self._unpack_inference_request(request)
ret = self.bclient_.predict(
feed=feed_batch,
fetch=fetch_names,
batch=True,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context):
resp = multi_lang_general_model_service_pb2.GetClientConfigResponse()
resp.client_config_str = self.model_config_str_
return resp
\ No newline at end of file
...@@ -18,12 +18,12 @@ Usage: ...@@ -18,12 +18,12 @@ Usage:
python -m paddle_serving_server.serve --model ./serving_server_model --port 9292 python -m paddle_serving_server.serve --model ./serving_server_model --port 9292
""" """
import argparse import argparse
import sys import os
import json import json
import base64 import base64
import time import time
from multiprocessing import Process from multiprocessing import Pool, Process
from .web_service import WebService, port_is_available from paddle_serving_server_gpu import serve_args
from flask import Flask, request from flask import Flask, request
import sys import sys
if sys.version_info.major == 2: if sys.version_info.major == 2:
...@@ -31,24 +31,26 @@ if sys.version_info.major == 2: ...@@ -31,24 +31,26 @@ if sys.version_info.major == 2:
elif sys.version_info.major == 3: elif sys.version_info.major == 3:
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
def serve_args():
def parse_args(): # pylint: disable=doc-string-missing
parser = argparse.ArgumentParser("serve") parser = argparse.ArgumentParser("serve")
parser.add_argument( parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server") "--thread", type=int, default=2, help="Concurrency of server")
parser.add_argument( parser.add_argument(
"--model", type=str, default="", help="Model for serving") "--port", type=int, default=9292, help="Port of the starting gpu")
parser.add_argument( parser.add_argument(
"--port", type=int, default=9292, help="Port the server") "--device", type=str, default="gpu", help="Type of device")
parser.add_argument("--gpu_ids", type=str, default="", help="gpu ids")
parser.add_argument( parser.add_argument(
"--name", type=str, default="None", help="Web service name") "--model", type=str, default="", help="Model for serving")
parser.add_argument( parser.add_argument(
"--workdir", "--workdir",
type=str, type=str,
default="workdir", default="workdir",
help="Working dir of current service") help="Working dir of current service")
parser.add_argument( parser.add_argument(
"--device", type=str, default="cpu", help="Type of device") "--name", type=str, default="None", help="Default service name")
parser.add_argument(
"--use_mkl", default=False, action="store_true", help="Use MKL")
parser.add_argument( parser.add_argument(
"--mem_optim_off", "--mem_optim_off",
default=False, default=False,
...@@ -56,8 +58,6 @@ def parse_args(): # pylint: disable=doc-string-missing ...@@ -56,8 +58,6 @@ def parse_args(): # pylint: disable=doc-string-missing
help="Memory optimize") help="Memory optimize")
parser.add_argument( parser.add_argument(
"--ir_optim", default=False, action="store_true", help="Graph optimize") "--ir_optim", default=False, action="store_true", help="Graph optimize")
parser.add_argument(
"--use_mkl", default=False, action="store_true", help="Use MKL")
parser.add_argument( parser.add_argument(
"--max_body_size", "--max_body_size",
type=int, type=int,
...@@ -73,6 +73,12 @@ def parse_args(): # pylint: disable=doc-string-missing ...@@ -73,6 +73,12 @@ def parse_args(): # pylint: disable=doc-string-missing
default=False, default=False,
action="store_true", action="store_true",
help="Use Multi-language-service") help="Use Multi-language-service")
parser.add_argument(
"--use_trt", default=False, action="store_true", help="Use TensorRT")
parser.add_argument(
"--use_lite", default=False, action="store_true", help="Use PaddleLite")
parser.add_argument(
"--use_xpu", default=False, action="store_true", help="Use XPU")
parser.add_argument( parser.add_argument(
"--product_name", "--product_name",
type=str, type=str,
...@@ -85,26 +91,29 @@ def parse_args(): # pylint: disable=doc-string-missing ...@@ -85,26 +91,29 @@ def parse_args(): # pylint: disable=doc-string-missing
help="container_id for authentication") help="container_id for authentication")
return parser.parse_args() return parser.parse_args()
def start_gpu_card_model(port, args, index = 0, gpuid): # pylint: disable=doc-string-missing
def start_standard_model(serving_port): # pylint: disable=doc-string-missing workdir = args.workdir
args = parse_args() gpuid = int(gpuid)
device = "gpu"
if gpuid == -1:
device = "cpu"
elif gpuid >= 0:
port = port + index
thread_num = args.thread thread_num = args.thread
model = args.model model = args.model
port = serving_port
workdir = args.workdir
device = args.device
mem_optim = args.mem_optim_off is False mem_optim = args.mem_optim_off is False
ir_optim = args.ir_optim ir_optim = args.ir_optim
max_body_size = args.max_body_size
use_mkl = args.use_mkl use_mkl = args.use_mkl
use_encryption_model = args.use_encryption_model max_body_size = args.max_body_size
use_multilang = args.use_multilang use_multilang = args.use_multilang
if gpuid >= 0:
workdir = "{}_{}".format(args.workdir, gpuid)
if model == "": if model == "":
print("You must specify your serving model") print("You must specify your serving model")
exit(-1) exit(-1)
import paddle_serving_server as serving import paddle_serving_server_gpu as serving
op_maker = serving.OpMaker() op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer') general_infer_op = op_maker.create('general_infer')
...@@ -115,29 +124,84 @@ def start_standard_model(serving_port): # pylint: disable=doc-string-missing ...@@ -115,29 +124,84 @@ def start_standard_model(serving_port): # pylint: disable=doc-string-missing
op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op) op_seq_maker.add_op(general_response_op)
server = None
if use_multilang: if use_multilang:
server = serving.MultiLangServer() server = serving.MultiLangServer()
else: else:
server = serving.Server() server = serving.Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num) server.set_num_threads(thread_num)
server.use_mkl(use_mkl)
server.set_memory_optimize(mem_optim) server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim) server.set_ir_optimize(ir_optim)
server.use_mkl(use_mkl)
server.set_max_body_size(max_body_size) server.set_max_body_size(max_body_size)
server.set_port(port) if args.use_trt:
server.use_encryption_model(use_encryption_model) server.set_trt()
if args.use_lite:
server.set_lite()
device = "arm"
server.set_device(device)
if args.use_xpu:
server.set_xpu()
if args.product_name != None: if args.product_name != None:
server.set_product_name(args.product_name) server.set_product_name(args.product_name)
if args.container_id != None: if args.container_id != None:
server.set_container_id(args.container_id) server.set_container_id(args.container_id)
server.load_model_config(model) server.load_model_config(model)
server.prepare_server(workdir=workdir, port=port, device=device) server.prepare_server(
workdir=workdir,
port=port,
device=device,
use_encryption_model=args.use_encryption_model)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.run_server() server.run_server()
def start_multi_card(args, serving_port=None): # pylint: disable=doc-string-missing
gpus = ""
if serving_port == None:
serving_port = args.port
if args.gpu_ids == "":
gpus = []
else:
gpus = args.gpu_ids.split(",")
if "CUDA_VISIBLE_DEVICES" in os.environ:
env_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",")
for ids in gpus:
if int(ids) >= len(env_gpus):
print(
" Max index of gpu_ids out of range, the number of CUDA_VISIBLE_DEVICES is {}."
.format(len(env_gpus)))
exit(-1)
else:
env_gpus = []
if args.use_lite:
print("run arm server.")
start_gpu_card_model(-1, -1, args)
elif len(gpus) <= 0:
print("gpu_ids not set, going to run cpu service.")
start_gpu_card_model(-1, -1, serving_port, args)
else:
gpu_processes = []
for i, gpu_id in enumerate(gpus):
p = Process(
target=start_gpu_card_model,
args=(
i,
gpu_id,
serving_port,
args, ))
gpu_processes.append(p)
for p in gpu_processes:
p.start()
for p in gpu_processes:
p.join()
class MainService(BaseHTTPRequestHandler): class MainService(BaseHTTPRequestHandler):
def get_available_port(self): def get_available_port(self):
default_port = 12000 default_port = 12000
...@@ -146,7 +210,7 @@ class MainService(BaseHTTPRequestHandler): ...@@ -146,7 +210,7 @@ class MainService(BaseHTTPRequestHandler):
return default_port + i return default_port + i
def start_serving(self): def start_serving(self):
start_standard_model(serving_port) start_multi_card(args, serving_port)
def get_key(self, post_data): def get_key(self, post_data):
if "key" not in post_data: if "key" not in post_data:
...@@ -207,9 +271,9 @@ class MainService(BaseHTTPRequestHandler): ...@@ -207,9 +271,9 @@ class MainService(BaseHTTPRequestHandler):
if __name__ == "__main__": if __name__ == "__main__":
args = serve_args()
args = parse_args()
if args.name == "None": if args.name == "None":
from .web_service import port_is_available
if args.use_encryption_model: if args.use_encryption_model:
p_flag = False p_flag = False
p = None p = None
...@@ -220,27 +284,39 @@ if __name__ == "__main__": ...@@ -220,27 +284,39 @@ if __name__ == "__main__":
) )
server.serve_forever() server.serve_forever()
else: else:
start_standard_model(args.port) start_multi_card(args)
else: else:
service = WebService(name=args.name) from .web_service import WebService
service.load_model_config(args.model) web_service = WebService(name=args.name)
service.prepare_server( web_service.load_model_config(args.model)
workdir=args.workdir, port=args.port, device=args.device) gpu_ids = args.gpu_ids
service.run_rpc_service() if gpu_ids == "":
if "CUDA_VISIBLE_DEVICES" in os.environ:
gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"]
if len(gpu_ids) > 0:
web_service.set_gpus(gpu_ids)
web_service.prepare_server(
workdir=args.workdir,
port=args.port,
device=args.device,
use_lite=args.use_lite,
use_xpu=args.use_xpu,
ir_optim=args.ir_optim)
web_service.run_rpc_service()
app_instance = Flask(__name__) app_instance = Flask(__name__)
@app_instance.before_first_request @app_instance.before_first_request
def init(): def init():
service._launch_web_service() web_service._launch_web_service()
service_name = "/" + service.name + "/prediction" service_name = "/" + web_service.name + "/prediction"
@app_instance.route(service_name, methods=["POST"]) @app_instance.route(service_name, methods=["POST"])
def run(): def run():
return service.get_prediction(request) return web_service.get_prediction(request)
app_instance.run(host="0.0.0.0", app_instance.run(host="0.0.0.0",
port=service.port, port=web_service.port,
threaded=False, threaded=False,
processes=4) processes=4)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import os import os
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
import tarfile import tarfile
import socket import socket
import paddle_serving_server_gpu as paddle_serving_server import paddle_serving_server_gpu as paddle_serving_server
...@@ -24,175 +7,18 @@ import time ...@@ -24,175 +7,18 @@ import time
from .version import serving_server_version from .version import serving_server_version
from contextlib import closing from contextlib import closing
import argparse import argparse
import collections
import sys import sys
if sys.platform.startswith('win') is False: if sys.platform.startswith('win') is False:
import fcntl import fcntl
import shutil import shutil
import numpy as np import numpy as np
import grpc import grpc
from .proto import multi_lang_general_model_service_pb2
import sys import sys
sys.path.append(
os.path.join(os.path.abspath(os.path.dirname(__file__)), 'proto'))
from .proto import multi_lang_general_model_service_pb2_grpc
from multiprocessing import Pool, Process from multiprocessing import Pool, Process
from concurrent import futures from concurrent import futures
def serve_args():
parser = argparse.ArgumentParser("serve")
parser.add_argument(
"--thread", type=int, default=2, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port of the starting gpu")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="gpu", help="Type of device")
parser.add_argument("--gpu_ids", type=str, default="", help="gpu ids")
parser.add_argument(
"--name", type=str, default="None", help="Default service name")
parser.add_argument(
"--mem_optim_off",
default=False,
action="store_true",
help="Memory optimize")
parser.add_argument(
"--ir_optim", default=False, action="store_true", help="Graph optimize")
parser.add_argument(
"--max_body_size",
type=int,
default=512 * 1024 * 1024,
help="Limit sizes of messages")
parser.add_argument(
"--use_encryption_model",
default=False,
action="store_true",
help="Use encryption model")
parser.add_argument(
"--use_multilang",
default=False,
action="store_true",
help="Use Multi-language-service")
parser.add_argument(
"--use_trt", default=False, action="store_true", help="Use TensorRT")
parser.add_argument(
"--use_lite", default=False, action="store_true", help="Use PaddleLite")
parser.add_argument(
"--use_xpu", default=False, action="store_true", help="Use XPU")
parser.add_argument(
"--product_name",
type=str,
default=None,
help="product_name for authentication")
parser.add_argument(
"--container_id",
type=str,
default=None,
help="container_id for authentication")
return parser.parse_args()
class OpMaker(object):
def __init__(self):
self.op_dict = {
"general_infer": "GeneralInferOp",
"general_reader": "GeneralReaderOp",
"general_response": "GeneralResponseOp",
"general_text_reader": "GeneralTextReaderOp",
"general_text_response": "GeneralTextResponseOp",
"general_single_kv": "GeneralSingleKVOp",
"general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv": "GeneralDistKVOp"
}
self.node_name_suffix_ = collections.defaultdict(int)
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
node_type))
node = server_sdk.DAGNode()
# node.name will be used as the infer engine name
if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'
.format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node])
def get_op_sequence(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object): class Server(object):
def __init__(self): def __init__(self):
self.server_handle_ = None self.server_handle_ = None
...@@ -217,6 +43,7 @@ class Server(object): ...@@ -217,6 +43,7 @@ class Server(object):
self.module_path = os.path.dirname(paddle_serving_server.__file__) self.module_path = os.path.dirname(paddle_serving_server.__file__)
self.cur_path = os.getcwd() self.cur_path = os.getcwd()
self.use_local_bin = False self.use_local_bin = False
self.mkl_flag = False
self.device = "cpu" self.device = "cpu"
self.gpuid = 0 self.gpuid = 0
self.use_trt = False self.use_trt = False
...@@ -432,6 +259,29 @@ class Server(object): ...@@ -432,6 +259,29 @@ class Server(object):
# check config here # check config here
# print config here # print config here
def use_mkl(self, flag):
self.mkl_flag = flag
def get_device_version(self):
avx_flag = False
mkl_flag = self.mkl_flag
openblas_flag = False
r = os.system("cat /proc/cpuinfo | grep avx > /dev/null 2>&1")
if r == 0:
avx_flag = True
if avx_flag:
if mkl_flag:
device_version = "serving-cpu-avx-mkl-"
else:
device_version = "serving-cpu-avx-openblas-"
else:
if mkl_flag:
print(
"Your CPU does not support AVX, server will running with noavx-openblas mode."
)
device_version = "serving-cpu-noavx-openblas-"
return device_version
def download_bin(self): def download_bin(self):
os.chdir(self.module_path) os.chdir(self.module_path)
need_download = False need_download = False
...@@ -495,6 +345,7 @@ class Server(object): ...@@ -495,6 +345,7 @@ class Server(object):
os.chdir(self.cur_path) os.chdir(self.cur_path)
self.bin_path = self.server_path + "/serving" self.bin_path = self.server_path + "/serving"
def prepare_server(self, def prepare_server(self,
workdir=None, workdir=None,
port=9292, port=9292,
...@@ -613,158 +464,6 @@ class Server(object): ...@@ -613,158 +464,6 @@ class Server(object):
os.system(command) os.system(command)
class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
MultiLangGeneralModelServiceServicer):
def __init__(self, model_config_path, is_multi_model, endpoints):
self.is_multi_model_ = is_multi_model
self.model_config_path_ = model_config_path
self.endpoints_ = endpoints
with open(self.model_config_path_) as f:
self.model_config_str_ = str(f.read())
self._parse_model_config(self.model_config_str_)
self._init_bclient(self.model_config_path_, self.endpoints_)
def _init_bclient(self, model_config_path, endpoints, timeout_ms=None):
from paddle_serving_client import Client
self.bclient_ = Client()
if timeout_ms is not None:
self.bclient_.set_rpc_timeout_ms(timeout_ms)
self.bclient_.load_client_config(model_config_path)
self.bclient_.connect(endpoints)
def _parse_model_config(self, model_config_str):
model_conf = m_config.GeneralModelConfig()
model_conf = google.protobuf.text_format.Merge(model_config_str,
model_conf)
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.feed_types_ = {}
self.feed_shapes_ = {}
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_types_ = {}
self.lod_tensor_set_ = set()
for i, var in enumerate(model_conf.feed_var):
self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
for i, var in enumerate(model_conf.fetch_var):
self.fetch_types_[var.alias_name] = var.fetch_type
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _flatten_list(self, nested_list):
for item in nested_list:
if isinstance(item, (list, tuple)):
for sub_item in self._flatten_list(item):
yield sub_item
else:
yield item
def _unpack_inference_request(self, request):
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
log_id = request.log_id
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
for idx, name in enumerate(feed_names):
var = feed_inst.tensor_array[idx]
v_type = self.feed_types_[name]
data = None
if is_python:
if v_type == 0:
data = np.frombuffer(var.data, dtype="int64")
elif v_type == 1:
data = np.frombuffer(var.data, dtype="float32")
elif v_type == 2:
data = np.frombuffer(var.data, dtype="int32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
data = np.array(list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
data = np.array(list(var.float_data), dtype="float32")
elif v_type == 2:
data = np.array(list(var.int_data), dtype="int32")
else:
raise Exception("error type.")
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
if len(var.lod) > 0:
feed_dict["{}.lod".format(name)] = var.lod
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse()
if ret is None:
resp.err_code = 1
return resp
results, tag = ret
resp.tag = tag
resp.err_code = 0
if not self.is_multi_model_:
results = {'general_infer_0': results}
for model_name, model_result in results.items():
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = model_result[name].tobytes()
else:
if v_type == 0: # int64
tensor.int64_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 1: # float32
tensor.float_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 2: # int32
tensor.int_data.extend(model_result[name].reshape(-1)
.tolist())
else:
raise Exception("error type.")
tensor.shape.extend(list(model_result[name].shape))
if "{}.lod".format(name) in model_result:
tensor.lod.extend(model_result["{}.lod".format(name)]
.tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
model_output.engine_name = model_name
resp.outputs.append(model_output)
return resp
def SetTimeout(self, request, context):
# This porcess and Inference process cannot be operate at the same time.
# For performance reasons, do not add thread lock temporarily.
timeout_ms = request.timeout_ms
self._init_bclient(self.model_config_path_, self.endpoints_, timeout_ms)
resp = multi_lang_general_model_service_pb2.SimpleResponse()
resp.err_code = 0
return resp
def Inference(self, request, context):
feed_batch, fetch_names, is_python, log_id \
= self._unpack_inference_request(request)
ret = self.bclient_.predict(
feed=feed_batch,
fetch=fetch_names,
batch=True,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context):
resp = multi_lang_general_model_service_pb2.GetClientConfigResponse()
resp.client_config_str = self.model_config_str_
return resp
class MultiLangServer(object): class MultiLangServer(object):
def __init__(self): def __init__(self):
self.bserver_ = Server() self.bserver_ = Server()
...@@ -808,6 +507,9 @@ class MultiLangServer(object): ...@@ -808,6 +507,9 @@ class MultiLangServer(object):
def set_op_graph(self, op_graph): def set_op_graph(self, op_graph):
self.bserver_.set_op_graph(op_graph) self.bserver_.set_op_graph(op_graph)
def use_mkl(self, flag):
self.bserver_.use_mkl(flag)
def set_memory_optimize(self, flag=False): def set_memory_optimize(self, flag=False):
self.bserver_.set_memory_optimize(flag) self.bserver_.set_memory_optimize(flag)
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Paddle Serving Client version string """
serving_client_version = "0.0.0"
serving_server_version = "0.0.0"
module_proto_version = "0.0.0"
commit_id = ""
...@@ -15,15 +15,18 @@ ...@@ -15,15 +15,18 @@
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from flask import Flask, request, abort from flask import Flask, request, abort
from multiprocessing import Pool, Process
from paddle_serving_server import OpMaker, OpSeqMaker, Server
from paddle_serving_client import Client
from contextlib import closing from contextlib import closing
from multiprocessing import Pool, Process, Queue
from paddle_serving_client import Client
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
from paddle_serving_server_gpu.serve import start_multi_card
import socket import socket
import sys
import numpy as np import numpy as np
from paddle_serving_server import pipeline import paddle_serving_server_gpu as serving
from paddle_serving_server.pipeline import Op
from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline import Op
def port_is_available(port): def port_is_available(port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
...@@ -34,13 +37,15 @@ def port_is_available(port): ...@@ -34,13 +37,15 @@ def port_is_available(port):
else: else:
return False return False
class WebService(object): class WebService(object):
def __init__(self, name="default_service"): def __init__(self, name="default_service"):
self.name = name self.name = name
# pipeline # pipeline
self._server = pipeline.PipelineServer(self.name) self._server = pipeline.PipelineServer(self.name)
self.gpus = [] # deprecated
self.rpc_service_list = [] # deprecated
def get_pipeline_response(self, read_op): def get_pipeline_response(self, read_op):
return None return None
...@@ -77,58 +82,115 @@ class WebService(object): ...@@ -77,58 +82,115 @@ class WebService(object):
self.feed_vars = {var.name: var for var in model_conf.feed_var} self.feed_vars = {var.name: var for var in model_conf.feed_var}
self.fetch_vars = {var.name: var for var in model_conf.fetch_var} self.fetch_vars = {var.name: var for var in model_conf.fetch_var}
def _launch_rpc_service(self): def set_gpus(self, gpus):
op_maker = OpMaker() print("This API will be deprecated later. Please do not use it")
self.gpus = [int(x) for x in gpus.split(",")]
def default_rpc_service(self,
workdir="conf",
port=9292,
gpuid=0,
thread_num=2,
mem_optim=True,
use_lite=False,
use_xpu=False,
ir_optim=False):
device = "gpu"
if gpuid == -1:
if use_lite:
device = "arm"
else:
device = "cpu"
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer') general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response') general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker() op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op) op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op) op_seq_maker.add_op(general_response_op)
server = Server() server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(16) server.set_num_threads(thread_num)
server.set_memory_optimize(self.mem_optim) server.set_memory_optimize(mem_optim)
server.set_ir_optimize(self.ir_optim) server.set_ir_optimize(ir_optim)
server.set_device(device)
if use_lite:
server.set_lite()
if use_xpu:
server.set_xpu()
server.load_model_config(self.model_config) server.load_model_config(self.model_config)
server.prepare_server( if gpuid >= 0:
workdir=self.workdir, port=self.port_list[0], device=self.device) server.set_gpuid(gpuid)
server.run_server() server.prepare_server(workdir=workdir, port=port, device=device)
return server
def port_is_available(self, port): def _launch_rpc_service(self, service_idx):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: self.rpc_service_list[service_idx].run_server()
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def prepare_server(self, def prepare_server(self,
workdir="", workdir="",
port=9393, port=9393,
device="cpu", device="gpu",
mem_optim=True, use_lite=False,
ir_optim=False): use_xpu=False,
ir_optim=False,
gpuid=0,
mem_optim=True):
print("This API will be deprecated later. Please do not use it") print("This API will be deprecated later. Please do not use it")
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
default_port = 12000 self.gpuid = gpuid
self.port_list = [] self.port_list = []
self.mem_optim = mem_optim default_port = 12000
self.ir_optim = ir_optim
for i in range(1000): for i in range(1000):
if port_is_available(default_port + i): if port_is_available(default_port + i):
self.port_list.append(default_port + i) self.port_list.append(default_port + i)
if len(self.port_list) > len(self.gpus):
break break
if len(self.gpus) == 0:
# init cpu service
self.rpc_service_list.append(
self.default_rpc_service(
self.workdir,
self.port_list[0],
-1,
thread_num=2,
mem_optim=mem_optim,
use_lite=use_lite,
use_xpu=use_xpu,
ir_optim=ir_optim))
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self.default_rpc_service(
"{}_{}".format(self.workdir, i),
self.port_list[i],
gpuid,
thread_num=2,
mem_optim=mem_optim,
use_lite=use_lite,
use_xpu=use_xpu,
ir_optim=ir_optim))
def _launch_web_service(self): def _launch_web_service(self):
gpu_num = len(self.gpus)
self.client = Client() self.client = Client()
self.client.load_client_config("{}/serving_server_conf.prototxt".format( self.client.load_client_config("{}/serving_server_conf.prototxt".format(
self.model_config)) self.model_config))
self.client.connect(["0.0.0.0:{}".format(self.port_list[0])]) endpoints = ""
if gpu_num > 0:
for i in range(gpu_num):
endpoints += "127.0.0.1:{},".format(self.port_list[i])
else:
endpoints = "127.0.0.1:{}".format(self.port_list[0])
self.client.connect([endpoints])
def get_prediction(self, request): def get_prediction(self, request):
if not request.json: if not request.json:
...@@ -158,8 +220,12 @@ class WebService(object): ...@@ -158,8 +220,12 @@ class WebService(object):
print("web service address:") print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port, print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name)) self.name))
p_rpc = Process(target=self._launch_rpc_service) server_pros = []
p_rpc.start() for i, service in enumerate(self.rpc_service_list):
p = Process(target=self._launch_rpc_service, args=(i, ))
server_pros.append(p)
for p in server_pros:
p.start()
app_instance = Flask(__name__) app_instance = Flask(__name__)
...@@ -175,7 +241,9 @@ class WebService(object): ...@@ -175,7 +241,9 @@ class WebService(object):
self.app_instance = app_instance self.app_instance = app_instance
def run_debugger_service(self): # TODO: maybe change another API name: maybe run_local_predictor?
def run_debugger_service(self, gpu=False):
print("This API will be deprecated later. Please do not use it")
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
...@@ -185,7 +253,7 @@ class WebService(object): ...@@ -185,7 +253,7 @@ class WebService(object):
@app_instance.before_first_request @app_instance.before_first_request
def init(): def init():
self._launch_local_predictor() self._launch_local_predictor(gpu)
service_name = "/" + self.name + "/prediction" service_name = "/" + self.name + "/prediction"
...@@ -195,11 +263,11 @@ class WebService(object): ...@@ -195,11 +263,11 @@ class WebService(object):
self.app_instance = app_instance self.app_instance = app_instance
def _launch_local_predictor(self): def _launch_local_predictor(self, gpu):
from paddle_serving_app.local_predict import LocalPredictor from paddle_serving_app.local_predict import LocalPredictor
self.client = LocalPredictor() self.client = LocalPredictor()
self.client.load_model_config( self.client.load_model_config(
"{}".format(self.model_config), use_gpu=False) "{}".format(self.model_config), use_gpu=True, gpu_id=self.gpus[0])
def run_web_service(self): def run_web_service(self):
print("This API will be deprecated later. Please do not use it") print("This API will be deprecated later. Please do not use it")
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Usage:
Start monitor with one line command
Example:
python -m paddle_serving_server.monitor
"""
import os
import time
import argparse
import subprocess
import datetime
import shutil
import tarfile
import logging
_LOGGER = logging.getLogger(__name__)
class Monitor(object):
'''
Monitor base class. It is used to monitor the remote model, pull and update the local model.
'''
def __init__(self, interval):
self._remote_path = None
self._remote_model_name = None
self._remote_donefile_name = None
self._local_path = None
self._local_model_name = None
self._local_timestamp_file = None
self._interval = interval
self._remote_donefile_timestamp = None
self._local_tmp_path = None
self._unpacked_filename = None
def set_remote_path(self, remote_path):
self._remote_path = remote_path
def set_remote_model_name(self, model_name):
self._remote_model_name = model_name
def set_remote_donefile_name(self, donefile_name):
self._remote_donefile_name = donefile_name
def set_local_path(self, local_path):
self._local_path = local_path
def set_local_model_name(self, model_name):
self._local_model_name = model_name
def set_local_timestamp_file(self, timestamp_file):
self._local_timestamp_file = timestamp_file
def set_local_tmp_path(self, tmp_path):
self._local_tmp_path = tmp_path
def set_unpacked_filename(self, unpacked_filename):
self._unpacked_filename = unpacked_filename
def _check_param_help(self, param_name, param_value):
return "Please check the {}({}) parameter.".format(param_name,
param_value)
def _check_params(self, params):
for param in params:
if getattr(self, param, None) is None:
raise Exception('{} not set.'.format(param))
def _print_params(self, params_name):
self._check_params(params_name)
for name in params_name:
_LOGGER.info('{}: {}'.format(name, getattr(self, name)))
def _decompress_model_file(self, local_tmp_path, model_name,
unpacked_filename):
if unpacked_filename is None:
_LOGGER.debug('remote file({}) is already unpacked.'.format(
model_name))
return model_name
tar_model_path = os.path.join(local_tmp_path, model_name)
_LOGGER.info("try to unpack remote file({})".format(tar_model_path))
if not tarfile.is_tarfile(tar_model_path):
raise Exception('not a tar packaged file type. {}'.format(
self._check_param_help('remote_model_name', model_name)))
try:
_LOGGER.info('unpack remote file({}).'.format(model_name))
tar = tarfile.open(tar_model_path)
tar.extractall(local_tmp_path)
tar.close()
except:
raise Exception(
'Decompressing failed, maybe no disk space left. {}'.foemat(
self._check_param_help('local_tmp_path', local_tmp_path)))
finally:
os.remove(tar_model_path)
_LOGGER.debug('remove packed file({}).'.format(tar_model_path))
_LOGGER.info('using unpacked filename: {}.'.format(
unpacked_filename))
if not os.path.exists(
os.path.join(local_tmp_path, unpacked_filename)):
raise Exception('file not exist. {}'.format(
self._check_param_help('unpacked_filename',
unpacked_filename)))
return unpacked_filename
def run(self):
'''
Monitor the remote model by polling and update the local model.
'''
params = [
'_remote_path', '_remote_model_name', '_remote_donefile_name',
'_local_model_name', '_local_path', '_local_timestamp_file',
'_local_tmp_path', '_interval'
]
self._print_params(params)
local_tmp_path = os.path.join(self._local_path, self._local_tmp_path)
_LOGGER.info('local_tmp_path: {}'.format(local_tmp_path))
if not os.path.exists(local_tmp_path):
_LOGGER.info('mkdir: {}'.format(local_tmp_path))
os.makedirs(local_tmp_path)
while True:
[flag, timestamp] = self._exist_remote_file(
self._remote_path, self._remote_donefile_name, local_tmp_path)
if flag:
if self._remote_donefile_timestamp is None or \
timestamp != self._remote_donefile_timestamp:
_LOGGER.info('doneilfe({}) changed.'.format(
self._remote_donefile_name))
self._remote_donefile_timestamp = timestamp
self._pull_remote_dir(self._remote_path,
self._remote_model_name,
local_tmp_path)
_LOGGER.info('pull remote model({}).'.format(
self._remote_model_name))
unpacked_filename = self._decompress_model_file(
local_tmp_path, self._remote_model_name,
self._unpacked_filename)
self._update_local_model(local_tmp_path, unpacked_filename,
self._local_path,
self._local_model_name)
_LOGGER.info('update local model({}).'.format(
self._local_model_name))
self._update_local_donefile(self._local_path,
self._local_model_name,
self._local_timestamp_file)
_LOGGER.info('update model timestamp({}).'.format(
self._local_timestamp_file))
else:
_LOGGER.info('remote({}) has no donefile.'.format(
self._remote_path))
_LOGGER.info('sleep {}s.'.format(self._interval))
time.sleep(self._interval)
def _exist_remote_file(self, path, filename, local_tmp_path):
raise Exception('This function must be inherited.')
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
raise Exception('This function must be inherited.')
def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
local_model_name):
tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
local_model_path = os.path.join(local_path, local_model_name)
cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
_LOGGER.debug('update model cmd: {}'.format(cmd))
if os.system(cmd) != 0:
raise Exception('update local model failed.')
def _update_local_donefile(self, local_path, local_model_name,
local_timestamp_file):
donefile_path = os.path.join(local_path, local_model_name,
local_timestamp_file)
cmd = 'touch {}'.format(donefile_path)
_LOGGER.debug('update timestamp cmd: {}'.format(cmd))
if os.system(cmd) != 0:
raise Exception('update local donefile failed.')
class HadoopMonitor(Monitor):
''' Monitor HDFS or AFS by Hadoop-client. '''
def __init__(self, hadoop_bin, fs_name='', fs_ugi='', interval=10):
super(HadoopMonitor, self).__init__(interval)
self._hadoop_bin = hadoop_bin
self._fs_name = fs_name
self._fs_ugi = fs_ugi
self._print_params(['_hadoop_bin', '_fs_name', '_fs_ugi'])
self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
if self._fs_name:
self._cmd_prefix += '-D fs.default.name={} '.format(self._fs_name)
if self._fs_ugi:
self._cmd_prefix += '-D hadoop.job.ugi={} '.format(self._fs_ugi)
_LOGGER.info('Hadoop prefix cmd: {}'.format(self._cmd_prefix))
def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename)
cmd = '{} -ls {} 2>/dev/null'.format(self._cmd_prefix, remote_filepath)
_LOGGER.debug('check cmd: {}'.format(cmd))
[status, output] = subprocess.getstatusoutput(cmd)
_LOGGER.debug('resp: {}'.format(output))
if status == 0:
[_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1].split()
timestr = mdate + mtime
return [True, timestr]
else:
return [False, None]
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
# remove old file before pull remote dir
local_dirpath = os.path.join(local_tmp_path, dirname)
if os.path.exists(local_dirpath):
_LOGGER.info('remove old temporary model file({}).'.format(dirname))
if self._unpacked_filename is None:
# the remote file is model folder.
shutil.rmtree(local_dirpath)
else:
# the remote file is a packed model file
os.remove(local_dirpath)
remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix,
remote_dirpath, local_dirpath)
_LOGGER.debug('pull cmd: {}'.format(cmd))
if os.system(cmd) != 0:
raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname)))
class FTPMonitor(Monitor):
''' FTP Monitor. '''
def __init__(self, host, port, username="", password="", interval=10):
super(FTPMonitor, self).__init__(interval)
import ftplib
self._ftp = ftplib.FTP()
self._ftp_host = host
self._ftp_port = port
self._ftp_username = username
self._ftp_password = password
self._ftp.connect(self._ftp_host, self._ftp_port)
self._ftp.login(self._ftp_username, self._ftp_password)
self._print_params(
['_ftp_host', '_ftp_port', '_ftp_username', '_ftp_password'])
def _exist_remote_file(self, path, filename, local_tmp_path):
import ftplib
try:
_LOGGER.debug('cwd: {}'.format(path))
self._ftp.cwd(path)
timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip(
)
return [True, timestamp]
except ftplib.error_perm:
_LOGGER.debug('remote file({}) not exist.'.format(filename))
return [False, None]
def _download_remote_file(self,
remote_path,
remote_filename,
local_tmp_path,
overwrite=True):
local_fullpath = os.path.join(local_tmp_path, remote_filename)
if not overwrite and os.path.isfile(fullpath):
return
else:
with open(local_fullpath, 'wb') as f:
_LOGGER.debug('cwd: {}'.format(remote_path))
self._ftp.cwd(remote_path)
_LOGGER.debug('download remote file({})'.format(
remote_filename))
self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write)
def _download_remote_files(self,
remote_path,
remote_dirname,
local_tmp_path,
overwrite=True):
import ftplib
remote_dirpath = os.path.join(remote_path, remote_dirname)
# Check whether remote_dirpath is a file or a folder
try:
_LOGGER.debug('cwd: {}'.format(remote_dirpath))
self._ftp.cwd(remote_dirpath)
_LOGGER.debug('{} is folder.'.format(remote_dirname))
local_dirpath = os.path.join(local_tmp_path, remote_dirname)
if not os.path.exists(local_dirpath):
_LOGGER.info('mkdir: {}'.format(local_dirpath))
os.mkdir(local_dirpath)
output = []
self._ftp.dir(output.append)
for line in output:
[attr, _, _, _, _, _, _, _, name] = line.split()
if attr[0] == 'd':
self._download_remote_files(
os.path.join(remote_path, remote_dirname), name,
os.path.join(local_tmp_path, remote_dirname), overwrite)
else:
self._download_remote_file(remote_dirpath, name,
local_dirpath, overwrite)
except ftplib.error_perm:
_LOGGER.debug('{} is file.'.format(remote_dirname))
self._download_remote_file(remote_path, remote_dirname,
local_tmp_path, overwrite)
return
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
self._download_remote_files(
remote_path, dirname, local_tmp_path, overwrite=True)
class GeneralMonitor(Monitor):
''' General Monitor. '''
def __init__(self, host, interval=10):
super(GeneralMonitor, self).__init__(interval)
self._general_host = host
self._print_params(['_general_host'])
def _get_local_file_timestamp(self, filename):
return os.path.getmtime(filename)
def _exist_remote_file(self, remote_path, filename, local_tmp_path):
remote_filepath = os.path.join(remote_path, filename)
url = '{}/{}'.format(self._general_host, remote_filepath)
_LOGGER.debug('remote file url: {}'.format(url))
# only for check donefile, which is not a folder.
cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0:
_LOGGER.debug('remote file({}) not exist.'.format(remote_filepath))
return [False, None]
else:
timestamp = self._get_local_file_timestamp(
os.path.join(local_tmp_path, filename))
return [True, timestamp]
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
remote_dirpath = os.path.join(remote_path, dirname)
url = '{}/{}'.format(self._general_host, remote_dirpath)
_LOGGER.debug('remote file url: {}'.format(url))
if self._unpacked_filename is None:
# the remote file is model folder.
cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(
os.path.join(local_tmp_path, dirname), url)
else:
# the remote file is a packed model file
cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0:
raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname)))
def parse_args():
""" parse args.
Returns:
parser.parse_args().
"""
parser = argparse.ArgumentParser(description="Monitor")
parser.add_argument(
"--type", type=str, default='general', help="Type of remote server")
parser.add_argument(
"--remote_path",
type=str,
required=True,
help="The base path for the remote")
parser.add_argument(
"--remote_model_name",
type=str,
required=True,
help="The model name to be pulled from the remote")
parser.add_argument(
"--remote_donefile_name",
type=str,
required=True,
help="The donefile name that marks the completion of the remote model update"
)
parser.add_argument(
"--local_path", type=str, required=True, help="Local work path")
parser.add_argument(
"--local_model_name", type=str, required=True, help="Local model name")
parser.add_argument(
"--local_timestamp_file",
type=str,
default='fluid_time_file',
help="The timestamp file used locally for hot loading, The file is considered to be placed in the `local_path/local_model_name` folder."
)
parser.add_argument(
"--local_tmp_path",
type=str,
default='_serving_monitor_tmp',
help="The path of the folder where temporary files are stored locally. If it does not exist, it will be created automatically"
)
parser.add_argument(
"--unpacked_filename",
type=str,
default=None,
help="If the model of the remote production is a packaged file, the unpacked file name should be set. Currently, only tar packaging format is supported."
)
parser.add_argument(
"--interval",
type=int,
default=10,
help="The polling interval in seconds")
parser.add_argument(
"--debug", action='store_true', help="If set, output more details")
parser.set_defaults(debug=False)
# general monitor
parser.add_argument("--general_host", type=str, help="General remote host")
# ftp monitor
parser.add_argument("--ftp_host", type=str, help="FTP remote host")
parser.add_argument("--ftp_port", type=int, help="FTP remote port")
parser.add_argument(
"--ftp_username",
type=str,
default='',
help="FTP username. Not used if anonymous access.")
parser.add_argument(
"--ftp_password",
type=str,
default='',
help="FTP password. Not used if anonymous access")
# afs/hdfs monitor
parser.add_argument(
"--hadoop_bin", type=str, help="Path of Hadoop binary file")
parser.add_argument(
"--fs_name",
type=str,
default='',
help="AFS/HDFS fs_name. Not used if set in Hadoop-client.")
parser.add_argument(
"--fs_ugi",
type=str,
default='',
help="AFS/HDFS fs_ugi, Not used if set in Hadoop-client")
return parser.parse_args()
def get_monitor(mtype):
""" generator monitor instance.
Args:
mtype: type of monitor
Returns:
monitor instance.
"""
if mtype == 'ftp':
return FTPMonitor(
args.ftp_host,
args.ftp_port,
username=args.ftp_username,
password=args.ftp_password,
interval=args.interval)
elif mtype == 'general':
return GeneralMonitor(args.general_host, interval=args.interval)
elif mtype == 'afs' or mtype == 'hdfs':
return HadoopMonitor(
args.hadoop_bin, args.fs_name, args.fs_ugi, interval=args.interval)
else:
raise Exception('unsupport type.')
def start_monitor(monitor, args):
monitor.set_remote_path(args.remote_path)
monitor.set_remote_model_name(args.remote_model_name)
monitor.set_remote_donefile_name(args.remote_donefile_name)
monitor.set_local_path(args.local_path)
monitor.set_local_model_name(args.local_model_name)
monitor.set_local_timestamp_file(args.local_timestamp_file)
monitor.set_local_tmp_path(args.local_tmp_path)
monitor.set_unpacked_filename(args.unpacked_filename)
monitor.run()
if __name__ == "__main__":
args = parse_args()
if args.debug:
logging.basicConfig(
format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M',
level=logging.DEBUG)
else:
logging.basicConfig(
format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M',
level=logging.INFO)
monitor = get_monitor(args.type)
start_monitor(monitor, args)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Usage:
Host a trained paddle model with one line command
Example:
python -m paddle_serving_server.serve --model ./serving_server_model --port 9292
"""
import argparse
import os
import json
import base64
import time
from multiprocessing import Pool, Process
from paddle_serving_server_gpu import serve_args
from flask import Flask, request
import sys
if sys.version_info.major == 2:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
elif sys.version_info.major == 3:
from http.server import BaseHTTPRequestHandler, HTTPServer
def start_gpu_card_model(index, gpuid, port, args): # pylint: disable=doc-string-missing
gpuid = int(gpuid)
device = "gpu"
if gpuid == -1:
device = "cpu"
elif gpuid >= 0:
port = port + index
thread_num = args.thread
model = args.model
mem_optim = args.mem_optim_off is False
ir_optim = args.ir_optim
max_body_size = args.max_body_size
use_multilang = args.use_multilang
workdir = args.workdir
if gpuid >= 0:
workdir = "{}_{}".format(args.workdir, gpuid)
if model == "":
print("You must specify your serving model")
exit(-1)
import paddle_serving_server_gpu as serving
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = serving.OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
if use_multilang:
server = serving.MultiLangServer()
else:
server = serving.Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim)
server.set_max_body_size(max_body_size)
if args.use_trt:
server.set_trt()
if args.use_lite:
server.set_lite()
device = "arm"
server.set_device(device)
if args.use_xpu:
server.set_xpu()
if args.product_name != None:
server.set_product_name(args.product_name)
if args.container_id != None:
server.set_container_id(args.container_id)
server.load_model_config(model)
server.prepare_server(
workdir=workdir,
port=port,
device=device,
use_encryption_model=args.use_encryption_model)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.run_server()
def start_multi_card(args, serving_port=None): # pylint: disable=doc-string-missing
gpus = ""
if serving_port == None:
serving_port = args.port
if args.gpu_ids == "":
gpus = []
else:
gpus = args.gpu_ids.split(",")
if "CUDA_VISIBLE_DEVICES" in os.environ:
env_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",")
for ids in gpus:
if int(ids) >= len(env_gpus):
print(
" Max index of gpu_ids out of range, the number of CUDA_VISIBLE_DEVICES is {}."
.format(len(env_gpus)))
exit(-1)
else:
env_gpus = []
if args.use_lite:
print("run arm server.")
start_gpu_card_model(-1, -1, args)
elif len(gpus) <= 0:
print("gpu_ids not set, going to run cpu service.")
start_gpu_card_model(-1, -1, serving_port, args)
else:
gpu_processes = []
for i, gpu_id in enumerate(gpus):
p = Process(
target=start_gpu_card_model,
args=(
i,
gpu_id,
serving_port,
args, ))
gpu_processes.append(p)
for p in gpu_processes:
p.start()
for p in gpu_processes:
p.join()
class MainService(BaseHTTPRequestHandler):
def get_available_port(self):
default_port = 12000
for i in range(1000):
if port_is_available(default_port + i):
return default_port + i
def start_serving(self):
start_multi_card(args, serving_port)
def get_key(self, post_data):
if "key" not in post_data:
return False
else:
key = base64.b64decode(post_data["key"].encode())
with open(args.model + "/key", "wb") as f:
f.write(key)
return True
def check_key(self, post_data):
if "key" not in post_data:
return False
else:
key = base64.b64decode(post_data["key"].encode())
with open(args.model + "/key", "rb") as f:
cur_key = f.read()
return (key == cur_key)
def start(self, post_data):
post_data = json.loads(post_data)
global p_flag
if not p_flag:
if args.use_encryption_model:
print("waiting key for model")
if not self.get_key(post_data):
print("not found key in request")
return False
global serving_port
global p
serving_port = self.get_available_port()
p = Process(target=self.start_serving)
p.start()
time.sleep(3)
if p.is_alive():
p_flag = True
else:
return False
else:
if p.is_alive():
if not self.check_key(post_data):
return False
else:
return False
return True
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
if self.start(post_data):
response = {"endpoint_list": [serving_port]}
else:
response = {"message": "start serving failed"}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
if __name__ == "__main__":
args = serve_args()
if args.name == "None":
from .web_service import port_is_available
if args.use_encryption_model:
p_flag = False
p = None
serving_port = 0
server = HTTPServer(('localhost', int(args.port)), MainService)
print(
'Starting encryption server, waiting for key from client, use <Ctrl-C> to stop'
)
server.serve_forever()
else:
start_multi_card(args)
else:
from .web_service import WebService
web_service = WebService(name=args.name)
web_service.load_model_config(args.model)
gpu_ids = args.gpu_ids
if gpu_ids == "":
if "CUDA_VISIBLE_DEVICES" in os.environ:
gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"]
if len(gpu_ids) > 0:
web_service.set_gpus(gpu_ids)
web_service.prepare_server(
workdir=args.workdir,
port=args.port,
device=args.device,
use_lite=args.use_lite,
use_xpu=args.use_xpu,
ir_optim=args.ir_optim)
web_service.run_rpc_service()
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
web_service._launch_web_service()
service_name = "/" + web_service.name + "/prediction"
@app_instance.route(service_name, methods=["POST"])
def run():
return web_service.get_prediction(request)
app_instance.run(host="0.0.0.0",
port=web_service.port,
threaded=False,
processes=4)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Paddle Serving Client version string """
serving_client_version = "0.0.0"
serving_server_version = "0.0.0"
module_proto_version = "0.0.0"
cuda_version = "9"
commit_id = ""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
# pylint: disable=doc-string-missing
from flask import Flask, request, abort
from contextlib import closing
from multiprocessing import Pool, Process, Queue
from paddle_serving_client import Client
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
from paddle_serving_server_gpu.serve import start_multi_card
import socket
import sys
import numpy as np
import paddle_serving_server_gpu as serving
from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline import Op
def port_is_available(port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
class WebService(object):
def __init__(self, name="default_service"):
self.name = name
# pipeline
self._server = pipeline.PipelineServer(self.name)
self.gpus = [] # deprecated
self.rpc_service_list = [] # deprecated
def get_pipeline_response(self, read_op):
return None
def prepare_pipeline_config(self, yaml_file):
# build dag
read_op = pipeline.RequestOp()
last_op = self.get_pipeline_response(read_op)
if not isinstance(last_op, Op):
raise ValueError("The return value type of `get_pipeline_response` "
"function is not Op type, please check function "
"`get_pipeline_response`.")
response_op = pipeline.ResponseOp(input_ops=[last_op])
self._server.set_response_op(response_op)
self._server.prepare_server(yaml_file)
def run_service(self):
self._server.run_server()
def load_model_config(self, model_config):
print("This API will be deprecated later. Please do not use it")
self.model_config = model_config
import os
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
if os.path.isdir(model_config):
client_config = "{}/serving_server_conf.prototxt".format(
model_config)
elif os.path.isfile(model_config):
client_config = model_config
model_conf = m_config.GeneralModelConfig()
f = open(client_config, 'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
self.feed_vars = {var.name: var for var in model_conf.feed_var}
self.fetch_vars = {var.name: var for var in model_conf.fetch_var}
def set_gpus(self, gpus):
print("This API will be deprecated later. Please do not use it")
self.gpus = [int(x) for x in gpus.split(",")]
def default_rpc_service(self,
workdir="conf",
port=9292,
gpuid=0,
thread_num=2,
mem_optim=True,
use_lite=False,
use_xpu=False,
ir_optim=False):
device = "gpu"
if gpuid == -1:
if use_lite:
device = "arm"
else:
device = "cpu"
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim)
server.set_device(device)
if use_lite:
server.set_lite()
if use_xpu:
server.set_xpu()
server.load_model_config(self.model_config)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.prepare_server(workdir=workdir, port=port, device=device)
return server
def _launch_rpc_service(self, service_idx):
self.rpc_service_list[service_idx].run_server()
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def prepare_server(self,
workdir="",
port=9393,
device="gpu",
use_lite=False,
use_xpu=False,
ir_optim=False,
gpuid=0,
mem_optim=True):
print("This API will be deprecated later. Please do not use it")
self.workdir = workdir
self.port = port
self.device = device
self.gpuid = gpuid
self.port_list = []
default_port = 12000
for i in range(1000):
if port_is_available(default_port + i):
self.port_list.append(default_port + i)
if len(self.port_list) > len(self.gpus):
break
if len(self.gpus) == 0:
# init cpu service
self.rpc_service_list.append(
self.default_rpc_service(
self.workdir,
self.port_list[0],
-1,
thread_num=2,
mem_optim=mem_optim,
use_lite=use_lite,
use_xpu=use_xpu,
ir_optim=ir_optim))
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self.default_rpc_service(
"{}_{}".format(self.workdir, i),
self.port_list[i],
gpuid,
thread_num=2,
mem_optim=mem_optim,
use_lite=use_lite,
use_xpu=use_xpu,
ir_optim=ir_optim))
def _launch_web_service(self):
gpu_num = len(self.gpus)
self.client = Client()
self.client.load_client_config("{}/serving_server_conf.prototxt".format(
self.model_config))
endpoints = ""
if gpu_num > 0:
for i in range(gpu_num):
endpoints += "127.0.0.1:{},".format(self.port_list[i])
else:
endpoints = "127.0.0.1:{}".format(self.port_list[0])
self.client.connect([endpoints])
def get_prediction(self, request):
if not request.json:
abort(400)
if "fetch" not in request.json:
abort(400)
try:
feed, fetch, is_batch = self.preprocess(request.json["feed"],
request.json["fetch"])
if isinstance(feed, dict) and "fetch" in feed:
del feed["fetch"]
if len(feed) == 0:
raise ValueError("empty input")
fetch_map = self.client.predict(
feed=feed, fetch=fetch, batch=is_batch)
result = self.postprocess(
feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
result = {"result": result}
except ValueError as err:
result = {"result": str(err)}
return result
def run_rpc_service(self):
print("This API will be deprecated later. Please do not use it")
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
server_pros = []
for i, service in enumerate(self.rpc_service_list):
p = Process(target=self._launch_rpc_service, args=(i, ))
server_pros.append(p)
for p in server_pros:
p.start()
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
self._launch_web_service()
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
self.app_instance = app_instance
# TODO: maybe change another API name: maybe run_local_predictor?
def run_debugger_service(self, gpu=False):
print("This API will be deprecated later. Please do not use it")
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
self._launch_local_predictor(gpu)
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
self.app_instance = app_instance
def _launch_local_predictor(self, gpu):
from paddle_serving_app.local_predict import LocalPredictor
self.client = LocalPredictor()
self.client.load_model_config(
"{}".format(self.model_config), use_gpu=True, gpu_id=self.gpus[0])
def run_web_service(self):
print("This API will be deprecated later. Please do not use it")
self.app_instance.run(host="0.0.0.0", port=self.port, threaded=True)
def get_app_instance(self):
return self.app_instance
def preprocess(self, feed=[], fetch=[]):
print("This API will be deprecated later. Please do not use it")
is_batch = True
feed_dict = {}
for var_name in self.feed_vars.keys():
feed_dict[var_name] = []
for feed_ins in feed:
for key in feed_ins:
feed_dict[key].append(
np.array(feed_ins[key]).reshape(
list(self.feed_vars[key].shape))[np.newaxis, :])
feed = {}
for key in feed_dict:
feed[key] = np.concatenate(feed_dict[key], axis=0)
return feed, fetch, is_batch
def postprocess(self, feed=[], fetch=[], fetch_map=None):
print("This API will be deprecated later. Please do not use it")
for key in fetch_map:
fetch_map[key] = fetch_map[key].tolist()
return fetch_map
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册