diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h index ca5fc0f8db743a25c8e0937a9589f7d651cced19..abdba1f2e5bc9710e19804fa3f4d4c0fbce50abd 100755 --- a/core/predictor/framework/bsf-inl.h +++ b/core/predictor/framework/bsf-inl.h @@ -70,7 +70,6 @@ bool Task::task_fetch_create(BatchTasks& batchTask) { // 每个lod型的fetchvar拷贝到对应的临时空间中 // 最后再计算临时空间的总量,合并fetchvar和lod fetchvar_batch = 0; - } else { // 普通fetchvar情况,此时该Task总的fetchvar_batch = // 输入的总的batch_size() @@ -86,14 +85,15 @@ bool Task::task_fetch_create(BatchTasks& batchTask) { // 此时 lod 为空。 tensor_out.lod = batchTask._batch_out[fetchvar_index].lod; // resize all batch memory at one time - + size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index; - - void* databuf_data = MempoolWrapper::instance().malloc(databuf_size,memoryPtr); + + void* databuf_data = + MempoolWrapper::instance().malloc(databuf_size, memoryPtr); paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); tensor_out.data = paddleBuf; - - //tensor_out.data.Resize(databuf_size); + + // tensor_out.data.Resize(databuf_size); } else { // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy @@ -213,7 +213,8 @@ void TaskExecutor::stop() { template TaskHandler TaskExecutor::schedule( const void* inVectorT_ptr, - void* outVectorT_ptr, MempoolRegion* memoryPtr) { // NOLINT + void* outVectorT_ptr, + MempoolRegion* memoryPtr) { // NOLINT TaskT* task = butil::get_object(); if (!task) { LOG(ERROR) << "Failed get TaskT from object pool"; @@ -240,7 +241,7 @@ TaskHandler TaskExecutor::schedule( task->write_fd = fds[1]; task->owner_tid = ::syscall(SYS_gettid); task->memoryPtr = memoryPtr; - //task->_bspec_key = _bspec_key; + // task->_bspec_key = _bspec_key; task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; if (!task->task_init()) { @@ -309,7 +310,7 @@ bool TaskExecutor::move_task_to_batch( } // combine_task_valid负责判断是否能够合并 - // 除最外层的shape外,内层shape应一致才能合并。 + // 除最外层的shape外,内层shape应一致或者允许Padding才能合并。 // 否则跳出循环,放入下一个batchTask中。 // 以此保证batch.append_task(task)中的task的内层shape相同。 @@ -317,12 +318,15 @@ bool TaskExecutor::move_task_to_batch( // 所以要求该feedvar必须相等,才能合并。 // 否则跳出循环,放入下一个batchTask中。 // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存. - // TODO(HexToString): 可以考虑后期支持AutoPadding. if (previous_task != nullptr) { - if (!task->combine_task_valid(previous_task)) { + if (task->combine_task_valid(previous_task) == 0) { break; } } + + if (batchTask.padding(task) != 2) { + break; + } size_t rem = batchTask.append_task(task); previous_task = task; if (task->rem <= 0) { @@ -407,10 +411,11 @@ int TaskExecutor::work(ThreadContext* context) { } template -bool TaskManager::schedule(const void* in, - void* out, MempoolRegion* memoryPtr) { // NOLINT +bool TaskManager::schedule( + const void* in, void* out, MempoolRegion* memoryPtr) { // NOLINT TaskHandler handler = - TaskExecutorVector::instance()[_model_index].schedule(in, out, memoryPtr); + TaskExecutorVector::instance()[_model_index].schedule( + in, out, memoryPtr); if (handler.valid()) { _task_owned = handler; diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h index ed8415ccef3105f6e776a3102cae2a0c568db6d1..dff2b4c0d65d7c5cb54361203908874d1c240ca2 100755 --- a/core/predictor/framework/bsf.h +++ b/core/predictor/framework/bsf.h @@ -17,10 +17,11 @@ #include #include #include +#include #include +#include #include #include - #ifdef BCLOUD #include "base/atomicops.h" #else @@ -38,6 +39,8 @@ namespace im { namespace bsf { static const size_t DEFAULT_BATCH_SIZE = 100; +static const size_t ABSOLUTE_ERROR = 1024; +static const float RELATIVE_ERROR = 0.5; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; typedef baidu::paddle_serving::predictor::MempoolRegion MempoolRegion; @@ -61,7 +64,7 @@ struct Task { typedef InItemT InType; typedef OutItemT OutType; typedef Task TaskT; - typedef std::vector ShapeVector; + typedef std::vector ShapeVector; typedef std::vector VectorOfShapeVector; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; @@ -124,7 +127,7 @@ struct Task { outLodTensorVector.clear(); } - void clear(){ + void clear() { read_fd = -1; write_fd = -1; owner_tid = -1; @@ -158,13 +161,18 @@ struct Task { return 1; } - bool combine_task_valid(Task* other_task) { - // TODO(HexToString): auto-padding - // 除最外层的shape外,内层shape应一致才能合并。 + int combine_task_valid(Task* other_task) { + // 除最外层的shape外,内层shape应一致或者允许Padding才能合并。 // 否则跳出循环,放入下一个batchTask中。 + // 当内层shape不一致时,此时先不判断是否Padding,在batchTask层判断,返回2。 // 以此保证batch.append_task(task)中的task的内层shape相同。 + + // return 0 表示Shape[0] = 1 + // 而!=batch的情况,两个Task中的值不同,此时不能合并。 + // return 1 表示Shape维度完全一致,直接合并即可。 + // return 2 表示Shape维度不完全一致,还需要进一步的判断,是否合并。 if (other_task->feedvar_shape_nobatch() != feedvar_shape_nobatch()) { - return false; + return 2; } // 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值 @@ -177,9 +185,9 @@ struct Task { std::memcmp((*inVectorT_ptr)[feedvar_index].data.data(), (*(other_task->inVectorT_ptr))[feedvar_index].data.data(), (*inVectorT_ptr)[feedvar_index].data.length()); - if (result != 0) return false; + if (result != 0) return 0; } - return true; + return 1; } size_t feedvar_batch_size(size_t feedvar_index) { @@ -282,12 +290,14 @@ struct Task { // which means error. if (feedvar_batch_size(feedvar_index) != 1 && total_feed_batch != 1) { return false; + } else if (feedvar_batch_size(feedvar_index) != 1 && + total_feed_batch == 1) { + for (int temp = 0; temp < feedvar_index; ++temp) { + set_feed_nobatch_index.insert(temp); + } + total_feed_batch = feedvar_batch_size(feedvar_index); } else { - // which means feedvar shape[0] = 1. - // shape[0] does not change with batch set_feed_nobatch_index.insert(feedvar_index); - total_feed_batch = - std::max(feedvar_batch_size(feedvar_index), total_feed_batch); } } // 将lod feedvar index加入到vector中。 @@ -324,6 +334,9 @@ struct Task { // feature_vector[0]是由shape0_index的范围值组成的vector,包含两个元素最小和最大值。 // feature_vector[1]是由lod组成的vector,包含指定batch的lod信息. // feature_vector[2]是由单个元素的组成的vector,元素值为1表示是nobatch的feedvar。 + // feature_vector[3]是2维lod组成的vector,包含指定batch的2-level lod。 + // 之所以把二维lod + // 加入到feature_vector[3],是为了兼容原有代码,尽可能小的改动。 // if 为 nobatch feedvar情况。 // else if 为带lod的feedvar情况。 @@ -335,6 +348,7 @@ struct Task { } else if (set_feed_lod_index.size() > 0 && set_feed_lod_index.find(feedvar_index) != set_feed_lod_index.end()) { + int lod_size = (*inVectorT_ptr)[feedvar_index].lod.size(); std::vector feed_lod_vector(end_batch - start_batch); for (size_t lod_index = start_batch + 1, vector_index = 0; lod_index < end_batch + 1; @@ -343,9 +357,35 @@ struct Task { (*inVectorT_ptr)[feedvar_index].lod[0][lod_index] - (*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; } - size_t shape0_start = (*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; - size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch]; - feature_vector = {{shape0_start, shape0_end}, feed_lod_vector}; + if (lod_size == 1) { + size_t shape0_start = + (*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; + size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch]; + feature_vector = {{shape0_start, shape0_end}, feed_lod_vector}; + } else if (lod_size == 2) { + size_t level2_lod_start_index = + (*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; + size_t level2_lod_end_index = + (*inVectorT_ptr)[feedvar_index].lod[0][end_batch]; + int level2_lod_size = level2_lod_end_index - level2_lod_start_index; + std::vector feed_2level_lod_vector(level2_lod_size); + for (size_t lod2_index = level2_lod_start_index + 1, vector_index = 0; + lod2_index < level2_lod_end_index + 1; + ++vector_index, ++lod2_index) { + feed_2level_lod_vector[vector_index] = + (*inVectorT_ptr)[feedvar_index].lod[1][lod2_index] - + (*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_start_index]; + } + size_t shape0_start = + (*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_start_index]; + size_t shape0_end = + (*inVectorT_ptr)[feedvar_index].lod[1][level2_lod_end_index]; + feature_vector = {{shape0_start, shape0_end}, + feed_lod_vector, + {}, + feed_2level_lod_vector}; + } + // feature_vector.push_back(feed_lod_vector); } else { feature_vector = {{start_batch, end_batch}}; @@ -360,24 +400,35 @@ struct Task { for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) { size_t data_length = 0; size_t lod_length = 0; + size_t lod2_length = 0; size_t total_shape0 = 0; + size_t once_lod0_length = 0; + int lod_size = 1; size_t feedvar_index = vector_fetch_lod_index[index]; // 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。 for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num; ++taskmeta_index) { + lod_size = outLodTensorVector[taskmeta_index][index].lod.size(); data_length += outLodTensorVector[taskmeta_index][index].data.length(); - lod_length += outLodTensorVector[taskmeta_index][index].lod[0].size(); + once_lod0_length = + outLodTensorVector[taskmeta_index][index].lod[0].size(); + lod_length += once_lod0_length; total_shape0 += outLodTensorVector[taskmeta_index][index].shape[0]; + if (lod_size == 2) { + lod2_length += outLodTensorVector[taskmeta_index][index] + .lod[0][once_lod0_length - 1]; + } } // 一次性扩容PaddleTensor中的data和lod paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index]; fetchVarTensor.shape[0] = total_shape0; - void* databuf_data = MempoolWrapper::instance().malloc(data_length,memoryPtr); + void* databuf_data = + MempoolWrapper::instance().malloc(data_length, memoryPtr); paddle::PaddleBuf paddleBuf(databuf_data, data_length); fetchVarTensor.data = paddleBuf; - - //fetchVarTensor.data.Resize(data_length); + + // fetchVarTensor.data.Resize(data_length); // task中的lod补0 if (fetchVarTensor.lod.size() <= 0) { fetchVarTensor.lod.push_back({0}); @@ -385,15 +436,25 @@ struct Task { fetchVarTensor.lod[0].push_back(0); } fetchVarTensor.lod[0].resize(lod_length + 1, 0); + if (lod_size == 2) { + if (fetchVarTensor.lod.size() <= 1) { + fetchVarTensor.lod.push_back({0}); + } else if (fetchVarTensor.lod[1].size() <= 0) { + fetchVarTensor.lod[1].push_back(0); + } + fetchVarTensor.lod[1].resize(lod2_length + 1, 0); + } // size_t data_length_offset = 0; size_t lod_length_offset = 0; + size_t lod2_length_offset = 0; size_t once_data_length = 0; size_t once_lod_length = 0; + size_t once_2lod_length = 0; for (size_t taskmeta_index = 0; taskmeta_index < total_taskmeta_num; ++taskmeta_index) { - //process data + // process data void* dst_ptr = fetchVarTensor.data.data() + data_length_offset; void* source_ptr = outLodTensorVector[taskmeta_index][index].data.data(); @@ -401,7 +462,7 @@ struct Task { outLodTensorVector[taskmeta_index][index].data.length(); memcpy(dst_ptr, source_ptr, once_data_length); data_length_offset += once_data_length; - //process lod + // process lod size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset]; once_lod_length = outLodTensorVector[taskmeta_index][index].lod[0].size(); @@ -412,7 +473,18 @@ struct Task { outLodTensorVector[taskmeta_index][index].lod[0][once_index]; lod_length_offset++; } - + if (lod_size == 2) { + size_t last_2lod_value = fetchVarTensor.lod[1][lod2_length_offset]; + once_2lod_length = + outLodTensorVector[taskmeta_index][index].lod[1].size(); + for (size_t once_index = 0; once_index < once_2lod_length; + ++once_index) { + fetchVarTensor.lod[1][lod2_length_offset + 1] = + last_2lod_value + + outLodTensorVector[taskmeta_index][index].lod[1][once_index]; + lod2_length_offset ++; + } + } } } } @@ -459,10 +531,16 @@ struct TaskMeta { feedvar_type.push_back(feature.size()); if (feature.size() == 1) { feed_lod_vector.push_back({}); + feed_2level_lod_vector.push_back({}); } else if (feature.size() == 2) { feed_lod_vector.push_back(feature[1]); - } else { + feed_2level_lod_vector.push_back({}); + } else if (feature.size() == 3) { feed_lod_vector.push_back({}); + feed_2level_lod_vector.push_back({}); + } else if (feature.size() == 4) { + feed_lod_vector.push_back(feature[1]); + feed_2level_lod_vector.push_back(feature[3]); } } } @@ -474,6 +552,7 @@ struct TaskMeta { size_t taskmeta_index; std::vector> feed_shape0_range; std::vector> feed_lod_vector; + std::vector> feed_2level_lod_vector; std::vector feedvar_type; }; @@ -488,7 +567,7 @@ class BatchTasks { typedef typename TaskT::InType InType; typedef typename TaskT::OutType OutType; typedef TaskMeta TaskMetaT; - typedef std::vector ShapeVector; + typedef std::vector ShapeVector; typedef std::vector VectorOfShapeVector; typedef std::vector LodVector; typedef std::vector PaddleTensorLod; @@ -496,11 +575,15 @@ class BatchTasks { explicit BatchTasks(size_t batch_size, bool overrun = false, - bool allow_split_request = true) + bool allow_split_request = true, + bool auto_padding = true, + int padding_value = 0) : _batch_size(batch_size), _rem_size(batch_size), _overrun(overrun), - _allow_split_request(allow_split_request) { + _allow_split_request(allow_split_request), + _auto_padding(auto_padding), + _padding_value(padding_value) { _batch_in.clear(); _batch_in_offset.clear(); _total_shape0_batch_in.clear(); @@ -530,6 +613,71 @@ class BatchTasks { vector_fetch_lod_index.clear(); } + // return 0 + // 表示feedvar数量都不一样,或者,每个feedvar的shape维度都不同,此时不能合并batch。 + // return 1 表示合并batch不划算。 + // return 2 表示合并batch划算。 + int padding(TaskT* task) { + const VectorOfShapeVector& task_vector_shape = + task->feedvar_shape_nobatch(); + int return_value = 2; + + // 当batchTask中为空时,第一次加入Task,此时则BatchTask中即为第一个Task中的Shape. + if (vector_of_max_shape.size() == 0) { + vector_of_max_shape = task_vector_shape; + return 2; + } + + if (vector_of_max_shape.size() != task_vector_shape.size()) { + return 0; + } + + // 当两个Shape完全相同时,无须更新,无须计算,无须Padding。 + if (vector_of_max_shape == task_vector_shape) { + return 2; + } + + std::vector multiplies_1(vector_of_max_shape.size()); + std::vector multiplies_2(vector_of_max_shape.size()); + std::vector temp_multiplies(vector_of_max_shape.size()); + VectorOfShapeVector temp_vector_max_shape(vector_of_max_shape.size()); + for (size_t i = 0; i < vector_of_max_shape.size(); ++i) { + if (vector_of_max_shape[i].size() != task_vector_shape[i].size()) + return 0; + for (size_t j = 0; j < vector_of_max_shape[i].size(); ++j) { + temp_vector_max_shape[i].push_back( + std::max(vector_of_max_shape[i][j], task_vector_shape[i][j])); + } + temp_multiplies[i] = std::accumulate(temp_vector_max_shape[i].begin(), + temp_vector_max_shape[i].end(), + 1, + std::multiplies()); + multiplies_1[i] = std::accumulate(vector_of_max_shape[i].begin(), + vector_of_max_shape[i].end(), + 1, + std::multiplies()); + multiplies_2[i] = std::accumulate(task_vector_shape[i].begin(), + task_vector_shape[i].end(), + 1, + std::multiplies()); + if ((labs(temp_multiplies[i] - multiplies_1[i]) <= ABSOLUTE_ERROR && + labs(temp_multiplies[i] - multiplies_2[i]) <= ABSOLUTE_ERROR) || + (temp_multiplies[i] / multiplies_1[i] >= RELATIVE_ERROR && + temp_multiplies[i] / multiplies_2[i] >= RELATIVE_ERROR)) { + continue; + } else { + return_value = 1; + } + } + + // 当合并batch时,需要更新BatchTask中的最大Shape + // 此时,整个BatchTask到最后合并多个Task时,需要Padding + if (return_value == 2) { + vector_of_max_shape = temp_vector_max_shape; + } + return return_value; + } + // synchronized operation // because Upper level callers of this function have already locked. // 能进到此函数的task都是同类task,在该函数之前已保证了这点。 @@ -545,8 +693,9 @@ class BatchTasks { TaskMetaT tm(task, start_index, add, task->taskmeta_num); task->rem -= add; _rem_size -= add; - if(task->taskmeta_num == 0){ - task->total_taskmeta_num = 1 + (task->rem + _batch_size - 1)/_batch_size; + if (task->taskmeta_num == 0) { + task->total_taskmeta_num = + 1 + (task->rem + _batch_size - 1) / _batch_size; } task->taskmeta_num += 1; _taskmeta_vector.push_back(tm); @@ -569,8 +718,15 @@ class BatchTasks { _total_shape0_batch_in[feedvar_index] += tm.feed_shape0_range[feedvar_index][1] - tm.feed_shape0_range[feedvar_index][0]; - } else if (tm.feedvar_type[feedvar_index] == 2) { - // lod类型的feedvar + } else if (tm.feedvar_type[feedvar_index] == 3) { + // tm.feedvar_type[feedvar_index] == 3 + // nobatch类型的feedvar. + // 此时不累加,且值应为1 + _total_shape0_batch_in[feedvar_index] = + tm.feed_shape0_range[feedvar_index][1] - + tm.feed_shape0_range[feedvar_index][0]; + } else { + // lod类型的feedvar 可能是1维lod 也可能是2维lod // 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。 _total_shape0_batch_in[feedvar_index] += tm.feed_shape0_range[feedvar_index][1] - @@ -589,13 +745,23 @@ class BatchTasks { _batch_in_lod[feedvar_index][0].push_back( last_lod_value + tm.feed_lod_vector[feedvar_index][lod_index]); } - } else { - // tm.feedvar_type[feedvar_index] == 3 - // nobatch类型的feedvar. - // 此时不累加,且值应为1 - _total_shape0_batch_in[feedvar_index] = - tm.feed_shape0_range[feedvar_index][1] - - tm.feed_shape0_range[feedvar_index][0]; + + // 2维lod 需要额外处理2维lod信息。 + if (tm.feedvar_type[feedvar_index] == 4) { + if (_batch_in_lod[feedvar_index].size() <= 1) { + _batch_in_lod[feedvar_index].push_back({0}); + } else if (_batch_in_lod[feedvar_index][1].size() <= 0) { + _batch_in_lod[feedvar_index][1].push_back(0); + } + size_t last_lod_value = _batch_in_lod[feedvar_index][1].back(); + for (size_t lod_index = 0; + lod_index < tm.feed_2level_lod_vector[feedvar_index].size(); + ++lod_index) { + _batch_in_lod[feedvar_index][1].push_back( + last_lod_value + + tm.feed_2level_lod_vector[feedvar_index][lod_index]); + } + } } } return _rem_size; @@ -631,19 +797,28 @@ class BatchTasks { const paddle::PaddleTensor& feedVarTensor = (*tm.task->inVectorT_ptr)[feedvar_index]; size_t feedvar_bytesize = tm.task->feedvar_bytesize(feedvar_index); + const ShapeVector& feedvar_max_shape_vector = + vector_of_max_shape[feedvar_index]; + size_t feedvar_max_num = + std::accumulate(feedvar_max_shape_vector.begin(), + feedvar_max_shape_vector.end(), + 1, + std::multiplies()); + size_t feedvar_element_bytesize = + tm.task->feedvar_element_bytesize(feedvar_index); + size_t feedvar_max_bytes = feedvar_element_bytesize * feedvar_max_num; if (ti == 0) { // Create the entire tensor at once - // for now, we assume that every task feedvar_bytesize is the same. - // which means we dont support auto embedding. - // but for different feedvar, it is different. paddle::PaddleTensor paddleTensor; paddleTensor.dtype = feedVarTensor.dtype; paddleTensor.name = feedVarTensor.name; paddleTensor.lod = _batch_in_lod[feedvar_index]; - paddleTensor.shape = feedVarTensor.shape; - paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index]; - size_t databuf_size = feedvar_bytesize * _total_shape0_batch_in[feedvar_index]; + paddleTensor.shape = feedvar_max_shape_vector; + paddleTensor.shape.insert(paddleTensor.shape.begin(), + _total_shape0_batch_in[feedvar_index]); + size_t databuf_size = + feedvar_max_bytes * _total_shape0_batch_in[feedvar_index]; void* databuf_data = MempoolWrapper::instance().malloc(databuf_size); paddle::PaddleBuf paddleBuf(databuf_data, databuf_size); paddleTensor.data = paddleBuf; @@ -656,12 +831,243 @@ class BatchTasks { feedVarTensor.data.data() + feedvar_bytesize * tm.feed_shape0_range[feedvar_index][0]; size_t length = - feedvar_bytesize * (tm.feed_shape0_range[feedvar_index][1] - - tm.feed_shape0_range[feedvar_index][0]); - memcpy(dst_ptr, source_ptr, length); + feedvar_max_bytes * (tm.feed_shape0_range[feedvar_index][1] - + tm.feed_shape0_range[feedvar_index][0]); + + // 不需要padding,连续内存,则直接memcpy + // 这里可以直接比较内存是否一样大 + // 在于前面Padding函数中,已经保证了vector_of_max_shape中各个维度都是最大值。 + // 当shape-1 = [8000,20000] shape-2 = [20000,8000]时 + // 此时,vector_of_max_shape中的shape = [20000,20000] + // 所以feedvar_max_bytes == feedvar_bytesize时,一定是shape完全相同。 + if (feedvar_max_bytes == feedvar_bytesize) { + memcpy(dst_ptr, source_ptr, length); + } else { + memset(dst_ptr, 0, length); + size_t old_index = 0; + size_t new_index = 0; + + switch (feedvar_max_shape_vector.size()) { + case 5: + for (int i_0 = tm.feed_shape0_range[feedvar_index][0]; + i_0 < tm.feed_shape0_range[feedvar_index][1]; + ++i_0) { + for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) { + for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) { + for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) { + for (int i_4 = 0; i_4 < feedVarTensor.shape[4]; ++i_4) { + for (int i_5 = 0; i_5 < feedVarTensor.shape[5]; ++i_5) { + old_index = i_0 * feedVarTensor.shape[1] * + feedVarTensor.shape[2] * + feedVarTensor.shape[3] * + feedVarTensor.shape[4] * + feedVarTensor.shape[5] + + i_1 * feedVarTensor.shape[2] * + feedVarTensor.shape[3] * + feedVarTensor.shape[4] * + feedVarTensor.shape[5] + + i_2 * feedVarTensor.shape[3] * + feedVarTensor.shape[4] * + feedVarTensor.shape[5] + + i_3 * feedVarTensor.shape[4] * + feedVarTensor.shape[5] + + i_4 * feedVarTensor.shape[5] + i_5; + new_index = i_0 * feedvar_max_shape_vector[0] * + feedvar_max_shape_vector[1] * + feedvar_max_shape_vector[2] * + feedvar_max_shape_vector[3] * + feedvar_max_shape_vector[4] + + i_1 * feedvar_max_shape_vector[1] * + feedvar_max_shape_vector[2] * + feedvar_max_shape_vector[3] * + feedvar_max_shape_vector[4] + + i_2 * feedvar_max_shape_vector[2] * + feedvar_max_shape_vector[3] * + feedvar_max_shape_vector[4] + + i_3 * feedvar_max_shape_vector[3] * + feedvar_max_shape_vector[4] + + i_4 * feedvar_max_shape_vector[4] + i_5; + if (feedVarTensor.dtype == + paddle::PaddleDType::INT64) { + *((int64_t*)dst_ptr + new_index) = + *((int64_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::FLOAT32) { + *((float*)dst_ptr + new_index) = + *((float*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::INT32) { + *((int32_t*)dst_ptr + new_index) = + *((int32_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::UINT8) { + *((char*)dst_ptr + new_index) = + *((char*)source_ptr + old_index); + } + } + } + } + } + } + } + break; + case 4: + for (int i_0 = tm.feed_shape0_range[feedvar_index][0]; + i_0 < tm.feed_shape0_range[feedvar_index][1]; + ++i_0) { + for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) { + for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) { + for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) { + for (int i_4 = 0; i_4 < feedVarTensor.shape[4]; ++i_4) { + old_index = i_0 * feedVarTensor.shape[1] * + feedVarTensor.shape[2] * + feedVarTensor.shape[3] * + feedVarTensor.shape[4] + + i_1 * feedVarTensor.shape[2] * + feedVarTensor.shape[3] * + feedVarTensor.shape[4] + + i_2 * feedVarTensor.shape[3] * + feedVarTensor.shape[4] + + i_3 * feedVarTensor.shape[4] + i_4; + new_index = i_0 * feedvar_max_shape_vector[0] * + feedvar_max_shape_vector[1] * + feedvar_max_shape_vector[2] * + feedvar_max_shape_vector[3] + + i_1 * feedvar_max_shape_vector[1] * + feedvar_max_shape_vector[2] * + feedvar_max_shape_vector[3] + + i_2 * feedvar_max_shape_vector[2] * + feedvar_max_shape_vector[3] + + i_3 * feedvar_max_shape_vector[3] + i_4; + if (feedVarTensor.dtype == paddle::PaddleDType::INT64) { + *((int64_t*)dst_ptr + new_index) = + *((int64_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::FLOAT32) { + *((float*)dst_ptr + new_index) = + *((float*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::INT32) { + *((int32_t*)dst_ptr + new_index) = + *((int32_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::UINT8) { + *((char*)dst_ptr + new_index) = + *((char*)source_ptr + old_index); + } + } + } + } + } + } + break; + case 3: + for (int i_0 = tm.feed_shape0_range[feedvar_index][0]; + i_0 < tm.feed_shape0_range[feedvar_index][1]; + ++i_0) { + for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) { + for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) { + for (int i_3 = 0; i_3 < feedVarTensor.shape[3]; ++i_3) { + old_index = i_0 * feedVarTensor.shape[1] * + feedVarTensor.shape[2] * + feedVarTensor.shape[3] + + i_1 * feedVarTensor.shape[2] * + feedVarTensor.shape[3] + + i_2 * feedVarTensor.shape[3] + i_3; + new_index = i_0 * feedvar_max_shape_vector[0] * + feedvar_max_shape_vector[1] * + feedvar_max_shape_vector[2] + + i_1 * feedvar_max_shape_vector[1] * + feedvar_max_shape_vector[2] + + i_2 * feedvar_max_shape_vector[2] + i_3; + if (feedVarTensor.dtype == paddle::PaddleDType::INT64) { + *((int64_t*)dst_ptr + new_index) = + *((int64_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::FLOAT32) { + *((float*)dst_ptr + new_index) = + *((float*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::INT32) { + *((int32_t*)dst_ptr + new_index) = + *((int32_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::UINT8) { + *((char*)dst_ptr + new_index) = + *((char*)source_ptr + old_index); + } + } + } + } + } + break; + case 2: + for (int i_0 = tm.feed_shape0_range[feedvar_index][0]; + i_0 < tm.feed_shape0_range[feedvar_index][1]; + ++i_0) { + for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) { + for (int i_2 = 0; i_2 < feedVarTensor.shape[2]; ++i_2) { + old_index = + i_0 * feedVarTensor.shape[1] * feedVarTensor.shape[2] + + i_1 * feedVarTensor.shape[2] + i_2; + new_index = i_0 * feedvar_max_shape_vector[0] * + feedvar_max_shape_vector[1] + + i_1 * feedvar_max_shape_vector[1] + i_2; + if (feedVarTensor.dtype == paddle::PaddleDType::INT64) { + *((int64_t*)dst_ptr + new_index) = + *((int64_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::FLOAT32) { + *((float*)dst_ptr + new_index) = + *((float*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::INT32) { + *((int32_t*)dst_ptr + new_index) = + *((int32_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::UINT8) { + *((char*)dst_ptr + new_index) = + *((char*)source_ptr + old_index); + } + } + } + } + break; + case 1: + for (int i_0 = tm.feed_shape0_range[feedvar_index][0]; + i_0 < tm.feed_shape0_range[feedvar_index][1]; + ++i_0) { + for (int i_1 = 0; i_1 < feedVarTensor.shape[1]; ++i_1) { + old_index = i_0 * feedVarTensor.shape[1] + i_1; + new_index = i_0 * feedvar_max_shape_vector[0] + i_1; + if (feedVarTensor.dtype == paddle::PaddleDType::INT64) { + *((int64_t*)dst_ptr + new_index) = + *((int64_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::FLOAT32) { + *((float*)dst_ptr + new_index) = + *((float*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::INT32) { + *((int32_t*)dst_ptr + new_index) = + *((int32_t*)source_ptr + old_index); + } else if (feedVarTensor.dtype == + paddle::PaddleDType::UINT8) { + *((char*)dst_ptr + new_index) = + *((char*)source_ptr + old_index); + } + } + } + break; + default: + break; + } + } + // nobatch类型的feedvar,不叠加. - if (tm.feedvar_type[feedvar_index] != 3) + if (tm.feedvar_type[feedvar_index] != 3) { _batch_in_offset[feedvar_index] += length; + } } } } @@ -753,25 +1159,23 @@ class BatchTasks { // 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉 // 后续希望在其他地方能够区分两者。 if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) { - if(fetchvar_batch_size(fetchvar_index) <= 0){ + if (fetchvar_batch_size(fetchvar_index) <= 0) { // which means error. return false; - }else if(fetchvar_batch_size(fetchvar_index) == 1){ + } else if (fetchvar_batch_size(fetchvar_index) == 1) { // which means fetchvar shape[0] = 1. // shape[0] does not change with batch set_fetch_nobatch_index.insert(fetchvar_index); - _total_fetch_batch = - std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); - }else if(_total_fetch_batch == 1){ - //这时意味着,之前的fetchvar shape[0] 全部都= 1 - //当前的fetchvar shape[0] > 1 - //所以,之前的都是no_batch - for(size_t temp_index = fetchvar_index-1; temp_index >= 0; --temp_index){ + } else if (_total_fetch_batch == 1) { + // 这时意味着,之前的fetchvar shape[0] 全部都= 1 + // 当前的fetchvar shape[0] > 1 + // 所以,之前的都是no_batch + for (size_t temp_index = 0; temp_index < fetchvar_index; + --temp_index) { set_fetch_nobatch_index.insert(fetchvar_index); } - _total_fetch_batch = - std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); - }else{ + _total_fetch_batch = fetchvar_batch_size(fetchvar_index); + } else { // which means error. return false; } @@ -846,6 +1250,14 @@ class BatchTasks { size_t shape0_index_end = _batch_out[fetchvar_index].lod[0][last_batch + add]; size_t shape0_length = shape0_index_end - shape0_index_start; + size_t lod_size = _batch_out[fetchvar_index].lod.size(); + if (lod_size == 2) { + shape0_index_start = + _batch_out[fetchvar_index].lod[1][shape0_index_start]; + shape0_index_end = + _batch_out[fetchvar_index].lod[1][shape0_index_end]; + shape0_length = shape0_index_end - shape0_index_start; + } // task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr // 此时,先拷入task->outLodTensorVector[taskmeta_index] // 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。 @@ -856,10 +1268,11 @@ class BatchTasks { fetchVarTensor.shape[0] = shape0_length; fetch_lod_index++; - void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); + void* databuf_data = + MempoolWrapper::instance().malloc(length, task->memoryPtr); paddle::PaddleBuf paddleBuf(databuf_data, length); fetchVarTensor.data = paddleBuf; - //fetchVarTensor.data.Resize(length); + // fetchVarTensor.data.Resize(length); void* dst_ptr = fetchVarTensor.data.data(); void* source_ptr = _batch_out[fetchvar_index].data.data() + shape0_index_start * fetchvar_bytesize_index; @@ -878,6 +1291,24 @@ class BatchTasks { (_batch_out[fetchvar_index].lod[0][lod_index] - last_lod_value); } + if (lod_size == 2) { + if (fetchVarTensor.lod.size() <= 1) { + fetchVarTensor.lod.push_back({}); + } + size_t last_lod0_value = + _batch_out[fetchvar_index].lod[0][last_batch]; + size_t end_lod0_value = + _batch_out[fetchvar_index].lod[0][last_batch + add]; + size_t lod1_size = end_lod0_value - last_lod0_value; + fetchVarTensor.lod[1].resize(lod1_size, 0); + for (size_t lod_index = last_lod0_value + 1, my_index = 0; + lod_index < end_lod0_value + 1; + ++lod_index, ++my_index) { + fetchVarTensor.lod[1][my_index] = + _batch_out[fetchvar_index].lod[1][lod_index] - + _batch_out[fetchvar_index].lod[1][last_lod0_value]; + } + } } else { // task未被拆分为多个taskmeta,故只有某个线程中的taskmeta会操作task不存在多线程竞争 // 此时resize后,直接写入task->outVectorT_ptr中即可。 @@ -885,12 +1316,13 @@ class BatchTasks { (*task->outVectorT_ptr)[fetchvar_index]; size_t length = fetchvar_bytesize_index * shape0_length; fetchVarTensor.shape[0] = shape0_length; - - void* databuf_data = MempoolWrapper::instance().malloc(length,task->memoryPtr); + + void* databuf_data = + MempoolWrapper::instance().malloc(length, task->memoryPtr); paddle::PaddleBuf paddleBuf(databuf_data, length); fetchVarTensor.data = paddleBuf; - - //fetchVarTensor.data.Resize(length); + + // fetchVarTensor.data.Resize(length); void* dst_ptr = fetchVarTensor.data.data(); void* source_ptr = _batch_out[fetchvar_index].data.data() + shape0_index_start * fetchvar_bytesize_index; @@ -918,6 +1350,27 @@ class BatchTasks { (_batch_out[fetchvar_index].lod[0][lod_index] - last_lod_value); } + + if (lod_size == 2) { + if (fetchVarTensor.lod.size() <= 1) { + fetchVarTensor.lod.push_back({}); + } else if (fetchVarTensor.lod[1].size() <= 0) { + fetchVarTensor.lod[1].push_back(0); + } + size_t last_lod0_value = + _batch_out[fetchvar_index].lod[0][last_batch]; + size_t end_lod0_value = + _batch_out[fetchvar_index].lod[0][last_batch + add]; + size_t lod1_size = end_lod0_value - last_lod0_value; + fetchVarTensor.lod[1].resize(lod1_size + 1, 0); + for (size_t lod_index = last_lod0_value + 1, my_index = 1; + lod_index < end_lod0_value + 1; + ++lod_index, ++my_index) { + fetchVarTensor.lod[1][my_index] = + _batch_out[fetchvar_index].lod[1][lod_index] - + _batch_out[fetchvar_index].lod[1][last_lod0_value]; + } + } } } else { // 普通fetchvar情况,此时该Task总的fetchvar_batch = @@ -979,10 +1432,26 @@ class BatchTasks { std::set set_fetch_nobatch_index; std::vector vector_fetch_lod_index; + // 这个BatchTask中目前,各个FeedVar中最大的Shape集合 + size_t _rem_size; size_t _batch_size; bool _overrun; bool _allow_split_request; + + // 这个BatchTask中目前,各个FeedVar中最大的Shape集合 + VectorOfShapeVector vector_of_max_shape; + // AutoPadding功能中,用这个与新的待合并Batch的TaskMeta来计算是否合并 + // 策略有两个,满足任何一个均可合并 + // 1、当相似度的乘积大于50%时 + // 2、当绝对差的字节数小于1024字节时 + // 例如,Shape-1 = [batch, 500, 500] Shape-2 = [batch, 400, 400] + // 此时,绝对值差为90000字节,相对误差为0.8*0.8 = 0.64,满足条件1,不满足条件2 + // 例如,Shape-1 = [batch, 1, 1] Shape-2 = [batch, 2, 2] + // 此时,绝对值差为3字节,相对误差为0.5*0.5 = 0.25,满足条件2,不满足条件1 + // 上述两种情况都可以进行AutoPadding. + bool _auto_padding; + int _padding_value; }; // BSF task handle @@ -1058,7 +1527,7 @@ class TaskExecutor { typedef typename TaskT::OutVectorT OutVectorT; typedef std::vector TaskArrayT; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; - typedef std::vector ShapeVector; + typedef std::vector ShapeVector; typedef std::vector VectorOfShapeVector; TaskExecutor()