diff --git a/core/general-server/op/general_rec_op.cpp b/core/general-server/op/general_rec_op.cpp index 3b77510a6ed1759511791ebc2bb6f038be71a497..da8ed0f6162f4cb78fee1835e4d62cb87cc3a117 100644 --- a/core/general-server/op/general_rec_op.cpp +++ b/core/general-server/op/general_rec_op.cpp @@ -163,10 +163,10 @@ int GeneralRecOp::inference() { argmax_idx = int(std::distance( &output_tensor_1_data[n * predict_shape[1]], std::max_element(&output_tensor_1_data[n * predict_shape[1]], - &output_tensor_1_data[(n + 1) * predict_shape[1]]))); + output_tensor_1_data + (n + 1) * predict_shape[1]))); max_value = float( *std::max_element(&output_tensor_1_data[n * predict_shape[1]], - &output_tensor_1_data[(n + 1) * predict_shape[1]])); + output_tensor_1_data + (n + 1) * predict_shape[1])); if (blank - 1 - argmax_idx > 1e-5) { score_vector[index] += max_value; count += 1; diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h index 401c51091ad858f48ad154d083c841a0e6a20010..ca5fc0f8db743a25c8e0937a9589f7d651cced19 100755 --- a/core/predictor/framework/bsf-inl.h +++ b/core/predictor/framework/bsf-inl.h @@ -19,10 +19,8 @@ #else #include #endif - #include #include - #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/memory.h" @@ -34,7 +32,7 @@ template bool Task::task_fetch_init(BatchTasks& batchTask) { // 双检锁,减少加锁的粒度 if (!fetch_init) { - if (taskmeta_num > 1) { + if (total_taskmeta_num > 1) { // 对于task被拆分为多个taskmeta,需要加锁。 AutoMutex lock(task_mut); task_fetch_create(batchTask); @@ -88,15 +86,21 @@ bool Task::task_fetch_create(BatchTasks& batchTask) { // 此时 lod 为空。 tensor_out.lod = batchTask._batch_out[fetchvar_index].lod; // resize all batch memory at one time + size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index; - tensor_out.data.Resize(databuf_size); + + void* databuf_data = MempoolWrapper::instance().malloc(databuf_size,memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); + tensor_out.data = paddleBuf; + + //tensor_out.data.Resize(databuf_size); } else { // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy // 当task被分为多个taskMeta时,需要临时对象记录 // 收齐后再一起合并 - if (taskmeta_num > 1) { + if (total_taskmeta_num > 1) { taskMetaOutLodTensor.push_back(tensor_out); } } @@ -104,7 +108,7 @@ bool Task::task_fetch_create(BatchTasks& batchTask) { } // outLodTensorVector实际是一个双层vector // shape为taskmeta_num * vector_fetch_lod_index.size(); - outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor); + outLodTensorVector.resize(total_taskmeta_num, taskMetaOutLodTensor); fetch_init = true; } return true; @@ -209,7 +213,7 @@ void TaskExecutor::stop() { template TaskHandler TaskExecutor::schedule( const void* inVectorT_ptr, - void* outVectorT_ptr) { // NOLINT + void* outVectorT_ptr, MempoolRegion* memoryPtr) { // NOLINT TaskT* task = butil::get_object(); if (!task) { LOG(ERROR) << "Failed get TaskT from object pool"; @@ -235,7 +239,8 @@ TaskHandler TaskExecutor::schedule( task->read_fd = fds[0]; task->write_fd = fds[1]; task->owner_tid = ::syscall(SYS_gettid); - + task->memoryPtr = memoryPtr; + //task->_bspec_key = _bspec_key; task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; if (!task->task_init()) { @@ -403,9 +408,9 @@ int TaskExecutor::work(ThreadContext* context) { template bool TaskManager::schedule(const void* in, - void* out) { // NOLINT + void* out, MempoolRegion* memoryPtr) { // NOLINT TaskHandler handler = - TaskExecutorVector::instance()[_model_index].schedule(in, out); + TaskExecutorVector::instance()[_model_index].schedule(in, out, memoryPtr); if (handler.valid()) { _task_owned = handler; diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h index 17f0c3d2ace16d50c223692b91f5dd30b3764cd0..ed8415ccef3105f6e776a3102cae2a0c568db6d1 100755 --- a/core/predictor/framework/bsf.h +++ b/core/predictor/framework/bsf.h @@ -38,6 +38,8 @@ namespace im { namespace bsf { static const size_t DEFAULT_BATCH_SIZE = 100; +typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; +typedef baidu::paddle_serving::predictor::MempoolRegion MempoolRegion; // InItemT is paddle::PaddleTensor // InVectorT std::vector @@ -61,6 +63,7 @@ struct Task { typedef Task TaskT; typedef std::vector ShapeVector; typedef std::vector VectorOfShapeVector; + typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; int read_fd; int write_fd; @@ -75,10 +78,12 @@ struct Task { std::set set_fetch_nobatch_index; butil::atomic index; size_t taskmeta_num; + size_t total_taskmeta_num; THREAD_MUTEX_T task_mut; bool fetch_init; // taskmeta_num * set_feed_lod_index.size() std::vector outLodTensorVector; + MempoolRegion* memoryPtr; Task() { read_fd = -1; @@ -96,6 +101,7 @@ struct Task { index.store(0, butil::memory_order_relaxed); THREAD_MUTEX_INIT(&task_mut, NULL); fetch_init = false; + total_taskmeta_num = 1; outLodTensorVector.clear(); } ~Task() { @@ -111,6 +117,7 @@ struct Task { rem = -1; total_feed_batch = 0; taskmeta_num = 0; + total_taskmeta_num = 1; index.store(0, butil::memory_order_relaxed); THREAD_MUTEX_DESTROY(&task_mut); fetch_init = false; @@ -130,6 +137,7 @@ struct Task { rem = -1; total_feed_batch = 0; taskmeta_num = 0; + total_taskmeta_num = 1; index.store(0, butil::memory_order_relaxed); THREAD_MUTEX_INIT(&task_mut, NULL); fetch_init = false; @@ -348,14 +356,14 @@ struct Task { bool combine_taskmeta() { // 只有含有lod类型的fetch输出,且task被拆分为多个taskmeta的情况 // 才需要将数据从outLodTensorVector搬运到outVectorT_ptr - if (vector_fetch_lod_index.size() > 0 && taskmeta_num > 1) { + if (vector_fetch_lod_index.size() > 0 && total_taskmeta_num > 1) { for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) { size_t data_length = 0; size_t lod_length = 0; size_t total_shape0 = 0; size_t feedvar_index = vector_fetch_lod_index[index]; // 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。 - for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num; + for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num; ++taskmeta_index) { data_length += outLodTensorVector[taskmeta_index][index].data.length(); @@ -364,7 +372,12 @@ struct Task { } // 一次性扩容PaddleTensor中的data和lod paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index]; - fetchVarTensor.data.Resize(data_length); + fetchVarTensor.shape[0] = total_shape0; + void* databuf_data = MempoolWrapper::instance().malloc(data_length,memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, data_length); + fetchVarTensor.data = paddleBuf; + + //fetchVarTensor.data.Resize(data_length); // task中的lod补0 if (fetchVarTensor.lod.size() <= 0) { fetchVarTensor.lod.push_back({0}); @@ -378,15 +391,18 @@ struct Task { size_t lod_length_offset = 0; size_t once_data_length = 0; size_t once_lod_length = 0; - size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset]; - for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num; + for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num; ++taskmeta_index) { + //process data void* dst_ptr = fetchVarTensor.data.data() + data_length_offset; void* source_ptr = outLodTensorVector[taskmeta_index][index].data.data(); once_data_length = outLodTensorVector[taskmeta_index][index].data.length(); memcpy(dst_ptr, source_ptr, once_data_length); + data_length_offset += once_data_length; + //process lod + size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset]; once_lod_length = outLodTensorVector[taskmeta_index][index].lod[0].size(); for (size_t once_index = 0; once_index < once_lod_length; @@ -394,9 +410,9 @@ struct Task { fetchVarTensor.lod[0][lod_length_offset + 1] = last_lod_value + outLodTensorVector[taskmeta_index][index].lod[0][once_index]; + lod_length_offset++; } - data_length_offset += once_data_length; - lod_length_offset += once_lod_length; + } } } @@ -527,6 +543,11 @@ class BatchTasks { } int start_index = task->batch_size() - task->rem; TaskMetaT tm(task, start_index, add, task->taskmeta_num); + task->rem -= add; + _rem_size -= add; + if(task->taskmeta_num == 0){ + task->total_taskmeta_num = 1 + (task->rem + _batch_size - 1)/_batch_size; + } task->taskmeta_num += 1; _taskmeta_vector.push_back(tm); if (_batch_in_offset.size() == 0) { @@ -577,8 +598,6 @@ class BatchTasks { tm.feed_shape0_range[feedvar_index][0]; } } - task->rem -= add; - _rem_size -= add; return _rem_size; } @@ -604,7 +623,6 @@ class BatchTasks { if (_taskmeta_vector.size() <= 0) { return; } - for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { TaskMetaT& tm = _taskmeta_vector[ti]; @@ -625,8 +643,10 @@ class BatchTasks { paddleTensor.lod = _batch_in_lod[feedvar_index]; paddleTensor.shape = feedVarTensor.shape; paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index]; - paddleTensor.data.Resize(feedvar_bytesize * - _total_shape0_batch_in[feedvar_index]); + size_t databuf_size = feedvar_bytesize * _total_shape0_batch_in[feedvar_index]; + void* databuf_data = MempoolWrapper::instance().malloc(databuf_size); + paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); + paddleTensor.data = paddleBuf; _batch_in.push_back(paddleTensor); } @@ -733,16 +753,27 @@ class BatchTasks { // 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉 // 后续希望在其他地方能够区分两者。 if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) { - // which means error. - if (fetchvar_batch_size(fetchvar_index) != 1 && - _total_fetch_batch != 1) { + if(fetchvar_batch_size(fetchvar_index) <= 0){ + // which means error. return false; - } else { + }else if(fetchvar_batch_size(fetchvar_index) == 1){ // which means fetchvar shape[0] = 1. // shape[0] does not change with batch set_fetch_nobatch_index.insert(fetchvar_index); _total_fetch_batch = std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); + }else if(_total_fetch_batch == 1){ + //这时意味着,之前的fetchvar shape[0] 全部都= 1 + //当前的fetchvar shape[0] > 1 + //所以,之前的都是no_batch + for(size_t temp_index = fetchvar_index-1; temp_index >= 0; --temp_index){ + set_fetch_nobatch_index.insert(fetchvar_index); + } + _total_fetch_batch = + std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); + }else{ + // which means error. + return false; } } // 将lod fetchvar index加入到vector中。 @@ -790,7 +821,6 @@ class BatchTasks { for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num; ++fetchvar_index) { size_t fetchvar_bytesize_index = fetchvar_bytesize(fetchvar_index); - if (set_fetch_nobatch_index.size() > 0 && set_fetch_nobatch_index.find(fetchvar_index) != set_fetch_nobatch_index.end()) { @@ -819,12 +849,17 @@ class BatchTasks { // task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr // 此时,先拷入task->outLodTensorVector[taskmeta_index] // 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。 - if (task->taskmeta_num > 1) { + if (task->total_taskmeta_num > 1) { paddle::PaddleTensor& fetchVarTensor = task->outLodTensorVector[taskmeta_index][fetch_lod_index]; size_t length = fetchvar_bytesize_index * shape0_length; fetchVarTensor.shape[0] = shape0_length; - fetchVarTensor.data.Resize(length); + fetch_lod_index++; + + void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, length); + fetchVarTensor.data = paddleBuf; + //fetchVarTensor.data.Resize(length); void* dst_ptr = fetchVarTensor.data.data(); void* source_ptr = _batch_out[fetchvar_index].data.data() + shape0_index_start * fetchvar_bytesize_index; @@ -850,7 +885,12 @@ class BatchTasks { (*task->outVectorT_ptr)[fetchvar_index]; size_t length = fetchvar_bytesize_index * shape0_length; fetchVarTensor.shape[0] = shape0_length; - fetchVarTensor.data.Resize(length); + + void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); + paddle::PaddleBuf paddleBuf(databuf_data, length); + fetchVarTensor.data = paddleBuf; + + //fetchVarTensor.data.Resize(length); void* dst_ptr = fetchVarTensor.data.data(); void* source_ptr = _batch_out[fetchvar_index].data.data() + shape0_index_start * fetchvar_bytesize_index; @@ -879,7 +919,6 @@ class BatchTasks { last_lod_value); } } - fetch_lod_index++; } else { // 普通fetchvar情况,此时该Task总的fetchvar_batch = // 输入的总的batch_size() @@ -1076,7 +1115,7 @@ class TaskExecutor { int work(ThreadContext* context); - TaskHandler schedule(const void*, void*); + TaskHandler schedule(const void*, void*, MempoolRegion* memoryPtr); bool move_task_to_batch(BatchTasks& batchTask); // NOLINT @@ -1159,7 +1198,7 @@ class TaskManager { ~TaskManager() { wait(); } - bool schedule(const void* in, void* out); // NOLINT + bool schedule(const void* in, void* out, MempoolRegion* memoryPtr); // NOLINT void wait(); inline void clear() { wait(); } diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp index 0290612287de7c5d63626fb28ebc092f03dd4d15..69d6dac75a4a664df2fec47c581f499c6dbe2fbf 100644 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -98,8 +98,7 @@ int ReloadableInferEngine::infer(const void* in, im::bsf::TaskManager task_manager( _model_index); - - task_manager.schedule(in, out); + task_manager.schedule(in, out, MempoolWrapper::instance().get_thread_memory_ptr()); task_manager.wait(); return 0; } diff --git a/core/predictor/framework/memory.cpp b/core/predictor/framework/memory.cpp index f22311e54388be3062b1bd8597ed4d0f9fefa416..de6537ec067cec0fba8221669d27a86581cbf3a6 100644 --- a/core/predictor/framework/memory.cpp +++ b/core/predictor/framework/memory.cpp @@ -19,30 +19,6 @@ namespace baidu { namespace paddle_serving { namespace predictor { -// why we need MempoolRegion -// because we need to release the resource. -// so we need both Mempool and Region. -// Mempool is a wrapper class for us to use memory more safely. -// Region is the RAII class. -struct MempoolRegion { - MempoolRegion(im::fugue::memory::Region* region, im::Mempool* mempool) - : _region(region), _mempool(mempool) {} - im::fugue::memory::Region* region() { return _region; } - im::Mempool* mempool() { return _mempool; } - - im::fugue::memory::Region* _region; - im::Mempool* _mempool; - ~MempoolRegion() { - if (_region) { - delete _region; - _region = NULL; - } - if (_mempool) { - delete _mempool; - _mempool = NULL; - } - } -}; int MempoolWrapper::initialize() { if (THREAD_KEY_CREATE(&_bspec_key, NULL) != 0) { @@ -112,6 +88,28 @@ void* MempoolWrapper::malloc(size_t size) { return mempool->malloc(size); } +void* MempoolWrapper::malloc(size_t size, MempoolRegion* my_mempool_region) { + MempoolRegion* mempool_region = my_mempool_region; + if (mempool_region == NULL) { + LOG(WARNING) << "THREAD_GETSPECIFIC() returned NULL"; + return NULL; + } + + im::Mempool* mempool = mempool_region->mempool(); + if (!mempool) { + LOG(WARNING) << "Cannot malloc memory:" << size + << ", since mempool is not thread initialized"; + return NULL; + } + return mempool->malloc(size); +} + +MempoolRegion* MempoolWrapper::get_thread_memory_ptr(){ + MempoolRegion* mempool_region = + (MempoolRegion*)THREAD_GETSPECIFIC(_bspec_key); + return mempool_region; +} + void MempoolWrapper::free(void* p, size_t size) { MempoolRegion* mempool_region = (MempoolRegion*)THREAD_GETSPECIFIC(_bspec_key); diff --git a/core/predictor/framework/memory.h b/core/predictor/framework/memory.h index 74fba4ebb34c337490684daf6b55e87fc9c33177..b0952b2a8c86e9aa17df0fe231dd9f4ad7f357c7 100644 --- a/core/predictor/framework/memory.h +++ b/core/predictor/framework/memory.h @@ -21,6 +21,30 @@ namespace baidu { namespace paddle_serving { namespace predictor { +// why we need MempoolRegion +// because we need to release the resource. +// so we need both Mempool and Region. +// Mempool is a wrapper class for us to use memory more safely. +// Region is the RAII class. +struct MempoolRegion { + MempoolRegion(im::fugue::memory::Region* region, im::Mempool* mempool) + : _region(region), _mempool(mempool) {} + im::fugue::memory::Region* region() { return _region; } + im::Mempool* mempool() { return _mempool; } + + im::fugue::memory::Region* _region; + im::Mempool* _mempool; + ~MempoolRegion() { + if (_region) { + delete _region; + _region = NULL; + } + if (_mempool) { + delete _mempool; + _mempool = NULL; + } + } +}; class MempoolWrapper { public: MempoolWrapper() {} @@ -38,6 +62,10 @@ class MempoolWrapper { void* malloc(size_t size); + void* malloc(size_t size, MempoolRegion* my_mempool_region); + + MempoolRegion* get_thread_memory_ptr(); + void free(void* p, size_t size); private: