提交 25ca1469 编写于 作者: T TeslaZhao

Distribute Serving for Foundation Models

上级 7b0e6ab1
......@@ -30,7 +30,7 @@ message( "WITH_GPU = ${WITH_GPU}")
# Paddle Version should be one of:
# latest: latest develop build
# version number like 1.5.2
SET(PADDLE_VERSION "2.2.2")
SET(PADDLE_VERSION "2.3.0-rc0")
if (WITH_GPU)
message("CUDA: ${CUDA_VERSION}, CUDNN_MAJOR_VERSION: ${CUDNN_MAJOR_VERSION}")
# cuda 11.0 is not supported, 11.2 would be added.
......@@ -177,7 +177,7 @@ if (NOT WITH_MKLML)
endif()
ADD_LIBRARY(paddle_inference STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.a)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so)
if (WITH_ASCEND_CL)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so)
endif()
......
......@@ -65,6 +65,24 @@ message EngineDesc {
optional int32 batch_infer_size = 31 [ default = 32 ];
optional bool enable_overrun = 32 [ default = false ];
optional bool allow_split_request = 33 [ default = true ];
/*
* Distributed inference params
* "enable_dist_model": enable distributed model, false default.
* "carrier_id": mark carrier
* "dist_cfg_file": file name of distributed configure.
* "dist_nranks": number of distributed nodes.
* "dist_endpoints": all endpoints(ip:port) of distributed nodes.
* "dist_subgraph_index": distributed subgraph index, auto increment from 0.
* It is
* used to select the endpoint of the current shard in distribute model.
*/
optional bool enable_dist_model = 40 [ default = false ];
optional string dist_carrier_id = 41 [ default = "inference" ];
optional string dist_cfg_file = 42;
optional int32 dist_nranks = 43 [ default = 0 ];
repeated string dist_endpoints = 44;
optional int32 dist_subgraph_index = 45 [ default = 0 ];
};
// model_toolkit conf
......@@ -96,7 +114,8 @@ message DAGNodeDependency {
message DAGNode {
required string name = 1;
required string type = 2;
repeated DAGNodeDependency dependencies = 3;
repeated string address = 3;
repeated DAGNodeDependency dependencies = 4;
};
// workflow entry
......
......@@ -80,8 +80,8 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") var num: " << var_num
<< ") start to call load general model_conf op";
if (var_num < 1) {
LOG(ERROR) << "(logid=" << log_id << ") Failed get feed_var, var_num="
<< var_num;
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get feed_var, var_num=" << var_num;
return -1;
}
......@@ -98,7 +98,7 @@ int GeneralReaderOp::inference() {
int64_t elem_type = 0;
int64_t elem_size = 0;
int64_t databuf_size = 0;
const void* src_ptr = nullptr;
const void *src_ptr = nullptr;
for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor paddleTensor;
const Tensor &tensor = req->tensor(i);
......@@ -107,7 +107,7 @@ int GeneralReaderOp::inference() {
elem_size = 0;
databuf_size = 0;
elem_type = tensor.elem_type();
src_ptr = nullptr ;
src_ptr = nullptr;
if (elem_type == P_INT64) { // int64
elem_size = sizeof(int64_t);
paddleTensor.dtype = paddle::PaddleDType::INT64;
......@@ -157,8 +157,8 @@ int GeneralReaderOp::inference() {
<< "dtype=" << paddleTensor.dtype << ";"
<< "data_len=" << data_len;
if (src_ptr == nullptr) {
LOG(ERROR) << "Not support var[" << i << "] with elem_type["
<< elem_type << "]";
LOG(ERROR) << "Not support var[" << i << "] with elem_type[" << elem_type
<< "]";
continue;
}
// implement lod tensor here
......@@ -166,9 +166,15 @@ int GeneralReaderOp::inference() {
// TODO(HexToString): support 2-D lod
if (tensor.lod_size() > 0) {
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
paddleTensor.lod.resize(1);
int lod_index = -1;
for (int k = 0; k < tensor.lod_size(); ++k) {
paddleTensor.lod[0].push_back(tensor.lod(k));
if (tensor.lod(k) == 0) {
lod_index++;
paddleTensor.lod.resize(lod_index + 1);
}
paddleTensor.lod[lod_index].push_back(tensor.lod(k));
VLOG(2) << "(logid=" << log_id << ") lod[" << lod_index
<< "]=" << tensor.lod(k);
}
}
......@@ -191,7 +197,7 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has lod_tensor and len=" << out->at(i).lod[0].back();
}
void* dst_ptr = out->at(i).data.data();
void *dst_ptr = out->at(i).data.data();
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
......
......@@ -92,11 +92,13 @@ message Request {
message Response {
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
bool profile_server = 3;
uint64 log_id = 4;
// Error code
int32 err_no = 3;
int32 err_no = 5;
// Error messages
string err_msg = 4;
string err_msg = 6;
};
message ModelOutput {
......
......@@ -129,6 +129,10 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) {
node->id = i + 1; // 0 is reserved for begginer-op
node->name = conf.nodes(i).name();
node->type = conf.nodes(i).type();
for (int add_index = 0; add_index < conf.nodes(i).address_size();
++add_index) {
node->address.push_back(conf.nodes(i).address(add_index));
}
uint32_t depend_size = conf.nodes(i).dependencies_size();
for (uint32_t j = 0; j < depend_size; j++) {
const configure::DAGNodeDependency& depend =
......@@ -159,7 +163,8 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) {
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
DagNode* node = _index_nodes[nid];
LOG(INFO) << "OP-" << node->id << "-" << node->name << "-" << node->type
<< " depends: " << node->depends.size();
<< " depends: " << node->depends.size()
<< " address: " << node->address.size();
boost::unordered_map<std::string, EdgeMode>::iterator it;
for (it = node->depends.begin(); it != node->depends.end(); it++) {
......
......@@ -29,6 +29,7 @@ struct DagNode {
std::string name; // opname
std::string full_name; // workflow_stageindex_opname
std::string type;
std::vector<std::string> address;
void* conf;
boost::unordered_map<std::string, EdgeMode> depends;
};
......
......@@ -90,6 +90,7 @@ int DagView::init(Dag* dag,
node->name,
node->type,
node->conf,
node->address,
log_id) != 0) {
LOG(WARNING) << "(logid=" << log_id
<< ") Failed init op, type:" << node->type;
......
......@@ -32,7 +32,8 @@
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/predictor_metric.h"
#include "paddle_inference_api.h" // NOLINT
#include "experimental/float16.h"
//#include "experimental/float16.h"
#include "experimental/phi/common/float16.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
......@@ -548,9 +549,9 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
int8_t* data = static_cast<int8_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else if ((*tensorVector_in_pointer)[i].dtype ==
paddle::PaddleDType::FLOAT16) {
paddle::platform::float16* data =
static_cast<paddle::platform::float16*>(origin_data);
paddle::PaddleDType::FLOAT16) {
phi::dtype::float16* data =
static_cast<phi::dtype::float16*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else {
LOG(ERROR) << "Inference not support type["
......@@ -646,14 +647,14 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
} else if (dataType == paddle::PaddleDType::FLOAT16) {
databuf_size = out_num * sizeof(paddle::platform::float16);
databuf_size = out_num * sizeof(phi::dtype::float16);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
paddle::platform::float16* data_out =
reinterpret_cast<paddle::platform::float16*>(databuf_data);
phi::dtype::float16* data_out =
reinterpret_cast<phi::dtype::float16*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
}
......
......@@ -36,12 +36,14 @@ int Op::init(Bus* bus,
const std::string& name,
const std::string& type,
void* conf,
const std::vector<std::string>& address,
const uint64_t log_id) {
_bus = bus;
_dag = dag;
_id = id;
_name = name;
_type = type;
_address = address;
set_config(conf);
_timer = butil::get_object<TimerFlow>();
......@@ -110,11 +112,13 @@ int Op::process(const uint64_t log_id, bool debug) {
return ERR_INTERNAL_FAILURE;
}
/*
if (_has_calc) {
LOG(INFO) << "(logid=" << log_id << ") Op: " << _name
<< " already processed before";
return ERR_OK;
}
*/
// 1. dependency inference
/*
......@@ -147,8 +151,10 @@ int Op::process(const uint64_t log_id, bool debug) {
}
// 3. share output to bus
Channel* channel = mutable_channel();
channel->share_to_bus(_bus, log_id);
if (!_has_calc) {
Channel* channel = mutable_channel();
channel->share_to_bus(_bus, log_id);
}
// 4. mark has calculated
_has_calc = true;
......
......@@ -114,6 +114,7 @@ class Op {
const std::string& name,
const std::string& type,
void* conf,
const std::vector<std::string>& address,
const uint64_t log_id);
int deinit();
......@@ -135,6 +136,8 @@ class Op {
const std::string& full_name() const { return _full_name; }
const std::vector<std::string>& address() const { return _address; }
const std::vector<std::string>& pre_names() const { return _pre_node_names; }
void set_full_name(const std::string full_name) { _full_name = full_name; }
......@@ -206,6 +209,7 @@ class Op {
std::string _name;
std::string _full_name; // service_workflow_stageindex_opname
std::string _type;
std::vector<std::string> _address;
bool _has_calc;
bool _has_init;
TimerFlow* _timer;
......
......@@ -92,11 +92,13 @@ message Request {
message Response {
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
bool profile_server = 3;
uint64 log_id = 4;
// Error code
int32 err_no = 3;
int32 err_no = 5;
// Error messages
string err_msg = 4;
string err_msg = 6;
};
message ModelOutput {
......
......@@ -37,6 +37,7 @@ using paddle_infer::PrecisionType;
using paddle_infer::Predictor;
using paddle_infer::Tensor;
using paddle_infer::CreatePredictor;
using paddle_infer::DistConfig;
DECLARE_int32(gpuid);
DECLARE_string(precision);
......@@ -206,6 +207,39 @@ class PaddleInferenceEngine : public EngineCore {
config.SetModel(model_path);
}
// Enable distributed model inferencing
DistConfig distCfg;
if (engine_conf.has_enable_dist_model() &&
engine_conf.enable_dist_model()) {
int ep_size = engine_conf.dist_endpoints_size();
int cur_index = engine_conf.dist_subgraph_index();
if (ep_size <= cur_index) {
LOG(ERROR) << "create paddle predictor failed, Distributed model error."
<< " dist_endpoints_size=" << ep_size
<< " is not bigger than dist_subgraph_index=" << cur_index;
return -1;
}
std::vector<std::string> vec_eps;
for (int i = 0; i < ep_size; ++i) {
vec_eps.emplace_back(engine_conf.dist_endpoints(i));
}
distCfg.EnableDistModel(true);
distCfg.SetCarrierId(engine_conf.dist_carrier_id());
distCfg.SetRanks(engine_conf.dist_nranks(), cur_index);
distCfg.SetEndpoints(vec_eps, engine_conf.dist_endpoints(cur_index));
distCfg.SetCommInitConfig(engine_conf.dist_cfg_file());
config.SetDistConfig(distCfg);
LOG(INFO) << "Create Distributed predictor! dist_carrier_id="
<< engine_conf.dist_carrier_id()
<< ", Ranks=" << engine_conf.dist_nranks()
<< ", current index of ranks=" << cur_index
<< ", current endpoint="
<< engine_conf.dist_endpoints(cur_index)
<< ", communicate init config file="
<< engine_conf.dist_cfg_file();
}
config.SwitchSpecifyInputNames(true);
config.SetCpuMathLibraryNumThreads(1);
if (engine_conf.has_use_gpu() && engine_conf.use_gpu()) {
......
......@@ -30,10 +30,16 @@ class OpMaker(object):
"GeneralDistKVOp",
"GeneralCopyOp",
"GeneralDetectionOp",
"GeneralRemoteOp",
]
self.node_name_suffix_ = collections.defaultdict(int)
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
def create(self,
node_type,
engine_name=None,
inputs=[],
outputs=[],
addresses=[]):
if node_type not in self.op_list:
raise Exception("Op type {} is not supported right now".format(
node_type))
......@@ -55,6 +61,11 @@ class OpMaker(object):
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# for general_remote op.
if addresses:
node.address.extend(addresses)
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
......
......@@ -21,21 +21,27 @@ Usage: export PYTHON_EXECUTABLE=/usr/local/bin/python3.6
python3.6 -m paddle_serving_server.serve check
'''
import sys
import os
import pytest
inference_test_cases = ["test_fit_a_line.py::TestFitALine::test_inference"]
cpp_test_cases = ["test_fit_a_line.py::TestFitALine::test_cpu", "test_fit_a_line.py::TestFitALine::test_gpu"]
pipeline_test_cases = ["test_uci_pipeline.py::TestUCIPipeline::test_cpu", "test_uci_pipeline.py::TestUCIPipeline::test_gpu"]
cpp_test_cases = [
"test_fit_a_line.py::TestFitALine::test_cpu",
"test_fit_a_line.py::TestFitALine::test_gpu"
]
pipeline_test_cases = [
"test_uci_pipeline.py::TestUCIPipeline::test_cpu",
"test_uci_pipeline.py::TestUCIPipeline::test_gpu"
]
log_files = ["PipelineServingLogs", "log", "stderr.log", "stdout.log"]
def set_serving_log_path():
if 'SERVING_LOG_PATH' not in os.environ:
serving_log_path = os.path.expanduser(os.getcwd()) + '/'
os.environ['SERVING_LOG_PATH']=serving_log_path
os.environ['SERVING_LOG_PATH'] = serving_log_path
def mv_log_to_new_dir(dir_path):
import shutil
......@@ -46,8 +52,8 @@ def mv_log_to_new_dir(dir_path):
file_path = os.path.join(serving_log_path, file_name)
dir_path_temp = os.path.join(dir_path, file_name)
if os.path.exists(file_path):
shutil.move(file_path, dir_path_temp)
shutil.move(file_path, dir_path_temp)
def run_test_cases(cases_list, case_type, is_open_std):
old_stdout, old_stderr = sys.stdout, sys.stderr
......@@ -66,33 +72,41 @@ def run_test_cases(cases_list, case_type, is_open_std):
new_dir_path = os.path.join(serving_log_path, dir_name)
mv_log_to_new_dir(new_dir_path)
if res == 0:
print("{} {} environment running success".format(case_type, case_name))
print("{} {} environment running success".format(case_type,
case_name))
elif res == 1:
if case_name == "inference":
print("{} {} environment running failure. Please refer to https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/install/pip/linux-pip.html to configure environment".format(case_type, case_name))
print(
"{} {} environment running failure. Please refer to https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/install/pip/linux-pip.html to configure environment".
format(case_type, case_name))
os._exit(0)
else:
print("{} {} environment running failure, if you need this environment, please refer to https://github.com/PaddlePaddle/Serving/blob/develop/doc/Install_CN.md".format(case_type, case_name))
print(
"{} {} environment running failure, if you need this environment, please refer to https://github.com/PaddlePaddle/Serving/blob/develop/doc/Install_CN.md".
format(case_type, case_name))
def unset_env(key):
del os.environ[key]
def check_env(mode):
set_serving_log_path()
if 'https_proxy' in os.environ or 'http_proxy' in os.environ:
unset_env("https_proxy")
unset_env("http_proxy")
unset_env("https_proxy")
unset_env("http_proxy")
if 'GREP_OPTIONS' in os.environ:
unset_env("GREP_OPTIONS")
is_open_std = False
if mode is "debug":
unset_env("GREP_OPTIONS")
is_open_std = False
if mode == "debug":
is_open_std = True
if mode is "all" or mode is "inference" or mode is "debug":
if mode == "all" or mode == "inference" or mode == "debug":
run_test_cases(inference_test_cases, "PaddlePaddle", is_open_std)
if mode is "all" or mode is "cpp" or mode is "debug":
if mode == "all" or mode == "cpp" or mode == "debug":
run_test_cases(cpp_test_cases, "C++", is_open_std)
if mode is "all" or mode is "pipeline" or mode is "debug":
if mode == "all" or mode == "pipeline" or mode == "debug":
run_test_cases(pipeline_test_cases, "Pipeline", is_open_std)
if __name__ == '__main__':
check_env("debug")
......@@ -37,12 +37,15 @@ from paddle_serving_server.util import *
from paddle_serving_server.env_check.run import check_env
import cmd
def signal_handler(signal, frame):
print('Process stopped')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# web_service.py is still used by Pipeline.
def port_is_available(port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
......@@ -185,10 +188,10 @@ def serve_args():
action="store_true",
help="Use encryption model")
parser.add_argument(
"--encryption_rpc_port",
type=int,
required=False,
default=12000,
"--encryption_rpc_port",
type=int,
required=False,
default=12000,
help="Port of encryption model, only valid for arg.use_encryption_model")
parser.add_argument(
"--use_trt", default=False, action="store_true", help="Use TensorRT")
......@@ -217,11 +220,60 @@ def serve_args():
action="store_true",
help="Use gpu_multi_stream")
parser.add_argument(
"--enable_prometheus", default=False, action="store_true", help="Use Prometheus")
"--enable_prometheus",
default=False,
action="store_true",
help="Use Prometheus")
parser.add_argument(
"--prometheus_port",
type=int,
default=19393,
help="Port of the Prometheus")
parser.add_argument(
"--request_cache_size",
type=int,
default=0,
help="Port of the Prometheus")
parser.add_argument(
"--use_dist_model",
default=False,
action="store_true",
help="Use distributed model")
parser.add_argument(
"--dist_carrier_id",
type=str,
default="",
help="carrier id of distributed model")
parser.add_argument(
"--dist_cfg_file",
type=str,
default="",
help="config file of distributed model")
parser.add_argument(
"--dist_endpoints",
type=str,
default="+",
help="endpoints of distributed model. splited by comma")
parser.add_argument(
"--dist_nranks",
type=int,
default=0,
help="nranks of distributed model")
parser.add_argument(
"--dist_subgraph_index",
type=int,
default=-1,
help="index of distributed model")
parser.add_argument(
"--prometheus_port", type=int, default=19393, help="Port of the Prometheus")
"--dist_worker_serving_endpoints",
type=str,
default=None,
help="endpoints of worker serving endpoints")
parser.add_argument(
"--request_cache_size", type=int, default=0, help="Port of the Prometheus")
"--dist_master_serving",
default=False,
action="store_true",
help="The master serving of distributed inference")
return parser.parse_args()
......@@ -245,7 +297,7 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi
workdir = "{}_{}".format(args.workdir, port)
dag_list_op = []
if model == "":
if model == "" and not args.dist_master_serving:
print("You must specify your serving model")
exit(-1)
for single_model_config in args.model:
......@@ -270,30 +322,51 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi
dag_list_op.append(temp_str_list[0])
read_op = op_maker.create('GeneralReaderOp')
op_seq_maker.add_op(read_op)
#如果dag_list_op不是空,那么证明通过--op 传入了自定义OP或自定义的DAG串联关系。
#此时,根据--op 传入的顺序去组DAG串联关系
if len(dag_list_op) > 0:
for single_op in dag_list_op:
op_seq_maker.add_op(op_maker.create(single_op))
#否则,仍然按照原有方式根虎--model去串联。
else:
for idx, single_model in enumerate(model):
infer_op_name = "GeneralInferOp"
# 目前由于ocr的节点Det模型依赖于opencv的第三方库
# 只有使用ocr的时候,才会加入opencv的第三方库并编译GeneralDetectionOp
# 故此处做特殊处理,当不满足下述情况时,所添加的op默认为GeneralInferOp
# 以后可能考虑不用python脚本来生成配置
if len(model) == 2 and idx == 0 and single_model == "ocr_det_model":
infer_op_name = "GeneralDetectionOp"
else:
# The workflows of master serving in distributed model is different from
# worker servings. The workflow of worker servings is same to non-distributed
# model, but workerflow of master serving needs to add IP address of other
# worker serving in the machine.
if not args.dist_master_serving:
read_op = op_maker.create('GeneralReaderOp')
op_seq_maker.add_op(read_op)
#如果dag_list_op不是空,那么证明通过--op 传入了自定义OP或自定义的DAG串联关系。
#此时,根据--op 传入的顺序去组DAG串联关系
if len(dag_list_op) > 0:
for single_op in dag_list_op:
op_seq_maker.add_op(op_maker.create(single_op))
#否则,仍然按照原有方式根虎--model去串联。
else:
for idx, single_model in enumerate(model):
infer_op_name = "GeneralInferOp"
general_infer_op = op_maker.create(infer_op_name)
op_seq_maker.add_op(general_infer_op)
# 目前由于ocr的节点Det模型依赖于opencv的第三方库
# 只有使用ocr的时候,才会加入opencv的第三方库并编译GeneralDetectionOp
# 故此处做特殊处理,当不满足下述情况时,所添加的op默认为GeneralInferOp
# 以后可能考虑不用python脚本来生成配置
if len(model
) == 2 and idx == 0 and single_model == "ocr_det_model":
infer_op_name = "GeneralDetectionOp"
else:
infer_op_name = "GeneralInferOp"
general_infer_op = op_maker.create(infer_op_name)
op_seq_maker.add_op(general_infer_op)
general_response_op = op_maker.create('GeneralResponseOp')
op_seq_maker.add_op(general_response_op)
general_response_op = op_maker.create('GeneralResponseOp')
op_seq_maker.add_op(general_response_op)
else:
# for the master serving of distributed model only add one general_remote op.
if args.dist_worker_serving_endpoints is None:
raise ValueError(
"Params Error!. dist_worker_serving_endpoints is empty when dist_master_serving is set"
)
worker_serving_endpoints = args.dist_worker_serving_endpoints.split(",")
if len(worker_serving_endpoints) == 0:
raise ValueError(
"Params Error!. dist_worker_serving_endpoints is empty when dist_master_serving is set"
)
general_remote_op = op_maker.create(
'GeneralRemoteOp', None, [], [], addresses=worker_serving_endpoints)
op_seq_maker.add_op(general_remote_op, )
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
......@@ -306,6 +379,12 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi
server.set_enable_prometheus(args.enable_prometheus)
server.set_prometheus_port(args.prometheus_port)
server.set_request_cache_size(args.request_cache_size)
server.set_enable_dist_model(args.use_dist_model)
server.set_dist_carrier_id(args.dist_carrier_id)
server.set_dist_cfg_file(args.dist_cfg_file)
server.set_dist_nranks(args.dist_nranks)
server.set_dist_endpoints(args.dist_endpoints.split(","))
server.set_dist_subgraph_index(args.dist_subgraph_index)
if args.use_trt and device == "gpu":
server.set_trt()
......@@ -489,8 +568,10 @@ def stop_serving(command: str, port: int=None):
os.remove(filepath)
return True
class Check_Env_Shell(cmd.Cmd):
intro = "Welcome to the check env shell.Type help to list commands.\n"
# ----- basic commands -----
def do_help(self, arg):
print("\nCommand list\t\tDescription\n"\
......@@ -507,23 +588,23 @@ class Check_Env_Shell(cmd.Cmd):
def do_check_all(self, arg):
"Check Environment of Paddle Inference, Pipeline Serving, C++ Serving"
check_env("all")
check_env("all")
def do_check_pipeline(self, arg):
"Check Environment of Pipeline Serving"
check_env("pipeline")
check_env("pipeline")
def do_check_cpp(self, arg):
"Check Environment of C++ Serving"
check_env("cpp")
check_env("cpp")
def do_check_inference(self, arg):
"Check Environment of Paddle Inference"
check_env("inference")
check_env("inference")
def do_debug(self, arg):
"Open pytest log to debug"
check_env("debug")
check_env("debug")
def do_exit(self, arg):
"Exit Check Env Shell"
......@@ -531,6 +612,7 @@ class Check_Env_Shell(cmd.Cmd):
os._exit(0)
return True
if __name__ == "__main__":
# args.device is not used at all.
# just keep the interface.
......@@ -547,7 +629,7 @@ if __name__ == "__main__":
else:
os._exit(-1)
elif args.server == "check":
Check_Env_Shell().cmdloop()
Check_Env_Shell().cmdloop()
for single_model_config in args.model:
if os.path.isdir(single_model_config):
pass
......
......@@ -53,6 +53,14 @@ class Server(object):
self.general_model_config_fn:'list'=[] # ["GeneralInferOp_0/general_model.prototxt"]The quantity is equal to the InferOp quantity,Feed and Fetch --OP
self.subdirectory:'list'=[] # The quantity is equal to the InferOp quantity, and name = node.name = engine.name
self.model_config_paths:'collections.OrderedDict()' # Save the serving_server_conf.prototxt path (feed and fetch information) this is a map for multi-model in a workflow
self.enable_dist_model: bool, enable distributed model, false default
self.dist_carrier_id: string, mark distributed model carrier name, "" default.
self.dist_cfg_file: string, file name of distributed configure, "" default.
self.dist_nranks: int, number of distributed nodes, 0 default.
self.dist_endpoints: list of string, all endpoints(ip:port) of distributed nodes, [] default.
self.dist_subgraph_index: index of distributed subgraph model, -1 default. It is used to select the endpoint of the current shard in distribute model. -1 default.
self.dist_worker_serving_endpoints: all endpoints of worker serving in the same machine. [] default.
self.dist_master_serving: the master serving is used for receiving client requests, only in pp0 of pipeline parallel, False default.
"""
self.server_handle_ = None
self.infer_service_conf = None
......@@ -101,6 +109,14 @@ class Server(object):
self.enable_prometheus = False
self.prometheus_port = 19393
self.request_cache_size = 0
self.enable_dist_model = False
self.dist_carrier_id = ""
self.dist_cfg_file = ""
self.dist_nranks = 0
self.dist_endpoints = []
self.dist_subgraph_index = -1
self.dist_worker_serving_endpoints = []
self.dist_master_serving = False
def get_fetch_list(self, infer_node_idx=-1):
fetch_names = [
......@@ -211,6 +227,55 @@ class Server(object):
def set_request_cache_size(self, request_cache_size):
self.request_cache_size = request_cache_size
def set_enable_dist_model(self, status):
self.enable_dist_model = status
def set_dist_carrier_id(self, carrier_id):
if isinstance(carrier_id, int):
carrier_id = str(carrier_id)
self.dist_carrier_id = carrier_id
def set_dist_cfg_file(self, dist_cfg_file):
self.dist_cfg_file = dist_cfg_file
def set_dist_nranks(self, nranks):
if isinstance(nranks, str):
nranks = int(nranks)
elif not isinstance(nranks, int):
raise ValueError("dist_nranks type error! must be int or string")
self.dist_nranks = nranks
def set_dist_endpoints(self, endpoints):
if isinstance(endpoints, list):
self.dist_endpoints = endpoints
elif isinstance(endpoints, str):
self.dist_endpoints = [endpoints]
else:
raise ValueError(
"dist_endpoints type error! must be list or string")
def set_dist_subgraph_index(self, subgraph_index):
if isinstance(subgraph_index, str):
subgraph_index = int(subgraph_index)
elif not isinstance(subgraph_index, int):
raise ValueError("subgraph type error! must be int or string")
self.dist_subgraph_index = subgraph_index
def set_dist_worker_serving_endpoint(self, serving_endpoints):
if isinstance(serving_endpoints, list):
self.dist_worker_serving_endpoint = serving_endpoints
elif not isinstance(serving_endpoints, str):
self.dist_worker_serving_endpoint = [serving_endpoints]
else:
raise ValueError(
"dist_worker_serving_endpoint type error! must be list or string"
)
def set_dist_master_serving(self, is_master):
self.dist_master_serving = is_master
def _prepare_engine(self, model_config_paths, device, use_encryption_model):
self.device = device
if self.model_toolkit_conf == None:
......@@ -265,6 +330,15 @@ class Server(object):
engine.use_ascend_cl = self.use_ascend_cl
engine.use_gpu = False
# use distributed model.
if self.dist_subgraph_index >= 0:
engine.enable_dist_model = True
engine.dist_carrier_id = self.dist_carrier_id
engine.dist_cfg_file = self.dist_cfg_file
engine.dist_nranks = self.dist_nranks
engine.dist_endpoints.extend(self.dist_endpoints)
engine.dist_subgraph_index = self.dist_subgraph_index
if len(self.gpuid) == 0:
raise ValueError("CPU: self.gpuid = -1, GPU: must set it ")
op_gpu_list = self.gpuid[index % len(self.gpuid)].split(",")
......@@ -592,7 +666,7 @@ class Server(object):
"-num_threads {} " \
"-port {} " \
"-precision {} " \
"-use_calib={} " \
"-use_calib {} " \
"-reload_interval_s {} " \
"-resource_path {} " \
"-resource_file {} " \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册