未验证 提交 1b2a65fe 编写于 作者: T TeslaZhao 提交者: GitHub

Merge pull request #1589 from HexToString/fix_bsf

修复异步框架内存泄露的问题
...@@ -163,10 +163,10 @@ int GeneralRecOp::inference() { ...@@ -163,10 +163,10 @@ int GeneralRecOp::inference() {
argmax_idx = int(std::distance( argmax_idx = int(std::distance(
&output_tensor_1_data[n * predict_shape[1]], &output_tensor_1_data[n * predict_shape[1]],
std::max_element(&output_tensor_1_data[n * predict_shape[1]], std::max_element(&output_tensor_1_data[n * predict_shape[1]],
&output_tensor_1_data[(n + 1) * predict_shape[1]]))); output_tensor_1_data + (n + 1) * predict_shape[1])));
max_value = float( max_value = float(
*std::max_element(&output_tensor_1_data[n * predict_shape[1]], *std::max_element(&output_tensor_1_data[n * predict_shape[1]],
&output_tensor_1_data[(n + 1) * predict_shape[1]])); output_tensor_1_data + (n + 1) * predict_shape[1]));
if (blank - 1 - argmax_idx > 1e-5) { if (blank - 1 - argmax_idx > 1e-5) {
score_vector[index] += max_value; score_vector[index] += max_value;
count += 1; count += 1;
......
...@@ -19,10 +19,8 @@ ...@@ -19,10 +19,8 @@
#else #else
#include <butil/atomicops.h> #include <butil/atomicops.h>
#endif #endif
#include <sys/syscall.h> #include <sys/syscall.h>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/memory.h" #include "core/predictor/framework/memory.h"
...@@ -34,7 +32,7 @@ template <typename InItemT, typename OutItemT> ...@@ -34,7 +32,7 @@ template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& batchTask) { bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& batchTask) {
// 双检锁,减少加锁的粒度 // 双检锁,减少加锁的粒度
if (!fetch_init) { if (!fetch_init) {
if (taskmeta_num > 1) { if (total_taskmeta_num > 1) {
// 对于task被拆分为多个taskmeta,需要加锁。 // 对于task被拆分为多个taskmeta,需要加锁。
AutoMutex lock(task_mut); AutoMutex lock(task_mut);
task_fetch_create(batchTask); task_fetch_create(batchTask);
...@@ -88,15 +86,21 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) { ...@@ -88,15 +86,21 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
// 此时 lod 为空。 // 此时 lod 为空。
tensor_out.lod = batchTask._batch_out[fetchvar_index].lod; tensor_out.lod = batchTask._batch_out[fetchvar_index].lod;
// resize all batch memory at one time // resize all batch memory at one time
size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index; size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
tensor_out.data.Resize(databuf_size);
void* databuf_data = MempoolWrapper::instance().malloc(databuf_size,memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, databuf_size);
tensor_out.data = paddleBuf;
//tensor_out.data.Resize(databuf_size);
} else { } else {
// 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
// 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy
// 当task被分为多个taskMeta时,需要临时对象记录 // 当task被分为多个taskMeta时,需要临时对象记录
// 收齐后再一起合并 // 收齐后再一起合并
if (taskmeta_num > 1) { if (total_taskmeta_num > 1) {
taskMetaOutLodTensor.push_back(tensor_out); taskMetaOutLodTensor.push_back(tensor_out);
} }
} }
...@@ -104,7 +108,7 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) { ...@@ -104,7 +108,7 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
} }
// outLodTensorVector实际是一个双层vector // outLodTensorVector实际是一个双层vector
// shape为taskmeta_num * vector_fetch_lod_index.size(); // shape为taskmeta_num * vector_fetch_lod_index.size();
outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor); outLodTensorVector.resize(total_taskmeta_num, taskMetaOutLodTensor);
fetch_init = true; fetch_init = true;
} }
return true; return true;
...@@ -209,7 +213,7 @@ void TaskExecutor<TaskT>::stop() { ...@@ -209,7 +213,7 @@ void TaskExecutor<TaskT>::stop() {
template <typename TaskT> template <typename TaskT>
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
const void* inVectorT_ptr, const void* inVectorT_ptr,
void* outVectorT_ptr) { // NOLINT void* outVectorT_ptr, MempoolRegion* memoryPtr) { // NOLINT
TaskT* task = butil::get_object<TaskT>(); TaskT* task = butil::get_object<TaskT>();
if (!task) { if (!task) {
LOG(ERROR) << "Failed get TaskT from object pool"; LOG(ERROR) << "Failed get TaskT from object pool";
...@@ -235,7 +239,8 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( ...@@ -235,7 +239,8 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
task->read_fd = fds[0]; task->read_fd = fds[0];
task->write_fd = fds[1]; task->write_fd = fds[1];
task->owner_tid = ::syscall(SYS_gettid); task->owner_tid = ::syscall(SYS_gettid);
task->memoryPtr = memoryPtr;
//task->_bspec_key = _bspec_key;
task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
if (!task->task_init()) { if (!task->task_init()) {
...@@ -403,9 +408,9 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) { ...@@ -403,9 +408,9 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
template <typename InItemT, typename OutItemT> template <typename InItemT, typename OutItemT>
bool TaskManager<InItemT, OutItemT>::schedule(const void* in, bool TaskManager<InItemT, OutItemT>::schedule(const void* in,
void* out) { // NOLINT void* out, MempoolRegion* memoryPtr) { // NOLINT
TaskHandler<TaskT> handler = TaskHandler<TaskT> handler =
TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out); TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out, memoryPtr);
if (handler.valid()) { if (handler.valid()) {
_task_owned = handler; _task_owned = handler;
......
...@@ -38,6 +38,8 @@ namespace im { ...@@ -38,6 +38,8 @@ namespace im {
namespace bsf { namespace bsf {
static const size_t DEFAULT_BATCH_SIZE = 100; static const size_t DEFAULT_BATCH_SIZE = 100;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
typedef baidu::paddle_serving::predictor::MempoolRegion MempoolRegion;
// InItemT is paddle::PaddleTensor // InItemT is paddle::PaddleTensor
// InVectorT std::vector<paddle::PaddleTensor> // InVectorT std::vector<paddle::PaddleTensor>
...@@ -61,6 +63,7 @@ struct Task { ...@@ -61,6 +63,7 @@ struct Task {
typedef Task<InItemT, OutItemT> TaskT; typedef Task<InItemT, OutItemT> TaskT;
typedef std::vector<size_t> ShapeVector; typedef std::vector<size_t> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector; typedef std::vector<ShapeVector> VectorOfShapeVector;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
int read_fd; int read_fd;
int write_fd; int write_fd;
...@@ -75,10 +78,12 @@ struct Task { ...@@ -75,10 +78,12 @@ struct Task {
std::set<size_t> set_fetch_nobatch_index; std::set<size_t> set_fetch_nobatch_index;
butil::atomic<size_t> index; butil::atomic<size_t> index;
size_t taskmeta_num; size_t taskmeta_num;
size_t total_taskmeta_num;
THREAD_MUTEX_T task_mut; THREAD_MUTEX_T task_mut;
bool fetch_init; bool fetch_init;
// taskmeta_num * set_feed_lod_index.size() // taskmeta_num * set_feed_lod_index.size()
std::vector<OutVectorT> outLodTensorVector; std::vector<OutVectorT> outLodTensorVector;
MempoolRegion* memoryPtr;
Task() { Task() {
read_fd = -1; read_fd = -1;
...@@ -96,6 +101,7 @@ struct Task { ...@@ -96,6 +101,7 @@ struct Task {
index.store(0, butil::memory_order_relaxed); index.store(0, butil::memory_order_relaxed);
THREAD_MUTEX_INIT(&task_mut, NULL); THREAD_MUTEX_INIT(&task_mut, NULL);
fetch_init = false; fetch_init = false;
total_taskmeta_num = 1;
outLodTensorVector.clear(); outLodTensorVector.clear();
} }
~Task() { ~Task() {
...@@ -111,6 +117,7 @@ struct Task { ...@@ -111,6 +117,7 @@ struct Task {
rem = -1; rem = -1;
total_feed_batch = 0; total_feed_batch = 0;
taskmeta_num = 0; taskmeta_num = 0;
total_taskmeta_num = 1;
index.store(0, butil::memory_order_relaxed); index.store(0, butil::memory_order_relaxed);
THREAD_MUTEX_DESTROY(&task_mut); THREAD_MUTEX_DESTROY(&task_mut);
fetch_init = false; fetch_init = false;
...@@ -130,6 +137,7 @@ struct Task { ...@@ -130,6 +137,7 @@ struct Task {
rem = -1; rem = -1;
total_feed_batch = 0; total_feed_batch = 0;
taskmeta_num = 0; taskmeta_num = 0;
total_taskmeta_num = 1;
index.store(0, butil::memory_order_relaxed); index.store(0, butil::memory_order_relaxed);
THREAD_MUTEX_INIT(&task_mut, NULL); THREAD_MUTEX_INIT(&task_mut, NULL);
fetch_init = false; fetch_init = false;
...@@ -348,14 +356,14 @@ struct Task { ...@@ -348,14 +356,14 @@ struct Task {
bool combine_taskmeta() { bool combine_taskmeta() {
// 只有含有lod类型的fetch输出,且task被拆分为多个taskmeta的情况 // 只有含有lod类型的fetch输出,且task被拆分为多个taskmeta的情况
// 才需要将数据从outLodTensorVector搬运到outVectorT_ptr // 才需要将数据从outLodTensorVector搬运到outVectorT_ptr
if (vector_fetch_lod_index.size() > 0 && taskmeta_num > 1) { if (vector_fetch_lod_index.size() > 0 && total_taskmeta_num > 1) {
for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) { for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) {
size_t data_length = 0; size_t data_length = 0;
size_t lod_length = 0; size_t lod_length = 0;
size_t total_shape0 = 0; size_t total_shape0 = 0;
size_t feedvar_index = vector_fetch_lod_index[index]; size_t feedvar_index = vector_fetch_lod_index[index];
// 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。 // 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。
for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num; for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num;
++taskmeta_index) { ++taskmeta_index) {
data_length += data_length +=
outLodTensorVector[taskmeta_index][index].data.length(); outLodTensorVector[taskmeta_index][index].data.length();
...@@ -364,7 +372,12 @@ struct Task { ...@@ -364,7 +372,12 @@ struct Task {
} }
// 一次性扩容PaddleTensor中的data和lod // 一次性扩容PaddleTensor中的data和lod
paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index]; paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index];
fetchVarTensor.data.Resize(data_length); fetchVarTensor.shape[0] = total_shape0;
void* databuf_data = MempoolWrapper::instance().malloc(data_length,memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, data_length);
fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(data_length);
// task中的lod补0 // task中的lod补0
if (fetchVarTensor.lod.size() <= 0) { if (fetchVarTensor.lod.size() <= 0) {
fetchVarTensor.lod.push_back({0}); fetchVarTensor.lod.push_back({0});
...@@ -378,15 +391,18 @@ struct Task { ...@@ -378,15 +391,18 @@ struct Task {
size_t lod_length_offset = 0; size_t lod_length_offset = 0;
size_t once_data_length = 0; size_t once_data_length = 0;
size_t once_lod_length = 0; size_t once_lod_length = 0;
size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset]; for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num;
for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num;
++taskmeta_index) { ++taskmeta_index) {
//process data
void* dst_ptr = fetchVarTensor.data.data() + data_length_offset; void* dst_ptr = fetchVarTensor.data.data() + data_length_offset;
void* source_ptr = void* source_ptr =
outLodTensorVector[taskmeta_index][index].data.data(); outLodTensorVector[taskmeta_index][index].data.data();
once_data_length = once_data_length =
outLodTensorVector[taskmeta_index][index].data.length(); outLodTensorVector[taskmeta_index][index].data.length();
memcpy(dst_ptr, source_ptr, once_data_length); memcpy(dst_ptr, source_ptr, once_data_length);
data_length_offset += once_data_length;
//process lod
size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset];
once_lod_length = once_lod_length =
outLodTensorVector[taskmeta_index][index].lod[0].size(); outLodTensorVector[taskmeta_index][index].lod[0].size();
for (size_t once_index = 0; once_index < once_lod_length; for (size_t once_index = 0; once_index < once_lod_length;
...@@ -394,9 +410,9 @@ struct Task { ...@@ -394,9 +410,9 @@ struct Task {
fetchVarTensor.lod[0][lod_length_offset + 1] = fetchVarTensor.lod[0][lod_length_offset + 1] =
last_lod_value + last_lod_value +
outLodTensorVector[taskmeta_index][index].lod[0][once_index]; outLodTensorVector[taskmeta_index][index].lod[0][once_index];
lod_length_offset++;
} }
data_length_offset += once_data_length;
lod_length_offset += once_lod_length;
} }
} }
} }
...@@ -527,6 +543,11 @@ class BatchTasks { ...@@ -527,6 +543,11 @@ class BatchTasks {
} }
int start_index = task->batch_size() - task->rem; int start_index = task->batch_size() - task->rem;
TaskMetaT tm(task, start_index, add, task->taskmeta_num); TaskMetaT tm(task, start_index, add, task->taskmeta_num);
task->rem -= add;
_rem_size -= add;
if(task->taskmeta_num == 0){
task->total_taskmeta_num = 1 + (task->rem + _batch_size - 1)/_batch_size;
}
task->taskmeta_num += 1; task->taskmeta_num += 1;
_taskmeta_vector.push_back(tm); _taskmeta_vector.push_back(tm);
if (_batch_in_offset.size() == 0) { if (_batch_in_offset.size() == 0) {
...@@ -577,8 +598,6 @@ class BatchTasks { ...@@ -577,8 +598,6 @@ class BatchTasks {
tm.feed_shape0_range[feedvar_index][0]; tm.feed_shape0_range[feedvar_index][0];
} }
} }
task->rem -= add;
_rem_size -= add;
return _rem_size; return _rem_size;
} }
...@@ -604,7 +623,6 @@ class BatchTasks { ...@@ -604,7 +623,6 @@ class BatchTasks {
if (_taskmeta_vector.size() <= 0) { if (_taskmeta_vector.size() <= 0) {
return; return;
} }
for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) {
TaskMetaT& tm = _taskmeta_vector[ti]; TaskMetaT& tm = _taskmeta_vector[ti];
...@@ -625,8 +643,10 @@ class BatchTasks { ...@@ -625,8 +643,10 @@ class BatchTasks {
paddleTensor.lod = _batch_in_lod[feedvar_index]; paddleTensor.lod = _batch_in_lod[feedvar_index];
paddleTensor.shape = feedVarTensor.shape; paddleTensor.shape = feedVarTensor.shape;
paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index]; paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index];
paddleTensor.data.Resize(feedvar_bytesize * size_t databuf_size = feedvar_bytesize * _total_shape0_batch_in[feedvar_index];
_total_shape0_batch_in[feedvar_index]); void* databuf_data = MempoolWrapper::instance().malloc(databuf_size);
paddle::PaddleBuf paddleBuf(databuf_data, databuf_size);
paddleTensor.data = paddleBuf;
_batch_in.push_back(paddleTensor); _batch_in.push_back(paddleTensor);
} }
...@@ -733,16 +753,27 @@ class BatchTasks { ...@@ -733,16 +753,27 @@ class BatchTasks {
// 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉 // 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉
// 后续希望在其他地方能够区分两者。 // 后续希望在其他地方能够区分两者。
if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) { if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) {
// which means error. if(fetchvar_batch_size(fetchvar_index) <= 0){
if (fetchvar_batch_size(fetchvar_index) != 1 && // which means error.
_total_fetch_batch != 1) {
return false; return false;
} else { }else if(fetchvar_batch_size(fetchvar_index) == 1){
// which means fetchvar shape[0] = 1. // which means fetchvar shape[0] = 1.
// shape[0] does not change with batch // shape[0] does not change with batch
set_fetch_nobatch_index.insert(fetchvar_index); set_fetch_nobatch_index.insert(fetchvar_index);
_total_fetch_batch = _total_fetch_batch =
std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch);
}else if(_total_fetch_batch == 1){
//这时意味着,之前的fetchvar shape[0] 全部都= 1
//当前的fetchvar shape[0] > 1
//所以,之前的都是no_batch
for(size_t temp_index = fetchvar_index-1; temp_index >= 0; --temp_index){
set_fetch_nobatch_index.insert(fetchvar_index);
}
_total_fetch_batch =
std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch);
}else{
// which means error.
return false;
} }
} }
// 将lod fetchvar index加入到vector中。 // 将lod fetchvar index加入到vector中。
...@@ -790,7 +821,6 @@ class BatchTasks { ...@@ -790,7 +821,6 @@ class BatchTasks {
for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num; for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
++fetchvar_index) { ++fetchvar_index) {
size_t fetchvar_bytesize_index = fetchvar_bytesize(fetchvar_index); size_t fetchvar_bytesize_index = fetchvar_bytesize(fetchvar_index);
if (set_fetch_nobatch_index.size() > 0 && if (set_fetch_nobatch_index.size() > 0 &&
set_fetch_nobatch_index.find(fetchvar_index) != set_fetch_nobatch_index.find(fetchvar_index) !=
set_fetch_nobatch_index.end()) { set_fetch_nobatch_index.end()) {
...@@ -819,12 +849,17 @@ class BatchTasks { ...@@ -819,12 +849,17 @@ class BatchTasks {
// task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr // task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr
// 此时,先拷入task->outLodTensorVector[taskmeta_index] // 此时,先拷入task->outLodTensorVector[taskmeta_index]
// 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。 // 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。
if (task->taskmeta_num > 1) { if (task->total_taskmeta_num > 1) {
paddle::PaddleTensor& fetchVarTensor = paddle::PaddleTensor& fetchVarTensor =
task->outLodTensorVector[taskmeta_index][fetch_lod_index]; task->outLodTensorVector[taskmeta_index][fetch_lod_index];
size_t length = fetchvar_bytesize_index * shape0_length; size_t length = fetchvar_bytesize_index * shape0_length;
fetchVarTensor.shape[0] = shape0_length; fetchVarTensor.shape[0] = shape0_length;
fetchVarTensor.data.Resize(length); fetch_lod_index++;
void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, length);
fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data(); void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() + void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index; shape0_index_start * fetchvar_bytesize_index;
...@@ -850,7 +885,12 @@ class BatchTasks { ...@@ -850,7 +885,12 @@ class BatchTasks {
(*task->outVectorT_ptr)[fetchvar_index]; (*task->outVectorT_ptr)[fetchvar_index];
size_t length = fetchvar_bytesize_index * shape0_length; size_t length = fetchvar_bytesize_index * shape0_length;
fetchVarTensor.shape[0] = shape0_length; fetchVarTensor.shape[0] = shape0_length;
fetchVarTensor.data.Resize(length);
void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, length);
fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data(); void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() + void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index; shape0_index_start * fetchvar_bytesize_index;
...@@ -879,7 +919,6 @@ class BatchTasks { ...@@ -879,7 +919,6 @@ class BatchTasks {
last_lod_value); last_lod_value);
} }
} }
fetch_lod_index++;
} else { } else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch = // 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size() // 输入的总的batch_size()
...@@ -1076,7 +1115,7 @@ class TaskExecutor { ...@@ -1076,7 +1115,7 @@ class TaskExecutor {
int work(ThreadContext<TaskT>* context); int work(ThreadContext<TaskT>* context);
TaskHandler<TaskT> schedule(const void*, void*); TaskHandler<TaskT> schedule(const void*, void*, MempoolRegion* memoryPtr);
bool move_task_to_batch(BatchTasks<TaskT>& batchTask); // NOLINT bool move_task_to_batch(BatchTasks<TaskT>& batchTask); // NOLINT
...@@ -1159,7 +1198,7 @@ class TaskManager { ...@@ -1159,7 +1198,7 @@ class TaskManager {
~TaskManager() { wait(); } ~TaskManager() { wait(); }
bool schedule(const void* in, void* out); // NOLINT bool schedule(const void* in, void* out, MempoolRegion* memoryPtr); // NOLINT
void wait(); void wait();
inline void clear() { wait(); } inline void clear() { wait(); }
......
...@@ -98,8 +98,7 @@ int ReloadableInferEngine::infer(const void* in, ...@@ -98,8 +98,7 @@ int ReloadableInferEngine::infer(const void* in,
im::bsf::TaskManager<paddle::PaddleTensor, paddle::PaddleTensor> task_manager( im::bsf::TaskManager<paddle::PaddleTensor, paddle::PaddleTensor> task_manager(
_model_index); _model_index);
task_manager.schedule(in, out, MempoolWrapper::instance().get_thread_memory_ptr());
task_manager.schedule(in, out);
task_manager.wait(); task_manager.wait();
return 0; return 0;
} }
......
...@@ -19,30 +19,6 @@ namespace baidu { ...@@ -19,30 +19,6 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
// why we need MempoolRegion
// because we need to release the resource.
// so we need both Mempool and Region.
// Mempool is a wrapper class for us to use memory more safely.
// Region is the RAII class.
struct MempoolRegion {
MempoolRegion(im::fugue::memory::Region* region, im::Mempool* mempool)
: _region(region), _mempool(mempool) {}
im::fugue::memory::Region* region() { return _region; }
im::Mempool* mempool() { return _mempool; }
im::fugue::memory::Region* _region;
im::Mempool* _mempool;
~MempoolRegion() {
if (_region) {
delete _region;
_region = NULL;
}
if (_mempool) {
delete _mempool;
_mempool = NULL;
}
}
};
int MempoolWrapper::initialize() { int MempoolWrapper::initialize() {
if (THREAD_KEY_CREATE(&_bspec_key, NULL) != 0) { if (THREAD_KEY_CREATE(&_bspec_key, NULL) != 0) {
...@@ -112,6 +88,28 @@ void* MempoolWrapper::malloc(size_t size) { ...@@ -112,6 +88,28 @@ void* MempoolWrapper::malloc(size_t size) {
return mempool->malloc(size); return mempool->malloc(size);
} }
void* MempoolWrapper::malloc(size_t size, MempoolRegion* my_mempool_region) {
MempoolRegion* mempool_region = my_mempool_region;
if (mempool_region == NULL) {
LOG(WARNING) << "THREAD_GETSPECIFIC() returned NULL";
return NULL;
}
im::Mempool* mempool = mempool_region->mempool();
if (!mempool) {
LOG(WARNING) << "Cannot malloc memory:" << size
<< ", since mempool is not thread initialized";
return NULL;
}
return mempool->malloc(size);
}
MempoolRegion* MempoolWrapper::get_thread_memory_ptr(){
MempoolRegion* mempool_region =
(MempoolRegion*)THREAD_GETSPECIFIC(_bspec_key);
return mempool_region;
}
void MempoolWrapper::free(void* p, size_t size) { void MempoolWrapper::free(void* p, size_t size) {
MempoolRegion* mempool_region = MempoolRegion* mempool_region =
(MempoolRegion*)THREAD_GETSPECIFIC(_bspec_key); (MempoolRegion*)THREAD_GETSPECIFIC(_bspec_key);
......
...@@ -21,6 +21,30 @@ namespace baidu { ...@@ -21,6 +21,30 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
// why we need MempoolRegion
// because we need to release the resource.
// so we need both Mempool and Region.
// Mempool is a wrapper class for us to use memory more safely.
// Region is the RAII class.
struct MempoolRegion {
MempoolRegion(im::fugue::memory::Region* region, im::Mempool* mempool)
: _region(region), _mempool(mempool) {}
im::fugue::memory::Region* region() { return _region; }
im::Mempool* mempool() { return _mempool; }
im::fugue::memory::Region* _region;
im::Mempool* _mempool;
~MempoolRegion() {
if (_region) {
delete _region;
_region = NULL;
}
if (_mempool) {
delete _mempool;
_mempool = NULL;
}
}
};
class MempoolWrapper { class MempoolWrapper {
public: public:
MempoolWrapper() {} MempoolWrapper() {}
...@@ -38,6 +62,10 @@ class MempoolWrapper { ...@@ -38,6 +62,10 @@ class MempoolWrapper {
void* malloc(size_t size); void* malloc(size_t size);
void* malloc(size_t size, MempoolRegion* my_mempool_region);
MempoolRegion* get_thread_memory_ptr();
void free(void* p, size_t size); void free(void* p, size_t size);
private: private:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册