未验证 提交 c3342ca6 编写于 作者: S ShiningZhang 提交者: GitHub

Merge branch 'develop' into dev-doc

...@@ -70,7 +70,6 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) { ...@@ -70,7 +70,6 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
// 每个lod型的fetchvar拷贝到对应的临时空间中 // 每个lod型的fetchvar拷贝到对应的临时空间中
// 最后再计算临时空间的总量,合并fetchvar和lod // 最后再计算临时空间的总量,合并fetchvar和lod
fetchvar_batch = 0; fetchvar_batch = 0;
} else { } else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch = // 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size() // 输入的总的batch_size()
...@@ -86,14 +85,15 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) { ...@@ -86,14 +85,15 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
// 此时 lod 为空。 // 此时 lod 为空。
tensor_out.lod = batchTask._batch_out[fetchvar_index].lod; tensor_out.lod = batchTask._batch_out[fetchvar_index].lod;
// resize all batch memory at one time // resize all batch memory at one time
size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index; size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
void* databuf_data = MempoolWrapper::instance().malloc(databuf_size,memoryPtr); void* databuf_data =
MempoolWrapper::instance().malloc(databuf_size, memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); paddle::PaddleBuf paddleBuf(databuf_data, databuf_size);
tensor_out.data = paddleBuf; tensor_out.data = paddleBuf;
//tensor_out.data.Resize(databuf_size); // tensor_out.data.Resize(databuf_size);
} else { } else {
// 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
// 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy
...@@ -213,7 +213,8 @@ void TaskExecutor<TaskT>::stop() { ...@@ -213,7 +213,8 @@ void TaskExecutor<TaskT>::stop() {
template <typename TaskT> template <typename TaskT>
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
const void* inVectorT_ptr, const void* inVectorT_ptr,
void* outVectorT_ptr, MempoolRegion* memoryPtr) { // NOLINT void* outVectorT_ptr,
MempoolRegion* memoryPtr) { // NOLINT
TaskT* task = butil::get_object<TaskT>(); TaskT* task = butil::get_object<TaskT>();
if (!task) { if (!task) {
LOG(ERROR) << "Failed get TaskT from object pool"; LOG(ERROR) << "Failed get TaskT from object pool";
...@@ -240,7 +241,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( ...@@ -240,7 +241,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
task->write_fd = fds[1]; task->write_fd = fds[1];
task->owner_tid = ::syscall(SYS_gettid); task->owner_tid = ::syscall(SYS_gettid);
task->memoryPtr = memoryPtr; task->memoryPtr = memoryPtr;
//task->_bspec_key = _bspec_key; // task->_bspec_key = _bspec_key;
task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
if (!task->task_init()) { if (!task->task_init()) {
...@@ -309,7 +310,7 @@ bool TaskExecutor<TaskT>::move_task_to_batch( ...@@ -309,7 +310,7 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
} }
// combine_task_valid负责判断是否能够合并 // combine_task_valid负责判断是否能够合并
// 除最外层的shape外,内层shape应一致才能合并。 // 除最外层的shape外,内层shape应一致或者允许Padding才能合并。
// 否则跳出循环,放入下一个batchTask中。 // 否则跳出循环,放入下一个batchTask中。
// 以此保证batch.append_task(task)中的task的内层shape相同。 // 以此保证batch.append_task(task)中的task的内层shape相同。
...@@ -317,12 +318,15 @@ bool TaskExecutor<TaskT>::move_task_to_batch( ...@@ -317,12 +318,15 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
// 所以要求该feedvar必须相等,才能合并。 // 所以要求该feedvar必须相等,才能合并。
// 否则跳出循环,放入下一个batchTask中。 // 否则跳出循环,放入下一个batchTask中。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存. // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
// TODO(HexToString): 可以考虑后期支持AutoPadding.
if (previous_task != nullptr) { if (previous_task != nullptr) {
if (!task->combine_task_valid(previous_task)) { if (task->combine_task_valid(previous_task) == 0) {
break; break;
} }
} }
if (batchTask.padding(task) != 2) {
break;
}
size_t rem = batchTask.append_task(task); size_t rem = batchTask.append_task(task);
previous_task = task; previous_task = task;
if (task->rem <= 0) { if (task->rem <= 0) {
...@@ -407,10 +411,11 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) { ...@@ -407,10 +411,11 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
} }
template <typename InItemT, typename OutItemT> template <typename InItemT, typename OutItemT>
bool TaskManager<InItemT, OutItemT>::schedule(const void* in, bool TaskManager<InItemT, OutItemT>::schedule(
void* out, MempoolRegion* memoryPtr) { // NOLINT const void* in, void* out, MempoolRegion* memoryPtr) { // NOLINT
TaskHandler<TaskT> handler = TaskHandler<TaskT> handler =
TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out, memoryPtr); TaskExecutorVector<TaskT>::instance()[_model_index].schedule(
in, out, memoryPtr);
if (handler.valid()) { if (handler.valid()) {
_task_owned = handler; _task_owned = handler;
......
...@@ -17,10 +17,11 @@ ...@@ -17,10 +17,11 @@
#include <errno.h> #include <errno.h>
#include <algorithm> #include <algorithm>
#include <cstring> #include <cstring>
#include <functional>
#include <list> #include <list>
#include <numeric>
#include <set> #include <set>
#include <vector> #include <vector>
#ifdef BCLOUD #ifdef BCLOUD
#include "base/atomicops.h" #include "base/atomicops.h"
#else #else
...@@ -38,6 +39,8 @@ namespace im { ...@@ -38,6 +39,8 @@ namespace im {
namespace bsf { namespace bsf {
static const size_t DEFAULT_BATCH_SIZE = 100; static const size_t DEFAULT_BATCH_SIZE = 100;
static const size_t ABSOLUTE_ERROR = 1024;
static const float RELATIVE_ERROR = 0.5;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
typedef baidu::paddle_serving::predictor::MempoolRegion MempoolRegion; typedef baidu::paddle_serving::predictor::MempoolRegion MempoolRegion;
...@@ -61,7 +64,7 @@ struct Task { ...@@ -61,7 +64,7 @@ struct Task {
typedef InItemT InType; typedef InItemT InType;
typedef OutItemT OutType; typedef OutItemT OutType;
typedef Task<InItemT, OutItemT> TaskT; typedef Task<InItemT, OutItemT> TaskT;
typedef std::vector<size_t> ShapeVector; typedef std::vector<int> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector; typedef std::vector<ShapeVector> VectorOfShapeVector;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
...@@ -124,7 +127,7 @@ struct Task { ...@@ -124,7 +127,7 @@ struct Task {
outLodTensorVector.clear(); outLodTensorVector.clear();
} }
void clear(){ void clear() {
read_fd = -1; read_fd = -1;
write_fd = -1; write_fd = -1;
owner_tid = -1; owner_tid = -1;
...@@ -158,13 +161,18 @@ struct Task { ...@@ -158,13 +161,18 @@ struct Task {
return 1; return 1;
} }
bool combine_task_valid(Task* other_task) { int combine_task_valid(Task* other_task) {
// TODO(HexToString): auto-padding // 除最外层的shape外,内层shape应一致或者允许Padding才能合并。
// 除最外层的shape外,内层shape应一致才能合并。
// 否则跳出循环,放入下一个batchTask中。 // 否则跳出循环,放入下一个batchTask中。
// 当内层shape不一致时,此时先不判断是否Padding,在batchTask层判断,返回2。
// 以此保证batch.append_task(task)中的task的内层shape相同。 // 以此保证batch.append_task(task)中的task的内层shape相同。
// return 0 表示Shape[0] = 1
// 而!=batch的情况,两个Task中的值不同,此时不能合并。
// return 1 表示Shape维度完全一致,直接合并即可。
// return 2 表示Shape维度不完全一致,还需要进一步的判断,是否合并。
if (other_task->feedvar_shape_nobatch() != feedvar_shape_nobatch()) { if (other_task->feedvar_shape_nobatch() != feedvar_shape_nobatch()) {
return false; return 2;
} }
// 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值 // 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
...@@ -177,9 +185,9 @@ struct Task { ...@@ -177,9 +185,9 @@ struct Task {
std::memcmp((*inVectorT_ptr)[feedvar_index].data.data(), std::memcmp((*inVectorT_ptr)[feedvar_index].data.data(),
(*(other_task->inVectorT_ptr))[feedvar_index].data.data(), (*(other_task->inVectorT_ptr))[feedvar_index].data.data(),
(*inVectorT_ptr)[feedvar_index].data.length()); (*inVectorT_ptr)[feedvar_index].data.length());
if (result != 0) return false; if (result != 0) return 0;
} }
return true; return 1;
} }
size_t feedvar_batch_size(size_t feedvar_index) { size_t feedvar_batch_size(size_t feedvar_index) {
...@@ -282,12 +290,14 @@ struct Task { ...@@ -282,12 +290,14 @@ struct Task {
// which means error. // which means error.
if (feedvar_batch_size(feedvar_index) != 1 && total_feed_batch != 1) { if (feedvar_batch_size(feedvar_index) != 1 && total_feed_batch != 1) {
return false; return false;
} else if (feedvar_batch_size(feedvar_index) != 1 &&
total_feed_batch == 1) {
for (int temp = 0; temp < feedvar_index; ++temp) {
set_feed_nobatch_index.insert(temp);
}
total_feed_batch = feedvar_batch_size(feedvar_index);
} else { } else {
// which means feedvar shape[0] = 1.
// shape[0] does not change with batch
set_feed_nobatch_index.insert(feedvar_index); set_feed_nobatch_index.insert(feedvar_index);
total_feed_batch =
std::max(feedvar_batch_size(feedvar_index), total_feed_batch);
} }
} }
// 将lod feedvar index加入到vector中。 // 将lod feedvar index加入到vector中。
...@@ -324,6 +334,9 @@ struct Task { ...@@ -324,6 +334,9 @@ struct Task {
// feature_vector[0]是由shape0_index的范围值组成的vector,包含两个元素最小和最大值。 // feature_vector[0]是由shape0_index的范围值组成的vector,包含两个元素最小和最大值。
// feature_vector[1]是由lod组成的vector,包含指定batch的lod信息. // feature_vector[1]是由lod组成的vector,包含指定batch的lod信息.
// feature_vector[2]是由单个元素的组成的vector,元素值为1表示是nobatch的feedvar。 // feature_vector[2]是由单个元素的组成的vector,元素值为1表示是nobatch的feedvar。
// feature_vector[3]是2维lod组成的vector,包含指定batch的2-level lod。
// 之所以把二维lod
// 加入到feature_vector[3],是为了兼容原有代码,尽可能小的改动。
// if 为 nobatch feedvar情况。 // if 为 nobatch feedvar情况。
// else if 为带lod的feedvar情况。 // else if 为带lod的feedvar情况。
...@@ -335,6 +348,7 @@ struct Task { ...@@ -335,6 +348,7 @@ struct Task {
} else if (set_feed_lod_index.size() > 0 && } else if (set_feed_lod_index.size() > 0 &&
set_feed_lod_index.find(feedvar_index) != set_feed_lod_index.find(feedvar_index) !=
set_feed_lod_index.end()) { set_feed_lod_index.end()) {
int lod_size = (*inVectorT_ptr)[feedvar_index].lod.size();
std::vector<size_t> feed_lod_vector(end_batch - start_batch); std::vector<size_t> feed_lod_vector(end_batch - start_batch);
for (size_t lod_index = start_batch + 1, vector_index = 0; for (size_t lod_index = start_batch + 1, vector_index = 0;
lod_index < end_batch + 1; lod_index < end_batch + 1;
...@@ -343,9 +357,35 @@ struct Task { ...@@ -343,9 +357,35 @@ struct Task {
(*inVectorT_ptr)[feedvar_index].lod[0][lod_index] - (*inVectorT_ptr)[feedvar_index].lod[0][lod_index] -
(*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; (*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
} }
size_t shape0_start = (*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; if (lod_size == 1) {
size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch]; size_t shape0_start =
feature_vector = {{shape0_start, shape0_end}, feed_lod_vector}; (*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch];
feature_vector = {{shape0_start, shape0_end}, feed_lod_vector};
} else if (lod_size == 2) {
size_t level2_lod_start_index =
(*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
size_t level2_lod_end_index =
(*inVectorT_ptr)[feedvar_index].lod[0][end_batch];
int level2_lod_size = level2_lod_end_index - level2_lod_start_index;
std::vector<size_t> feed_2level_lod_vector(level2_lod_size);
for (size_t lod2_index = level2_lod_start_index + 1, vector_index = 0;
lod2_index < level2_lod_end_index + 1;
++vector_index, ++lod2_index) {
feed_2level_lod_vector[vector_index] =
(*inVectorT_ptr)[feedvar_index].lod[1][lod2_index] -
(*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_start_index];
}
size_t shape0_start =
(*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_start_index];
size_t shape0_end =
(*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_end_index];
feature_vector = {{shape0_start, shape0_end},
feed_lod_vector,
{},
feed_2level_lod_vector};
}
// feature_vector.push_back(feed_lod_vector); // feature_vector.push_back(feed_lod_vector);
} else { } else {
feature_vector = {{start_batch, end_batch}}; feature_vector = {{start_batch, end_batch}};
...@@ -360,24 +400,35 @@ struct Task { ...@@ -360,24 +400,35 @@ struct Task {
for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) { for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) {
size_t data_length = 0; size_t data_length = 0;
size_t lod_length = 0; size_t lod_length = 0;
size_t lod2_length = 0;
size_t total_shape0 = 0; size_t total_shape0 = 0;
size_t once_lod0_length = 0;
int lod_size = 1;
size_t feedvar_index = vector_fetch_lod_index[index]; size_t feedvar_index = vector_fetch_lod_index[index];
// 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。 // 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。
for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num; for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num;
++taskmeta_index) { ++taskmeta_index) {
lod_size = outLodTensorVector[taskmeta_index][index].lod.size();
data_length += data_length +=
outLodTensorVector[taskmeta_index][index].data.length(); outLodTensorVector[taskmeta_index][index].data.length();
lod_length += outLodTensorVector[taskmeta_index][index].lod[0].size(); once_lod0_length =
outLodTensorVector[taskmeta_index][index].lod[0].size();
lod_length += once_lod0_length;
total_shape0 += outLodTensorVector[taskmeta_index][index].shape[0]; total_shape0 += outLodTensorVector[taskmeta_index][index].shape[0];
if (lod_size == 2) {
lod2_length += outLodTensorVector[taskmeta_index][index]
.lod[0][once_lod0_length - 1];
}
} }
// 一次性扩容PaddleTensor中的data和lod // 一次性扩容PaddleTensor中的data和lod
paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index]; paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index];
fetchVarTensor.shape[0] = total_shape0; fetchVarTensor.shape[0] = total_shape0;
void* databuf_data = MempoolWrapper::instance().malloc(data_length,memoryPtr); void* databuf_data =
MempoolWrapper::instance().malloc(data_length, memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, data_length); paddle::PaddleBuf paddleBuf(databuf_data, data_length);
fetchVarTensor.data = paddleBuf; fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(data_length); // fetchVarTensor.data.Resize(data_length);
// task中的lod补0 // task中的lod补0
if (fetchVarTensor.lod.size() <= 0) { if (fetchVarTensor.lod.size() <= 0) {
fetchVarTensor.lod.push_back({0}); fetchVarTensor.lod.push_back({0});
...@@ -385,15 +436,25 @@ struct Task { ...@@ -385,15 +436,25 @@ struct Task {
fetchVarTensor.lod[0].push_back(0); fetchVarTensor.lod[0].push_back(0);
} }
fetchVarTensor.lod[0].resize(lod_length + 1, 0); fetchVarTensor.lod[0].resize(lod_length + 1, 0);
if (lod_size == 2) {
if (fetchVarTensor.lod.size() <= 1) {
fetchVarTensor.lod.push_back({0});
} else if (fetchVarTensor.lod[1].size() <= 0) {
fetchVarTensor.lod[1].push_back(0);
}
fetchVarTensor.lod[1].resize(lod2_length + 1, 0);
}
// //
size_t data_length_offset = 0; size_t data_length_offset = 0;
size_t lod_length_offset = 0; size_t lod_length_offset = 0;
size_t lod2_length_offset = 0;
size_t once_data_length = 0; size_t once_data_length = 0;
size_t once_lod_length = 0; size_t once_lod_length = 0;
size_t once_2lod_length = 0;
for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num; for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num;
++taskmeta_index) { ++taskmeta_index) {
//process data // process data
void* dst_ptr = fetchVarTensor.data.data() + data_length_offset; void* dst_ptr = fetchVarTensor.data.data() + data_length_offset;
void* source_ptr = void* source_ptr =
outLodTensorVector[taskmeta_index][index].data.data(); outLodTensorVector[taskmeta_index][index].data.data();
...@@ -401,7 +462,7 @@ struct Task { ...@@ -401,7 +462,7 @@ struct Task {
outLodTensorVector[taskmeta_index][index].data.length(); outLodTensorVector[taskmeta_index][index].data.length();
memcpy(dst_ptr, source_ptr, once_data_length); memcpy(dst_ptr, source_ptr, once_data_length);
data_length_offset += once_data_length; data_length_offset += once_data_length;
//process lod // process lod
size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset]; size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset];
once_lod_length = once_lod_length =
outLodTensorVector[taskmeta_index][index].lod[0].size(); outLodTensorVector[taskmeta_index][index].lod[0].size();
...@@ -412,7 +473,18 @@ struct Task { ...@@ -412,7 +473,18 @@ struct Task {
outLodTensorVector[taskmeta_index][index].lod[0][once_index]; outLodTensorVector[taskmeta_index][index].lod[0][once_index];
lod_length_offset++; lod_length_offset++;
} }
if (lod_size == 2) {
size_t last_2lod_value = fetchVarTensor.lod[1][lod2_length_offset];
once_2lod_length =
outLodTensorVector[taskmeta_index][index].lod[1].size();
for (size_t once_index = 0; once_index < once_2lod_length;
++once_index) {
fetchVarTensor.lod[1][lod2_length_offset + 1] =
last_2lod_value +
outLodTensorVector[taskmeta_index][index].lod[1][once_index];
lod2_length_offset ++;
}
}
} }
} }
} }
...@@ -459,10 +531,16 @@ struct TaskMeta { ...@@ -459,10 +531,16 @@ struct TaskMeta {
feedvar_type.push_back(feature.size()); feedvar_type.push_back(feature.size());
if (feature.size() == 1) { if (feature.size() == 1) {
feed_lod_vector.push_back({}); feed_lod_vector.push_back({});
feed_2level_lod_vector.push_back({});
} else if (feature.size() == 2) { } else if (feature.size() == 2) {
feed_lod_vector.push_back(feature[1]); feed_lod_vector.push_back(feature[1]);
} else { feed_2level_lod_vector.push_back({});
} else if (feature.size() == 3) {
feed_lod_vector.push_back({}); feed_lod_vector.push_back({});
feed_2level_lod_vector.push_back({});
} else if (feature.size() == 4) {
feed_lod_vector.push_back(feature[1]);
feed_2level_lod_vector.push_back(feature[3]);
} }
} }
} }
...@@ -474,6 +552,7 @@ struct TaskMeta { ...@@ -474,6 +552,7 @@ struct TaskMeta {
size_t taskmeta_index; size_t taskmeta_index;
std::vector<std::vector<size_t>> feed_shape0_range; std::vector<std::vector<size_t>> feed_shape0_range;
std::vector<std::vector<size_t>> feed_lod_vector; std::vector<std::vector<size_t>> feed_lod_vector;
std::vector<std::vector<size_t>> feed_2level_lod_vector;
std::vector<size_t> feedvar_type; std::vector<size_t> feedvar_type;
}; };
...@@ -488,7 +567,7 @@ class BatchTasks { ...@@ -488,7 +567,7 @@ class BatchTasks {
typedef typename TaskT::InType InType; typedef typename TaskT::InType InType;
typedef typename TaskT::OutType OutType; typedef typename TaskT::OutType OutType;
typedef TaskMeta<TaskT> TaskMetaT; typedef TaskMeta<TaskT> TaskMetaT;
typedef std::vector<size_t> ShapeVector; typedef std::vector<int> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector; typedef std::vector<ShapeVector> VectorOfShapeVector;
typedef std::vector<size_t> LodVector; typedef std::vector<size_t> LodVector;
typedef std::vector<LodVector> PaddleTensorLod; typedef std::vector<LodVector> PaddleTensorLod;
...@@ -496,11 +575,15 @@ class BatchTasks { ...@@ -496,11 +575,15 @@ class BatchTasks {
explicit BatchTasks(size_t batch_size, explicit BatchTasks(size_t batch_size,
bool overrun = false, bool overrun = false,
bool allow_split_request = true) bool allow_split_request = true,
bool auto_padding = true,
int padding_value = 0)
: _batch_size(batch_size), : _batch_size(batch_size),
_rem_size(batch_size), _rem_size(batch_size),
_overrun(overrun), _overrun(overrun),
_allow_split_request(allow_split_request) { _allow_split_request(allow_split_request),
_auto_padding(auto_padding),
_padding_value(padding_value) {
_batch_in.clear(); _batch_in.clear();
_batch_in_offset.clear(); _batch_in_offset.clear();
_total_shape0_batch_in.clear(); _total_shape0_batch_in.clear();
...@@ -530,6 +613,71 @@ class BatchTasks { ...@@ -530,6 +613,71 @@ class BatchTasks {
vector_fetch_lod_index.clear(); vector_fetch_lod_index.clear();
} }
// return 0
// 表示feedvar数量都不一样,或者,每个feedvar的shape维度都不同,此时不能合并batch。
// return 1 表示合并batch不划算。
// return 2 表示合并batch划算。
int padding(TaskT* task) {
const VectorOfShapeVector& task_vector_shape =
task->feedvar_shape_nobatch();
int return_value = 2;
// 当batchTask中为空时,第一次加入Task,此时则BatchTask中即为第一个Task中的Shape.
if (vector_of_max_shape.size() == 0) {
vector_of_max_shape = task_vector_shape;
return 2;
}
if (vector_of_max_shape.size() != task_vector_shape.size()) {
return 0;
}
// 当两个Shape完全相同时,无须更新,无须计算,无须Padding。
if (vector_of_max_shape == task_vector_shape) {
return 2;
}
std::vector<size_t> multiplies_1(vector_of_max_shape.size());
std::vector<size_t> multiplies_2(vector_of_max_shape.size());
std::vector<size_t> temp_multiplies(vector_of_max_shape.size());
VectorOfShapeVector temp_vector_max_shape(vector_of_max_shape.size());
for (size_t i = 0; i < vector_of_max_shape.size(); ++i) {
if (vector_of_max_shape[i].size() != task_vector_shape[i].size())
return 0;
for (size_t j = 0; j < vector_of_max_shape[i].size(); ++j) {
temp_vector_max_shape[i].push_back(
std::max(vector_of_max_shape[i][j], task_vector_shape[i][j]));
}
temp_multiplies[i] = std::accumulate(temp_vector_max_shape[i].begin(),
temp_vector_max_shape[i].end(),
1,
std::multiplies<size_t>());
multiplies_1[i] = std::accumulate(vector_of_max_shape[i].begin(),
vector_of_max_shape[i].end(),
1,
std::multiplies<size_t>());
multiplies_2[i] = std::accumulate(task_vector_shape[i].begin(),
task_vector_shape[i].end(),
1,
std::multiplies<size_t>());
if ((labs(temp_multiplies[i] - multiplies_1[i]) <= ABSOLUTE_ERROR &&
labs(temp_multiplies[i] - multiplies_2[i]) <= ABSOLUTE_ERROR) ||
(temp_multiplies[i] / multiplies_1[i] >= RELATIVE_ERROR &&
temp_multiplies[i] / multiplies_2[i] >= RELATIVE_ERROR)) {
continue;
} else {
return_value = 1;
}
}
// 当合并batch时,需要更新BatchTask中的最大Shape
// 此时,整个BatchTask到最后合并多个Task时,需要Padding
if (return_value == 2) {
vector_of_max_shape = temp_vector_max_shape;
}
return return_value;
}
// synchronized operation // synchronized operation
// because Upper level callers of this function have already locked. // because Upper level callers of this function have already locked.
// 能进到此函数的task都是同类task,在该函数之前已保证了这点。 // 能进到此函数的task都是同类task,在该函数之前已保证了这点。
...@@ -545,8 +693,9 @@ class BatchTasks { ...@@ -545,8 +693,9 @@ class BatchTasks {
TaskMetaT tm(task, start_index, add, task->taskmeta_num); TaskMetaT tm(task, start_index, add, task->taskmeta_num);
task->rem -= add; task->rem -= add;
_rem_size -= add; _rem_size -= add;
if(task->taskmeta_num == 0){ if (task->taskmeta_num == 0) {
task->total_taskmeta_num = 1 + (task->rem + _batch_size - 1)/_batch_size; task->total_taskmeta_num =
1 + (task->rem + _batch_size - 1) / _batch_size;
} }
task->taskmeta_num += 1; task->taskmeta_num += 1;
_taskmeta_vector.push_back(tm); _taskmeta_vector.push_back(tm);
...@@ -569,8 +718,15 @@ class BatchTasks { ...@@ -569,8 +718,15 @@ class BatchTasks {
_total_shape0_batch_in[feedvar_index] += _total_shape0_batch_in[feedvar_index] +=
tm.feed_shape0_range[feedvar_index][1] - tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0]; tm.feed_shape0_range[feedvar_index][0];
} else if (tm.feedvar_type[feedvar_index] == 2) { } else if (tm.feedvar_type[feedvar_index] == 3) {
// lod类型的feedvar // tm.feedvar_type[feedvar_index] == 3
// nobatch类型的feedvar.
// 此时不累加,且值应为1
_total_shape0_batch_in[feedvar_index] =
tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0];
} else {
// lod类型的feedvar 可能是1维lod 也可能是2维lod
// 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。 // 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。
_total_shape0_batch_in[feedvar_index] += _total_shape0_batch_in[feedvar_index] +=
tm.feed_shape0_range[feedvar_index][1] - tm.feed_shape0_range[feedvar_index][1] -
...@@ -589,13 +745,23 @@ class BatchTasks { ...@@ -589,13 +745,23 @@ class BatchTasks {
_batch_in_lod[feedvar_index][0].push_back( _batch_in_lod[feedvar_index][0].push_back(
last_lod_value + tm.feed_lod_vector[feedvar_index][lod_index]); last_lod_value + tm.feed_lod_vector[feedvar_index][lod_index]);
} }
} else {
// tm.feedvar_type[feedvar_index] == 3 // 2维lod 需要额外处理2维lod信息。
// nobatch类型的feedvar. if (tm.feedvar_type[feedvar_index] == 4) {
// 此时不累加,且值应为1 if (_batch_in_lod[feedvar_index].size() <= 1) {
_total_shape0_batch_in[feedvar_index] = _batch_in_lod[feedvar_index].push_back({0});
tm.feed_shape0_range[feedvar_index][1] - } else if (_batch_in_lod[feedvar_index][1].size() <= 0) {
tm.feed_shape0_range[feedvar_index][0]; _batch_in_lod[feedvar_index][1].push_back(0);
}
size_t last_lod_value = _batch_in_lod[feedvar_index][1].back();
for (size_t lod_index = 0;
lod_index < tm.feed_2level_lod_vector[feedvar_index].size();
++lod_index) {
_batch_in_lod[feedvar_index][1].push_back(
last_lod_value +
tm.feed_2level_lod_vector[feedvar_index][lod_index]);
}
}
} }
} }
return _rem_size; return _rem_size;
...@@ -631,19 +797,28 @@ class BatchTasks { ...@@ -631,19 +797,28 @@ class BatchTasks {
const paddle::PaddleTensor& feedVarTensor = const paddle::PaddleTensor& feedVarTensor =
(*tm.task->inVectorT_ptr)[feedvar_index]; (*tm.task->inVectorT_ptr)[feedvar_index];
size_t feedvar_bytesize = tm.task->feedvar_bytesize(feedvar_index); size_t feedvar_bytesize = tm.task->feedvar_bytesize(feedvar_index);
const ShapeVector& feedvar_max_shape_vector =
vector_of_max_shape[feedvar_index];
size_t feedvar_max_num =
std::accumulate(feedvar_max_shape_vector.begin(),
feedvar_max_shape_vector.end(),
1,
std::multiplies<size_t>());
size_t feedvar_element_bytesize =
tm.task->feedvar_element_bytesize(feedvar_index);
size_t feedvar_max_bytes = feedvar_element_bytesize * feedvar_max_num;
if (ti == 0) { if (ti == 0) {
// Create the entire tensor at once // Create the entire tensor at once
// for now, we assume that every task feedvar_bytesize is the same.
// which means we dont support auto embedding.
// but for different feedvar, it is different.
paddle::PaddleTensor paddleTensor; paddle::PaddleTensor paddleTensor;
paddleTensor.dtype = feedVarTensor.dtype; paddleTensor.dtype = feedVarTensor.dtype;
paddleTensor.name = feedVarTensor.name; paddleTensor.name = feedVarTensor.name;
paddleTensor.lod = _batch_in_lod[feedvar_index]; paddleTensor.lod = _batch_in_lod[feedvar_index];
paddleTensor.shape = feedVarTensor.shape; paddleTensor.shape = feedvar_max_shape_vector;
paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index]; paddleTensor.shape.insert(paddleTensor.shape.begin(),
size_t databuf_size = feedvar_bytesize * _total_shape0_batch_in[feedvar_index]; _total_shape0_batch_in[feedvar_index]);
size_t databuf_size =
feedvar_max_bytes * _total_shape0_batch_in[feedvar_index];
void* databuf_data = MempoolWrapper::instance().malloc(databuf_size); void* databuf_data = MempoolWrapper::instance().malloc(databuf_size);
paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); paddle::PaddleBuf paddleBuf(databuf_data, databuf_size);
paddleTensor.data = paddleBuf; paddleTensor.data = paddleBuf;
...@@ -656,12 +831,243 @@ class BatchTasks { ...@@ -656,12 +831,243 @@ class BatchTasks {
feedVarTensor.data.data() + feedVarTensor.data.data() +
feedvar_bytesize * tm.feed_shape0_range[feedvar_index][0]; feedvar_bytesize * tm.feed_shape0_range[feedvar_index][0];
size_t length = size_t length =
feedvar_bytesize * (tm.feed_shape0_range[feedvar_index][1] - feedvar_max_bytes * (tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0]); tm.feed_shape0_range[feedvar_index][0]);
memcpy(dst_ptr, source_ptr, length);
// 不需要padding,连续内存,则直接memcpy
// 这里可以直接比较内存是否一样大
// 在于前面Padding函数中,已经保证了vector_of_max_shape中各个维度都是最大值。
// 当shape-1 = [8000,20000] shape-2 = [20000,8000]时
// 此时,vector_of_max_shape中的shape = [20000,20000]
// 所以feedvar_max_bytes == feedvar_bytesize时,一定是shape完全相同。
if (feedvar_max_bytes == feedvar_bytesize) {
memcpy(dst_ptr, source_ptr, length);
} else {
memset(dst_ptr, 0, length);
size_t old_index = 0;
size_t new_index = 0;
switch (feedvar_max_shape_vector.size()) {
case 5:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) {
for (int i_4 = 0; i_4 < feedVarTensor.shape[4]; ++i_4) {
for (int i_5 = 0; i_5 < feedVarTensor.shape[5]; ++i_5) {
old_index = i_0 * feedVarTensor.shape[1] *
feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_1 * feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_2 * feedVarTensor.shape[3] *
feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_3 * feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_4 * feedVarTensor.shape[5] + i_5;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_1 * feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_2 * feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_3 * feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_4 * feedvar_max_shape_vector[4] + i_5;
if (feedVarTensor.dtype ==
paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
}
}
}
break;
case 4:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) {
for (int i_4 = 0; i_4 < feedVarTensor.shape[4]; ++i_4) {
old_index = i_0 * feedVarTensor.shape[1] *
feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] +
i_1 * feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] +
i_2 * feedVarTensor.shape[3] *
feedVarTensor.shape[4] +
i_3 * feedVarTensor.shape[4] + i_4;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] +
i_1 * feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] +
i_2 * feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] +
i_3 * feedvar_max_shape_vector[3] + i_4;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
}
}
break;
case 3:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) {
old_index = i_0 * feedVarTensor.shape[1] *
feedVarTensor.shape[2] *
feedVarTensor.shape[3] +
i_1 * feedVarTensor.shape[2] *
feedVarTensor.shape[3] +
i_2 * feedVarTensor.shape[3] + i_3;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] +
i_1 * feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] +
i_2 * feedvar_max_shape_vector[2] + i_3;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
}
break;
case 2:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
old_index =
i_0 * feedVarTensor.shape[1] * feedVarTensor.shape[2] +
i_1 * feedVarTensor.shape[2] + i_2;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] +
i_1 * feedvar_max_shape_vector[1] + i_2;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
break;
case 1:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
old_index = i_0 * feedVarTensor.shape[1] + i_1;
new_index = i_0 * feedvar_max_shape_vector[0] + i_1;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
break;
default:
break;
}
}
// nobatch类型的feedvar,不叠加. // nobatch类型的feedvar,不叠加.
if (tm.feedvar_type[feedvar_index] != 3) if (tm.feedvar_type[feedvar_index] != 3) {
_batch_in_offset[feedvar_index] += length; _batch_in_offset[feedvar_index] += length;
}
} }
} }
} }
...@@ -753,25 +1159,23 @@ class BatchTasks { ...@@ -753,25 +1159,23 @@ class BatchTasks {
// 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉 // 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉
// 后续希望在其他地方能够区分两者。 // 后续希望在其他地方能够区分两者。
if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) { if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) {
if(fetchvar_batch_size(fetchvar_index) <= 0){ if (fetchvar_batch_size(fetchvar_index) <= 0) {
// which means error. // which means error.
return false; return false;
}else if(fetchvar_batch_size(fetchvar_index) == 1){ } else if (fetchvar_batch_size(fetchvar_index) == 1) {
// which means fetchvar shape[0] = 1. // which means fetchvar shape[0] = 1.
// shape[0] does not change with batch // shape[0] does not change with batch
set_fetch_nobatch_index.insert(fetchvar_index); set_fetch_nobatch_index.insert(fetchvar_index);
_total_fetch_batch = } else if (_total_fetch_batch == 1) {
std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); // 这时意味着,之前的fetchvar shape[0] 全部都= 1
}else if(_total_fetch_batch == 1){ // 当前的fetchvar shape[0] > 1
//这时意味着,之前的fetchvar shape[0] 全部都= 1 // 所以,之前的都是no_batch
//当前的fetchvar shape[0] > 1 for (size_t temp_index = 0; temp_index < fetchvar_index;
//所以,之前的都是no_batch --temp_index) {
for(size_t temp_index = fetchvar_index-1; temp_index >= 0; --temp_index){
set_fetch_nobatch_index.insert(fetchvar_index); set_fetch_nobatch_index.insert(fetchvar_index);
} }
_total_fetch_batch = _total_fetch_batch = fetchvar_batch_size(fetchvar_index);
std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); } else {
}else{
// which means error. // which means error.
return false; return false;
} }
...@@ -846,6 +1250,14 @@ class BatchTasks { ...@@ -846,6 +1250,14 @@ class BatchTasks {
size_t shape0_index_end = size_t shape0_index_end =
_batch_out[fetchvar_index].lod[0][last_batch + add]; _batch_out[fetchvar_index].lod[0][last_batch + add];
size_t shape0_length = shape0_index_end - shape0_index_start; size_t shape0_length = shape0_index_end - shape0_index_start;
size_t lod_size = _batch_out[fetchvar_index].lod.size();
if (lod_size == 2) {
shape0_index_start =
_batch_out[fetchvar_index].lod[1][shape0_index_start];
shape0_index_end =
_batch_out[fetchvar_index].lod[1][shape0_index_end];
shape0_length = shape0_index_end - shape0_index_start;
}
// task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr // task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr
// 此时,先拷入task->outLodTensorVector[taskmeta_index] // 此时,先拷入task->outLodTensorVector[taskmeta_index]
// 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。 // 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。
...@@ -856,10 +1268,11 @@ class BatchTasks { ...@@ -856,10 +1268,11 @@ class BatchTasks {
fetchVarTensor.shape[0] = shape0_length; fetchVarTensor.shape[0] = shape0_length;
fetch_lod_index++; fetch_lod_index++;
void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); void* databuf_data =
MempoolWrapper::instance().malloc(length, task->memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, length); paddle::PaddleBuf paddleBuf(databuf_data, length);
fetchVarTensor.data = paddleBuf; fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(length); // fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data(); void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() + void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index; shape0_index_start * fetchvar_bytesize_index;
...@@ -878,6 +1291,24 @@ class BatchTasks { ...@@ -878,6 +1291,24 @@ class BatchTasks {
(_batch_out[fetchvar_index].lod[0][lod_index] - (_batch_out[fetchvar_index].lod[0][lod_index] -
last_lod_value); last_lod_value);
} }
if (lod_size == 2) {
if (fetchVarTensor.lod.size() <= 1) {
fetchVarTensor.lod.push_back({});
}
size_t last_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch];
size_t end_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch + add];
size_t lod1_size = end_lod0_value - last_lod0_value;
fetchVarTensor.lod[1].resize(lod1_size, 0);
for (size_t lod_index = last_lod0_value + 1, my_index = 0;
lod_index < end_lod0_value + 1;
++lod_index, ++my_index) {
fetchVarTensor.lod[1][my_index] =
_batch_out[fetchvar_index].lod[1][lod_index] -
_batch_out[fetchvar_index].lod[1][last_lod0_value];
}
}
} else { } else {
// task未被拆分为多个taskmeta,故只有某个线程中的taskmeta会操作task不存在多线程竞争 // task未被拆分为多个taskmeta,故只有某个线程中的taskmeta会操作task不存在多线程竞争
// 此时resize后,直接写入task->outVectorT_ptr中即可。 // 此时resize后,直接写入task->outVectorT_ptr中即可。
...@@ -885,12 +1316,13 @@ class BatchTasks { ...@@ -885,12 +1316,13 @@ class BatchTasks {
(*task->outVectorT_ptr)[fetchvar_index]; (*task->outVectorT_ptr)[fetchvar_index];
size_t length = fetchvar_bytesize_index * shape0_length; size_t length = fetchvar_bytesize_index * shape0_length;
fetchVarTensor.shape[0] = shape0_length; fetchVarTensor.shape[0] = shape0_length;
void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); void* databuf_data =
MempoolWrapper::instance().malloc(length, task->memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, length); paddle::PaddleBuf paddleBuf(databuf_data, length);
fetchVarTensor.data = paddleBuf; fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(length); // fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data(); void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() + void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index; shape0_index_start * fetchvar_bytesize_index;
...@@ -918,6 +1350,27 @@ class BatchTasks { ...@@ -918,6 +1350,27 @@ class BatchTasks {
(_batch_out[fetchvar_index].lod[0][lod_index] - (_batch_out[fetchvar_index].lod[0][lod_index] -
last_lod_value); last_lod_value);
} }
if (lod_size == 2) {
if (fetchVarTensor.lod.size() <= 1) {
fetchVarTensor.lod.push_back({});
} else if (fetchVarTensor.lod[1].size() <= 0) {
fetchVarTensor.lod[1].push_back(0);
}
size_t last_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch];
size_t end_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch + add];
size_t lod1_size = end_lod0_value - last_lod0_value;
fetchVarTensor.lod[1].resize(lod1_size + 1, 0);
for (size_t lod_index = last_lod0_value + 1, my_index = 1;
lod_index < end_lod0_value + 1;
++lod_index, ++my_index) {
fetchVarTensor.lod[1][my_index] =
_batch_out[fetchvar_index].lod[1][lod_index] -
_batch_out[fetchvar_index].lod[1][last_lod0_value];
}
}
} }
} else { } else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch = // 普通fetchvar情况,此时该Task总的fetchvar_batch =
...@@ -979,10 +1432,26 @@ class BatchTasks { ...@@ -979,10 +1432,26 @@ class BatchTasks {
std::set<size_t> set_fetch_nobatch_index; std::set<size_t> set_fetch_nobatch_index;
std::vector<size_t> vector_fetch_lod_index; std::vector<size_t> vector_fetch_lod_index;
// 这个BatchTask中目前,各个FeedVar中最大的Shape集合
size_t _rem_size; size_t _rem_size;
size_t _batch_size; size_t _batch_size;
bool _overrun; bool _overrun;
bool _allow_split_request; bool _allow_split_request;
// 这个BatchTask中目前,各个FeedVar中最大的Shape集合
VectorOfShapeVector vector_of_max_shape;
// AutoPadding功能中,用这个与新的待合并Batch的TaskMeta来计算是否合并
// 策略有两个,满足任何一个均可合并
// 1、当相似度的乘积大于50%时
// 2、当绝对差的字节数小于1024字节时
// 例如,Shape-1 = [batch, 500, 500] Shape-2 = [batch, 400, 400]
// 此时,绝对值差为90000字节,相对误差为0.8*0.8 = 0.64,满足条件1,不满足条件2
// 例如,Shape-1 = [batch, 1, 1] Shape-2 = [batch, 2, 2]
// 此时,绝对值差为3字节,相对误差为0.5*0.5 = 0.25,满足条件2,不满足条件1
// 上述两种情况都可以进行AutoPadding.
bool _auto_padding;
int _padding_value;
}; };
// BSF task handle // BSF task handle
...@@ -1058,7 +1527,7 @@ class TaskExecutor { ...@@ -1058,7 +1527,7 @@ class TaskExecutor {
typedef typename TaskT::OutVectorT OutVectorT; typedef typename TaskT::OutVectorT OutVectorT;
typedef std::vector<TaskT> TaskArrayT; typedef std::vector<TaskT> TaskArrayT;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
typedef std::vector<size_t> ShapeVector; typedef std::vector<int> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector; typedef std::vector<ShapeVector> VectorOfShapeVector;
TaskExecutor() TaskExecutor()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册