From 25ca1469afdf8f5188f40cca48150f769e603604 Mon Sep 17 00:00:00 2001 From: TeslaZhao Date: Thu, 21 Apr 2022 19:54:19 +0800 Subject: [PATCH] Distribute Serving for Foundation Models --- cmake/paddlepaddle.cmake | 4 +- core/configure/proto/server_configure.proto | 21 ++- core/general-server/op/general_reader_op.cpp | 24 ++- .../proto/general_model_service.proto | 6 +- core/predictor/framework/dag.cpp | 7 +- core/predictor/framework/dag.h | 1 + core/predictor/framework/dag_view.cpp | 1 + core/predictor/framework/infer.h | 15 +- core/predictor/op/op.cpp | 10 +- core/predictor/op/op.h | 4 + .../sdk-cpp/proto/general_model_service.proto | 6 +- .../paddle/include/paddle_engine.h | 34 ++++ python/paddle_serving_server/dag.py | 13 +- python/paddle_serving_server/env_check/run.py | 50 ++++-- python/paddle_serving_server/serve.py | 160 +++++++++++++----- python/paddle_serving_server/server.py | 76 ++++++++- 16 files changed, 347 insertions(+), 85 deletions(-) mode change 100755 => 100644 core/configure/proto/server_configure.proto diff --git a/cmake/paddlepaddle.cmake b/cmake/paddlepaddle.cmake index ad8df0f1..599f6d47 100644 --- a/cmake/paddlepaddle.cmake +++ b/cmake/paddlepaddle.cmake @@ -30,7 +30,7 @@ message( "WITH_GPU = ${WITH_GPU}") # Paddle Version should be one of: # latest: latest develop build # version number like 1.5.2 -SET(PADDLE_VERSION "2.2.2") +SET(PADDLE_VERSION "2.3.0-rc0") if (WITH_GPU) message("CUDA: ${CUDA_VERSION}, CUDNN_MAJOR_VERSION: ${CUDNN_MAJOR_VERSION}") # cuda 11.0 is not supported, 11.2 would be added. @@ -177,7 +177,7 @@ if (NOT WITH_MKLML) endif() ADD_LIBRARY(paddle_inference STATIC IMPORTED GLOBAL) -SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.a) +SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so) if (WITH_ASCEND_CL) SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so) endif() diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto old mode 100755 new mode 100644 index c974f010..5848f8e4 --- a/core/configure/proto/server_configure.proto +++ b/core/configure/proto/server_configure.proto @@ -65,6 +65,24 @@ message EngineDesc { optional int32 batch_infer_size = 31 [ default = 32 ]; optional bool enable_overrun = 32 [ default = false ]; optional bool allow_split_request = 33 [ default = true ]; + + /* + * Distributed inference params + * "enable_dist_model": enable distributed model, false default. + * "carrier_id": mark carrier + * "dist_cfg_file": file name of distributed configure. + * "dist_nranks": number of distributed nodes. + * "dist_endpoints": all endpoints(ip:port) of distributed nodes. + * "dist_subgraph_index": distributed subgraph index, auto increment from 0. + * It is + * used to select the endpoint of the current shard in distribute model. + */ + optional bool enable_dist_model = 40 [ default = false ]; + optional string dist_carrier_id = 41 [ default = "inference" ]; + optional string dist_cfg_file = 42; + optional int32 dist_nranks = 43 [ default = 0 ]; + repeated string dist_endpoints = 44; + optional int32 dist_subgraph_index = 45 [ default = 0 ]; }; // model_toolkit conf @@ -96,7 +114,8 @@ message DAGNodeDependency { message DAGNode { required string name = 1; required string type = 2; - repeated DAGNodeDependency dependencies = 3; + repeated string address = 3; + repeated DAGNodeDependency dependencies = 4; }; // workflow entry diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp index 7b88f22a..21f96e6c 100644 --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -80,8 +80,8 @@ int GeneralReaderOp::inference() { VLOG(2) << "(logid=" << log_id << ") var num: " << var_num << ") start to call load general model_conf op"; if (var_num < 1) { - LOG(ERROR) << "(logid=" << log_id << ") Failed get feed_var, var_num=" - << var_num; + LOG(ERROR) << "(logid=" << log_id + << ") Failed get feed_var, var_num=" << var_num; return -1; } @@ -98,7 +98,7 @@ int GeneralReaderOp::inference() { int64_t elem_type = 0; int64_t elem_size = 0; int64_t databuf_size = 0; - const void* src_ptr = nullptr; + const void *src_ptr = nullptr; for (int i = 0; i < var_num; ++i) { paddle::PaddleTensor paddleTensor; const Tensor &tensor = req->tensor(i); @@ -107,7 +107,7 @@ int GeneralReaderOp::inference() { elem_size = 0; databuf_size = 0; elem_type = tensor.elem_type(); - src_ptr = nullptr ; + src_ptr = nullptr; if (elem_type == P_INT64) { // int64 elem_size = sizeof(int64_t); paddleTensor.dtype = paddle::PaddleDType::INT64; @@ -157,8 +157,8 @@ int GeneralReaderOp::inference() { << "dtype=" << paddleTensor.dtype << ";" << "data_len=" << data_len; if (src_ptr == nullptr) { - LOG(ERROR) << "Not support var[" << i << "] with elem_type[" - << elem_type << "]"; + LOG(ERROR) << "Not support var[" << i << "] with elem_type[" << elem_type + << "]"; continue; } // implement lod tensor here @@ -166,9 +166,15 @@ int GeneralReaderOp::inference() { // TODO(HexToString): support 2-D lod if (tensor.lod_size() > 0) { VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor"; - paddleTensor.lod.resize(1); + int lod_index = -1; for (int k = 0; k < tensor.lod_size(); ++k) { - paddleTensor.lod[0].push_back(tensor.lod(k)); + if (tensor.lod(k) == 0) { + lod_index++; + paddleTensor.lod.resize(lod_index + 1); + } + paddleTensor.lod[lod_index].push_back(tensor.lod(k)); + VLOG(2) << "(logid=" << log_id << ") lod[" << lod_index + << "]=" << tensor.lod(k); } } @@ -191,7 +197,7 @@ int GeneralReaderOp::inference() { VLOG(2) << "(logid=" << log_id << ") var[" << i << "] has lod_tensor and len=" << out->at(i).lod[0].back(); } - void* dst_ptr = out->at(i).data.data(); + void *dst_ptr = out->at(i).data.data(); if (!dst_ptr) { LOG(ERROR) << "dst_ptr is nullptr"; return -1; diff --git a/core/general-server/proto/general_model_service.proto b/core/general-server/proto/general_model_service.proto index 4b628263..904ffb97 100755 --- a/core/general-server/proto/general_model_service.proto +++ b/core/general-server/proto/general_model_service.proto @@ -92,11 +92,13 @@ message Request { message Response { repeated ModelOutput outputs = 1; repeated int64 profile_time = 2; + bool profile_server = 3; + uint64 log_id = 4; // Error code - int32 err_no = 3; + int32 err_no = 5; // Error messages - string err_msg = 4; + string err_msg = 6; }; message ModelOutput { diff --git a/core/predictor/framework/dag.cpp b/core/predictor/framework/dag.cpp index c45952f8..629e3b09 100644 --- a/core/predictor/framework/dag.cpp +++ b/core/predictor/framework/dag.cpp @@ -129,6 +129,10 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) { node->id = i + 1; // 0 is reserved for begginer-op node->name = conf.nodes(i).name(); node->type = conf.nodes(i).type(); + for (int add_index = 0; add_index < conf.nodes(i).address_size(); + ++add_index) { + node->address.push_back(conf.nodes(i).address(add_index)); + } uint32_t depend_size = conf.nodes(i).dependencies_size(); for (uint32_t j = 0; j < depend_size; j++) { const configure::DAGNodeDependency& depend = @@ -159,7 +163,8 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) { for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) { DagNode* node = _index_nodes[nid]; LOG(INFO) << "OP-" << node->id << "-" << node->name << "-" << node->type - << " depends: " << node->depends.size(); + << " depends: " << node->depends.size() + << " address: " << node->address.size(); boost::unordered_map::iterator it; for (it = node->depends.begin(); it != node->depends.end(); it++) { diff --git a/core/predictor/framework/dag.h b/core/predictor/framework/dag.h index 1145c9a9..c072c5e3 100644 --- a/core/predictor/framework/dag.h +++ b/core/predictor/framework/dag.h @@ -29,6 +29,7 @@ struct DagNode { std::string name; // opname std::string full_name; // workflow_stageindex_opname std::string type; + std::vector address; void* conf; boost::unordered_map depends; }; diff --git a/core/predictor/framework/dag_view.cpp b/core/predictor/framework/dag_view.cpp index 64383514..16d2c647 100644 --- a/core/predictor/framework/dag_view.cpp +++ b/core/predictor/framework/dag_view.cpp @@ -90,6 +90,7 @@ int DagView::init(Dag* dag, node->name, node->type, node->conf, + node->address, log_id) != 0) { LOG(WARNING) << "(logid=" << log_id << ") Failed init op, type:" << node->type; diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 5c5ef873..171b3bfa 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -32,7 +32,8 @@ #include "core/predictor/framework/memory.h" #include "core/predictor/framework/predictor_metric.h" #include "paddle_inference_api.h" // NOLINT -#include "experimental/float16.h" +//#include "experimental/float16.h" +#include "experimental/phi/common/float16.h" namespace baidu { namespace paddle_serving { namespace predictor { @@ -548,9 +549,9 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { int8_t* data = static_cast(origin_data); lod_tensor_in->CopyFromCpu(data); } else if ((*tensorVector_in_pointer)[i].dtype == - paddle::PaddleDType::FLOAT16) { - paddle::platform::float16* data = - static_cast(origin_data); + paddle::PaddleDType::FLOAT16) { + phi::dtype::float16* data = + static_cast(origin_data); lod_tensor_in->CopyFromCpu(data); } else { LOG(ERROR) << "Inference not support type[" @@ -646,14 +647,14 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { lod_tensor_out->CopyToCpu(data_out); databuf_char = reinterpret_cast(data_out); } else if (dataType == paddle::PaddleDType::FLOAT16) { - databuf_size = out_num * sizeof(paddle::platform::float16); + databuf_size = out_num * sizeof(phi::dtype::float16); databuf_data = MempoolWrapper::instance().malloc(databuf_size); if (!databuf_data) { LOG(ERROR) << "Malloc failed, size: " << databuf_size; return -1; } - paddle::platform::float16* data_out = - reinterpret_cast(databuf_data); + phi::dtype::float16* data_out = + reinterpret_cast(databuf_data); lod_tensor_out->CopyToCpu(data_out); databuf_char = reinterpret_cast(data_out); } diff --git a/core/predictor/op/op.cpp b/core/predictor/op/op.cpp index 33dba2b5..a7848ede 100644 --- a/core/predictor/op/op.cpp +++ b/core/predictor/op/op.cpp @@ -36,12 +36,14 @@ int Op::init(Bus* bus, const std::string& name, const std::string& type, void* conf, + const std::vector& address, const uint64_t log_id) { _bus = bus; _dag = dag; _id = id; _name = name; _type = type; + _address = address; set_config(conf); _timer = butil::get_object(); @@ -110,11 +112,13 @@ int Op::process(const uint64_t log_id, bool debug) { return ERR_INTERNAL_FAILURE; } + /* if (_has_calc) { LOG(INFO) << "(logid=" << log_id << ") Op: " << _name << " already processed before"; return ERR_OK; } + */ // 1. dependency inference /* @@ -147,8 +151,10 @@ int Op::process(const uint64_t log_id, bool debug) { } // 3. share output to bus - Channel* channel = mutable_channel(); - channel->share_to_bus(_bus, log_id); + if (!_has_calc) { + Channel* channel = mutable_channel(); + channel->share_to_bus(_bus, log_id); + } // 4. mark has calculated _has_calc = true; diff --git a/core/predictor/op/op.h b/core/predictor/op/op.h index ea700cce..f14d74b4 100644 --- a/core/predictor/op/op.h +++ b/core/predictor/op/op.h @@ -114,6 +114,7 @@ class Op { const std::string& name, const std::string& type, void* conf, + const std::vector& address, const uint64_t log_id); int deinit(); @@ -135,6 +136,8 @@ class Op { const std::string& full_name() const { return _full_name; } + const std::vector& address() const { return _address; } + const std::vector& pre_names() const { return _pre_node_names; } void set_full_name(const std::string full_name) { _full_name = full_name; } @@ -206,6 +209,7 @@ class Op { std::string _name; std::string _full_name; // service_workflow_stageindex_opname std::string _type; + std::vector _address; bool _has_calc; bool _has_init; TimerFlow* _timer; diff --git a/core/sdk-cpp/proto/general_model_service.proto b/core/sdk-cpp/proto/general_model_service.proto index 5340f422..de951625 100755 --- a/core/sdk-cpp/proto/general_model_service.proto +++ b/core/sdk-cpp/proto/general_model_service.proto @@ -92,11 +92,13 @@ message Request { message Response { repeated ModelOutput outputs = 1; repeated int64 profile_time = 2; + bool profile_server = 3; + uint64 log_id = 4; // Error code - int32 err_no = 3; + int32 err_no = 5; // Error messages - string err_msg = 4; + string err_msg = 6; }; message ModelOutput { diff --git a/paddle_inference/paddle/include/paddle_engine.h b/paddle_inference/paddle/include/paddle_engine.h index bf8c98ed..1bdb6265 100644 --- a/paddle_inference/paddle/include/paddle_engine.h +++ b/paddle_inference/paddle/include/paddle_engine.h @@ -37,6 +37,7 @@ using paddle_infer::PrecisionType; using paddle_infer::Predictor; using paddle_infer::Tensor; using paddle_infer::CreatePredictor; +using paddle_infer::DistConfig; DECLARE_int32(gpuid); DECLARE_string(precision); @@ -206,6 +207,39 @@ class PaddleInferenceEngine : public EngineCore { config.SetModel(model_path); } + // Enable distributed model inferencing + DistConfig distCfg; + if (engine_conf.has_enable_dist_model() && + engine_conf.enable_dist_model()) { + int ep_size = engine_conf.dist_endpoints_size(); + int cur_index = engine_conf.dist_subgraph_index(); + if (ep_size <= cur_index) { + LOG(ERROR) << "create paddle predictor failed, Distributed model error." + << " dist_endpoints_size=" << ep_size + << " is not bigger than dist_subgraph_index=" << cur_index; + return -1; + } + std::vector vec_eps; + for (int i = 0; i < ep_size; ++i) { + vec_eps.emplace_back(engine_conf.dist_endpoints(i)); + } + distCfg.EnableDistModel(true); + distCfg.SetCarrierId(engine_conf.dist_carrier_id()); + distCfg.SetRanks(engine_conf.dist_nranks(), cur_index); + distCfg.SetEndpoints(vec_eps, engine_conf.dist_endpoints(cur_index)); + distCfg.SetCommInitConfig(engine_conf.dist_cfg_file()); + + config.SetDistConfig(distCfg); + LOG(INFO) << "Create Distributed predictor! dist_carrier_id=" + << engine_conf.dist_carrier_id() + << ", Ranks=" << engine_conf.dist_nranks() + << ", current index of ranks=" << cur_index + << ", current endpoint=" + << engine_conf.dist_endpoints(cur_index) + << ", communicate init config file=" + << engine_conf.dist_cfg_file(); + } + config.SwitchSpecifyInputNames(true); config.SetCpuMathLibraryNumThreads(1); if (engine_conf.has_use_gpu() && engine_conf.use_gpu()) { diff --git a/python/paddle_serving_server/dag.py b/python/paddle_serving_server/dag.py index 50a92fa1..91c9014e 100755 --- a/python/paddle_serving_server/dag.py +++ b/python/paddle_serving_server/dag.py @@ -30,10 +30,16 @@ class OpMaker(object): "GeneralDistKVOp", "GeneralCopyOp", "GeneralDetectionOp", + "GeneralRemoteOp", ] self.node_name_suffix_ = collections.defaultdict(int) - def create(self, node_type, engine_name=None, inputs=[], outputs=[]): + def create(self, + node_type, + engine_name=None, + inputs=[], + outputs=[], + addresses=[]): if node_type not in self.op_list: raise Exception("Op type {} is not supported right now".format( node_type)) @@ -55,6 +61,11 @@ class OpMaker(object): dep.name = dep_node.name dep.mode = "RO" node.dependencies.extend([dep]) + + # for general_remote op. + if addresses: + node.address.extend(addresses) + # Because the return value will be used as the key value of the # dict, and the proto object is variable which cannot be hashed, # so it is processed into a string. This has little effect on diff --git a/python/paddle_serving_server/env_check/run.py b/python/paddle_serving_server/env_check/run.py index 2c4216b2..0c29426a 100644 --- a/python/paddle_serving_server/env_check/run.py +++ b/python/paddle_serving_server/env_check/run.py @@ -21,21 +21,27 @@ Usage: export PYTHON_EXECUTABLE=/usr/local/bin/python3.6 python3.6 -m paddle_serving_server.serve check ''' - import sys import os import pytest - inference_test_cases = ["test_fit_a_line.py::TestFitALine::test_inference"] -cpp_test_cases = ["test_fit_a_line.py::TestFitALine::test_cpu", "test_fit_a_line.py::TestFitALine::test_gpu"] -pipeline_test_cases = ["test_uci_pipeline.py::TestUCIPipeline::test_cpu", "test_uci_pipeline.py::TestUCIPipeline::test_gpu"] +cpp_test_cases = [ + "test_fit_a_line.py::TestFitALine::test_cpu", + "test_fit_a_line.py::TestFitALine::test_gpu" +] +pipeline_test_cases = [ + "test_uci_pipeline.py::TestUCIPipeline::test_cpu", + "test_uci_pipeline.py::TestUCIPipeline::test_gpu" +] log_files = ["PipelineServingLogs", "log", "stderr.log", "stdout.log"] + def set_serving_log_path(): if 'SERVING_LOG_PATH' not in os.environ: serving_log_path = os.path.expanduser(os.getcwd()) + '/' - os.environ['SERVING_LOG_PATH']=serving_log_path + os.environ['SERVING_LOG_PATH'] = serving_log_path + def mv_log_to_new_dir(dir_path): import shutil @@ -46,8 +52,8 @@ def mv_log_to_new_dir(dir_path): file_path = os.path.join(serving_log_path, file_name) dir_path_temp = os.path.join(dir_path, file_name) if os.path.exists(file_path): - shutil.move(file_path, dir_path_temp) - + shutil.move(file_path, dir_path_temp) + def run_test_cases(cases_list, case_type, is_open_std): old_stdout, old_stderr = sys.stdout, sys.stderr @@ -66,33 +72,41 @@ def run_test_cases(cases_list, case_type, is_open_std): new_dir_path = os.path.join(serving_log_path, dir_name) mv_log_to_new_dir(new_dir_path) if res == 0: - print("{} {} environment running success".format(case_type, case_name)) + print("{} {} environment running success".format(case_type, + case_name)) elif res == 1: if case_name == "inference": - print("{} {} environment running failure. Please refer to https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/install/pip/linux-pip.html to configure environment".format(case_type, case_name)) + print( + "{} {} environment running failure. Please refer to https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/install/pip/linux-pip.html to configure environment". + format(case_type, case_name)) os._exit(0) else: - print("{} {} environment running failure, if you need this environment, please refer to https://github.com/PaddlePaddle/Serving/blob/develop/doc/Install_CN.md".format(case_type, case_name)) + print( + "{} {} environment running failure, if you need this environment, please refer to https://github.com/PaddlePaddle/Serving/blob/develop/doc/Install_CN.md". + format(case_type, case_name)) + def unset_env(key): del os.environ[key] + def check_env(mode): set_serving_log_path() if 'https_proxy' in os.environ or 'http_proxy' in os.environ: - unset_env("https_proxy") - unset_env("http_proxy") + unset_env("https_proxy") + unset_env("http_proxy") if 'GREP_OPTIONS' in os.environ: - unset_env("GREP_OPTIONS") - is_open_std = False - if mode is "debug": + unset_env("GREP_OPTIONS") + is_open_std = False + if mode == "debug": is_open_std = True - if mode is "all" or mode is "inference" or mode is "debug": + if mode == "all" or mode == "inference" or mode == "debug": run_test_cases(inference_test_cases, "PaddlePaddle", is_open_std) - if mode is "all" or mode is "cpp" or mode is "debug": + if mode == "all" or mode == "cpp" or mode == "debug": run_test_cases(cpp_test_cases, "C++", is_open_std) - if mode is "all" or mode is "pipeline" or mode is "debug": + if mode == "all" or mode == "pipeline" or mode == "debug": run_test_cases(pipeline_test_cases, "Pipeline", is_open_std) + if __name__ == '__main__': check_env("debug") diff --git a/python/paddle_serving_server/serve.py b/python/paddle_serving_server/serve.py index 09931dad..3369bd05 100755 --- a/python/paddle_serving_server/serve.py +++ b/python/paddle_serving_server/serve.py @@ -37,12 +37,15 @@ from paddle_serving_server.util import * from paddle_serving_server.env_check.run import check_env import cmd + def signal_handler(signal, frame): print('Process stopped') sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) + # web_service.py is still used by Pipeline. def port_is_available(port): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: @@ -185,10 +188,10 @@ def serve_args(): action="store_true", help="Use encryption model") parser.add_argument( - "--encryption_rpc_port", - type=int, - required=False, - default=12000, + "--encryption_rpc_port", + type=int, + required=False, + default=12000, help="Port of encryption model, only valid for arg.use_encryption_model") parser.add_argument( "--use_trt", default=False, action="store_true", help="Use TensorRT") @@ -217,11 +220,60 @@ def serve_args(): action="store_true", help="Use gpu_multi_stream") parser.add_argument( - "--enable_prometheus", default=False, action="store_true", help="Use Prometheus") + "--enable_prometheus", + default=False, + action="store_true", + help="Use Prometheus") + parser.add_argument( + "--prometheus_port", + type=int, + default=19393, + help="Port of the Prometheus") + parser.add_argument( + "--request_cache_size", + type=int, + default=0, + help="Port of the Prometheus") + parser.add_argument( + "--use_dist_model", + default=False, + action="store_true", + help="Use distributed model") + parser.add_argument( + "--dist_carrier_id", + type=str, + default="", + help="carrier id of distributed model") + parser.add_argument( + "--dist_cfg_file", + type=str, + default="", + help="config file of distributed model") + parser.add_argument( + "--dist_endpoints", + type=str, + default="+", + help="endpoints of distributed model. splited by comma") + parser.add_argument( + "--dist_nranks", + type=int, + default=0, + help="nranks of distributed model") + parser.add_argument( + "--dist_subgraph_index", + type=int, + default=-1, + help="index of distributed model") parser.add_argument( - "--prometheus_port", type=int, default=19393, help="Port of the Prometheus") + "--dist_worker_serving_endpoints", + type=str, + default=None, + help="endpoints of worker serving endpoints") parser.add_argument( - "--request_cache_size", type=int, default=0, help="Port of the Prometheus") + "--dist_master_serving", + default=False, + action="store_true", + help="The master serving of distributed inference") return parser.parse_args() @@ -245,7 +297,7 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi workdir = "{}_{}".format(args.workdir, port) dag_list_op = [] - if model == "": + if model == "" and not args.dist_master_serving: print("You must specify your serving model") exit(-1) for single_model_config in args.model: @@ -270,30 +322,51 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi dag_list_op.append(temp_str_list[0]) - read_op = op_maker.create('GeneralReaderOp') - op_seq_maker.add_op(read_op) - #如果dag_list_op不是空,那么证明通过--op 传入了自定义OP或自定义的DAG串联关系。 - #此时,根据--op 传入的顺序去组DAG串联关系 - if len(dag_list_op) > 0: - for single_op in dag_list_op: - op_seq_maker.add_op(op_maker.create(single_op)) - #否则,仍然按照原有方式根虎--model去串联。 - else: - for idx, single_model in enumerate(model): - infer_op_name = "GeneralInferOp" - # 目前由于ocr的节点Det模型依赖于opencv的第三方库 - # 只有使用ocr的时候,才会加入opencv的第三方库并编译GeneralDetectionOp - # 故此处做特殊处理,当不满足下述情况时,所添加的op默认为GeneralInferOp - # 以后可能考虑不用python脚本来生成配置 - if len(model) == 2 and idx == 0 and single_model == "ocr_det_model": - infer_op_name = "GeneralDetectionOp" - else: + # The workflows of master serving in distributed model is different from + # worker servings. The workflow of worker servings is same to non-distributed + # model, but workerflow of master serving needs to add IP address of other + # worker serving in the machine. + if not args.dist_master_serving: + read_op = op_maker.create('GeneralReaderOp') + op_seq_maker.add_op(read_op) + #如果dag_list_op不是空,那么证明通过--op 传入了自定义OP或自定义的DAG串联关系。 + #此时,根据--op 传入的顺序去组DAG串联关系 + if len(dag_list_op) > 0: + for single_op in dag_list_op: + op_seq_maker.add_op(op_maker.create(single_op)) + #否则,仍然按照原有方式根虎--model去串联。 + else: + for idx, single_model in enumerate(model): infer_op_name = "GeneralInferOp" - general_infer_op = op_maker.create(infer_op_name) - op_seq_maker.add_op(general_infer_op) + # 目前由于ocr的节点Det模型依赖于opencv的第三方库 + # 只有使用ocr的时候,才会加入opencv的第三方库并编译GeneralDetectionOp + # 故此处做特殊处理,当不满足下述情况时,所添加的op默认为GeneralInferOp + # 以后可能考虑不用python脚本来生成配置 + if len(model + ) == 2 and idx == 0 and single_model == "ocr_det_model": + infer_op_name = "GeneralDetectionOp" + else: + infer_op_name = "GeneralInferOp" + general_infer_op = op_maker.create(infer_op_name) + op_seq_maker.add_op(general_infer_op) - general_response_op = op_maker.create('GeneralResponseOp') - op_seq_maker.add_op(general_response_op) + general_response_op = op_maker.create('GeneralResponseOp') + op_seq_maker.add_op(general_response_op) + else: + # for the master serving of distributed model only add one general_remote op. + if args.dist_worker_serving_endpoints is None: + raise ValueError( + "Params Error!. dist_worker_serving_endpoints is empty when dist_master_serving is set" + ) + worker_serving_endpoints = args.dist_worker_serving_endpoints.split(",") + if len(worker_serving_endpoints) == 0: + raise ValueError( + "Params Error!. dist_worker_serving_endpoints is empty when dist_master_serving is set" + ) + + general_remote_op = op_maker.create( + 'GeneralRemoteOp', None, [], [], addresses=worker_serving_endpoints) + op_seq_maker.add_op(general_remote_op, ) server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_num_threads(thread_num) @@ -306,6 +379,12 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi server.set_enable_prometheus(args.enable_prometheus) server.set_prometheus_port(args.prometheus_port) server.set_request_cache_size(args.request_cache_size) + server.set_enable_dist_model(args.use_dist_model) + server.set_dist_carrier_id(args.dist_carrier_id) + server.set_dist_cfg_file(args.dist_cfg_file) + server.set_dist_nranks(args.dist_nranks) + server.set_dist_endpoints(args.dist_endpoints.split(",")) + server.set_dist_subgraph_index(args.dist_subgraph_index) if args.use_trt and device == "gpu": server.set_trt() @@ -489,8 +568,10 @@ def stop_serving(command: str, port: int=None): os.remove(filepath) return True + class Check_Env_Shell(cmd.Cmd): intro = "Welcome to the check env shell.Type help to list commands.\n" + # ----- basic commands ----- def do_help(self, arg): print("\nCommand list\t\tDescription\n"\ @@ -507,23 +588,23 @@ class Check_Env_Shell(cmd.Cmd): def do_check_all(self, arg): "Check Environment of Paddle Inference, Pipeline Serving, C++ Serving" - check_env("all") - + check_env("all") + def do_check_pipeline(self, arg): "Check Environment of Pipeline Serving" - check_env("pipeline") - + check_env("pipeline") + def do_check_cpp(self, arg): "Check Environment of C++ Serving" - check_env("cpp") + check_env("cpp") def do_check_inference(self, arg): "Check Environment of Paddle Inference" - check_env("inference") - + check_env("inference") + def do_debug(self, arg): "Open pytest log to debug" - check_env("debug") + check_env("debug") def do_exit(self, arg): "Exit Check Env Shell" @@ -531,6 +612,7 @@ class Check_Env_Shell(cmd.Cmd): os._exit(0) return True + if __name__ == "__main__": # args.device is not used at all. # just keep the interface. @@ -547,7 +629,7 @@ if __name__ == "__main__": else: os._exit(-1) elif args.server == "check": - Check_Env_Shell().cmdloop() + Check_Env_Shell().cmdloop() for single_model_config in args.model: if os.path.isdir(single_model_config): pass diff --git a/python/paddle_serving_server/server.py b/python/paddle_serving_server/server.py index a1ed1c1c..0784cb33 100755 --- a/python/paddle_serving_server/server.py +++ b/python/paddle_serving_server/server.py @@ -53,6 +53,14 @@ class Server(object): self.general_model_config_fn:'list'=[] # ["GeneralInferOp_0/general_model.prototxt"]The quantity is equal to the InferOp quantity,Feed and Fetch --OP self.subdirectory:'list'=[] # The quantity is equal to the InferOp quantity, and name = node.name = engine.name self.model_config_paths:'collections.OrderedDict()' # Save the serving_server_conf.prototxt path (feed and fetch information) this is a map for multi-model in a workflow + self.enable_dist_model: bool, enable distributed model, false default + self.dist_carrier_id: string, mark distributed model carrier name, "" default. + self.dist_cfg_file: string, file name of distributed configure, "" default. + self.dist_nranks: int, number of distributed nodes, 0 default. + self.dist_endpoints: list of string, all endpoints(ip:port) of distributed nodes, [] default. + self.dist_subgraph_index: index of distributed subgraph model, -1 default. It is used to select the endpoint of the current shard in distribute model. -1 default. + self.dist_worker_serving_endpoints: all endpoints of worker serving in the same machine. [] default. + self.dist_master_serving: the master serving is used for receiving client requests, only in pp0 of pipeline parallel, False default. """ self.server_handle_ = None self.infer_service_conf = None @@ -101,6 +109,14 @@ class Server(object): self.enable_prometheus = False self.prometheus_port = 19393 self.request_cache_size = 0 + self.enable_dist_model = False + self.dist_carrier_id = "" + self.dist_cfg_file = "" + self.dist_nranks = 0 + self.dist_endpoints = [] + self.dist_subgraph_index = -1 + self.dist_worker_serving_endpoints = [] + self.dist_master_serving = False def get_fetch_list(self, infer_node_idx=-1): fetch_names = [ @@ -211,6 +227,55 @@ class Server(object): def set_request_cache_size(self, request_cache_size): self.request_cache_size = request_cache_size + def set_enable_dist_model(self, status): + self.enable_dist_model = status + + def set_dist_carrier_id(self, carrier_id): + if isinstance(carrier_id, int): + carrier_id = str(carrier_id) + self.dist_carrier_id = carrier_id + + def set_dist_cfg_file(self, dist_cfg_file): + self.dist_cfg_file = dist_cfg_file + + def set_dist_nranks(self, nranks): + if isinstance(nranks, str): + nranks = int(nranks) + elif not isinstance(nranks, int): + raise ValueError("dist_nranks type error! must be int or string") + + self.dist_nranks = nranks + + def set_dist_endpoints(self, endpoints): + if isinstance(endpoints, list): + self.dist_endpoints = endpoints + elif isinstance(endpoints, str): + self.dist_endpoints = [endpoints] + else: + raise ValueError( + "dist_endpoints type error! must be list or string") + + def set_dist_subgraph_index(self, subgraph_index): + if isinstance(subgraph_index, str): + subgraph_index = int(subgraph_index) + elif not isinstance(subgraph_index, int): + raise ValueError("subgraph type error! must be int or string") + + self.dist_subgraph_index = subgraph_index + + def set_dist_worker_serving_endpoint(self, serving_endpoints): + if isinstance(serving_endpoints, list): + self.dist_worker_serving_endpoint = serving_endpoints + elif not isinstance(serving_endpoints, str): + self.dist_worker_serving_endpoint = [serving_endpoints] + else: + raise ValueError( + "dist_worker_serving_endpoint type error! must be list or string" + ) + + def set_dist_master_serving(self, is_master): + self.dist_master_serving = is_master + def _prepare_engine(self, model_config_paths, device, use_encryption_model): self.device = device if self.model_toolkit_conf == None: @@ -265,6 +330,15 @@ class Server(object): engine.use_ascend_cl = self.use_ascend_cl engine.use_gpu = False + # use distributed model. + if self.dist_subgraph_index >= 0: + engine.enable_dist_model = True + engine.dist_carrier_id = self.dist_carrier_id + engine.dist_cfg_file = self.dist_cfg_file + engine.dist_nranks = self.dist_nranks + engine.dist_endpoints.extend(self.dist_endpoints) + engine.dist_subgraph_index = self.dist_subgraph_index + if len(self.gpuid) == 0: raise ValueError("CPU: self.gpuid = -1, GPU: must set it ") op_gpu_list = self.gpuid[index % len(self.gpuid)].split(",") @@ -592,7 +666,7 @@ class Server(object): "-num_threads {} " \ "-port {} " \ "-precision {} " \ - "-use_calib={} " \ + "-use_calib {} " \ "-reload_interval_s {} " \ "-resource_path {} " \ "-resource_file {} " \ -- GitLab