diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cb661faf996bc32424f88103f238088efd08520..7991e15cbb19726fba9881bd3061de9efdc5d215 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ find_package(Threads REQUIRED) find_package(CUDA QUIET) include(simd) - +SET(CMAKE_BUILD_TYPE "Debug") # CMAKE_BUILD_TYPE if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "RelWithDebInfo" CACHE STRING diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto index 24fb62806476effdcf453cb7b4047122731106ea..5cace06420e29e1590218f63777c85bbcf504b29 100755 --- a/core/configure/proto/server_configure.proto +++ b/core/configure/proto/server_configure.proto @@ -21,11 +21,12 @@ message EngineDesc { required string reloadable_meta = 3; required string reloadable_type = 4; required string model_dir = 5; - required int32 runtime_thread_num = 6; - required int32 batch_infer_size = 7; - required int32 enable_batch_align = 8; - optional string version_file = 9; - optional string version_type = 10; + repeated int32 gpu_ids = 6; + required int32 runtime_thread_num = 7; + required int32 batch_infer_size = 8; + required int32 enable_batch_align = 9; + optional string version_file = 10; + optional string version_type = 11; /* * Sparse Parameter Service type. Valid types are: @@ -38,16 +39,17 @@ message EngineDesc { LOCAL = 1; REMOTE = 2; } - optional SparseParamServiceType sparse_param_service_type = 11; - optional string sparse_param_service_table_name = 12; - optional bool enable_memory_optimization = 13; - optional bool enable_ir_optimization = 14; - optional bool use_trt = 15; - optional bool use_lite = 16; - optional bool use_xpu = 17; - optional bool use_gpu = 18; - optional bool combined_model = 19; - optional bool encrypted_model = 20; + optional SparseParamServiceType sparse_param_service_type = 12; + optional string sparse_param_service_table_name = 13; + optional bool enable_memory_optimization = 14; + optional bool enable_ir_optimization = 15; + optional bool use_trt = 16; + optional bool use_lite = 17; + optional bool use_xpu = 18; + optional bool use_gpu = 19; + optional bool combined_model = 20; + optional bool encrypted_model = 21; + optional bool gpu_multi_stream = 22; }; // model_toolkit conf diff --git a/core/general-client/src/general_model.cpp b/core/general-client/src/general_model.cpp old mode 100644 new mode 100755 index 0ade573de6ac2da59156ba82f5ff3e04f1b7f6b2..d3dd5d9f7c72abd921c7fdf055e9f02293dfab43 --- a/core/general-client/src/general_model.cpp +++ b/core/general-client/src/general_model.cpp @@ -166,6 +166,8 @@ int PredictorClient::numpy_predict( batch_size = batch_size > string_feed_batch.size() ? batch_size : string_feed_batch.size(); VLOG(2) << "batch size: " << batch_size; + // batch_size must be 1, cause batch is already in Tensor. + // I suggest to remove the outside vector<>. predict_res_batch.clear(); Timer timeline; int64_t preprocess_start = timeline.TimeStampUS(); @@ -188,6 +190,8 @@ int PredictorClient::numpy_predict( } int vec_idx = 0; + // batch_size can only be 1, cause batch is already in Tensor. + // if batch_size is not 1, error will occur in C++ part. for (int bi = 0; bi < batch_size; bi++) { VLOG(2) << "prepare batch " << bi; std::vector tensor_vec; diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp old mode 100644 new mode 100755 index 3e1091dd844f0afd71c8556586f82aafc42c5097..1c40e89b86a5b28543c3a49240316401a46ea639 --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -93,6 +93,9 @@ int GeneralReaderOp::inference() { res->SetLogId(log_id); Timer timeline; int64_t start = timeline.TimeStampUS(); + // only get insts(0), cause batch is already in Tensor. + // req can only include 1 inst. + // var_num means the number of feed_var. int var_num = req->insts(0).tensor_array_size(); VLOG(2) << "(logid=" << log_id << ") var num: " << var_num diff --git a/core/predictor/framework/bsf-inl-tensor.h b/core/predictor/framework/bsf-inl-tensor.h deleted file mode 100644 index b7c725b443281f355addffb8f2fcb36651b6d9b6..0000000000000000000000000000000000000000 --- a/core/predictor/framework/bsf-inl-tensor.h +++ /dev/null @@ -1,373 +0,0 @@ -// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#ifdef BCLOUD -#include -#else -#include -#endif - -#include -#include -#include -#include -#include "core/predictor/common/inner_common.h" -#include "core/predictor/framework/infer_data.h" -#include "core/predictor/framework/memory.h" - -#include - -namespace im { -namespace bsf { - -template <> -struct Task { - typedef Task - TaskT; - typedef baidu::paddle_serving::predictor::Tensor Tensor; - typedef baidu::paddle_serving::predictor::Tensor InType; - typedef baidu::paddle_serving::predictor::Tensor OutType; - typedef baidu::paddle_serving::predictor::BatchTensor BatchTensor; - typedef baidu::paddle_serving::predictor::BatchTensor InArrayT; - typedef baidu::paddle_serving::predictor::BatchTensor OutArrayT; - - struct Segment { - Segment(void* p, size_t b, size_t s) : ptr(p), begin(b), size(s) {} - void* ptr; - size_t begin; - size_t size; - }; - - int read_fd; - int write_fd; - - pid_t owner_tid; - - const InArrayT* in; - OutArrayT* out; - - size_t rem; - size_t size; - - butil::atomic index; - - const BatchTensor* get(bool is_in) const { - if (is_in) { - return in; - } else { - return out; - } - } - - BatchTensor* get(bool is_in) { - if (is_in) { - return const_cast(in); - } else { - return out; - } - } - - Task() { - read_fd = -1; - write_fd = -1; - owner_tid = -1; - in = NULL; - out = NULL; - rem = -1; - size = -1; - index.store(0, butil::memory_order_relaxed); - } -}; - -template <> -class BatchTasks> { - public: - typedef baidu::paddle_serving::predictor::Tensor Tensor; - typedef baidu::paddle_serving::predictor::Tensor InType; - typedef baidu::paddle_serving::predictor::Tensor OutType; - typedef baidu::paddle_serving::predictor::DataBuf DataBuf; - typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; - - typedef Task - TaskT; - typedef TaskMeta TaskMetaT; - typedef TaskT::InArrayT InArrayT; - typedef TaskT::OutArrayT OutArrayT; - - explicit BatchTasks(size_t batch_size, bool batch_align = false) - : _batch_size(batch_size), - _rem_size(batch_size), - _batch_align(batch_align) { - _batch_in.clear(); - _batch_out.clear(); - _tasks.clear(); - } - - ~BatchTasks() { - _batch_in.clear(); - _batch_out.clear(); - _tasks.clear(); - } - - static bool check_valid(const InArrayT& in, - OutArrayT& out, // NOLINT - bool align) { // NOLINT - if (align) { - if (out.count() <= 0 || out.size() <= 0) { - LOG(ERROR) << "Out tensor is empty, when aligned"; - return false; - } - - if (out.size() != in.size()) { - LOG(ERROR) << "In/Out tensor size not eq: " << out.size() - << "!=" << in.size(); - return false; - } - - for (size_t fi = 0, shape0 = 0; fi < out.count(); ++fi) { - if (!out[fi].valid()) { - LOG(ERROR) << "Out[" << fi << "] tensor not valid"; - return false; - } - - if (out.size() != out[fi].shape0()) { - LOG(ERROR) << "Shape0 not consistency, " << out.size() - << "!=" << out[fi].shape0() << ", " << fi; - return false; - } - } - } - - return true; - } - - size_t append_task(TaskT* task) { - size_t add = std::min(task->rem, _rem_size); - if (!_batch_align) { - add = task->rem; - } - TaskMetaT tm(task, task->in->size() - task->rem, add); - _tasks.push_back(tm); - - task->rem -= add; - _rem_size -= add; - return _rem_size; - } - - void merge_tasks() { - merge_input(); - merge_output(); - } - - void merge_input() { - if (_tasks.size() <= 0 || _tasks[0].task->in->count() <= 0) { - return; - } - - if (_tasks.size() == 1 && !_batch_align) { - TaskMetaT& tm = _tasks[0]; - _batch_in = *(tm.task->in); - return; - } - - merge_tensor(true); - } - - void merge_output() { - if (_batch_align) { - if (_tasks.size() <= 0 || _tasks[0].task->out->count() <= 0) { - return; - } - } - - if (_tasks.size() <= 0 || _tasks[0].task->out->count() <= 0) { - return; - } - - TaskMetaT& tm = _tasks[0]; - if (_tasks.size() == 1 && !_batch_align) { - _batch_out = *(tm.task->out); - return; - } - - if (tm.task->out->size() <= 0) { - // shape is empty - _batch_out = *(tm.task->out); - return; - } - - if ((*tm.task->out)[0].data.data() == 0 || - (*tm.task->out)[0].data.size() == 0) { - _batch_out = *(tm.task->out); - return; - } - - merge_tensor(false); - } - - void merge_tensor(bool is_in) { - // accumulate batch size from fetched tasks - size_t batch_size = 0; - for (size_t ti = 0; ti < _tasks.size(); ++ti) { - TaskMetaT& tm = _tasks[ti]; - size_t add = tm.end - tm.begin; - batch_size += add; - } - - // merge all instanses in each tensor data - size_t tensor_count = _tasks[0].task->get(is_in)->count(); - for (size_t fi = 0; fi < tensor_count; ++fi) { - const Tensor& head = (*(_tasks[0].task->get(is_in)))[fi]; - Tensor batch_tensor; - batch_tensor.name = head.name; - batch_tensor.type = head.type; - batch_tensor.shape.push_back(batch_size); - - size_t ins_ele_count = 1; - for (size_t si = 1; si < head.shape.size(); ++si) { - batch_tensor.shape.push_back(head.shape[si]); - ins_ele_count *= head.shape[si]; - } - - size_t tensor_ele_count = ins_ele_count * batch_size; - size_t ins_byte = ins_ele_count * head.ele_byte(); - - size_t tensor_byte = tensor_ele_count * head.ele_byte(); - void* data_buf = MempoolWrapper::instance().malloc(tensor_byte); - if (!data_buf) { - LOG(ERROR) << "Malloc failed, size: " << tensor_byte; - return; - } - - size_t data_byte = 0; - for (size_t ti = 0; ti < _tasks.size(); ++ti) { - TaskMetaT& tm = _tasks[ti]; - size_t acc_byte = ins_byte * (tm.end - tm.begin); - if (data_byte + acc_byte > tensor_byte) { - LOG(ERROR) << "Invalid bytes: " << data_byte << " + " << acc_byte - << " >= " << tensor_byte; - return; - } - - const Tensor& tensor = (*(tm.task->get(is_in)))[fi]; - memcpy( - reinterpret_cast(data_buf) + data_byte, - reinterpret_cast(tensor.data.data()) + tm.begin * ins_byte, - acc_byte); - data_byte += acc_byte; - } - - if (data_byte != tensor_byte) { - LOG(ERROR) << "Invalid tensor byte: " << data_byte - << " != " << tensor_byte; - return; - } - - batch_tensor.data = - DataBuf(reinterpret_cast(data_buf), tensor_byte); - if (is_in) { - _batch_in.push_back(batch_tensor); - } else { - _batch_out.push_back(batch_tensor); - } - } - - LOG(INFO) << "merge input(" << is_in << ") samples: " << batch_size - << " from " << _tasks.size() << " pvs"; - } - - void notify_tasks() { - if (_batch_out.size() != _batch_in.size()) { - LOG(ERROR) << "batch size not consistency: " << _batch_out.size() - << " != " << _batch_in.size(); - return; - } - - size_t tensor_count = _batch_out.count(); - size_t batch_size = _batch_out.size(); - for (size_t fi = 0; fi < tensor_count; ++fi) { - const Tensor& tensor = _batch_out[fi]; - size_t ins_byte = tensor.ele_byte(); - for (size_t si = 1; si < tensor.shape.size(); ++si) { - ins_byte *= tensor.shape[si]; - } - - for (size_t ti = 0, bi = 0, add = 0; ti < _tasks.size(); - ++ti, bi += add) { - OutArrayT* dst = _tasks[ti].task->out; - add = _tasks[ti].end - _tasks[ti].begin; - size_t offset_src = ins_byte * bi; - size_t add_byte = add * ins_byte; - - if (_batch_align) { // merge all batchs - size_t offset_dst = ins_byte * _tasks[ti].begin; - void* ptr = const_cast((*dst)[fi].data.data()); - memcpy( - reinterpret_cast(ptr) + offset_dst, - reinterpret_cast(_batch_out[fi].data.data()) + offset_src, - add_byte); - } else { // overwrite - if (dst->count() <= 0) { - dst->push_back(_batch_out[fi]); - } else { - (*dst)[fi] = _batch_out[fi]; - } - - (*dst)[fi].shape[0] = add; - (*dst)[fi].data = DataBuf( - reinterpret_cast(_batch_out[fi].data.data()) + offset_src, - add_byte); - } - } - } - - for (size_t ti = 0; ti < _tasks.size(); ++ti) { - TaskT* task = _tasks[ti].task; - size_t begin = _tasks[ti].begin; - size_t end = _tasks[ti].end; - size_t add = end - begin; - - size_t index = task->index.fetch_add(add); - if ((index + add) >= task->in->size()) { - char c = 0; - while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) { - } - butil::return_object(task); - } - } - } - - const typename TaskT::InArrayT& in() const { return _batch_in; } - - typename TaskT::OutArrayT& out() { return _batch_out; } - - size_t task_size() { return _tasks.size(); } - - private: - std::vector _tasks; - InArrayT _batch_in; - OutArrayT _batch_out; - size_t _batch_size; - size_t _rem_size; - bool _batch_align; -}; - -} // namespace bsf -} // namespace im diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h index 1193ce4860e595598b738adab738c7af9664cc26..22128d53e20c926ba982c13c9a8e8dcc34216907 100644 --- a/core/predictor/framework/bsf-inl.h +++ b/core/predictor/framework/bsf-inl.h @@ -24,6 +24,7 @@ #include #include "core/predictor/common/inner_common.h" +#include "core/predictor/framework/memory.h" namespace im { namespace bsf { @@ -125,18 +126,21 @@ void TaskExecutor::stop() { } template -TaskHandler TaskExecutor::schedule(const InArrayT& in, - OutArrayT& out) { // NOLINT +TaskHandler TaskExecutor::schedule( + const void* inVectorT_ptr, + void* outVectorT_ptr) { // NOLINT TaskT* task = butil::get_object(); if (!task) { LOG(ERROR) << "Failed get TaskT from object pool"; return TaskHandler::valid_handle(); } + /* if (!BatchTasks::check_valid(in, out, _batch_align)) { LOG(ERROR) << "Invalid input & output"; return TaskHandler::valid_handle(); } + */ int fds[2]; int rc = pipe(fds); @@ -150,10 +154,9 @@ TaskHandler TaskExecutor::schedule(const InArrayT& in, task->write_fd = fds[1]; task->owner_tid = ::syscall(SYS_gettid); - task->in = ∈ - task->out = &out; - task->rem = in.size(); - task->size = in.size(); + task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; + task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; + task->rem = task->batch_size(); task->index.store(0, butil::memory_order_relaxed); AutoMutex lock(_mut); @@ -163,8 +166,13 @@ TaskHandler TaskExecutor::schedule(const InArrayT& in, return TaskHandler(*task); } +// this function is accessed by multi thread. +// so AutoMutex at first. +// so batch.append_task is thread safe. +// you dont need to add extra lock in append_task() template -bool TaskExecutor::fetch_batch(BatchTasks& batch) { // NOLINT +bool TaskExecutor::move_task_to_batch( + BatchTasks& batch) { // NOLINT AutoMutex lock(_mut); while (_task_queue.empty()) { THREAD_COND_WAIT(&_cond, &_mut); @@ -187,8 +195,30 @@ bool TaskExecutor::fetch_batch(BatchTasks& batch) { // NOLINT return true; } +// this function is accessed by multi thread. +// move_task_to_batch have add lock inside the function. +// Packaging 1 TaskT as 1 or Several TaskMeta. +// TaskT is from the SingleTon TaskExecutor`s _task_queue +// although TaskMeta is a local variable, but several TaskMeta may points to +// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue. +// put TaskMeta to the local variable BatchTasks batch. + +// batch.merge_tasks() and batch.notify_tasks() has no lock. +// BatchTasks batch itself is a local variable, it`s thread safe. +// If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta +// you need to pay attention to that. +// Multi-Thread deal with different TaskMeta(cause it`s created as local +// variable) +// But different TaskMeta may points to the same TaskT +// which is get from the SingleTon TaskExecutor`s _task_queue. + template int TaskExecutor::work(ThreadContext* context) { + if (MempoolWrapper::instance().thread_initialize() != 0) { + LOG(ERROR) << "Failed thread initialize mempool"; + return -1; + } + if (_thread_init_fn != NULL) { if (_thread_init_fn(context->user_thread_context) != 0) { LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit"; @@ -207,10 +237,15 @@ int TaskExecutor::work(ThreadContext* context) { } } + if (MempoolWrapper::instance().thread_clear() != 0) { + LOG(ERROR) << "Failed thread clear mempool"; + return -1; + } + BatchTasks batch(_batch_size, _batch_align); - if (fetch_batch(batch)) { + if (move_task_to_batch(batch)) { batch.merge_tasks(); - _fn(batch.in(), batch.out()); + _fn(&batch.in(), &batch.out()); batch.notify_tasks(); } } @@ -219,8 +254,8 @@ int TaskExecutor::work(ThreadContext* context) { } template -bool TaskManager::schedule(const InArrayT& in, - OutArrayT& out) { // NOLINT +bool TaskManager::schedule(const void* in, + void* out) { // NOLINT TaskHandler handler = _executor.schedule(in, out); if (handler.valid()) { diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h old mode 100644 new mode 100755 index 36a00c381130c191de713e5024c7247d64cb96e7..ee5ad61533fd7ca382aa466cb3f95b2a4ed6e228 --- a/core/predictor/framework/bsf.h +++ b/core/predictor/framework/bsf.h @@ -29,46 +29,184 @@ #include "boost/function.hpp" +#include "core/predictor/framework/memory.h" +#include "paddle_inference_api.h" + namespace im { namespace bsf { static const size_t DEFAULT_BATCH_SIZE = 100; +// InItemT is paddle::PaddleTensor +// InVectorT std::vector +// InVectorT means different feedvar, but not batch. +// Batch is already inside the paddle::PaddleTensor. + +// size_t `rem` records how many batch have not been put in BatchTasks. +// `rem` don`t need to be atomic, cause the operation `put` is synchronous. +// actually, the reason is that lock have been added outside the operation +// `put`. + +// size_t `index` records how many batch have been processing completed. +// `index` need to be atomic, cause the operation 'notify' is asynchronous. template struct Task { - typedef std::vector InArrayT; - typedef std::vector OutArrayT; + typedef std::vector InVectorT; + typedef std::vector OutVectorT; typedef InItemT InType; typedef OutItemT OutType; typedef Task TaskT; + typedef std::vector ShapeVector; + typedef std::vector VectorOfShapeVector; int read_fd; int write_fd; - pid_t owner_tid; - - const InArrayT* in; - OutArrayT* out; - + const InVectorT* inVectorT_ptr; + OutVectorT* outVectorT_ptr; size_t rem; - size_t size; - - size_t batch_size() { return in->size(); } - butil::atomic index; Task() { read_fd = -1; write_fd = -1; owner_tid = -1; - in = NULL; - out = NULL; + inVectorT_ptr = NULL; + outVectorT_ptr = NULL; rem = -1; - size = -1; index.store(0, butil::memory_order_relaxed); } + + bool check_feedvar_valid(int feedvar_index) { + if (feedvar_index < 0 || inVectorT_ptr->size() <= feedvar_index) { + LOG(ERROR) << "feedvar doesnt exsit or feedvar_index error"; + return 0; + } + + if ((*inVectorT_ptr)[feedvar_index].shape.size() <= 0) { + LOG(ERROR) << "feedvar[" << feedvar_index << "].shape.size()<=0,error"; + return 0; + } + + return 1; + } + + // Now, it simply assume that the first dimension of data is batch. + // so the batch is PaddleTensor.shape[0] + + // If batch information is added into feedvar.prototxt. + // we can get the information from the feedvar.prototxt instead of assume. + size_t feedvar_batch_size(int feedvar_index) { + if (!check_feedvar_valid(feedvar_index)) { + return 0; + } + + return (*inVectorT_ptr)[feedvar_index].shape[0]; + } + + size_t feedvar_element_bytesize(int feedvar_index) { + if (!check_feedvar_valid(feedvar_index)) { + return 0; + } + int dtype = (*inVectorT_ptr)[feedvar_index].dtype; + if (dtype == paddle::PaddleDType::INT64) { + return sizeof(int64_t); + } + if (dtype == paddle::PaddleDType::FLOAT32) { + return sizeof(float); + } + if (dtype == paddle::PaddleDType::INT32) { + return sizeof(int32_t); + } + if (dtype == paddle::PaddleDType::UINT8) { + return sizeof(char); + } + return 0; + } + + // Now, the implementation of this function is based on assumption + // that shape [0] = batch_size. + size_t feedvar_element_num(int feedvar_index) { + if (!check_feedvar_valid(feedvar_index)) { + return 0; + } + int element_num = 1; + if ((*inVectorT_ptr)[feedvar_index].shape.size() == 1) { + // cause shape[0] is batch_size. + return 1; + } + // start from shape[1], cause shape[0] = batch_size. + for (int i = 1; i < (*inVectorT_ptr)[feedvar_index].shape.size(); ++i) { + element_num *= (*inVectorT_ptr)[feedvar_index].shape[i]; + } + return element_num; + } + + size_t feedvar_bytesize(int feedvar_index) { + return feedvar_element_num(feedvar_index) * + feedvar_element_bytesize(feedvar_index); + } + + ShapeVector feedvar_shape_nobatch(int feedvar_index) { + if (!check_feedvar_valid(feedvar_index)) { + return ShapeVector(); + } + return ShapeVector{(*inVectorT_ptr)[feedvar_index].shape.begin() + 1, + (*inVectorT_ptr)[feedvar_index].shape.end()}; + } + + VectorOfShapeVector feedvar_shape_nobatch() { + VectorOfShapeVector vector_of_feedvar_shape_nobatch(inVectorT_ptr->size()); + for (int index = 0; index < inVectorT_ptr->size(); ++index) { + vector_of_feedvar_shape_nobatch.push_back(feedvar_shape_nobatch(index)); + } + return vector_of_feedvar_shape_nobatch; + } + + // At present, it is considered that the batch of all feedvar is consistent. + // so for each feedvar, PaddleTensor.shape[0] should be the same. + bool check_batch_align() { + int batch_size_align = feedvar_batch_size(0); + for (int feedvar_index = 0; feedvar_index < inVectorT_ptr->size(); + ++feedvar_index) { + if (feedvar_batch_size(feedvar_index) != batch_size_align) { + return 0; + } + } + /* + for(int fetchvar_index = 0; fetchvar_index < outVectorT_ptr->size(); + ++fetchvar_index) { + if(fetchvar_batch_size(fetchvar_index) != batch_size_align) { + return 0; + } + } + */ + return 1; + } + + size_t batch_size() { + if (check_batch_align()) { + return feedvar_batch_size(0); + } + return 0; + } }; +// `Several Task` or `part of batch in Task` can be a TaskMeta. +// Task is the original Request from User. +// For example, the batch of Task is 30. There are 4 Requests. +// The batch of BatchTasks is 100, which means we can deal 100 batch 1 time. +// TaskMeta-1:{task-1,0,30} TaskMeta-2:{task-2,0,30} TaskMeta-3:{task-3,0,30} +// but the last Task will be divided to 2 TaskMeta. +// TaskMeta-4:{task-4,0,10} TaskMeta-5:{task-4,10,30}. +// TaskMeta-1 ~ TaskMeta-4 will be inside BatchTasks-1. +// TaskMeta-5 will be inside BatchTasks-2. + +// TaskMeta is necessary. +// cause we need know the the corresponding relationship between +// `batch_out`(which is in BatchTasks) and `outVectorT_ptr`(which is in Task). +// especially when 1 Task be divided into several TaskMeta and be put into +// several different BatchTasks. template struct TaskMeta { TaskMeta(TaskT* ptr, size_t start, size_t add) @@ -79,6 +217,10 @@ struct TaskMeta { size_t end; }; +// each TaskT is already include batch in itself +// BatchTasks need to combine several `small TaskMeta` into a new `big TaskT`. +// The only difference between the `big TaskT` and `small TaskT` is that +// the TaskT.inVectorT_ptr->[feedvar_index].shape[0] is different. template class BatchTasks { public: @@ -91,33 +233,38 @@ class BatchTasks { _rem_size(batch_size), _batch_align(batch_align) { _batch_in.clear(); + _batch_in_offset.clear(); _batch_out.clear(); - _tasks.clear(); + _batch_out_offset.clear(); + _taskmeta_vector.clear(); } ~BatchTasks() { _batch_in.clear(); + _batch_in_offset.clear(); _batch_out.clear(); - _tasks.clear(); + _batch_out_offset.clear(); + _taskmeta_vector.clear(); } // synchronized operation + // because Upper level callers of this function have already locked. size_t append_task(TaskT* task) { size_t add = std::min(task->rem, _rem_size); if (!_batch_align) { add = task->rem; } - - TaskMetaT tm(task, task->in->size() - task->rem, add); - _tasks.push_back(tm); + int start_index = task->batch_size() - task->rem; + TaskMetaT tm(task, start_index, add); + _taskmeta_vector.push_back(tm); task->rem -= add; _rem_size -= add; return _rem_size; } - static bool check_valid(const typename TaskT::InArrayT& in, - const typename TaskT::OutArrayT& out, + static bool check_valid(const typename TaskT::InVectorT& in, + const typename TaskT::OutVectorT& out, bool align) { (void)in; (void)out; @@ -125,40 +272,221 @@ class BatchTasks { return true; } + // this should be modified totally. + // maybe we don`t need to do this inside the BatchTasks. + // we can do the copy work outside the BatchTasks. + // cause maybe next time we don`t need to do the extra copy. + // directly copy the every Task into the Predictor. + + // lod is not supported. + // if lod is set, we should not allow to use the bsf task. + + // batch.merge_tasks() is thread-safe function + // cause batch is a local variable and Task is just read, not written. void merge_tasks() { - for (size_t ti = 0; ti < _tasks.size(); ++ti) { - TaskMetaT& tm = _tasks[ti]; - for (size_t vi = tm.begin; vi < tm.end; ++vi) { - _batch_in.push_back((*tm.task->in)[vi]); - _batch_out.push_back((*tm.task->out)[vi]); + if (_taskmeta_vector.size() <= 0) { + return; + } + + // Temporarily, the batch of each feedvar is consistent + // If not consistent, use feedvar_batch_size instead of task->batch_size(). + int temp_batch = 0; + for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { + TaskMetaT& tm = _taskmeta_vector[ti]; + temp_batch += tm.task->batch_size(); + } + if (temp_batch > _batch_size) { + LOG(ERROR) << "_realNumber_batch_in >_batch_size, error."; + return; + } + + int feedvar_num = _taskmeta_vector[0].task->inVectorT_ptr->size(); + if (_batch_in_offset.size() == 0) { + _batch_in_offset.resize(feedvar_num, 0); + _realNumber_batch_in.resize(feedvar_num, temp_batch); + } + + for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { + TaskMetaT& tm = _taskmeta_vector[ti]; + + for (int index = 0; index < feedvar_num; ++index) { + const paddle::PaddleTensor& feedVarTensor = + (*tm.task->inVectorT_ptr)[index]; + int feedvar_bytesize = tm.task->feedvar_bytesize(index); + + if (ti == 0) { + if (feedVarTensor.lod.size() > 0 && feedVarTensor.lod[0].size() > 0) { + LOG(ERROR) << "lod Tensor is not supported now."; + return; + } + // for now, we assume that every task feedvar_bytesize is the same. + // which means we dont support auto embedding. + // but for different feedvar, it is different. + paddle::PaddleTensor paddleTensor; + paddleTensor.dtype = feedVarTensor.dtype; + paddleTensor.name = feedVarTensor.name; + paddleTensor.lod = feedVarTensor.lod; + paddleTensor.shape = feedVarTensor.shape; + paddleTensor.shape[0] = _realNumber_batch_in[index]; + paddleTensor.data.Resize(feedvar_bytesize * + _realNumber_batch_in[index]); + _batch_in.push_back(paddleTensor); + } + + void* dst_ptr = _batch_in[index].data.data() + + feedvar_bytesize * _batch_in_offset[index]; + void* source_ptr = + feedVarTensor.data.data() + feedvar_bytesize * tm.begin; + int length = feedvar_bytesize * (tm.end - tm.begin); + memcpy(dst_ptr, source_ptr, length); + _batch_in_offset[index] += length; } } } + bool check_fetchvar_valid(int fetchvar_index) { + if (fetchvar_index < 0 || _batch_out.size() <= fetchvar_index) { + LOG(ERROR) << "fetchvar doesnt exsit or fetchvar_index error"; + return 0; + } + + if (_batch_out[fetchvar_index].shape.size() <= 0) { + LOG(ERROR) << "fetchvar[" << fetchvar_index << "].shape.size()<=0,error"; + return 0; + } + + return 1; + } + + size_t fetchvar_batch_size(int fetchvar_index) { + if (!check_fetchvar_valid(fetchvar_index)) { + return 0; + } + + return _batch_out[fetchvar_index].shape[0]; + } + + size_t fetchvar_element_bytesize(int fetchvar_index) { + if (!check_fetchvar_valid(fetchvar_index)) { + return 0; + } + int dtype = _batch_out[fetchvar_index].dtype; + if (dtype == paddle::PaddleDType::INT64) { + return sizeof(int64_t); + } + if (dtype == paddle::PaddleDType::FLOAT32) { + return sizeof(float); + } + if (dtype == paddle::PaddleDType::INT32) { + return sizeof(int32_t); + } + if (dtype == paddle::PaddleDType::UINT8) { + return sizeof(char); + } + return 0; + } + + // Now, the implementation of this function is based on assumption + // that shape [0] = batch_size. + size_t fetchvar_element_num(int fetchvar_index) { + if (!check_fetchvar_valid(fetchvar_index)) { + return 0; + } + int element_num = 1; + if (_batch_out[fetchvar_index].shape.size() == 1) { + // cause shape[0] is batch_size. + return 1; + } + // start from shape[1], cause shape[0] = batch_size. + for (int i = 1; i < _batch_out[fetchvar_index].shape.size(); ++i) { + element_num *= _batch_out[fetchvar_index].shape[i]; + } + return element_num; + } + + size_t fetchvar_bytesize(int fetchvar_index) { + return fetchvar_element_num(fetchvar_index) * + fetchvar_element_bytesize(fetchvar_index); + } + + bool check_fetchvar_batch_align() { + int batch_size_align = fetchvar_batch_size(0); + + for (int fetchvar_index = 0; fetchvar_index < _batch_out.size(); + ++fetchvar_index) { + if (fetchvar_batch_size(fetchvar_index) != batch_size_align) { + return 0; + } + } + + return 1; + } + + size_t fetchvar_batch_size() { + if (check_fetchvar_batch_align()) { + return fetchvar_batch_size(0); + } + return 0; + } + void notify_tasks() { - if (_batch_out.size() != _batch_in.size()) { - LOG(ERROR) << "batch size not consistency: " << _batch_out.size() - << " != " << _batch_in.size(); + if (_taskmeta_vector.size() <= 0) { + LOG(ERROR) << "_taskmeta_vector.size() <=0, error."; + return; + } + if (_realNumber_batch_in[0] != fetchvar_batch_size()) { + LOG(ERROR) << "_batch_out`s batch != _batch_in`s batch, error."; return; } - for (size_t ti = 0, bi = 0; ti < _tasks.size(); ++ti) { - TaskT* task = _tasks[ti].task; - size_t begin = _tasks[ti].begin; - size_t end = _tasks[ti].end; + int fetchvar_num = _batch_out.size(); + if (_batch_out_offset.size() == 0) { + _batch_out_offset.resize(fetchvar_num, 0); + } + + for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { + TaskT* task = _taskmeta_vector[ti].task; + size_t begin = _taskmeta_vector[ti].begin; + size_t end = _taskmeta_vector[ti].end; size_t add = end - begin; - for (size_t oi = begin; oi < end; ++oi, ++bi) { - if (bi >= _batch_in.size()) { - LOG(ERROR) << "batch index overflow: " << bi << " > " - << _batch_in.size(); + for (int index = 0; index < fetchvar_num; ++index) { + // the task->outVectorT_ptr is null before core->run(). + // first time we should copy from _batch_out + // so we need init. + int fetchvar_bytesize_index = fetchvar_bytesize(index); + if (task->outVectorT_ptr->size() <= index) { + paddle::PaddleTensor tensor_out; + tensor_out.name = _batch_out[index].name; + tensor_out.dtype = paddle::PaddleDType(_batch_out[index].dtype); + tensor_out.shape = _batch_out[index].shape; + tensor_out.shape[0] = task->batch_size(); + tensor_out.lod = _batch_out[index].lod; + // resize all batch memory at one time + size_t databuf_size = task->batch_size() * fetchvar_bytesize_index; + tensor_out.data.Resize(databuf_size); + task->outVectorT_ptr->push_back(tensor_out); + } + + paddle::PaddleTensor& fetchVarTensor = (*task->outVectorT_ptr)[index]; + + void* dst_ptr = + fetchVarTensor.data.data() + fetchvar_bytesize_index * begin; + int length = fetchvar_bytesize_index * add; + if (_batch_out_offset[index] + length > + fetchvar_batch_size() * fetchvar_bytesize(index)) { + LOG(ERROR) << "_batch_out is less than taskmeta, error."; return; } - (*task->out)[oi] = _batch_out[bi]; + void* source_ptr = + _batch_out[index].data.data() + _batch_out_offset[index]; + + memcpy(dst_ptr, source_ptr, length); + _batch_out_offset[index] += length; } size_t index = task->index.fetch_add(add); - if ((index + add) >= task->in->size()) { + if ((index + add) >= task->batch_size()) { char c = 0; while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) { } @@ -167,16 +495,20 @@ class BatchTasks { } } - const typename TaskT::InArrayT& in() const { return _batch_in; } + const typename TaskT::InVectorT& in() const { return _batch_in; } - typename TaskT::OutArrayT& out() { return _batch_out; } + typename TaskT::OutVectorT& out() { return _batch_out; } - size_t task_size() { return _tasks.size(); } + size_t task_size() { return _taskmeta_vector.size(); } private: - std::vector _tasks; - typename TaskT::InArrayT _batch_in; - typename TaskT::OutArrayT _batch_out; + std::vector _taskmeta_vector; + typename TaskT::InVectorT _batch_in; + std::vector _batch_in_offset; + std::vector _realNumber_batch_in; + typename TaskT::OutVectorT _batch_out; + std::vector _batch_out_offset; + std::vector _realNumber_batch_out; size_t _rem_size; size_t _batch_size; bool _batch_align; @@ -236,9 +568,10 @@ class TaskExecutor { public: typedef typename TaskT::InType InType; typedef typename TaskT::OutType OutType; - typedef typename TaskT::InArrayT InArrayT; - typedef typename TaskT::OutArrayT OutArrayT; + typedef typename TaskT::InVectorT InVectorT; + typedef typename TaskT::OutVectorT OutVectorT; typedef std::vector TaskArrayT; + typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; TaskExecutor() : _stop(false), @@ -277,8 +610,7 @@ class TaskExecutor { _thread_reset_fn = reset_fn; } - void set_thread_callback_fn( - boost::function cb) { + void set_thread_callback_fn(boost::function cb) { _fn = cb; } @@ -293,9 +625,9 @@ class TaskExecutor { int work(ThreadContext* context); - TaskHandler schedule(const InArrayT&, OutArrayT&); + TaskHandler schedule(const void*, void*); - bool fetch_batch(BatchTasks& batch); // NOLINT + bool move_task_to_batch(BatchTasks& batch); // NOLINT bool _stop; @@ -315,15 +647,15 @@ class TaskExecutor { size_t _batch_size; bool _batch_align; - boost::function _fn; + boost::function _fn; }; template class TaskManager { public: typedef Task TaskT; - typedef typename TaskT::InArrayT InArrayT; - typedef typename TaskT::OutArrayT OutArrayT; + typedef typename TaskT::InVectorT InVectorT; + typedef typename TaskT::OutVectorT OutVectorT; explicit TaskManager(TaskExecutor& exe, size_t batch_size) // NOLINT : _executor(exe) {} @@ -332,7 +664,7 @@ class TaskManager { ~TaskManager() { wait(); } - bool schedule(const InArrayT& in, OutArrayT& out); // NOLINT + bool schedule(const void* in, void* out); // NOLINT void wait(); inline void clear() { wait(); } @@ -357,5 +689,5 @@ class AutoMutex { } // namespace bsf } // namespace im -#include "core/predictor/framework/bsf-inl-tensor.h" +// #include "core/predictor/framework/bsf-inl-tensor.h" #include "core/predictor/framework/bsf-inl.h" diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp old mode 100644 new mode 100755 index e11861426fe3c1c1cea39811d66bb4feffdd8b9e..9b84ea35ee8ab6dd0888c40aa557a9ebcd2ab623 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -82,9 +82,9 @@ int ReloadableInferEngine::infer(const void* in, return infer_impl(in, out, batch_size); } - im::bsf::TaskManager task_manager; - task_manager.schedule(*(reinterpret_cast(in)), - *(reinterpret_cast(out))); + im::bsf::TaskManager task_manager; + + task_manager.schedule(in, out); task_manager.wait(); return 0; } @@ -362,8 +362,8 @@ int VersionedInferEngine::infer_impl(const void* in, uint32_t batch_size) { return -1; } -int VersionedInferEngine::task_infer_impl(const BatchTensor& in, - BatchTensor& out) { // NOLINT +int VersionedInferEngine::task_infer_impl(const void* in, + void* out) { // NOLINT return -1; } diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 6113dc8eff60814af62ad145a334db666629f080..e7785f40462af7a18775319737ecc808881cba79 100755 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -21,10 +21,12 @@ #include #include #include +#include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/bsf.h" #include "core/predictor/framework/factory.h" #include "core/predictor/framework/infer_data.h" +#include "core/predictor/framework/memory.h" #include "paddle_inference_api.h" // NOLINT namespace baidu { namespace paddle_serving { @@ -86,8 +88,7 @@ class InferEngine { virtual int infer_impl(const void* in, void* out, uint32_t batch_size = -1) = 0; - virtual int task_infer_impl(const BatchTensor& in, - BatchTensor& out) = 0; // NOLINT + virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT // end: framework inner call }; @@ -104,7 +105,7 @@ class ReloadableInferEngine : public InferEngine { }; virtual int load(const configure::EngineDesc& conf) = 0; - typedef im::bsf::Task TaskT; + typedef im::bsf::Task TaskT; int proc_initialize_impl(const configure::EngineDesc& conf, bool version); @@ -179,6 +180,8 @@ struct ModelData { delete cores[1]; } + void* get() { return cores[current_idx]->get(); } + EngineCore* cores[2]; uint32_t current_idx; }; @@ -191,14 +194,20 @@ class DBReloadableInferEngine : public ReloadableInferEngine { int proc_initialize(const configure::EngineDesc& conf, bool version) { THREAD_KEY_CREATE(&_skey, NULL); THREAD_MUTEX_INIT(&_mutex, NULL); + gpu_index = 0; return ReloadableInferEngine::proc_initialize(conf, version); } + // 进程初始化会调用load,但由于未执行线程初始化,所以_reload_vec为空,不再继续执行。 + // 热加载的话会调用load,由于线程已经初始化,_reload_vec不为空,所以继续执行load_data操作加载数据。 + // 线程初始化会执行load_data操作加载数据,然后将engine加入_reload_vec中。 + // 每个模型只有一个CloneDBReloadableInferEngine对象。 + // 但一个CloneDBReloadableInferEngine对象,可以包含N个EngineCore。 virtual int load(const configure::EngineDesc& conf) { if (_reload_vec.empty()) { return 0; } - + gpu_index = 0; for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) { if (load_data(_reload_vec[ti], conf) != 0) { LOG(ERROR) << "Failed reload engine model: " << ti; @@ -210,7 +219,8 @@ class DBReloadableInferEngine : public ReloadableInferEngine { return 0; } - int load_data(ModelData* md, const configure::EngineDesc& conf) { + virtual int load_data(ModelData* md, + const configure::EngineDesc& conf) { uint32_t next_idx = (md->current_idx + 1) % 2; if (md->cores[next_idx]) { delete md->cores[next_idx]; @@ -219,28 +229,29 @@ class DBReloadableInferEngine : public ReloadableInferEngine { md->cores[next_idx] = new (std::nothrow) EngineCore; // params.dump(); - if (!md->cores[next_idx] || md->cores[next_idx]->create(conf) != 0) { + size_t gpu_ids_num = conf.gpu_ids_size(); + im::bsf::AutoMutex lock(_mutex); + int gpu_id = -1; + if (gpu_ids_num > 0) { + gpu_id = conf.gpu_ids(gpu_index % gpu_ids_num); + } + if (!md->cores[next_idx] || + md->cores[next_idx]->create(conf, gpu_id) != 0) { LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); return -1; } + gpu_index++; md->current_idx = next_idx; return 0; } virtual int thrd_initialize_impl() { - // memory pool to be inited in non-serving-threads - if (MempoolWrapper::instance().thread_initialize() != 0) { - LOG(ERROR) << "Failed thread initialize mempool"; - return -1; - } - ModelData* md = new (std::nothrow) ModelData; if (!md || load_data(md, _conf) != 0) { LOG(ERROR) << "Failed create thread data from " << _conf.model_dir(); return -1; } - LOG(ERROR) << "THREAD_SETSPECIFIC _skey = md"; THREAD_SETSPECIFIC(_skey, md); im::bsf::AutoMutex lock(_mutex); _reload_vec.push_back(md); @@ -248,11 +259,23 @@ class DBReloadableInferEngine : public ReloadableInferEngine { } int thrd_clear_impl() { - // for non-serving-threads - if (MempoolWrapper::instance().thread_clear() != 0) { - LOG(ERROR) << "Failed thread clear mempool"; - return -1; - } + // for bsf-Task-threads + // actually, there are 2 kinds of multi-thread. + // 1. brpc thread 2. bsf Task thread + // each request is in 1-single brpc thread. + // IF (bsf Task thread is not used) + // every single brpc thread thread corresponds to all the EngineCores. + // each request runs all models in 1-single thread brpc thread. + + // IF (bsf Task thread is used) + // there will be a ThreadPool called bsf TaskExecutor. + // in TaskExecutor, 1 bsf thread corresponds to 1 EngineCore. + // brpc thread only put the data into the task_queue(which is in + // TaskExecutor) + // EngineCore->infer() is running in bsf Task thread. + + // MempoolWrapper::instance() is actually a Thread-Local Mempool. + // so it belongs to a single Thread. return 0; } @@ -278,6 +301,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine { THREAD_KEY_T _skey; THREAD_MUTEX_T _mutex; std::vector*> _reload_vec; + int gpu_index = 0; }; // 多个EngineCore共用同一份模型数据 @@ -287,88 +311,69 @@ class CloneDBReloadableInferEngine public: virtual ~CloneDBReloadableInferEngine() {} - virtual int proc_initialize(const configure::EngineDesc& conf, bool version) { - _pd = new (std::nothrow) ModelData; - if (!_pd) { - LOG(ERROR) << "Failed to allocate for ProcData"; - return -1; - } - return DBReloadableInferEngine::proc_initialize(conf, version); - } + // 进程初始化会调用load,但由于未执行线程初始化,所以_reload_vec为空,不再继续执行。 + // 热加载的话会调用load,由于线程已经初始化,_reload_vec不为空,所以继续执行load_data操作加载数据。 + // 线程初始化会执行load_data操作加载数据,然后将engine加入_reload_vec中。 + // 每个模型只有一个CloneDBReloadableInferEngine对象。 + // 但一个CloneDBReloadableInferEngine对象,可以包含N个EngineCore。 - virtual int load(const configure::EngineDesc& conf) { - // 加载进程级模型数据 - if (!_pd || - DBReloadableInferEngine::load_data(_pd, conf) != 0) { - LOG(ERROR) << "Failed to create common model from [" << conf.model_dir() - << "]."; - return -1; + virtual int load_data(ModelData* md, + const configure::EngineDesc& conf) { + uint32_t next_idx = (md->current_idx + 1) % 2; + if (md->cores[next_idx]) { + delete md->cores[next_idx]; } - LOG(WARNING) << "Succ load common model[" << _pd->cores[_pd->current_idx] - << "], path[" << conf.model_dir() << "]."; + md->cores[next_idx] = new (std::nothrow) EngineCore; - if (DBReloadableInferEngine::_reload_vec.empty()) { - return 0; + // params.dump(); + size_t gpu_ids_num = conf.gpu_ids_size(); + im::bsf::AutoMutex lock(DBReloadableInferEngine::_mutex); + int gpu_id = -1; + if (gpu_ids_num > 0) { + gpu_id = conf.gpu_ids(DBReloadableInferEngine::gpu_index % + gpu_ids_num); } - - for (uint32_t ti = 0; - ti < DBReloadableInferEngine::_reload_vec.size(); - ++ti) { - if (load_data(DBReloadableInferEngine::_reload_vec[ti], - _pd->cores[_pd->current_idx]) != 0) { - LOG(ERROR) << "Failed reload engine model: " << ti; + // gpu_index will be set to be 0, when load() or proc_initial() is called. + // gpu_index < gpu_ids_num, means there are still not create on some GPU + // card. + // so we need to create the predictor. + // gpu_index >= gpu_ids_num, means each GPU card has already create one. + // so we need to clone the predictor. + if (DBReloadableInferEngine::gpu_index < gpu_ids_num) { + if (!md->cores[next_idx] || + md->cores[next_idx]->create(conf, gpu_id) != 0) { + LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); return -1; } + DBReloadableInferEngine::gpu_index++; + md->current_idx = next_idx; + if (_cloneTemplate.size() < + DBReloadableInferEngine::gpu_index) { + _cloneTemplate.push_back(md); + } else { + _cloneTemplate[DBReloadableInferEngine::gpu_index - 1] = md; + } + } else { + // when gpu_id = -1, means we use cpu, but the index should be 0. + if (gpu_id == -1) gpu_id = 0; + if (!md->cores[next_idx] || + md->cores[next_idx]->clone(_cloneTemplate[gpu_id]->get()) != 0) { + LOG(ERROR) << "Failed clone model from core"; + return -1; + } + DBReloadableInferEngine::gpu_index++; + md->current_idx = next_idx; + LOG(WARNING) << "core clone model succ, cur_idx[" << md->current_idx + << "]."; } - LOG(WARNING) << "Succ load clone model, path[" << conf.model_dir() << "]"; - return 0; - } - - // 加载线程级对象,多个线程级对象共用pd_core的模型数据 - int load_data(ModelData* td, EngineCore* pd_core) { - uint32_t next_idx = (td->current_idx + 1) % 2; - if (td->cores[next_idx]) { - delete td->cores[next_idx]; - } - - td->cores[next_idx] = new (std::nothrow) EngineCore; - if (!td->cores[next_idx] || - td->cores[next_idx]->clone(pd_core->get()) != 0) { - LOG(ERROR) << "Failed clone model from pd_core[ " << pd_core << "], idx[" - << next_idx << "]"; - return -1; - } - td->current_idx = next_idx; - LOG(WARNING) << "td_core[" << td->cores[td->current_idx] - << "] clone model from pd_core[" << pd_core - << "] succ, cur_idx[" << td->current_idx << "]."; - return 0; - } - - virtual int thrd_initialize_impl() { - // memory pool to be inited in non-serving-threads - if (MempoolWrapper::instance().thread_initialize() != 0) { - LOG(ERROR) << "Failed thread initialize mempool"; - return -1; - } - - ModelData* md = new (std::nothrow) ModelData; - if (!md || load_data(md, _pd->cores[_pd->current_idx]) != 0) { - LOG(ERROR) << "Failed clone thread data, origin_core[" - << _pd->cores[_pd->current_idx] << "]."; - return -1; - } - - THREAD_SETSPECIFIC(DBReloadableInferEngine::_skey, md); - im::bsf::AutoMutex lock(DBReloadableInferEngine::_mutex); - DBReloadableInferEngine::_reload_vec.push_back(md); return 0; } protected: - ModelData* - _pd; // 进程级EngineCore,多个线程级EngineCore共用该对象的模型数据 + // 模板EngineCore,如果已创建,则多个线程级EngineCore共用该对象的模型数据 + std::vector*> + _cloneTemplate; }; template @@ -505,8 +510,8 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { return 0; } - int task_infer_impl(const BatchTensor& in, BatchTensor& out) { // NOLINT - return infer_impl(&in, &out); + int task_infer_impl(const void* in, void* out) { // NOLINT + return infer_impl(in, out); } }; @@ -559,7 +564,7 @@ class VersionedInferEngine : public InferEngine { int infer_impl(const void* in, void* out, uint32_t batch_size = -1); - int task_infer_impl(const BatchTensor& in, BatchTensor& out); + int task_infer_impl(const void* in, void* out); private: boost::unordered_map _versions; diff --git a/core/predictor/framework/server.cpp b/core/predictor/framework/server.cpp old mode 100644 new mode 100755 index 25b4079509b5346277609648a44f8d361187708d..8ced6f1e9936059ada169633e21690d13bc48ae3 --- a/core/predictor/framework/server.cpp +++ b/core/predictor/framework/server.cpp @@ -91,6 +91,7 @@ int ServerManager::start_and_wait() { } } + // rpc multi-thread start from here. if (_server.Start(FLAGS_port, &_options) != 0) { LOG(ERROR) << "Failed to start Paddle Inference Server"; return -1; diff --git a/core/predictor/mempool/mempool.cpp b/core/predictor/mempool/mempool.cpp old mode 100644 new mode 100755 diff --git a/core/predictor/mempool/mempool.h b/core/predictor/mempool/mempool.h old mode 100644 new mode 100755 index a10e8f97a59a148ace77c575cd2c70d2dac79603..bc18d89f3fa28bd2da27118aac70ae36237d72e8 --- a/core/predictor/mempool/mempool.h +++ b/core/predictor/mempool/mempool.h @@ -129,7 +129,7 @@ class FreeList { to get the class Pointer for example T is the member of class Node, T data, 'data' is the name. - T* value is the member(pointer type) class Node + T* value is the member(pointer type) of class Node so we can get the Node* by calling container_of(value, Node, data) */ Node* node = container_of(value, Node, data); @@ -261,7 +261,11 @@ struct BlockReference { // because BlockFreeList is a threal-safe Singleton. // so we don`t release Block, it is global memory. -// total number is 32*1024 +// total number is 256*1024. +// the MAX_BLOCK_COUNT of Region(one thread one Region) is 1024. +// so BlockFreeList allow 256 Region(means 256 thread). +// the memory used by BlockFreeListType is sizeof(void*)*256*1024. +// Block(2MB) memory is created only when get() is called. class BlockFreeList { public: static const int MAX_BLOCK_COUNT = 256 * 1024; @@ -374,7 +378,8 @@ class Mempool { void* malloc(size_t size) { size = _align(size); // It does not enter the if statement the first time. - // Because the block has not been used up, it will enter. + // The if statement may enter after the block is created. + // If the block has not been used up, it will enter. if (size <= _free_size) { void* p = _free_cursor; _free_size -= size; @@ -392,7 +397,7 @@ class Mempool { return; } - // memory in Block,update the pointer. + // memory in _block,update the pointer. if (_free_cursor - size == static_cast(p)) { // for example, you need to release -(8+1)bytes // you can only release -8bytes,cause -(8+2)byte is used by other. @@ -424,9 +429,8 @@ class Mempool { } // 可能返回的是单独Region中malloc的内存。 - // 也可能是Block,例如new_size=1M, old_data原本的指针头就在1.2M处,old_size - // = - // 0.5M + // 也可能是Block,例如new_size=1M, old_data原本的指针头就在1.2M处 + // old_size = 0.5M // 此时,_free_size = 0.3M,new_size<2M,但是required = 1-0.5 >0.3 // 分配出来的就是Block,但是该Block没有并很完美的利用完全。 void* p = this->malloc_from_region(new_size); diff --git a/core/predictor/src/pdserving.cpp b/core/predictor/src/pdserving.cpp old mode 100755 new mode 100644 index e88d9b3b2aaa03ccbb7f903485bdffecfa6f7222..f637e8a9cdbeeb75b7ea079e72124f6f9873ffd9 --- a/core/predictor/src/pdserving.cpp +++ b/core/predictor/src/pdserving.cpp @@ -69,12 +69,15 @@ DEFINE_bool(V, false, "print version, bool"); DEFINE_bool(g, false, "user defined gflag path"); DECLARE_string(flagfile); +// +/* namespace bthread { extern pthread_mutex_t g_task_control_mutex; } pthread_mutex_t g_worker_start_fn_mutex = PTHREAD_MUTEX_INITIALIZER; - +*/ void pthread_worker_start_fn() { + /* while (pthread_mutex_lock(&g_worker_start_fn_mutex) != 0) { } @@ -83,15 +86,18 @@ void pthread_worker_start_fn() { if (lock_status == EBUSY || lock_status == EAGAIN) { pthread_mutex_unlock(&bthread::g_task_control_mutex); } + */ Resource::instance().thread_initialize(); // Try to avoid deadlock in bthread + /* if (lock_status == EBUSY || lock_status == EAGAIN) { while (pthread_mutex_lock(&bthread::g_task_control_mutex) != 0) { } } pthread_mutex_unlock(&g_worker_start_fn_mutex); + */ } static void g_change_server_port() { @@ -126,7 +132,7 @@ int main(int argc, char** argv) { return 0; } - //google::ParseCommandLineFlags(&argc, &argv, true); + // google::ParseCommandLineFlags(&argc, &argv, true); g_change_server_port(); @@ -202,7 +208,7 @@ int main(int argc, char** argv) { } VLOG(2) << "Succ call pthread worker start function"; - //this is not used by any code segment,which can be cancelled. + // this is not used by any code segment,which can be cancelled. if (Resource::instance().general_model_initialize(FLAGS_resource_path, FLAGS_resource_file) != 0) { LOG(ERROR) << "Failed to initialize general model conf: " diff --git a/paddle_inference/paddle/include/paddle_engine.h b/paddle_inference/paddle/include/paddle_engine.h index 262a0378bef5caacbfdf5a3d2b46ed6ce598cb10..d2027ed2823ace230a14e85eb6ee37fe82e5a21f 100755 --- a/paddle_inference/paddle/include/paddle_engine.h +++ b/paddle_inference/paddle/include/paddle_engine.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include "core/configure/include/configure_parser.h" #include "core/configure/inferencer_configure.pb.h" @@ -96,7 +97,7 @@ class EngineCore { return true; } - virtual int create(const configure::EngineDesc& conf) = 0; + virtual int create(const configure::EngineDesc& conf, int gpu_id) = 0; virtual int clone(void* predictor) { if (predictor == NULL) { @@ -121,7 +122,7 @@ class EngineCore { // Paddle Inference Engine class PaddleInferenceEngine : public EngineCore { public: - int create(const configure::EngineDesc& engine_conf) { + int create(const configure::EngineDesc& engine_conf, int gpu_id) { std::string model_path = engine_conf.model_dir(); if (access(model_path.c_str(), F_OK) == -1) { LOG(ERROR) << "create paddle predictor failed, path not exits: " @@ -162,7 +163,11 @@ class PaddleInferenceEngine : public EngineCore { config.SetCpuMathLibraryNumThreads(1); if (engine_conf.has_use_gpu() && engine_conf.use_gpu()) { // 2000MB GPU memory - config.EnableUseGpu(2000, FLAGS_gpuid); + config.EnableUseGpu(50, gpu_id); + if (engine_conf.has_gpu_multi_stream() && + engine_conf.gpu_multi_stream()) { + config.EnableGpuMultiStream(); + } } precision_type = GetPrecision(FLAGS_precision); @@ -174,8 +179,13 @@ class PaddleInferenceEngine : public EngineCore { } if (engine_conf.has_use_trt() && engine_conf.use_trt()) { + config.SwitchIrOptim(true); if (!engine_conf.has_use_gpu() || !engine_conf.use_gpu()) { - config.EnableUseGpu(2000, FLAGS_gpuid); + config.EnableUseGpu(50, gpu_id); + if (engine_conf.has_gpu_multi_stream() && + engine_conf.gpu_multi_stream()) { + config.EnableGpuMultiStream(); + } } config.EnableTensorRtEngine(1 << 20, max_batch, @@ -203,7 +213,7 @@ class PaddleInferenceEngine : public EngineCore { if (precision_type == PrecisionType::kInt8) { config.EnableMkldnnQuantizer(); auto quantizer_config = config.mkldnn_quantizer_config(); - // TODO: warmup data + // TODO(somebody): warmup data // quantizer_config -> SetWarmupData(); // quantizer_config -> SetWarmupBatchSize(); // quantizer_config -> SetEnabledOpTypes(4); diff --git a/python/paddle_serving_client/client.py b/python/paddle_serving_client/client.py index 0ccdbb1435fbca1a9e576d1a5ad01517d4ed352d..0c5aa23c183990bc248053c43a430cba2d2008a2 100755 --- a/python/paddle_serving_client/client.py +++ b/python/paddle_serving_client/client.py @@ -307,6 +307,8 @@ class Client(object): if isinstance(feed, dict): feed_batch.append(feed) elif isinstance(feed, list): + if len(feed) != 1: + raise ValueError("Feed only list = [dict]") feed_batch = feed else: raise ValueError("Feed only accepts dict and list of dict") @@ -326,7 +328,7 @@ class Client(object): fetch_names = [] counter = 0 - batch_size = len(feed_batch) + batch_size = len(feed_batch) # batch_size must be 1. for key in fetch_list: if key in self.fetch_names_: diff --git a/python/paddle_serving_server/serve.py b/python/paddle_serving_server/serve.py index 101b7b4b460fd125541ed62f28b38d7512b5bda8..b69fbca6eef1ab38fbbd330e4fee44eb4d91d82e 100755 --- a/python/paddle_serving_server/serve.py +++ b/python/paddle_serving_server/serve.py @@ -39,7 +39,16 @@ def serve_args(): "--port", type=int, default=9292, help="Port of the starting gpu") parser.add_argument( "--device", type=str, default="gpu", help="Type of device") - parser.add_argument("--gpu_ids", type=str, default="", help="gpu ids") + parser.add_argument( + "--gpu_ids", type=str, default="", nargs="+", help="gpu ids") + parser.add_argument( + "--op_num", type=int, default=0, nargs="+", help="Number of each op") + parser.add_argument( + "--op_max_batch", + type=int, + default=32, + nargs="+", + help="Max batch of each op") parser.add_argument( "--model", type=str, default="", nargs="+", help="Model for serving") parser.add_argument( @@ -99,85 +108,20 @@ def serve_args(): type=str, default=None, help="container_id for authentication") + parser.add_argument( + "--gpu_multi_stream", + default=False, + action="store_true", + help="Use gpu_multi_stream") return parser.parse_args() -def start_standard_model(serving_port): # pylint: disable=doc-string-missing - args = serve_args() - thread_num = args.thread - model = args.model - port = serving_port - workdir = args.workdir - device = args.device - mem_optim = args.mem_optim_off is False - ir_optim = args.ir_optim - max_body_size = args.max_body_size - use_mkl = args.use_mkl - use_encryption_model = args.use_encryption_model - use_multilang = args.use_multilang - - if model == "": - print("You must specify your serving model") - exit(-1) +def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-missing - for single_model_config in args.model: - if os.path.isdir(single_model_config): - pass - elif os.path.isfile(single_model_config): - raise ValueError("The input of --model should be a dir not file.") - - import paddle_serving_server as serving - op_maker = serving.OpMaker() - op_seq_maker = serving.OpSeqMaker() - - read_op = op_maker.create('general_reader') - op_seq_maker.add_op(read_op) - - for idx, single_model in enumerate(model): - infer_op_name = "general_infer" - #Temporary support for OCR model,it will be completely revised later - #If you want to use this, C++ server must compile with WITH_OPENCV option. - if len(model) == 2 and idx == 0 and model[0] == 'ocr_det_model': - infer_op_name = "general_detection" - general_infer_op = op_maker.create(infer_op_name) - op_seq_maker.add_op(general_infer_op) - - general_response_op = op_maker.create('general_response') - op_seq_maker.add_op(general_response_op) - - server = None - if use_multilang: - server = serving.MultiLangServer() - else: - server = serving.Server() - server.set_op_sequence(op_seq_maker.get_op_sequence()) - server.set_num_threads(thread_num) - server.set_memory_optimize(mem_optim) - server.set_ir_optimize(ir_optim) - server.use_mkl(use_mkl) - server.set_max_body_size(max_body_size) - server.set_port(port) - server.set_precision(args.precision) - server.set_use_calib(args.use_calib) - server.use_encryption_model(use_encryption_model) - if args.product_name != None: - server.set_product_name(args.product_name) - if args.container_id != None: - server.set_container_id(args.container_id) - - server.load_model_config(model) - server.prepare_server(workdir=workdir, port=port, device=device) - server.run_server() - - -def start_gpu_card_model(index, gpuid, port, args): # pylint: disable=doc-string-missing - workdir = args.workdir - gpuid = int(gpuid) device = "gpu" - if gpuid == -1: + if gpu_mode == False: device = "cpu" - elif gpuid >= 0: - port = port + index + thread_num = args.thread model = args.model mem_optim = args.mem_optim_off is False @@ -185,8 +129,7 @@ def start_gpu_card_model(index, gpuid, port, args): # pylint: disable=doc-strin use_mkl = args.use_mkl max_body_size = args.max_body_size use_multilang = args.use_multilang - if gpuid >= 0: - workdir = "{}_{}".format(args.workdir, gpuid) + workdir = "{}_{}".format(args.workdir, port) if model == "": print("You must specify your serving model") @@ -226,8 +169,19 @@ def start_gpu_card_model(index, gpuid, port, args): # pylint: disable=doc-strin server.set_memory_optimize(mem_optim) server.set_ir_optimize(ir_optim) server.set_max_body_size(max_body_size) - if args.use_trt: + + if args.use_trt and device == "gpu": server.set_trt() + server.set_ir_optimize(True) + + if args.gpu_multi_stream and device == "gpu": + server.set_gpu_multi_stream() + + if args.op_num: + server.set_op_num(args.op_num) + + if args.op_max_batch: + server.set_op_max_batch(args.op_max_batch) if args.use_lite: server.set_lite() @@ -247,48 +201,40 @@ def start_gpu_card_model(index, gpuid, port, args): # pylint: disable=doc-strin port=port, device=device, use_encryption_model=args.use_encryption_model) - if gpuid >= 0: - server.set_gpuid(gpuid) + if gpu_mode == True: + server.set_gpuid(args.gpu_ids) server.run_server() def start_multi_card(args, serving_port=None): # pylint: disable=doc-string-missing - gpus = "" + gpus = [] if serving_port == None: serving_port = args.port + if args.gpu_ids == "": gpus = [] else: - gpus = args.gpu_ids.split(",") + #check the gpu_id is valid or not. + gpus = args.gpu_ids + if isinstance(gpus, str): + gpus = [gpus] if "CUDA_VISIBLE_DEVICES" in os.environ: env_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",") - for ids in gpus: - if ids not in env_gpus: - print("gpu_ids is not in CUDA_VISIBLE_DEVICES.") - exit(-1) - else: - env_gpus = [] + for op_gpus_str in gpus: + op_gpu_list = op_gpus_str.split(",") + for ids in op_gpu_list: + if ids not in env_gpus: + print("gpu_ids is not in CUDA_VISIBLE_DEVICES.") + exit(-1) + if args.use_lite: print("run using paddle-lite.") - start_gpu_card_model(-1, -1, serving_port, args) + start_gpu_card_model(False, serving_port, args) elif len(gpus) <= 0: print("gpu_ids not set, going to run cpu service.") - start_gpu_card_model(-1, -1, serving_port, args) + start_gpu_card_model(False, serving_port, args) else: - gpu_processes = [] - for i, gpu_id in enumerate(gpus): - p = Process( - target=start_gpu_card_model, - args=( - i, - gpu_id, - serving_port, - args, )) - gpu_processes.append(p) - for p in gpu_processes: - p.start() - for p in gpu_processes: - p.join() + start_gpu_card_model(True, serving_port, args) class MainService(BaseHTTPRequestHandler): @@ -395,14 +341,28 @@ if __name__ == "__main__": from .web_service import WebService web_service = WebService(name=args.name) web_service.load_model_config(args.model) - gpu_ids = args.gpu_ids - if gpu_ids == "": + + if args.gpu_ids == "": + gpus = [] + else: + #check the gpu_id is valid or not. + gpus = args.gpu_ids + if isinstance(gpus, str): + gpus = [gpus] if "CUDA_VISIBLE_DEVICES" in os.environ: - gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"] - if len(gpu_ids) > 0: - web_service.set_gpus(gpu_ids) + env_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",") + for op_gpus_str in gpus: + op_gpu_list = op_gpus_str.split(",") + for ids in op_gpu_list: + if ids not in env_gpus: + print("gpu_ids is not in CUDA_VISIBLE_DEVICES.") + exit(-1) + + if len(gpus) > 0: + web_service.set_gpus(gpus) + workdir = "{}_{}".format(args.workdir, args.port) web_service.prepare_server( - workdir=args.workdir, + workdir=workdir, port=args.port, device=args.device, use_lite=args.use_lite, @@ -410,7 +370,11 @@ if __name__ == "__main__": ir_optim=args.ir_optim, thread_num=args.thread, precision=args.precision, - use_calib=args.use_calib) + use_calib=args.use_calib, + use_trt=args.use_trt, + gpu_multi_stream=args.gpu_multi_stream, + op_num=args.op_num, + op_max_batch=args.op_max_batch) web_service.run_rpc_service() app_instance = Flask(__name__) diff --git a/python/paddle_serving_server/server.py b/python/paddle_serving_server/server.py index 6d8077ad3a3a10c943201f9a945a2ef92b370df0..919d8ba01f892a5532d0bc70f00e7035c19a305a 100755 --- a/python/paddle_serving_server/server.py +++ b/python/paddle_serving_server/server.py @@ -81,8 +81,11 @@ class Server(object): self.use_local_bin = False self.mkl_flag = False self.device = "cpu" - self.gpuid = 0 + self.gpuid = [] + self.op_num = [0] + self.op_max_batch = [32] self.use_trt = False + self.gpu_multi_stream = False self.use_lite = False self.use_xpu = False self.model_config_paths = collections.OrderedDict() @@ -137,11 +140,13 @@ class Server(object): def set_ir_optimize(self, flag=False): self.ir_optimization = flag + # Multi-Server does not have this Function. def set_product_name(self, product_name=None): if product_name == None: raise ValueError("product_name can't be None.") self.product_name = product_name + # Multi-Server does not have this Function. def set_container_id(self, container_id): if container_id == None: raise ValueError("container_id can't be None.") @@ -163,12 +168,21 @@ class Server(object): def set_device(self, device="cpu"): self.device = device - def set_gpuid(self, gpuid=0): + def set_gpuid(self, gpuid): self.gpuid = gpuid + def set_op_num(self, op_num): + self.op_num = op_num + + def set_op_max_batch(self, op_max_batch): + self.op_max_batch = op_max_batch + def set_trt(self): self.use_trt = True + def set_gpu_multi_stream(self): + self.gpu_multi_stream = True + def set_lite(self): self.use_lite = True @@ -178,6 +192,27 @@ class Server(object): def _prepare_engine(self, model_config_paths, device, use_encryption_model): if self.model_toolkit_conf == None: self.model_toolkit_conf = [] + self.device = device + + if isinstance(self.gpuid, str): + self.gpuid = [self.gpuid] + if len(self.gpuid) == 0: + if self.device == "gpu" or self.use_trt: + self.gpuid.append("0") + else: + self.gpuid.append("-1") + + if isinstance(self.op_num, int): + self.op_num = [self.op_num] + if len(self.op_num) == 0: + self.op_num.append(0) + + if isinstance(self.op_max_batch, int): + self.op_max_batch = [self.op_max_batch] + if len(self.op_max_batch) == 0: + self.op_max_batch.append(32) + + index = 0 for engine_name, model_config_path in model_config_paths.items(): engine = server_sdk.EngineDesc() @@ -186,19 +221,28 @@ class Server(object): engine.reloadable_meta = model_config_path + "/fluid_time_file" os.system("touch {}".format(engine.reloadable_meta)) engine.reloadable_type = "timestamp_ne" - engine.runtime_thread_num = 0 - engine.batch_infer_size = 0 - engine.enable_batch_align = 0 + engine.runtime_thread_num = self.op_num[index % len(self.op_num)] + engine.batch_infer_size = self.op_max_batch[index % + len(self.op_max_batch)] + + engine.enable_batch_align = 1 engine.model_dir = model_config_path engine.enable_memory_optimization = self.memory_optimization engine.enable_ir_optimization = self.ir_optimization engine.use_trt = self.use_trt + engine.gpu_multi_stream = self.gpu_multi_stream engine.use_lite = self.use_lite engine.use_xpu = self.use_xpu engine.use_gpu = False - if self.device == "gpu": + if self.device == "gpu" or self.use_trt: engine.use_gpu = True + if len(self.gpuid) == 0: + raise ValueError("CPU: self.gpuid = -1, GPU: must set it ") + op_gpu_list = self.gpuid[index % len(self.gpuid)].split(",") + for ids in op_gpu_list: + engine.gpu_ids.extend([int(ids)]) + if os.path.exists('{}/__params__'.format(model_config_path)): engine.combined_model = True else: @@ -208,6 +252,7 @@ class Server(object): engine.type = "PADDLE_INFER" self.model_toolkit_conf.append(server_sdk.ModelToolkitConf()) self.model_toolkit_conf[-1].engines.extend([engine]) + index = index + 1 def _prepare_infer_service(self, port): if self.infer_service_conf == None: @@ -332,7 +377,11 @@ class Server(object): self.mkl_flag = flag def check_avx(self): - p = subprocess.Popen(['cat /proc/cpuinfo | grep avx 2>/dev/null'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + p = subprocess.Popen( + ['cat /proc/cpuinfo | grep avx 2>/dev/null'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True) out, err = p.communicate() if err == b'' and len(out) > 0: return True @@ -431,6 +480,7 @@ class Server(object): device="cpu", use_encryption_model=False, cube_conf=None): + self.device = device if workdir == None: workdir = "./tmp" os.system("mkdir -p {}".format(workdir)) @@ -533,7 +583,6 @@ class Server(object): "-workflow_path {} " \ "-workflow_file {} " \ "-bthread_concurrency {} " \ - "-gpuid {} " \ "-max_body_size {} ".format( self.bin_path, self.workdir, @@ -549,7 +598,6 @@ class Server(object): self.workdir, self.workflow_fn, self.num_threads, - self.gpuid, self.max_body_size) print("Going to Run Comand") print(command) @@ -615,9 +663,27 @@ class MultiLangServer(object): def set_ir_optimize(self, flag=False): self.bserver_.set_ir_optimize(flag) - def set_gpuid(self, gpuid=0): + def set_gpuid(self, gpuid): self.bserver_.set_gpuid(gpuid) + def set_op_num(self, op_num): + self.bserver_.set_op_num(op_num) + + def set_op_max_batch(self, op_max_batch): + self.bserver_.set_op_max_batch(op_max_batch) + + def set_trt(self): + self.bserver_.set_trt() + + def set_gpu_multi_stream(self): + self.bserver_.set_gpu_multi_stream() + + def set_lite(self): + self.bserver_.set_lite() + + def set_xpu(self): + self.bserver_.set_xpu() + def load_model_config(self, server_config_dir_paths, client_config_path=None): @@ -674,6 +740,7 @@ class MultiLangServer(object): device="cpu", use_encryption_model=False, cube_conf=None): + self.device = device if not self._port_is_available(port): raise SystemExit("Port {} is already used".format(port)) default_port = 12000 diff --git a/python/paddle_serving_server/web_service.py b/python/paddle_serving_server/web_service.py index 3db3b7bc47f497e53993b51631910828a0af8ed3..87e16dcf96247049474c844de0efa09345d564cd 100755 --- a/python/paddle_serving_server/web_service.py +++ b/python/paddle_serving_server/web_service.py @@ -105,25 +105,33 @@ class WebService(object): def set_gpus(self, gpus): print("This API will be deprecated later. Please do not use it") - self.gpus = [int(x) for x in gpus.split(",")] + self.gpus = gpus def default_rpc_service(self, - workdir="conf", + workdir, port=9292, - gpuid=0, + gpus=-1, thread_num=2, mem_optim=True, use_lite=False, use_xpu=False, ir_optim=False, precision="fp32", - use_calib=False): + use_calib=False, + use_trt=False, + gpu_multi_stream=False, + op_num=None, + op_max_batch=None): device = "gpu" - if gpuid == -1: + server = Server() + + if gpus == -1: if use_lite: device = "arm" else: device = "cpu" + else: + server.set_gpuid(gpus) op_maker = OpMaker() op_seq_maker = OpSeqMaker() @@ -142,7 +150,6 @@ class WebService(object): general_response_op = op_maker.create('general_response') op_seq_maker.add_op(general_response_op) - server = Server() server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_num_threads(thread_num) server.set_memory_optimize(mem_optim) @@ -151,6 +158,19 @@ class WebService(object): server.set_precision(precision) server.set_use_calib(use_calib) + if use_trt and device == "gpu": + server.set_trt() + server.set_ir_optimize(True) + + if gpu_multi_stream and device == "gpu": + server.set_gpu_multi_stream() + + if op_num: + server.set_op_num(op_num) + + if op_max_batch: + server.set_op_max_batch(op_max_batch) + if use_lite: server.set_lite() if use_xpu: @@ -158,8 +178,7 @@ class WebService(object): server.load_model_config(self.server_config_dir_paths ) #brpc Server support server_config_dir_paths - if gpuid >= 0: - server.set_gpuid(gpuid) + server.prepare_server(workdir=workdir, port=port, device=device) return server @@ -180,24 +199,29 @@ class WebService(object): use_xpu=self.use_xpu, ir_optim=self.ir_optim, precision=self.precision, - use_calib=self.use_calib)) + use_calib=self.use_calib, + op_num=self.op_num, + op_max_batch=self.op_max_batch)) else: - for i, gpuid in enumerate(self.gpus): - self.rpc_service_list.append( - self.default_rpc_service( - "{}_{}".format(self.workdir, i), - self.port_list[i], - gpuid, - thread_num=self.thread_num, - mem_optim=self.mem_optim, - use_lite=self.use_lite, - use_xpu=self.use_xpu, - ir_optim=self.ir_optim, - precision=self.precision, - use_calib=self.use_calib)) + self.rpc_service_list.append( + self.default_rpc_service( + self.workdir, + self.port_list[0], + self.gpus, + thread_num=self.thread_num, + mem_optim=self.mem_optim, + use_lite=self.use_lite, + use_xpu=self.use_xpu, + ir_optim=self.ir_optim, + precision=self.precision, + use_calib=self.use_calib, + use_trt=self.use_trt, + gpu_multi_stream=self.gpu_multi_stream, + op_num=self.op_num, + op_max_batch=self.op_max_batch)) def prepare_server(self, - workdir="", + workdir, port=9393, device="gpu", precision="fp32", @@ -205,9 +229,12 @@ class WebService(object): use_lite=False, use_xpu=False, ir_optim=False, - gpuid=0, thread_num=2, - mem_optim=True): + mem_optim=True, + use_trt=False, + gpu_multi_stream=False, + op_num=None, + op_max_batch=None): print("This API will be deprecated later. Please do not use it") self.workdir = workdir self.port = port @@ -219,25 +246,23 @@ class WebService(object): self.use_xpu = use_xpu self.ir_optim = ir_optim self.mem_optim = mem_optim - self.gpuid = gpuid self.port_list = [] + self.use_trt = use_trt + self.gpu_multi_stream = gpu_multi_stream + self.op_num = op_num + self.op_max_batch = op_max_batch + default_port = 12000 for i in range(1000): if port_is_available(default_port + i): self.port_list.append(default_port + i) - if len(self.port_list) > len(self.gpus): break def _launch_web_service(self): - gpu_num = len(self.gpus) self.client = Client() self.client.load_client_config(self.client_config_path) endpoints = "" - if gpu_num > 0: - for i in range(gpu_num): - endpoints += "127.0.0.1:{},".format(self.port_list[i]) - else: - endpoints = "127.0.0.1:{}".format(self.port_list[0]) + endpoints = "127.0.0.1:{}".format(self.port_list[0]) self.client.connect([endpoints]) def get_prediction(self, request): @@ -324,10 +349,10 @@ class WebService(object): # default self.gpus = [0]. if len(self.gpus) == 0: self.gpus.append(0) + + gpu_id = (self.gpus[0].split(","))[0] self.client.load_model_config( - self.server_config_dir_paths[0], - use_gpu=True, - gpu_id=self.gpus[0]) + self.server_config_dir_paths[0], use_gpu=True, gpu_id=gpu_id) else: self.client.load_model_config( self.server_config_dir_paths[0], use_gpu=False)