From b7d1b4338ec167a74cb80d60fbe827fbf985896e Mon Sep 17 00:00:00 2001 From: bjjwwang Date: Wed, 12 Jan 2022 11:12:18 +0000 Subject: [PATCH] fix bsf memory error --- core/predictor/framework/bsf-inl.h | 17 ++++++--- core/predictor/framework/bsf.h | 53 +++++++++++++++++++++++------ core/predictor/framework/infer.cpp | 3 +- core/predictor/framework/memory.cpp | 46 ++++++++++++------------- core/predictor/framework/memory.h | 28 +++++++++++++++ 5 files changed, 105 insertions(+), 42 deletions(-) diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h index 401c5109..8790a6bd 100755 --- a/core/predictor/framework/bsf-inl.h +++ b/core/predictor/framework/bsf-inl.h @@ -88,8 +88,14 @@ bool Task::task_fetch_create(BatchTasks& batchTask) { // 此时 lod 为空。 tensor_out.lod = batchTask._batch_out[fetchvar_index].lod; // resize all batch memory at one time + size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index; - tensor_out.data.Resize(databuf_size); + + void* databuf_data = MempoolWrapper::instance().malloc(databuf_size,memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); + tensor_out.data = paddleBuf; + + //tensor_out.data.Resize(databuf_size); } else { // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy @@ -209,7 +215,7 @@ void TaskExecutor::stop() { template TaskHandler TaskExecutor::schedule( const void* inVectorT_ptr, - void* outVectorT_ptr) { // NOLINT + void* outVectorT_ptr, MempoolRegion* memoryPtr) { // NOLINT TaskT* task = butil::get_object(); if (!task) { LOG(ERROR) << "Failed get TaskT from object pool"; @@ -235,7 +241,8 @@ TaskHandler TaskExecutor::schedule( task->read_fd = fds[0]; task->write_fd = fds[1]; task->owner_tid = ::syscall(SYS_gettid); - + task->memoryPtr = memoryPtr; + //task->_bspec_key = _bspec_key; task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; if (!task->task_init()) { @@ -403,9 +410,9 @@ int TaskExecutor::work(ThreadContext* context) { template bool TaskManager::schedule(const void* in, - void* out) { // NOLINT + void* out, MempoolRegion* memoryPtr) { // NOLINT TaskHandler handler = - TaskExecutorVector::instance()[_model_index].schedule(in, out); + TaskExecutorVector::instance()[_model_index].schedule(in, out, memoryPtr); if (handler.valid()) { _task_owned = handler; diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h index 17f0c3d2..62162149 100755 --- a/core/predictor/framework/bsf.h +++ b/core/predictor/framework/bsf.h @@ -38,6 +38,8 @@ namespace im { namespace bsf { static const size_t DEFAULT_BATCH_SIZE = 100; +typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; +typedef baidu::paddle_serving::predictor::MempoolRegion MempoolRegion; // InItemT is paddle::PaddleTensor // InVectorT std::vector @@ -61,6 +63,7 @@ struct Task { typedef Task TaskT; typedef std::vector ShapeVector; typedef std::vector VectorOfShapeVector; + typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; int read_fd; int write_fd; @@ -79,6 +82,7 @@ struct Task { bool fetch_init; // taskmeta_num * set_feed_lod_index.size() std::vector outLodTensorVector; + MempoolRegion* memoryPtr; Task() { read_fd = -1; @@ -364,7 +368,12 @@ struct Task { } // 一次性扩容PaddleTensor中的data和lod paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index]; - fetchVarTensor.data.Resize(data_length); + + void* databuf_data = MempoolWrapper::instance().malloc(data_length,memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, data_length); + fetchVarTensor.data = paddleBuf; + + //fetchVarTensor.data.Resize(data_length); // task中的lod补0 if (fetchVarTensor.lod.size() <= 0) { fetchVarTensor.lod.push_back({0}); @@ -625,8 +634,10 @@ class BatchTasks { paddleTensor.lod = _batch_in_lod[feedvar_index]; paddleTensor.shape = feedVarTensor.shape; paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index]; - paddleTensor.data.Resize(feedvar_bytesize * - _total_shape0_batch_in[feedvar_index]); + size_t databuf_size = feedvar_bytesize * _total_shape0_batch_in[feedvar_index]; + void* databuf_data = MempoolWrapper::instance().malloc(databuf_size); + paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); + paddleTensor.data = paddleBuf; _batch_in.push_back(paddleTensor); } @@ -733,16 +744,27 @@ class BatchTasks { // 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉 // 后续希望在其他地方能够区分两者。 if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) { - // which means error. - if (fetchvar_batch_size(fetchvar_index) != 1 && - _total_fetch_batch != 1) { + if(fetchvar_batch_size(fetchvar_index) <= 0){ + // which means error. return false; - } else { + }else if(fetchvar_batch_size(fetchvar_index) == 1){ // which means fetchvar shape[0] = 1. // shape[0] does not change with batch set_fetch_nobatch_index.insert(fetchvar_index); _total_fetch_batch = std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); + }else if(_total_fetch_batch == 1){ + //这时意味着,之前的fetchvar shape[0] 全部都= 1 + //当前的fetchvar shape[0] > 1 + //所以,之前的都是no_batch + for(size_t temp_index = fetchvar_index-1; temp_index >= 0; --temp_index){ + set_fetch_nobatch_index.insert(fetchvar_index); + } + _total_fetch_batch = + std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); + }else{ + // which means error. + return false; } } // 将lod fetchvar index加入到vector中。 @@ -824,7 +846,11 @@ class BatchTasks { task->outLodTensorVector[taskmeta_index][fetch_lod_index]; size_t length = fetchvar_bytesize_index * shape0_length; fetchVarTensor.shape[0] = shape0_length; - fetchVarTensor.data.Resize(length); + + void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, length); + fetchVarTensor.data = paddleBuf; + //fetchVarTensor.data.Resize(length); void* dst_ptr = fetchVarTensor.data.data(); void* source_ptr = _batch_out[fetchvar_index].data.data() + shape0_index_start * fetchvar_bytesize_index; @@ -850,7 +876,12 @@ class BatchTasks { (*task->outVectorT_ptr)[fetchvar_index]; size_t length = fetchvar_bytesize_index * shape0_length; fetchVarTensor.shape[0] = shape0_length; - fetchVarTensor.data.Resize(length); + + void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, length); + fetchVarTensor.data = paddleBuf; + + //fetchVarTensor.data.Resize(length); void* dst_ptr = fetchVarTensor.data.data(); void* source_ptr = _batch_out[fetchvar_index].data.data() + shape0_index_start * fetchvar_bytesize_index; @@ -1076,7 +1107,7 @@ class TaskExecutor { int work(ThreadContext* context); - TaskHandler schedule(const void*, void*); + TaskHandler schedule(const void*, void*, MempoolRegion* memoryPtr); bool move_task_to_batch(BatchTasks& batchTask); // NOLINT @@ -1159,7 +1190,7 @@ class TaskManager { ~TaskManager() { wait(); } - bool schedule(const void* in, void* out); // NOLINT + bool schedule(const void* in, void* out, MempoolRegion* memoryPtr); // NOLINT void wait(); inline void clear() { wait(); } diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp index 02906122..69d6dac7 100644 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -98,8 +98,7 @@ int ReloadableInferEngine::infer(const void* in, im::bsf::TaskManager task_manager( _model_index); - - task_manager.schedule(in, out); + task_manager.schedule(in, out, MempoolWrapper::instance().get_thread_memory_ptr()); task_manager.wait(); return 0; } diff --git a/core/predictor/framework/memory.cpp b/core/predictor/framework/memory.cpp index f22311e5..de6537ec 100644 --- a/core/predictor/framework/memory.cpp +++ b/core/predictor/framework/memory.cpp @@ -19,30 +19,6 @@ namespace baidu { namespace paddle_serving { namespace predictor { -// why we need MempoolRegion -// because we need to release the resource. -// so we need both Mempool and Region. -// Mempool is a wrapper class for us to use memory more safely. -// Region is the RAII class. -struct MempoolRegion { - MempoolRegion(im::fugue::memory::Region* region, im::Mempool* mempool) - : _region(region), _mempool(mempool) {} - im::fugue::memory::Region* region() { return _region; } - im::Mempool* mempool() { return _mempool; } - - im::fugue::memory::Region* _region; - im::Mempool* _mempool; - ~MempoolRegion() { - if (_region) { - delete _region; - _region = NULL; - } - if (_mempool) { - delete _mempool; - _mempool = NULL; - } - } -}; int MempoolWrapper::initialize() { if (THREAD_KEY_CREATE(&_bspec_key, NULL) != 0) { @@ -112,6 +88,28 @@ void* MempoolWrapper::malloc(size_t size) { return mempool->malloc(size); } +void* MempoolWrapper::malloc(size_t size, MempoolRegion* my_mempool_region) { + MempoolRegion* mempool_region = my_mempool_region; + if (mempool_region == NULL) { + LOG(WARNING) << "THREAD_GETSPECIFIC() returned NULL"; + return NULL; + } + + im::Mempool* mempool = mempool_region->mempool(); + if (!mempool) { + LOG(WARNING) << "Cannot malloc memory:" << size + << ", since mempool is not thread initialized"; + return NULL; + } + return mempool->malloc(size); +} + +MempoolRegion* MempoolWrapper::get_thread_memory_ptr(){ + MempoolRegion* mempool_region = + (MempoolRegion*)THREAD_GETSPECIFIC(_bspec_key); + return mempool_region; +} + void MempoolWrapper::free(void* p, size_t size) { MempoolRegion* mempool_region = (MempoolRegion*)THREAD_GETSPECIFIC(_bspec_key); diff --git a/core/predictor/framework/memory.h b/core/predictor/framework/memory.h index 74fba4eb..b0952b2a 100644 --- a/core/predictor/framework/memory.h +++ b/core/predictor/framework/memory.h @@ -21,6 +21,30 @@ namespace baidu { namespace paddle_serving { namespace predictor { +// why we need MempoolRegion +// because we need to release the resource. +// so we need both Mempool and Region. +// Mempool is a wrapper class for us to use memory more safely. +// Region is the RAII class. +struct MempoolRegion { + MempoolRegion(im::fugue::memory::Region* region, im::Mempool* mempool) + : _region(region), _mempool(mempool) {} + im::fugue::memory::Region* region() { return _region; } + im::Mempool* mempool() { return _mempool; } + + im::fugue::memory::Region* _region; + im::Mempool* _mempool; + ~MempoolRegion() { + if (_region) { + delete _region; + _region = NULL; + } + if (_mempool) { + delete _mempool; + _mempool = NULL; + } + } +}; class MempoolWrapper { public: MempoolWrapper() {} @@ -38,6 +62,10 @@ class MempoolWrapper { void* malloc(size_t size); + void* malloc(size_t size, MempoolRegion* my_mempool_region); + + MempoolRegion* get_thread_memory_ptr(); + void free(void* p, size_t size); private: -- GitLab