未验证 提交 75f59f4e 编写于 作者: T Thomas Young 提交者: GitHub

Merge pull request #1713 from HexToString/add_2_lod_and_padding_new

Add 2 lod and padding new
......@@ -70,7 +70,6 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
// 每个lod型的fetchvar拷贝到对应的临时空间中
// 最后再计算临时空间的总量,合并fetchvar和lod
fetchvar_batch = 0;
} else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size()
......@@ -86,14 +85,15 @@ bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
// 此时 lod 为空。
tensor_out.lod = batchTask._batch_out[fetchvar_index].lod;
// resize all batch memory at one time
size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
void* databuf_data = MempoolWrapper::instance().malloc(databuf_size,memoryPtr);
void* databuf_data =
MempoolWrapper::instance().malloc(databuf_size, memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, databuf_size);
tensor_out.data = paddleBuf;
//tensor_out.data.Resize(databuf_size);
// tensor_out.data.Resize(databuf_size);
} else {
// 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
// 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy
......@@ -213,7 +213,8 @@ void TaskExecutor<TaskT>::stop() {
template <typename TaskT>
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
const void* inVectorT_ptr,
void* outVectorT_ptr, MempoolRegion* memoryPtr) { // NOLINT
void* outVectorT_ptr,
MempoolRegion* memoryPtr) { // NOLINT
TaskT* task = butil::get_object<TaskT>();
if (!task) {
LOG(ERROR) << "Failed get TaskT from object pool";
......@@ -240,7 +241,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
task->write_fd = fds[1];
task->owner_tid = ::syscall(SYS_gettid);
task->memoryPtr = memoryPtr;
//task->_bspec_key = _bspec_key;
// task->_bspec_key = _bspec_key;
task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
if (!task->task_init()) {
......@@ -309,7 +310,7 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
}
// combine_task_valid负责判断是否能够合并
// 除最外层的shape外,内层shape应一致才能合并。
// 除最外层的shape外,内层shape应一致或者允许Padding才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 以此保证batch.append_task(task)中的task的内层shape相同。
......@@ -317,12 +318,15 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
// 所以要求该feedvar必须相等,才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
// TODO(HexToString): 可以考虑后期支持AutoPadding.
if (previous_task != nullptr) {
if (!task->combine_task_valid(previous_task)) {
if (task->combine_task_valid(previous_task) == 0) {
break;
}
}
if (batchTask.padding(task) != 2) {
break;
}
size_t rem = batchTask.append_task(task);
previous_task = task;
if (task->rem <= 0) {
......@@ -407,10 +411,11 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
}
template <typename InItemT, typename OutItemT>
bool TaskManager<InItemT, OutItemT>::schedule(const void* in,
void* out, MempoolRegion* memoryPtr) { // NOLINT
bool TaskManager<InItemT, OutItemT>::schedule(
const void* in, void* out, MempoolRegion* memoryPtr) { // NOLINT
TaskHandler<TaskT> handler =
TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out, memoryPtr);
TaskExecutorVector<TaskT>::instance()[_model_index].schedule(
in, out, memoryPtr);
if (handler.valid()) {
_task_owned = handler;
......
......@@ -17,10 +17,11 @@
#include <errno.h>
#include <algorithm>
#include <cstring>
#include <functional>
#include <list>
#include <numeric>
#include <set>
#include <vector>
#ifdef BCLOUD
#include "base/atomicops.h"
#else
......@@ -38,6 +39,8 @@ namespace im {
namespace bsf {
static const size_t DEFAULT_BATCH_SIZE = 100;
static const size_t ABSOLUTE_ERROR = 1024;
static const float RELATIVE_ERROR = 0.5;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
typedef baidu::paddle_serving::predictor::MempoolRegion MempoolRegion;
......@@ -61,7 +64,7 @@ struct Task {
typedef InItemT InType;
typedef OutItemT OutType;
typedef Task<InItemT, OutItemT> TaskT;
typedef std::vector<size_t> ShapeVector;
typedef std::vector<int> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
......@@ -124,7 +127,7 @@ struct Task {
outLodTensorVector.clear();
}
void clear(){
void clear() {
read_fd = -1;
write_fd = -1;
owner_tid = -1;
......@@ -158,13 +161,18 @@ struct Task {
return 1;
}
bool combine_task_valid(Task* other_task) {
// TODO(HexToString): auto-padding
// 除最外层的shape外,内层shape应一致才能合并。
int combine_task_valid(Task* other_task) {
// 除最外层的shape外,内层shape应一致或者允许Padding才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 当内层shape不一致时,此时先不判断是否Padding,在batchTask层判断,返回2。
// 以此保证batch.append_task(task)中的task的内层shape相同。
// return 0 表示Shape[0] = 1
// 而!=batch的情况,两个Task中的值不同,此时不能合并。
// return 1 表示Shape维度完全一致,直接合并即可。
// return 2 表示Shape维度不完全一致,还需要进一步的判断,是否合并。
if (other_task->feedvar_shape_nobatch() != feedvar_shape_nobatch()) {
return false;
return 2;
}
// 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
......@@ -177,9 +185,9 @@ struct Task {
std::memcmp((*inVectorT_ptr)[feedvar_index].data.data(),
(*(other_task->inVectorT_ptr))[feedvar_index].data.data(),
(*inVectorT_ptr)[feedvar_index].data.length());
if (result != 0) return false;
if (result != 0) return 0;
}
return true;
return 1;
}
size_t feedvar_batch_size(size_t feedvar_index) {
......@@ -282,12 +290,14 @@ struct Task {
// which means error.
if (feedvar_batch_size(feedvar_index) != 1 && total_feed_batch != 1) {
return false;
} else if (feedvar_batch_size(feedvar_index) != 1 &&
total_feed_batch == 1) {
for (int temp = 0; temp < feedvar_index; ++temp) {
set_feed_nobatch_index.insert(temp);
}
total_feed_batch = feedvar_batch_size(feedvar_index);
} else {
// which means feedvar shape[0] = 1.
// shape[0] does not change with batch
set_feed_nobatch_index.insert(feedvar_index);
total_feed_batch =
std::max(feedvar_batch_size(feedvar_index), total_feed_batch);
}
}
// 将lod feedvar index加入到vector中。
......@@ -324,6 +334,9 @@ struct Task {
// feature_vector[0]是由shape0_index的范围值组成的vector,包含两个元素最小和最大值。
// feature_vector[1]是由lod组成的vector,包含指定batch的lod信息.
// feature_vector[2]是由单个元素的组成的vector,元素值为1表示是nobatch的feedvar。
// feature_vector[3]是2维lod组成的vector,包含指定batch的2-level lod。
// 之所以把二维lod
// 加入到feature_vector[3],是为了兼容原有代码,尽可能小的改动。
// if 为 nobatch feedvar情况。
// else if 为带lod的feedvar情况。
......@@ -335,6 +348,7 @@ struct Task {
} else if (set_feed_lod_index.size() > 0 &&
set_feed_lod_index.find(feedvar_index) !=
set_feed_lod_index.end()) {
int lod_size = (*inVectorT_ptr)[feedvar_index].lod.size();
std::vector<size_t> feed_lod_vector(end_batch - start_batch);
for (size_t lod_index = start_batch + 1, vector_index = 0;
lod_index < end_batch + 1;
......@@ -343,9 +357,35 @@ struct Task {
(*inVectorT_ptr)[feedvar_index].lod[0][lod_index] -
(*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
}
size_t shape0_start = (*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch];
feature_vector = {{shape0_start, shape0_end}, feed_lod_vector};
if (lod_size == 1) {
size_t shape0_start =
(*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch];
feature_vector = {{shape0_start, shape0_end}, feed_lod_vector};
} else if (lod_size == 2) {
size_t level2_lod_start_index =
(*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
size_t level2_lod_end_index =
(*inVectorT_ptr)[feedvar_index].lod[0][end_batch];
int level2_lod_size = level2_lod_end_index - level2_lod_start_index;
std::vector<size_t> feed_2level_lod_vector(level2_lod_size);
for (size_t lod2_index = level2_lod_start_index + 1, vector_index = 0;
lod2_index < level2_lod_end_index + 1;
++vector_index, ++lod2_index) {
feed_2level_lod_vector[vector_index] =
(*inVectorT_ptr)[feedvar_index].lod[1][lod2_index] -
(*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_start_index];
}
size_t shape0_start =
(*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_start_index];
size_t shape0_end =
(*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_end_index];
feature_vector = {{shape0_start, shape0_end},
feed_lod_vector,
{},
feed_2level_lod_vector};
}
// feature_vector.push_back(feed_lod_vector);
} else {
feature_vector = {{start_batch, end_batch}};
......@@ -360,24 +400,35 @@ struct Task {
for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) {
size_t data_length = 0;
size_t lod_length = 0;
size_t lod2_length = 0;
size_t total_shape0 = 0;
size_t once_lod0_length = 0;
int lod_size = 1;
size_t feedvar_index = vector_fetch_lod_index[index];
// 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。
for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num;
++taskmeta_index) {
lod_size = outLodTensorVector[taskmeta_index][index].lod.size();
data_length +=
outLodTensorVector[taskmeta_index][index].data.length();
lod_length += outLodTensorVector[taskmeta_index][index].lod[0].size();
once_lod0_length =
outLodTensorVector[taskmeta_index][index].lod[0].size();
lod_length += once_lod0_length;
total_shape0 += outLodTensorVector[taskmeta_index][index].shape[0];
if (lod_size == 2) {
lod2_length += outLodTensorVector[taskmeta_index][index]
.lod[0][once_lod0_length - 1];
}
}
// 一次性扩容PaddleTensor中的data和lod
paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index];
fetchVarTensor.shape[0] = total_shape0;
void* databuf_data = MempoolWrapper::instance().malloc(data_length,memoryPtr);
void* databuf_data =
MempoolWrapper::instance().malloc(data_length, memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, data_length);
fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(data_length);
// fetchVarTensor.data.Resize(data_length);
// task中的lod补0
if (fetchVarTensor.lod.size() <= 0) {
fetchVarTensor.lod.push_back({0});
......@@ -385,15 +436,25 @@ struct Task {
fetchVarTensor.lod[0].push_back(0);
}
fetchVarTensor.lod[0].resize(lod_length + 1, 0);
if (lod_size == 2) {
if (fetchVarTensor.lod.size() <= 1) {
fetchVarTensor.lod.push_back({0});
} else if (fetchVarTensor.lod[1].size() <= 0) {
fetchVarTensor.lod[1].push_back(0);
}
fetchVarTensor.lod[1].resize(lod2_length + 1, 0);
}
//
size_t data_length_offset = 0;
size_t lod_length_offset = 0;
size_t lod2_length_offset = 0;
size_t once_data_length = 0;
size_t once_lod_length = 0;
size_t once_2lod_length = 0;
for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num;
++taskmeta_index) {
//process data
// process data
void* dst_ptr = fetchVarTensor.data.data() + data_length_offset;
void* source_ptr =
outLodTensorVector[taskmeta_index][index].data.data();
......@@ -401,7 +462,7 @@ struct Task {
outLodTensorVector[taskmeta_index][index].data.length();
memcpy(dst_ptr, source_ptr, once_data_length);
data_length_offset += once_data_length;
//process lod
// process lod
size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset];
once_lod_length =
outLodTensorVector[taskmeta_index][index].lod[0].size();
......@@ -412,7 +473,18 @@ struct Task {
outLodTensorVector[taskmeta_index][index].lod[0][once_index];
lod_length_offset++;
}
if (lod_size == 2) {
size_t last_2lod_value = fetchVarTensor.lod[1][lod2_length_offset];
once_2lod_length =
outLodTensorVector[taskmeta_index][index].lod[1].size();
for (size_t once_index = 0; once_index < once_2lod_length;
++once_index) {
fetchVarTensor.lod[1][lod2_length_offset + 1] =
last_2lod_value +
outLodTensorVector[taskmeta_index][index].lod[1][once_index];
lod2_length_offset ++;
}
}
}
}
}
......@@ -459,10 +531,16 @@ struct TaskMeta {
feedvar_type.push_back(feature.size());
if (feature.size() == 1) {
feed_lod_vector.push_back({});
feed_2level_lod_vector.push_back({});
} else if (feature.size() == 2) {
feed_lod_vector.push_back(feature[1]);
} else {
feed_2level_lod_vector.push_back({});
} else if (feature.size() == 3) {
feed_lod_vector.push_back({});
feed_2level_lod_vector.push_back({});
} else if (feature.size() == 4) {
feed_lod_vector.push_back(feature[1]);
feed_2level_lod_vector.push_back(feature[3]);
}
}
}
......@@ -474,6 +552,7 @@ struct TaskMeta {
size_t taskmeta_index;
std::vector<std::vector<size_t>> feed_shape0_range;
std::vector<std::vector<size_t>> feed_lod_vector;
std::vector<std::vector<size_t>> feed_2level_lod_vector;
std::vector<size_t> feedvar_type;
};
......@@ -488,7 +567,7 @@ class BatchTasks {
typedef typename TaskT::InType InType;
typedef typename TaskT::OutType OutType;
typedef TaskMeta<TaskT> TaskMetaT;
typedef std::vector<size_t> ShapeVector;
typedef std::vector<int> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector;
typedef std::vector<size_t> LodVector;
typedef std::vector<LodVector> PaddleTensorLod;
......@@ -496,11 +575,15 @@ class BatchTasks {
explicit BatchTasks(size_t batch_size,
bool overrun = false,
bool allow_split_request = true)
bool allow_split_request = true,
bool auto_padding = true,
int padding_value = 0)
: _batch_size(batch_size),
_rem_size(batch_size),
_overrun(overrun),
_allow_split_request(allow_split_request) {
_allow_split_request(allow_split_request),
_auto_padding(auto_padding),
_padding_value(padding_value) {
_batch_in.clear();
_batch_in_offset.clear();
_total_shape0_batch_in.clear();
......@@ -530,6 +613,71 @@ class BatchTasks {
vector_fetch_lod_index.clear();
}
// return 0
// 表示feedvar数量都不一样,或者,每个feedvar的shape维度都不同,此时不能合并batch。
// return 1 表示合并batch不划算。
// return 2 表示合并batch划算。
int padding(TaskT* task) {
const VectorOfShapeVector& task_vector_shape =
task->feedvar_shape_nobatch();
int return_value = 2;
// 当batchTask中为空时,第一次加入Task,此时则BatchTask中即为第一个Task中的Shape.
if (vector_of_max_shape.size() == 0) {
vector_of_max_shape = task_vector_shape;
return 2;
}
if (vector_of_max_shape.size() != task_vector_shape.size()) {
return 0;
}
// 当两个Shape完全相同时,无须更新,无须计算,无须Padding。
if (vector_of_max_shape == task_vector_shape) {
return 2;
}
std::vector<size_t> multiplies_1(vector_of_max_shape.size());
std::vector<size_t> multiplies_2(vector_of_max_shape.size());
std::vector<size_t> temp_multiplies(vector_of_max_shape.size());
VectorOfShapeVector temp_vector_max_shape(vector_of_max_shape.size());
for (size_t i = 0; i < vector_of_max_shape.size(); ++i) {
if (vector_of_max_shape[i].size() != task_vector_shape[i].size())
return 0;
for (size_t j = 0; j < vector_of_max_shape[i].size(); ++j) {
temp_vector_max_shape[i].push_back(
std::max(vector_of_max_shape[i][j], task_vector_shape[i][j]));
}
temp_multiplies[i] = std::accumulate(temp_vector_max_shape[i].begin(),
temp_vector_max_shape[i].end(),
1,
std::multiplies<size_t>());
multiplies_1[i] = std::accumulate(vector_of_max_shape[i].begin(),
vector_of_max_shape[i].end(),
1,
std::multiplies<size_t>());
multiplies_2[i] = std::accumulate(task_vector_shape[i].begin(),
task_vector_shape[i].end(),
1,
std::multiplies<size_t>());
if ((labs(temp_multiplies[i] - multiplies_1[i]) <= ABSOLUTE_ERROR &&
labs(temp_multiplies[i] - multiplies_2[i]) <= ABSOLUTE_ERROR) ||
(temp_multiplies[i] / multiplies_1[i] >= RELATIVE_ERROR &&
temp_multiplies[i] / multiplies_2[i] >= RELATIVE_ERROR)) {
continue;
} else {
return_value = 1;
}
}
// 当合并batch时,需要更新BatchTask中的最大Shape
// 此时,整个BatchTask到最后合并多个Task时,需要Padding
if (return_value == 2) {
vector_of_max_shape = temp_vector_max_shape;
}
return return_value;
}
// synchronized operation
// because Upper level callers of this function have already locked.
// 能进到此函数的task都是同类task,在该函数之前已保证了这点。
......@@ -545,8 +693,9 @@ class BatchTasks {
TaskMetaT tm(task, start_index, add, task->taskmeta_num);
task->rem -= add;
_rem_size -= add;
if(task->taskmeta_num == 0){
task->total_taskmeta_num = 1 + (task->rem + _batch_size - 1)/_batch_size;
if (task->taskmeta_num == 0) {
task->total_taskmeta_num =
1 + (task->rem + _batch_size - 1) / _batch_size;
}
task->taskmeta_num += 1;
_taskmeta_vector.push_back(tm);
......@@ -569,8 +718,15 @@ class BatchTasks {
_total_shape0_batch_in[feedvar_index] +=
tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0];
} else if (tm.feedvar_type[feedvar_index] == 2) {
// lod类型的feedvar
} else if (tm.feedvar_type[feedvar_index] == 3) {
// tm.feedvar_type[feedvar_index] == 3
// nobatch类型的feedvar.
// 此时不累加,且值应为1
_total_shape0_batch_in[feedvar_index] =
tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0];
} else {
// lod类型的feedvar 可能是1维lod 也可能是2维lod
// 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。
_total_shape0_batch_in[feedvar_index] +=
tm.feed_shape0_range[feedvar_index][1] -
......@@ -589,13 +745,23 @@ class BatchTasks {
_batch_in_lod[feedvar_index][0].push_back(
last_lod_value + tm.feed_lod_vector[feedvar_index][lod_index]);
}
} else {
// tm.feedvar_type[feedvar_index] == 3
// nobatch类型的feedvar.
// 此时不累加,且值应为1
_total_shape0_batch_in[feedvar_index] =
tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0];
// 2维lod 需要额外处理2维lod信息。
if (tm.feedvar_type[feedvar_index] == 4) {
if (_batch_in_lod[feedvar_index].size() <= 1) {
_batch_in_lod[feedvar_index].push_back({0});
} else if (_batch_in_lod[feedvar_index][1].size() <= 0) {
_batch_in_lod[feedvar_index][1].push_back(0);
}
size_t last_lod_value = _batch_in_lod[feedvar_index][1].back();
for (size_t lod_index = 0;
lod_index < tm.feed_2level_lod_vector[feedvar_index].size();
++lod_index) {
_batch_in_lod[feedvar_index][1].push_back(
last_lod_value +
tm.feed_2level_lod_vector[feedvar_index][lod_index]);
}
}
}
}
return _rem_size;
......@@ -631,19 +797,28 @@ class BatchTasks {
const paddle::PaddleTensor& feedVarTensor =
(*tm.task->inVectorT_ptr)[feedvar_index];
size_t feedvar_bytesize = tm.task->feedvar_bytesize(feedvar_index);
const ShapeVector& feedvar_max_shape_vector =
vector_of_max_shape[feedvar_index];
size_t feedvar_max_num =
std::accumulate(feedvar_max_shape_vector.begin(),
feedvar_max_shape_vector.end(),
1,
std::multiplies<size_t>());
size_t feedvar_element_bytesize =
tm.task->feedvar_element_bytesize(feedvar_index);
size_t feedvar_max_bytes = feedvar_element_bytesize * feedvar_max_num;
if (ti == 0) {
// Create the entire tensor at once
// for now, we assume that every task feedvar_bytesize is the same.
// which means we dont support auto embedding.
// but for different feedvar, it is different.
paddle::PaddleTensor paddleTensor;
paddleTensor.dtype = feedVarTensor.dtype;
paddleTensor.name = feedVarTensor.name;
paddleTensor.lod = _batch_in_lod[feedvar_index];
paddleTensor.shape = feedVarTensor.shape;
paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index];
size_t databuf_size = feedvar_bytesize * _total_shape0_batch_in[feedvar_index];
paddleTensor.shape = feedvar_max_shape_vector;
paddleTensor.shape.insert(paddleTensor.shape.begin(),
_total_shape0_batch_in[feedvar_index]);
size_t databuf_size =
feedvar_max_bytes * _total_shape0_batch_in[feedvar_index];
void* databuf_data = MempoolWrapper::instance().malloc(databuf_size);
paddle::PaddleBuf paddleBuf(databuf_data, databuf_size);
paddleTensor.data = paddleBuf;
......@@ -656,12 +831,243 @@ class BatchTasks {
feedVarTensor.data.data() +
feedvar_bytesize * tm.feed_shape0_range[feedvar_index][0];
size_t length =
feedvar_bytesize * (tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0]);
memcpy(dst_ptr, source_ptr, length);
feedvar_max_bytes * (tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0]);
// 不需要padding,连续内存,则直接memcpy
// 这里可以直接比较内存是否一样大
// 在于前面Padding函数中,已经保证了vector_of_max_shape中各个维度都是最大值。
// 当shape-1 = [8000,20000] shape-2 = [20000,8000]时
// 此时,vector_of_max_shape中的shape = [20000,20000]
// 所以feedvar_max_bytes == feedvar_bytesize时,一定是shape完全相同。
if (feedvar_max_bytes == feedvar_bytesize) {
memcpy(dst_ptr, source_ptr, length);
} else {
memset(dst_ptr, 0, length);
size_t old_index = 0;
size_t new_index = 0;
switch (feedvar_max_shape_vector.size()) {
case 5:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) {
for (int i_4 = 0; i_4 < feedVarTensor.shape[4]; ++i_4) {
for (int i_5 = 0; i_5 < feedVarTensor.shape[5]; ++i_5) {
old_index = i_0 * feedVarTensor.shape[1] *
feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_1 * feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_2 * feedVarTensor.shape[3] *
feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_3 * feedVarTensor.shape[4] *
feedVarTensor.shape[5] +
i_4 * feedVarTensor.shape[5] + i_5;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_1 * feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_2 * feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_3 * feedvar_max_shape_vector[3] *
feedvar_max_shape_vector[4] +
i_4 * feedvar_max_shape_vector[4] + i_5;
if (feedVarTensor.dtype ==
paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
}
}
}
break;
case 4:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) {
for (int i_4 = 0; i_4 < feedVarTensor.shape[4]; ++i_4) {
old_index = i_0 * feedVarTensor.shape[1] *
feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] +
i_1 * feedVarTensor.shape[2] *
feedVarTensor.shape[3] *
feedVarTensor.shape[4] +
i_2 * feedVarTensor.shape[3] *
feedVarTensor.shape[4] +
i_3 * feedVarTensor.shape[4] + i_4;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] +
i_1 * feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] +
i_2 * feedvar_max_shape_vector[2] *
feedvar_max_shape_vector[3] +
i_3 * feedvar_max_shape_vector[3] + i_4;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
}
}
break;
case 3:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) {
old_index = i_0 * feedVarTensor.shape[1] *
feedVarTensor.shape[2] *
feedVarTensor.shape[3] +
i_1 * feedVarTensor.shape[2] *
feedVarTensor.shape[3] +
i_2 * feedVarTensor.shape[3] + i_3;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] +
i_1 * feedvar_max_shape_vector[1] *
feedvar_max_shape_vector[2] +
i_2 * feedvar_max_shape_vector[2] + i_3;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
}
break;
case 2:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) {
old_index =
i_0 * feedVarTensor.shape[1] * feedVarTensor.shape[2] +
i_1 * feedVarTensor.shape[2] + i_2;
new_index = i_0 * feedvar_max_shape_vector[0] *
feedvar_max_shape_vector[1] +
i_1 * feedvar_max_shape_vector[1] + i_2;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
}
break;
case 1:
for (int i_0 = tm.feed_shape0_range[feedvar_index][0];
i_0 < tm.feed_shape0_range[feedvar_index][1];
++i_0) {
for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) {
old_index = i_0 * feedVarTensor.shape[1] + i_1;
new_index = i_0 * feedvar_max_shape_vector[0] + i_1;
if (feedVarTensor.dtype == paddle::PaddleDType::INT64) {
*((int64_t*)dst_ptr + new_index) =
*((int64_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::FLOAT32) {
*((float*)dst_ptr + new_index) =
*((float*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::INT32) {
*((int32_t*)dst_ptr + new_index) =
*((int32_t*)source_ptr + old_index);
} else if (feedVarTensor.dtype ==
paddle::PaddleDType::UINT8) {
*((char*)dst_ptr + new_index) =
*((char*)source_ptr + old_index);
}
}
}
break;
default:
break;
}
}
// nobatch类型的feedvar,不叠加.
if (tm.feedvar_type[feedvar_index] != 3)
if (tm.feedvar_type[feedvar_index] != 3) {
_batch_in_offset[feedvar_index] += length;
}
}
}
}
......@@ -753,25 +1159,23 @@ class BatchTasks {
// 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉
// 后续希望在其他地方能够区分两者。
if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) {
if(fetchvar_batch_size(fetchvar_index) <= 0){
if (fetchvar_batch_size(fetchvar_index) <= 0) {
// which means error.
return false;
}else if(fetchvar_batch_size(fetchvar_index) == 1){
} else if (fetchvar_batch_size(fetchvar_index) == 1) {
// which means fetchvar shape[0] = 1.
// shape[0] does not change with batch
set_fetch_nobatch_index.insert(fetchvar_index);
_total_fetch_batch =
std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch);
}else if(_total_fetch_batch == 1){
//这时意味着,之前的fetchvar shape[0] 全部都= 1
//当前的fetchvar shape[0] > 1
//所以,之前的都是no_batch
for(size_t temp_index = fetchvar_index-1; temp_index >= 0; --temp_index){
} else if (_total_fetch_batch == 1) {
// 这时意味着,之前的fetchvar shape[0] 全部都= 1
// 当前的fetchvar shape[0] > 1
// 所以,之前的都是no_batch
for (size_t temp_index = 0; temp_index < fetchvar_index;
--temp_index) {
set_fetch_nobatch_index.insert(fetchvar_index);
}
_total_fetch_batch =
std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch);
}else{
_total_fetch_batch = fetchvar_batch_size(fetchvar_index);
} else {
// which means error.
return false;
}
......@@ -846,6 +1250,14 @@ class BatchTasks {
size_t shape0_index_end =
_batch_out[fetchvar_index].lod[0][last_batch + add];
size_t shape0_length = shape0_index_end - shape0_index_start;
size_t lod_size = _batch_out[fetchvar_index].lod.size();
if (lod_size == 2) {
shape0_index_start =
_batch_out[fetchvar_index].lod[1][shape0_index_start];
shape0_index_end =
_batch_out[fetchvar_index].lod[1][shape0_index_end];
shape0_length = shape0_index_end - shape0_index_start;
}
// task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr
// 此时,先拷入task->outLodTensorVector[taskmeta_index]
// 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。
......@@ -856,10 +1268,11 @@ class BatchTasks {
fetchVarTensor.shape[0] = shape0_length;
fetch_lod_index++;
void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr);
void* databuf_data =
MempoolWrapper::instance().malloc(length, task->memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, length);
fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(length);
// fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index;
......@@ -878,6 +1291,24 @@ class BatchTasks {
(_batch_out[fetchvar_index].lod[0][lod_index] -
last_lod_value);
}
if (lod_size == 2) {
if (fetchVarTensor.lod.size() <= 1) {
fetchVarTensor.lod.push_back({});
}
size_t last_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch];
size_t end_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch + add];
size_t lod1_size = end_lod0_value - last_lod0_value;
fetchVarTensor.lod[1].resize(lod1_size, 0);
for (size_t lod_index = last_lod0_value + 1, my_index = 0;
lod_index < end_lod0_value + 1;
++lod_index, ++my_index) {
fetchVarTensor.lod[1][my_index] =
_batch_out[fetchvar_index].lod[1][lod_index] -
_batch_out[fetchvar_index].lod[1][last_lod0_value];
}
}
} else {
// task未被拆分为多个taskmeta,故只有某个线程中的taskmeta会操作task不存在多线程竞争
// 此时resize后,直接写入task->outVectorT_ptr中即可。
......@@ -885,12 +1316,13 @@ class BatchTasks {
(*task->outVectorT_ptr)[fetchvar_index];
size_t length = fetchvar_bytesize_index * shape0_length;
fetchVarTensor.shape[0] = shape0_length;
void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr);
void* databuf_data =
MempoolWrapper::instance().malloc(length, task->memoryPtr);
paddle::PaddleBuf paddleBuf(databuf_data, length);
fetchVarTensor.data = paddleBuf;
//fetchVarTensor.data.Resize(length);
// fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index;
......@@ -918,6 +1350,27 @@ class BatchTasks {
(_batch_out[fetchvar_index].lod[0][lod_index] -
last_lod_value);
}
if (lod_size == 2) {
if (fetchVarTensor.lod.size() <= 1) {
fetchVarTensor.lod.push_back({});
} else if (fetchVarTensor.lod[1].size() <= 0) {
fetchVarTensor.lod[1].push_back(0);
}
size_t last_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch];
size_t end_lod0_value =
_batch_out[fetchvar_index].lod[0][last_batch + add];
size_t lod1_size = end_lod0_value - last_lod0_value;
fetchVarTensor.lod[1].resize(lod1_size + 1, 0);
for (size_t lod_index = last_lod0_value + 1, my_index = 1;
lod_index < end_lod0_value + 1;
++lod_index, ++my_index) {
fetchVarTensor.lod[1][my_index] =
_batch_out[fetchvar_index].lod[1][lod_index] -
_batch_out[fetchvar_index].lod[1][last_lod0_value];
}
}
}
} else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch =
......@@ -979,10 +1432,26 @@ class BatchTasks {
std::set<size_t> set_fetch_nobatch_index;
std::vector<size_t> vector_fetch_lod_index;
// 这个BatchTask中目前,各个FeedVar中最大的Shape集合
size_t _rem_size;
size_t _batch_size;
bool _overrun;
bool _allow_split_request;
// 这个BatchTask中目前,各个FeedVar中最大的Shape集合
VectorOfShapeVector vector_of_max_shape;
// AutoPadding功能中,用这个与新的待合并Batch的TaskMeta来计算是否合并
// 策略有两个,满足任何一个均可合并
// 1、当相似度的乘积大于50%时
// 2、当绝对差的字节数小于1024字节时
// 例如,Shape-1 = [batch, 500, 500] Shape-2 = [batch, 400, 400]
// 此时,绝对值差为90000字节,相对误差为0.8*0.8 = 0.64,满足条件1,不满足条件2
// 例如,Shape-1 = [batch, 1, 1] Shape-2 = [batch, 2, 2]
// 此时,绝对值差为3字节,相对误差为0.5*0.5 = 0.25,满足条件2,不满足条件1
// 上述两种情况都可以进行AutoPadding.
bool _auto_padding;
int _padding_value;
};
// BSF task handle
......@@ -1058,7 +1527,7 @@ class TaskExecutor {
typedef typename TaskT::OutVectorT OutVectorT;
typedef std::vector<TaskT> TaskArrayT;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
typedef std::vector<size_t> ShapeVector;
typedef std::vector<int> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector;
TaskExecutor()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册