diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp old mode 100755 new mode 100644 index 1c40e89b86a5b28543c3a49240316401a46ea639..8e1904f8167030a9640e20036e3414dedba71d65 --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -181,7 +181,10 @@ int GeneralReaderOp::inference() { VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i << "]: " << data_len; databuf_size = data_len * elem_size; - out->at(i).data.Resize(databuf_size); + void *databuf_char = MempoolWrapper::instance().malloc(databuf_size); + paddle::PaddleBuf paddleBuf(databuf_char, databuf_size); + out->at(i).data = paddleBuf; + // out->at(i).data.Resize(databuf_size); if (out->at(i).lod.size() > 0) { VLOG(2) << "(logid=" << log_id << ") var[" << i << "] has lod_tensor and len=" << out->at(i).lod[0].back(); diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h index 22128d53e20c926ba982c13c9a8e8dcc34216907..4dc0baa3a2b07c48b88d366428db9c17569d9f4f 100644 --- a/core/predictor/framework/bsf-inl.h +++ b/core/predictor/framework/bsf-inl.h @@ -36,7 +36,7 @@ void* TaskExecutor::thread_entry(void* args) { static_cast*>(context->executor); executor->work(context); - return NULL; + return nullptr; } template @@ -256,7 +256,8 @@ int TaskExecutor::work(ThreadContext* context) { template bool TaskManager::schedule(const void* in, void* out) { // NOLINT - TaskHandler handler = _executor.schedule(in, out); + TaskHandler handler = + TaskExecutorVector::instance()[_model_index].schedule(in, out); if (handler.valid()) { _task_owned = handler; diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h old mode 100755 new mode 100644 index ee5ad61533fd7ca382aa466cb3f95b2a4ed6e228..642c54d366ffb51e4c28a6ce711e1bbf5fdb5507 --- a/core/predictor/framework/bsf.h +++ b/core/predictor/framework/bsf.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #ifdef BCLOUD @@ -220,7 +220,8 @@ struct TaskMeta { // each TaskT is already include batch in itself // BatchTasks need to combine several `small TaskMeta` into a new `big TaskT`. // The only difference between the `big TaskT` and `small TaskT` is that -// the TaskT.inVectorT_ptr->[feedvar_index].shape[0] is different. +// the TaskT.inVectorT_ptr->[feedvar_index].shape[0] +// which is actually batch_size is different. template class BatchTasks { public: @@ -540,9 +541,6 @@ struct TaskHandler { template class TaskExecutor; -template -class TaskManager; - template struct ThreadContext { TaskExecutor* executor; @@ -591,10 +589,18 @@ class TaskExecutor { THREAD_COND_DESTROY(&_cond); } - static TaskExecutor* instance() { - static TaskExecutor singleton; - return &singleton; + // cause vector.resize will use copy or move construct. + TaskExecutor(TaskExecutor&& other) noexcept { + if (this != &other) { + TaskExecutor(); + } } + /* + static TaskExecutor* instance() { + static TaskExecutor singleton; + return &singleton; + } + */ void set_batch_size(size_t batch_size) { _batch_size = batch_size; } @@ -619,30 +625,35 @@ class TaskExecutor { static void* thread_entry(void* args); - private: - TaskExecutor(TaskExecutor const& other); - TaskExecutor* operator=(TaskExecutor const& other); - int work(ThreadContext* context); TaskHandler schedule(const void*, void*); bool move_task_to_batch(BatchTasks& batch); // NOLINT + private: + TaskExecutor(TaskExecutor const& other) = delete; + + TaskExecutor& operator=(TaskExecutor const& other) = delete; + /* + TaskExecutor(TaskExecutor && other) = delete; + + TaskExecutor& operator=(TaskExecutor && other) = delete; + */ + bool _stop; // can't use boost::mutex, because some stupid macro THREAD_MUTEX_T _mut; THREAD_COND_T _cond; - std::deque _task_queue; + std::list _task_queue; boost::function _thread_init_fn; boost::function _thread_reset_fn; void** _user_thread_contexts; std::vector*> _thread_contexts; - friend class TaskManager; size_t _batch_size; bool _batch_align; @@ -650,6 +661,34 @@ class TaskExecutor { boost::function _fn; }; +template +class TaskExecutorVector { + public: + static TaskExecutorVector& instance() { + static TaskExecutorVector singleton; + return singleton; + } + + void resize(int size) { _vector_executor.resize(size); } + + TaskExecutor& operator[](int index) { + if (_vector_executor.size() <= index || index <= -1) { + LOG(ERROR) << "_vector_executor.size() <= index or <= -1"; + throw "_vector_executor.size() <= index or <= -1"; + } + return _vector_executor[index]; + } + + private: + TaskExecutorVector() = default; + TaskExecutorVector(const TaskExecutorVector& other) = delete; + TaskExecutorVector& operator=(const TaskExecutorVector& other) = + delete; + TaskExecutorVector(TaskExecutorVector&& other) = delete; + TaskExecutorVector& operator=(TaskExecutorVector&& other) = delete; + std::vector> _vector_executor; +}; + template class TaskManager { public: @@ -657,10 +696,8 @@ class TaskManager { typedef typename TaskT::InVectorT InVectorT; typedef typename TaskT::OutVectorT OutVectorT; - explicit TaskManager(TaskExecutor& exe, size_t batch_size) // NOLINT - : _executor(exe) {} - - TaskManager() : _executor(*TaskExecutor::instance()) {} + explicit TaskManager(uint32_t index) // NOLINT + : _model_index(index) {} ~TaskManager() { wait(); } @@ -670,8 +707,8 @@ class TaskManager { inline void clear() { wait(); } private: - TaskExecutor& _executor; TaskHandler _task_owned; + uint32_t _model_index; }; // class TaskManager class AutoMutex { diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp old mode 100755 new mode 100644 index 9b84ea35ee8ab6dd0888c40aa557a9ebcd2ab623..fd80ed639bc51075dcf79bec8f33d724503fe617 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -56,15 +56,23 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, } // init bsf framework - im::bsf::TaskExecutor::instance()->set_thread_init_fn( - boost::bind(&InferEngine::thrd_initialize_impl, this)); - im::bsf::TaskExecutor::instance()->set_thread_reset_fn( - boost::bind(&InferEngine::thrd_clear_impl, this)); - im::bsf::TaskExecutor::instance()->set_thread_callback_fn( - boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); - im::bsf::TaskExecutor::instance()->set_batch_size(_infer_batch_size); - im::bsf::TaskExecutor::instance()->set_batch_align(_infer_batch_align); - if (im::bsf::TaskExecutor::instance()->start(_infer_thread_num) != 0) { + im::bsf::TaskExecutorVector::instance()[_model_index] + .set_thread_init_fn( + boost::bind(&InferEngine::thrd_initialize_impl, this)); + im::bsf::TaskExecutorVector::instance()[_model_index] + .set_thread_init_fn( + boost::bind(&InferEngine::thrd_initialize_impl, this)); + im::bsf::TaskExecutorVector::instance()[_model_index] + .set_thread_reset_fn(boost::bind(&InferEngine::thrd_clear_impl, this)); + im::bsf::TaskExecutorVector::instance()[_model_index] + .set_thread_callback_fn( + boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); + im::bsf::TaskExecutorVector::instance()[_model_index].set_batch_size( + _infer_batch_size); + im::bsf::TaskExecutorVector::instance()[_model_index].set_batch_align( + _infer_batch_align); + if (im::bsf::TaskExecutorVector::instance()[_model_index].start( + _infer_thread_num) != 0) { LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num; return -1; } @@ -75,6 +83,11 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, return 0; } +// Multiple threads will enter this method of the same object +// One Model corresponds to One ReloadableInferEngine object. +// ReloadableInferEngine object is Process object. +// One ReloadableInferEngine object can have several ModelData +// ModelData is Thread object. int ReloadableInferEngine::infer(const void* in, void* out, uint32_t batch_size) { @@ -82,7 +95,8 @@ int ReloadableInferEngine::infer(const void* in, return infer_impl(in, out, batch_size); } - im::bsf::TaskManager task_manager; + im::bsf::TaskManager task_manager( + _model_index); task_manager.schedule(in, out); task_manager.wait(); @@ -110,7 +124,7 @@ int ReloadableInferEngine::proc_finalize() { } if (_infer_thread_num > 0) { - im::bsf::TaskExecutor::instance()->stop(); + im::bsf::TaskExecutorVector::instance()[_model_index].stop(); } return 0; } @@ -191,6 +205,7 @@ int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf, std::string engine_type = conf.type(); InferEngine* engine = StaticInferFactory::instance().generate_object(engine_type); + engine->set_model_index(_model_index); if (!engine) { LOG(ERROR) << "Failed generate engine with type:" << engine_type; return -1; @@ -373,12 +388,14 @@ int InferManager::proc_initialize(const char* path, const char* file) { LOG(ERROR) << "failed load infer config, path: " << path << "/" << file; return -1; } - size_t engine_num = model_toolkit_conf.engines_size(); - for (size_t ei = 0; ei < engine_num; ++ei) { + uint32_t engine_num = model_toolkit_conf.engines_size(); + im::bsf::TaskExecutorVector::instance().resize(engine_num); + for (uint32_t ei = 0; ei < engine_num; ++ei) { LOG(INFO) << "model_toolkit_conf.engines(" << ei << ").name: " << model_toolkit_conf.engines(ei).name(); std::string engine_name = model_toolkit_conf.engines(ei).name(); VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine(); + engine->set_model_index(ei); if (!engine) { LOG(ERROR) << "Failed generate versioned engine: " << engine_name; return -1; diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index e7785f40462af7a18775319737ecc808881cba79..672e03b8e8372f1c66eedc0f6e97dc5b7a700851 100755 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -17,11 +17,11 @@ #include #include #include +#include #include #include #include #include -#include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/bsf.h" #include "core/predictor/framework/factory.h" @@ -73,7 +73,7 @@ class InferEngine { virtual int infer(const void* in, void* out, uint32_t batch_size = -1) { return infer_impl(in, out, batch_size); } - + virtual void set_model_index(uint32_t index) { _model_index = index; } virtual int reload() = 0; virtual uint64_t version() const = 0; @@ -89,10 +89,12 @@ class InferEngine { void* out, uint32_t batch_size = -1) = 0; virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT - + + protected: + uint32_t _model_index; // end: framework inner call }; - +typedef im::bsf::Task TaskT; class ReloadableInferEngine : public InferEngine { public: virtual ~ReloadableInferEngine() {} @@ -105,7 +107,6 @@ class ReloadableInferEngine : public InferEngine { }; virtual int load(const configure::EngineDesc& conf) = 0; - typedef im::bsf::Task TaskT; int proc_initialize_impl(const configure::EngineDesc& conf, bool version); @@ -372,8 +373,7 @@ class CloneDBReloadableInferEngine protected: // 模板EngineCore,如果已创建,则多个线程级EngineCore共用该对象的模型数据 - std::vector*> - _cloneTemplate; + std::vector*> _cloneTemplate; }; template diff --git a/core/predictor/mempool/mempool.cpp b/core/predictor/mempool/mempool.cpp index 88936687e47e9f9e3350dcf94b6eb38a93f9dc28..f1dceb809cb1b62039f242f922ac7529fa5b017b 100755 --- a/core/predictor/mempool/mempool.cpp +++ b/core/predictor/mempool/mempool.cpp @@ -24,7 +24,7 @@ namespace fugue { namespace memory { void Region::init() { - _big_mem_capacity = 64 * 1024 * 1024; // 64MB + _big_mem_capacity = 128 * 1024 * 1024; // 64MB _big_mem_start = new char[_big_mem_capacity]; } diff --git a/core/predictor/mempool/mempool.h b/core/predictor/mempool/mempool.h old mode 100755 new mode 100644 index bc18d89f3fa28bd2da27118aac70ae36237d72e8..a4143d4b52460f80f3e4471fadd12e39d2e19319 --- a/core/predictor/mempool/mempool.h +++ b/core/predictor/mempool/mempool.h @@ -345,9 +345,10 @@ class Region { 2 * 1024 * 1024; // 2MB,means when you need less than 2M, get memory from Block. - // 64MB,means when you need less than 64MB, get memory from BigMemory instead + // 128MB,means when you need less than 128MB, get memory from BigMemory + // instead // of BigNode - static const int BIGNODE_MEM_THRESHOLD = (64 * 1024 * 1024 + 1); + static const int BIGNODE_MEM_THRESHOLD = (128 * 1024 * 1024 + 1); static const int COUNTER_SIZE = BIGNODE_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1; // this is not used