提交 464387d9 编写于 作者: H HexToString

support lod in batchTask

上级 64102c04
...@@ -22,11 +22,12 @@ message EngineDesc { ...@@ -22,11 +22,12 @@ message EngineDesc {
required string reloadable_type = 4; required string reloadable_type = 4;
required string model_dir = 5; required string model_dir = 5;
repeated int32 gpu_ids = 6; repeated int32 gpu_ids = 6;
required int32 runtime_thread_num = 7; optional int32 runtime_thread_num = 7 [ default = 0 ];
required int32 batch_infer_size = 8; optional int32 batch_infer_size = 8 [ default = 0 ];
required int32 enable_batch_align = 9; optional bool enable_batch_align = 9 [ default = true ];
optional string version_file = 10; optional bool allow_split_request = 10 [ default = true ];
optional string version_type = 11; optional string version_file = 11;
optional string version_type = 12;
/* /*
* Sparse Parameter Service type. Valid types are: * Sparse Parameter Service type. Valid types are:
...@@ -39,17 +40,17 @@ message EngineDesc { ...@@ -39,17 +40,17 @@ message EngineDesc {
LOCAL = 1; LOCAL = 1;
REMOTE = 2; REMOTE = 2;
} }
optional SparseParamServiceType sparse_param_service_type = 12; optional SparseParamServiceType sparse_param_service_type = 13;
optional string sparse_param_service_table_name = 13; optional string sparse_param_service_table_name = 14;
optional bool enable_memory_optimization = 14; optional bool enable_memory_optimization = 15;
optional bool enable_ir_optimization = 15; optional bool enable_ir_optimization = 16;
optional bool use_trt = 16; optional bool use_trt = 17;
optional bool use_lite = 17; optional bool use_lite = 18;
optional bool use_xpu = 18; optional bool use_xpu = 19;
optional bool use_gpu = 19; optional bool use_gpu = 20;
optional bool combined_model = 20; optional bool combined_model = 21;
optional bool encrypted_model = 21; optional bool encrypted_model = 22;
optional bool gpu_multi_stream = 22; optional bool gpu_multi_stream = 23;
}; };
// model_toolkit conf // model_toolkit conf
......
...@@ -26,9 +26,90 @@ ...@@ -26,9 +26,90 @@
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/memory.h" #include "core/predictor/framework/memory.h"
// this file is included by bsf.h
namespace im { namespace im {
namespace bsf { namespace bsf {
template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& baskTask) {
// 双检锁,减少加锁的粒度
if (!fetch_init) {
if (taskmeta_num > 1) {
// 对于task被拆分为多个taskmeta,需要加锁。
AutoMutex lock(task_mut);
task_fetch_create(baskTask);
} else {
// 对于task只有1个taskmeta,不需要加锁。
task_fetch_create(baskTask);
}
}
return true;
}
template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& baskTask) {
if (!fetch_init) {
vector_fetch_lod_index = baskTask.vector_fetch_lod_index;
set_fetch_nobatch_index = baskTask.set_fetch_nobatch_index;
OutVectorT taskMetaOutLodTensor;
size_t fetchvar_num = baskTask._batch_out.size();
for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
++fetchvar_index) {
size_t fetchvar_bytesize_index =
baskTask.fetchvar_bytesize(fetchvar_index);
size_t fetchvar_batch = 0;
// 1. nobatch fetchvar情况
if (set_fetch_nobatch_index.size() > 0 &&
set_fetch_nobatch_index.find(fetchvar_index) !=
set_fetch_nobatch_index.end()) {
fetchvar_batch = 1;
} else if (vector_fetch_lod_index.size() > 0 &&
std::find(vector_fetch_lod_index.begin(),
vector_fetch_lod_index.end(),
fetchvar_index) != vector_fetch_lod_index.end()) {
// lod fetchvar情况,此时无法确定总的shape[0]
// 根据task中的task_num总数开辟task_num个临时空间
// 每个lod型的fetchvar拷贝到对应的临时空间中
// 最后再计算临时空间的总量,合并fetchvar和lod
fetchvar_batch = 0;
} else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size()
fetchvar_batch = batch_size();
}
paddle::PaddleTensor tensor_out;
tensor_out.name = baskTask._batch_out[fetchvar_index].name;
tensor_out.dtype =
paddle::PaddleDType(baskTask._batch_out[fetchvar_index].dtype);
tensor_out.shape = baskTask._batch_out[fetchvar_index].shape;
tensor_out.shape[0] = fetchvar_batch;
if (fetchvar_batch != 0) {
// 此时 lod 为空。
tensor_out.lod = baskTask._batch_out[fetchvar_index].lod;
// resize all batch memory at one time
size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
tensor_out.data.Resize(databuf_size);
} else {
// 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
// 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy
// 当task被分为多个taskMeta时,需要临时对象记录
// 收齐后再一起合并
if (taskmeta_num > 1) {
taskMetaOutLodTensor.push_back(tensor_out);
}
}
outVectorT_ptr->push_back(tensor_out);
}
// outLodTensorVector实际是一个双层vector
// shape为taskmeta_num * vector_fetch_lod_index.size();
outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor);
fetch_init = true;
}
return true;
}
template <typename TaskT> template <typename TaskT>
void* TaskExecutor<TaskT>::thread_entry(void* args) { void* TaskExecutor<TaskT>::thread_entry(void* args) {
ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args); ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
...@@ -156,9 +237,11 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( ...@@ -156,9 +237,11 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
if (!task->task_init()) {
LOG(ERROR) << "task->init() failed";
}
task->rem = task->batch_size(); task->rem = task->batch_size();
task->index.store(0, butil::memory_order_relaxed); task->index.store(0, butil::memory_order_relaxed);
AutoMutex lock(_mut); AutoMutex lock(_mut);
_task_queue.push_back(task); _task_queue.push_back(task);
THREAD_COND_SIGNAL(&_cond); THREAD_COND_SIGNAL(&_cond);
...@@ -168,11 +251,12 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( ...@@ -168,11 +251,12 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
// this function is accessed by multi thread. // this function is accessed by multi thread.
// so AutoMutex at first. // so AutoMutex at first.
// so batch.append_task is thread safe. // so batchTask.append_task is thread safe.
// you dont need to add extra lock in append_task() // you dont need to add extra lock in append_task()
// task is already init.
template <typename TaskT> template <typename TaskT>
bool TaskExecutor<TaskT>::move_task_to_batch( bool TaskExecutor<TaskT>::move_task_to_batch(
BatchTasks<TaskT>& batch) { // NOLINT BatchTasks<TaskT>& batchTask) { // NOLINT
AutoMutex lock(_mut); AutoMutex lock(_mut);
while (_task_queue.empty()) { while (_task_queue.empty()) {
THREAD_COND_WAIT(&_cond, &_mut); THREAD_COND_WAIT(&_cond, &_mut);
...@@ -183,9 +267,45 @@ bool TaskExecutor<TaskT>::move_task_to_batch( ...@@ -183,9 +267,45 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
return false; return false;
} }
TaskT* previous_task = nullptr;
while (!_task_queue.empty()) { while (!_task_queue.empty()) {
TaskT* task = _task_queue.front(); TaskT* task = _task_queue.front();
size_t rem = batch.append_task(task);
// 由于无法确定fetchVar是否为lod,故单个task不能拆分放到多个batchTask中,否则后续组装很难完成。
// 所以,task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
// 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
// 所以,task中想要创建taskmeta_num* lod的fetchvar num* PaddleBuf(以及Lod)
// 只能在notify_task中,用taskmeta->task去创建,需要在task中加锁。
// 原子操作不可行,因为多个线程必须等待创建好上述的PaddleBuf后才能继续。
// 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。
// _batch_align为false时,即使空间小,也会全放入一个完整的Task,允许临时超限。
// _allow_split_request == false,则每个task不会被拆分。
// 默认为true,允许拆分task从而使得空间利用率最大。
if (!batchTask.get_allow_split_request()) {
if (task->batch_size() > batchTask.get_rem_size() &&
batchTask.get_batch_align()) {
break;
}
}
// combine_task_valid负责判断是否能够合并
// 除最外层的shape外,内层shape应一致才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 以此保证batch.append_task(task)中的task的内层shape相同。
// 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
// 所以要求该feedvar必须相等,才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
if (previous_task != nullptr) {
if (!task->combine_task_valid(previous_task)) {
break;
}
}
size_t rem = batchTask.append_task(task);
previous_task = task;
if (task->rem <= 0) { if (task->rem <= 0) {
_task_queue.pop_front(); _task_queue.pop_front();
} }
...@@ -201,11 +321,12 @@ bool TaskExecutor<TaskT>::move_task_to_batch( ...@@ -201,11 +321,12 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
// TaskT is from the SingleTon TaskExecutor`s _task_queue // TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to // although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue. // the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
// put TaskMeta to the local variable BatchTasks<TaskT> batch. // put TaskMeta to the local variable BatchTasks<TaskT> batchTask.
// batch.merge_tasks() and batch.notify_tasks() has no lock. // batchTask.merge_tasks() and batchTask.notify_tasks() has no lock.
// BatchTasks<TaskT> batch itself is a local variable, it`s thread safe. // BatchTasks<TaskT> batchTask itself is a local variable, it`s thread safe.
// If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta // If batchTask.merge_tasks() and batchTask.notify_tasks() do something to
// TaskMeta
// you need to pay attention to that. // you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local // Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable) // variable)
...@@ -242,11 +363,24 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) { ...@@ -242,11 +363,24 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
return -1; return -1;
} }
BatchTasks<TaskT> batch(_batch_size, _batch_align); // move_task_to_batch() take the original task from the `_task_queue`
if (move_task_to_batch(batch)) { // put the original task into its own Vector<taskmeta>
batch.merge_tasks(); // the capacity of its own Vector<taskmeta> is decided by `_batch_size` or
_fn(&batch.in(), &batch.out()); // `_batch_align`
batch.notify_tasks();
// merge_tasks() move the imput-data into `_batch_in` from its own
// Vector<taskmeta>.
// because the predictor`s input is the `_batch_in`
// notify_tasks() move the output-data into every single taskmeta from
// `_batch_out`.
// because the predictor`s output is the `_batch_out`
BatchTasks<TaskT> batchTask(
_batch_size, _batch_align, _allow_split_request);
if (move_task_to_batch(batchTask)) {
batchTask.merge_tasks();
_fn(&batchTask.in(), &batchTask.out());
batchTask.notify_tasks();
} }
} }
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
#include <errno.h> #include <errno.h>
#include <algorithm> #include <algorithm>
#include <cstring>
#include <list> #include <list>
#include <set>
#include <vector> #include <vector>
#ifdef BCLOUD #ifdef BCLOUD
...@@ -46,7 +48,8 @@ static const size_t DEFAULT_BATCH_SIZE = 100; ...@@ -46,7 +48,8 @@ static const size_t DEFAULT_BATCH_SIZE = 100;
// `rem` don`t need to be atomic, cause the operation `put` is synchronous. // `rem` don`t need to be atomic, cause the operation `put` is synchronous.
// actually, the reason is that lock have been added outside the operation // actually, the reason is that lock have been added outside the operation
// `put`. // `put`.
template <typename TaskT>
class BatchTasks;
// size_t `index` records how many batch have been processing completed. // size_t `index` records how many batch have been processing completed.
// `index` need to be atomic, cause the operation 'notify' is asynchronous. // `index` need to be atomic, cause the operation 'notify' is asynchronous.
template <typename InItemT, typename OutItemT> template <typename InItemT, typename OutItemT>
...@@ -56,7 +59,7 @@ struct Task { ...@@ -56,7 +59,7 @@ struct Task {
typedef InItemT InType; typedef InItemT InType;
typedef OutItemT OutType; typedef OutItemT OutType;
typedef Task<InItemT, OutItemT> TaskT; typedef Task<InItemT, OutItemT> TaskT;
typedef std::vector<int> ShapeVector; typedef std::vector<size_t> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector; typedef std::vector<ShapeVector> VectorOfShapeVector;
int read_fd; int read_fd;
...@@ -65,7 +68,17 @@ struct Task { ...@@ -65,7 +68,17 @@ struct Task {
const InVectorT* inVectorT_ptr; const InVectorT* inVectorT_ptr;
OutVectorT* outVectorT_ptr; OutVectorT* outVectorT_ptr;
size_t rem; size_t rem;
size_t total_feed_batch;
std::set<size_t> set_feed_lod_index;
std::set<size_t> set_feed_nobatch_index;
std::vector<size_t> vector_fetch_lod_index;
std::set<size_t> set_fetch_nobatch_index;
butil::atomic<size_t> index; butil::atomic<size_t> index;
size_t taskmeta_num;
THREAD_MUTEX_T task_mut;
bool fetch_init;
// taskmeta_num * set_feed_lod_index.size()
std::vector<OutVectorT> outLodTensorVector;
Task() { Task() {
read_fd = -1; read_fd = -1;
...@@ -73,11 +86,24 @@ struct Task { ...@@ -73,11 +86,24 @@ struct Task {
owner_tid = -1; owner_tid = -1;
inVectorT_ptr = NULL; inVectorT_ptr = NULL;
outVectorT_ptr = NULL; outVectorT_ptr = NULL;
set_feed_lod_index.clear();
set_feed_nobatch_index.clear();
vector_fetch_lod_index.clear();
set_fetch_nobatch_index.clear();
rem = -1; rem = -1;
total_feed_batch = 0;
taskmeta_num = 0;
index.store(0, butil::memory_order_relaxed); index.store(0, butil::memory_order_relaxed);
THREAD_MUTEX_INIT(&task_mut, NULL);
fetch_init = false;
outLodTensorVector.clear();
}
~Task() {
THREAD_MUTEX_DESTROY(&task_mut);
outLodTensorVector.clear();
} }
bool check_feedvar_valid(int feedvar_index) { bool check_feedvar_valid(size_t feedvar_index) {
if (feedvar_index < 0 || inVectorT_ptr->size() <= feedvar_index) { if (feedvar_index < 0 || inVectorT_ptr->size() <= feedvar_index) {
LOG(ERROR) << "feedvar doesnt exsit or feedvar_index error"; LOG(ERROR) << "feedvar doesnt exsit or feedvar_index error";
return 0; return 0;
...@@ -91,20 +117,47 @@ struct Task { ...@@ -91,20 +117,47 @@ struct Task {
return 1; return 1;
} }
// Now, it simply assume that the first dimension of data is batch. bool combine_task_valid(Task* other_task) {
// so the batch is PaddleTensor.shape[0] // TODO(HexToString): auto-padding
// 除最外层的shape外,内层shape应一致才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 以此保证batch.append_task(task)中的task的内层shape相同。
if (other_task->feedvar_shape_nobatch() != feedvar_shape_nobatch()) {
return false;
}
// 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
// 所以要求该feedvar必须相等,才能合并。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
for (size_t feedvar_index = 0;
feedvar_index < set_feed_nobatch_index.size();
++feedvar_index) {
int result =
std::memcmp((*inVectorT_ptr)[feedvar_index].data.data(),
(*(other_task->inVectorT_ptr))[feedvar_index].data.data(),
(*inVectorT_ptr)[feedvar_index].data.length());
if (result != 0) return false;
}
return true;
}
// If batch information is added into feedvar.prototxt. size_t feedvar_batch_size(size_t feedvar_index) {
// we can get the information from the feedvar.prototxt instead of assume.
size_t feedvar_batch_size(int feedvar_index) {
if (!check_feedvar_valid(feedvar_index)) { if (!check_feedvar_valid(feedvar_index)) {
return 0; return 0;
} }
// if lod, 'lod[0].size()-1' is batch.
// for PaddleTensor lod is vector<vector<size_t>>, so lod[0] is real lod.
// for example, lod = [0,3,4,6], shape = [6,340,340], batch is 3 actually.
// for lod, the batch < shape[0].
if ((*inVectorT_ptr)[feedvar_index].lod.size() > 0 &&
(*inVectorT_ptr)[feedvar_index].lod[0].size() > 0) {
return (*inVectorT_ptr)[feedvar_index].lod[0].size() - 1;
}
// if not lod, the first dimension of data `PaddleTensor.shape[0]` is batch.
return (*inVectorT_ptr)[feedvar_index].shape[0]; return (*inVectorT_ptr)[feedvar_index].shape[0];
} }
size_t feedvar_element_bytesize(int feedvar_index) { size_t feedvar_element_bytesize(size_t feedvar_index) {
if (!check_feedvar_valid(feedvar_index)) { if (!check_feedvar_valid(feedvar_index)) {
return 0; return 0;
} }
...@@ -126,7 +179,7 @@ struct Task { ...@@ -126,7 +179,7 @@ struct Task {
// Now, the implementation of this function is based on assumption // Now, the implementation of this function is based on assumption
// that shape [0] = batch_size. // that shape [0] = batch_size.
size_t feedvar_element_num(int feedvar_index) { size_t feedvar_element_num(size_t feedvar_index) {
if (!check_feedvar_valid(feedvar_index)) { if (!check_feedvar_valid(feedvar_index)) {
return 0; return 0;
} }
...@@ -138,18 +191,18 @@ struct Task { ...@@ -138,18 +191,18 @@ struct Task {
return 1; return 1;
} }
// start from shape[1], cause shape[0] = batch_size. // start from shape[1], cause shape[0] = batch_size.
for (int i = 1; i < (*inVectorT_ptr)[feedvar_index].shape.size(); ++i) { for (size_t i = 1; i < (*inVectorT_ptr)[feedvar_index].shape.size(); ++i) {
element_num *= (*inVectorT_ptr)[feedvar_index].shape[i]; element_num *= (*inVectorT_ptr)[feedvar_index].shape[i];
} }
return element_num; return element_num;
} }
size_t feedvar_bytesize(int feedvar_index) { size_t feedvar_bytesize(size_t feedvar_index) {
return feedvar_element_num(feedvar_index) * return feedvar_element_num(feedvar_index) *
feedvar_element_bytesize(feedvar_index); feedvar_element_bytesize(feedvar_index);
} }
ShapeVector feedvar_shape_nobatch(int feedvar_index) { ShapeVector feedvar_shape_nobatch(size_t feedvar_index) {
if (!check_feedvar_valid(feedvar_index)) { if (!check_feedvar_valid(feedvar_index)) {
return ShapeVector(); return ShapeVector();
} }
...@@ -158,40 +211,165 @@ struct Task { ...@@ -158,40 +211,165 @@ struct Task {
} }
VectorOfShapeVector feedvar_shape_nobatch() { VectorOfShapeVector feedvar_shape_nobatch() {
VectorOfShapeVector vector_of_feedvar_shape_nobatch(inVectorT_ptr->size()); VectorOfShapeVector vector_of_feedvar_shape_nobatch;
for (int index = 0; index < inVectorT_ptr->size(); ++index) { for (size_t feedvar_index = 0; feedvar_index < inVectorT_ptr->size();
vector_of_feedvar_shape_nobatch.push_back(feedvar_shape_nobatch(index)); ++feedvar_index) {
vector_of_feedvar_shape_nobatch.push_back(
feedvar_shape_nobatch(feedvar_index));
} }
return vector_of_feedvar_shape_nobatch; return vector_of_feedvar_shape_nobatch;
} }
// At present, it is considered that the batch of all feedvar is consistent. // For each feedvar, batch should be 1 or batch_size.
// so for each feedvar, PaddleTensor.shape[0] should be the same. // if feedvar-1: batch_size = 1 (always not batch).
bool check_batch_align() { // feedvar-2: batch_size = n, batch = n.
int batch_size_align = feedvar_batch_size(0); // this function is not thread safe. only called when task is creating.
for (int feedvar_index = 0; feedvar_index < inVectorT_ptr->size(); bool task_init() {
total_feed_batch = feedvar_batch_size(0);
// which means error.
if (total_feed_batch <= 0) return false;
for (size_t feedvar_index = 0; feedvar_index < inVectorT_ptr->size();
++feedvar_index) { ++feedvar_index) {
if (feedvar_batch_size(feedvar_index) != batch_size_align) { // TODO(HexToString): Distinguish between nobatch and batch =
return 0; // 1(By:HexToString)
// 当数据中feedvar-1: 带batch,且batch =1,shape[0] = 1
// feedvar-2:不带batch,由于不带batch导致shape[0] =1
// 此时,无法分辨是否是天然nobatch,此时set_feed_nobatch_index会漏掉
// 后续希望在其他地方能够区分两者。
if (feedvar_batch_size(feedvar_index) != total_feed_batch) {
// which means error.
if (feedvar_batch_size(feedvar_index) != 1 && total_feed_batch != 1) {
return false;
} else {
// which means feedvar shape[0] = 1.
// shape[0] does not change with batch
set_feed_nobatch_index.insert(feedvar_index);
total_feed_batch =
std::max(feedvar_batch_size(feedvar_index), total_feed_batch);
}
}
// 将lod feedvar index加入到vector中。
if ((*inVectorT_ptr)[feedvar_index].lod.size() > 0 &&
(*inVectorT_ptr)[feedvar_index].lod[0].size() > 0) {
set_feed_lod_index.insert(feedvar_index);
} }
} }
/* return true;
for(int fetchvar_index = 0; fetchvar_index < outVectorT_ptr->size();
++fetchvar_index) {
if(fetchvar_batch_size(fetchvar_index) != batch_size_align) {
return 0;
} }
size_t batch_size() { return total_feed_batch; }
// start_batch range is 0~batch_size, end_batch range is 1~batch_size
// start_batch should not be included, end_batch > start_batch
// return is (start_batch, end_batch] = [start_batch+1,end_batch]
// for not lod, shape0_index = [(start_batch+1)-1,end_batch-1] =
// [start_batch,end_batch-1] = [start_batch,end_batch)
// for lod, shape0_index = [lod[start_batch],lod[end_batch]-1] =
// [lod[start_batch],lod[end_batch])
// for nobatch, shape0_index = [0,1)
// 对于调用者,拿到shape0_index后,for(size_t myindex =shape0_index[0];
// myindex <shape0_index[1];myindex++)即可.
// 原始lod= [0,3,4,6] 取的batch为(start_batch = 1,end_batch =
// 3],即取batch=2,3.
// 此时lod=[3,4,6],处理后得到[1,3]
// 这样处理后,合并lod比较方便,直接加上上一个lod的结尾的值即可。
std::vector<std::vector<size_t>> get_feature_by_batch(size_t feedvar_index,
size_t start_batch,
size_t end_batch) {
std::vector<std::vector<size_t>> feature_vector;
// feature_vector是双层vector,这么设计是由于一个遍历即可处理所有的特征。
// feature_vector[0]是由shape0_index的范围值组成的vector,包含两个元素最小和最大值。
// feature_vector[1]是由lod组成的vector,包含指定batch的lod信息.
// feature_vector[2]是由单个元素的组成的vector,元素值为1表示是nobatch的feedvar。
// if 为 nobatch feedvar情况。
// else if 为带lod的feedvar情况。
// else为不带lod 普通feedvar情况。
if (set_feed_nobatch_index.size() > 0 &&
set_feed_nobatch_index.find(feedvar_index) !=
set_feed_nobatch_index.end()) {
feature_vector = {{0, 1}, {}, {1}};
} else if (set_feed_lod_index.size() > 0 &&
set_feed_lod_index.find(feedvar_index) !=
set_feed_lod_index.end()) {
std::vector<size_t> feed_lod_vector(end_batch - start_batch);
for (size_t lod_index = start_batch + 1, vector_index = 0;
lod_index < end_batch + 1;
++lod_index, ++vector_index) {
feed_lod_vector[vector_index] =
(*inVectorT_ptr)[feedvar_index].lod[0][lod_index] -
(*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
}
size_t shape0_start = (*inVectorT_ptr)[feedvar_index].lod[0][start_batch];
size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch];
feature_vector = {{shape0_start, shape0_end}, feed_lod_vector};
// feature_vector.push_back(feed_lod_vector);
} else {
feature_vector = {{start_batch, end_batch}};
}
return feature_vector;
}
bool combine_taskmeta() {
// 只有含有lod类型的fetch输出,且task被拆分为多个taskmeta的情况
// 才需要将数据从outLodTensorVector搬运到outVectorT_ptr
if (vector_fetch_lod_index.size() > 0 && taskmeta_num > 1) {
for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) {
size_t data_length = 0;
size_t lod_length = 0;
size_t feedvar_index = vector_fetch_lod_index[index];
// 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。
for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num;
++taskmeta_num) {
data_length +=
outLodTensorVector[taskmeta_index][index].data.length();
lod_length += outLodTensorVector[taskmeta_index][index].lod[0].size();
}
// 一次性扩容PaddleTensor中的data和lod
paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index];
fetchVarTensor.data.Resize(data_length);
// task中的lod补0
if (fetchVarTensor.lod.size() <= 0) {
fetchVarTensor.lod.push_back({0});
} else if (fetchVarTensor.lod[0].size() <= 0) {
fetchVarTensor.lod[0].push_back(0);
}
fetchVarTensor.lod[0].resize(lod_length + 1, 0);
//
size_t data_length_offset = 0;
size_t lod_length_offset = 0;
size_t once_data_length = 0;
size_t once_lod_length = 0;
size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset];
for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num;
++taskmeta_num) {
void* dst_ptr = fetchVarTensor.data.data() + data_length_offset;
void* source_ptr =
outLodTensorVector[taskmeta_index][index].data.data();
once_data_length =
outLodTensorVector[taskmeta_index][index].data.length();
memcpy(dst_ptr, source_ptr, once_data_length);
once_lod_length =
outLodTensorVector[taskmeta_index][index].lod[0].size();
for (size_t once_index = 0; once_index < once_lod_length;
++once_index) {
fetchVarTensor.lod[0][lod_length_offset + 1] =
last_lod_value +
outLodTensorVector[taskmeta_index][index].lod[0][once_index];
} }
*/ data_length_offset += once_data_length;
return 1; lod_length_offset += once_lod_length;
} }
size_t batch_size() {
if (check_batch_align()) {
return feedvar_batch_size(0);
} }
return 0;
} }
return true;
}
bool task_fetch_init(BatchTasks<TaskT>& baskTask);
bool task_fetch_create(BatchTasks<TaskT>& baskTask);
}; };
// `Several Task` or `part of batch in Task` can be a TaskMeta. // `Several Task` or `part of batch in Task` can be a TaskMeta.
...@@ -206,61 +384,164 @@ struct Task { ...@@ -206,61 +384,164 @@ struct Task {
// TaskMeta is necessary. // TaskMeta is necessary.
// cause we need know the the corresponding relationship between // cause we need know the the corresponding relationship between
// `batch_out`(which is in BatchTasks) and `outVectorT_ptr`(which is in Task). // `_batch_out`(which is in BatchTasks) and `outVectorT_ptr`(which is in Task).
// especially when 1 Task be divided into several TaskMeta and be put into // especially when 1 Task be divided into several TaskMeta and be put into
// several different BatchTasks. // several different BatchTasks.
// begin、add、end means batch, not shape[0].
// if not lod, batch == shape[0]. if lod, batch != shape[0]
// for example, lod = [0,3,4,6], shape = [6,340,340]
// there is 3 batch actually, add = 3, but shape[0] = 6.
template <typename TaskT> template <typename TaskT>
struct TaskMeta { struct TaskMeta {
TaskMeta(TaskT* ptr, size_t start, size_t add) TaskMeta(TaskT* ptr, size_t start, size_t add, size_t taskmeta_index)
: task(ptr), begin(start), end(start + add) {} : task(ptr),
begin(start),
end(start + add),
taskmeta_index(taskmeta_index) {
feedvar_num = ptr->inVectorT_ptr->size();
for (size_t feedvar_index = 0; feedvar_index < feedvar_num;
++feedvar_index) {
std::vector<std::vector<size_t>> feature =
ptr->get_feature_by_batch(feedvar_index, start, start + add);
feed_shape0_range.push_back(feature[0]);
feedvar_type.push_back(feature.size());
if (feature.size() == 1) {
feed_lod_vector.push_back({});
} else if (feature.size() == 2) {
feed_lod_vector.push_back(feature[1]);
} else {
feed_lod_vector.push_back({});
}
}
}
TaskT* task; TaskT* task;
size_t begin; size_t begin;
size_t end; size_t end;
size_t feedvar_num;
size_t taskmeta_index;
std::vector<std::vector<size_t>> feed_shape0_range;
std::vector<std::vector<size_t>> feed_lod_vector;
std::vector<size_t> feedvar_type;
}; };
// each TaskT is already include batch in itself // each TaskT is already include batch in itself
// BatchTasks need to combine several `small TaskMeta` into a new `big TaskT`. // BatchTasks need to combine several `small TaskMeta` into a new `big TaskT`.
// The only difference between the `big TaskT` and `small TaskT` is that // The only difference between the `big TaskT` and `small TaskT` is that
// the TaskT.inVectorT_ptr->[feedvar_index].shape[0] // the TaskT.inVectorT_ptr->[feedvar_index].shape[0] is different
// which is actually batch_size is different. // `big TaskT`.inVectorT_ptr->[feedvar_index].shape[0] is actually batch_size .
template <typename TaskT> template <typename TaskT>
class BatchTasks { class BatchTasks {
public: public:
typedef typename TaskT::InType InType; typedef typename TaskT::InType InType;
typedef typename TaskT::OutType OutType; typedef typename TaskT::OutType OutType;
typedef TaskMeta<TaskT> TaskMetaT; typedef TaskMeta<TaskT> TaskMetaT;
typedef std::vector<size_t> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector;
typedef std::vector<size_t> LodVector;
typedef std::vector<LodVector> PaddleTensorLod;
friend TaskT;
explicit BatchTasks(size_t batch_size, bool batch_align = true) explicit BatchTasks(size_t batch_size,
bool batch_align = true,
bool allow_split_request = true)
: _batch_size(batch_size), : _batch_size(batch_size),
_rem_size(batch_size), _rem_size(batch_size),
_batch_align(batch_align) { _batch_align(batch_align),
_allow_split_request(allow_split_request) {
_batch_in.clear(); _batch_in.clear();
_batch_in_offset.clear(); _batch_in_offset.clear();
_total_shape0_batch_in.clear();
_total_feed_batch = 0;
_batch_in_lod.clear();
_batch_out.clear(); _batch_out.clear();
_batch_out_offset.clear(); _batch_out_offset.clear();
_total_fetch_batch = 0;
_taskmeta_vector.clear(); _taskmeta_vector.clear();
set_fetch_nobatch_index.clear();
vector_fetch_lod_index.clear();
} }
~BatchTasks() { ~BatchTasks() {
_batch_in.clear(); _batch_in.clear();
_batch_in_offset.clear(); _batch_in_offset.clear();
_total_shape0_batch_in.clear();
_total_feed_batch = 0;
_batch_in_lod.clear();
_batch_out.clear(); _batch_out.clear();
_batch_out_offset.clear(); _batch_out_offset.clear();
_total_fetch_batch = 0;
_taskmeta_vector.clear(); _taskmeta_vector.clear();
set_fetch_nobatch_index.clear();
vector_fetch_lod_index.clear();
} }
// synchronized operation // synchronized operation
// because Upper level callers of this function have already locked. // because Upper level callers of this function have already locked.
// 能进到此函数的task都是同类task,在该函数之前已保证了这点。
size_t append_task(TaskT* task) { size_t append_task(TaskT* task) {
size_t add = std::min(task->rem, _rem_size); size_t add = std::min(task->rem, _rem_size);
// when _batch_align == false, it means always take a whole task as TaskMeta
// we can temporary breakthrough the limit of BatchTask`s capacity
// BatchTask`s capacity is _batch_size or _rem_size
if (!_batch_align) { if (!_batch_align) {
add = task->rem; add = task->rem;
} }
int start_index = task->batch_size() - task->rem; int start_index = task->batch_size() - task->rem;
TaskMetaT tm(task, start_index, add); TaskMetaT tm(task, start_index, add, task->taskmeta_num);
task->taskmeta_num += 1;
_taskmeta_vector.push_back(tm); _taskmeta_vector.push_back(tm);
if (_batch_in_offset.size() == 0) {
_batch_in_offset.resize(tm.feedvar_num, 0);
}
if (_total_shape0_batch_in.size() == 0) {
_total_shape0_batch_in.resize(tm.feedvar_num, 0);
}
if (_batch_in_lod.size() == 0) {
PaddleTensorLod null_lod;
_batch_in_lod.resize(tm.feedvar_num, null_lod);
}
_total_feed_batch += add;
for (size_t feedvar_index = 0; feedvar_index < tm.feedvar_num;
++feedvar_index) {
if (tm.feedvar_type[feedvar_index] == 1) {
// 普通的非lod feedvar
// 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。
_total_shape0_batch_in[feedvar_index] +=
tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0];
} else if (tm.feedvar_type[feedvar_index] == 2) {
// lod类型的feedvar
// 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。
_total_shape0_batch_in[feedvar_index] +=
tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0];
// 在Lod最前面加0
if (_batch_in_lod[feedvar_index].size() <= 0) {
_batch_in_lod[feedvar_index].push_back({0});
} else if (_batch_in_lod[feedvar_index][0].size() <= 0) {
_batch_in_lod[feedvar_index][0].push_back(0);
}
// 将lod加上前一组lod的结尾最大值,组合Lod
size_t last_lod_value = _batch_in_lod[feedvar_index][0].back();
for (size_t lod_index = 0;
lod_index < tm.feed_lod_vector[feedvar_index].size();
++lod_index) {
_batch_in_lod[feedvar_index][0].push_back(
last_lod_value + tm.feed_lod_vector[feedvar_index][lod_index]);
}
} else {
// tm.feedvar_type[feedvar_index] == 3
// nobatch类型的feedvar.
// 此时不累加,且值应为1
_total_shape0_batch_in[feedvar_index] =
tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0];
}
}
task->rem -= add; task->rem -= add;
_rem_size -= add; _rem_size -= add;
return _rem_size; return _rem_size;
...@@ -281,72 +562,56 @@ class BatchTasks { ...@@ -281,72 +562,56 @@ class BatchTasks {
// cause maybe next time we don`t need to do the extra copy. // cause maybe next time we don`t need to do the extra copy.
// directly copy the every Task into the Predictor. // directly copy the every Task into the Predictor.
// lod is not supported.
// if lod is set, we should not allow to use the bsf task.
// batch.merge_tasks() is thread-safe function // batch.merge_tasks() is thread-safe function
// cause batch is a local variable and Task is just read, not written. // cause batch is a local variable and Task is just read, not written.
void merge_tasks() { void merge_tasks() {
if (_taskmeta_vector.size() <= 0) { if (_taskmeta_vector.size() <= 0) {
return; return;
} }
// Temporarily, the batch of each feedvar is consistent
// If not consistent, use feedvar_batch_size instead of task->batch_size().
int temp_batch = 0;
for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) {
TaskMetaT& tm = _taskmeta_vector[ti];
temp_batch += tm.task->batch_size();
}
if (temp_batch > _batch_size) {
LOG(ERROR) << "_realNumber_batch_in >_batch_size, error.";
return;
}
int feedvar_num = _taskmeta_vector[0].task->inVectorT_ptr->size();
if (_batch_in_offset.size() == 0) {
_batch_in_offset.resize(feedvar_num, 0);
_realNumber_batch_in.resize(feedvar_num, temp_batch);
}
for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) {
TaskMetaT& tm = _taskmeta_vector[ti]; TaskMetaT& tm = _taskmeta_vector[ti];
for (int index = 0; index < feedvar_num; ++index) { for (size_t feedvar_index = 0; feedvar_index < tm.feedvar_num;
++feedvar_index) {
const paddle::PaddleTensor& feedVarTensor = const paddle::PaddleTensor& feedVarTensor =
(*tm.task->inVectorT_ptr)[index]; (*tm.task->inVectorT_ptr)[feedvar_index];
size_t feedvar_bytesize = tm.task->feedvar_bytesize(index); size_t feedvar_bytesize = tm.task->feedvar_bytesize(feedvar_index);
if (ti == 0) { if (ti == 0) {
if (feedVarTensor.lod.size() > 0 && feedVarTensor.lod[0].size() > 0) { // Create the entire tensor at once
LOG(ERROR) << "lod Tensor is not supported now.";
return;
}
// for now, we assume that every task feedvar_bytesize is the same. // for now, we assume that every task feedvar_bytesize is the same.
// which means we dont support auto embedding. // which means we dont support auto embedding.
// but for different feedvar, it is different. // but for different feedvar, it is different.
paddle::PaddleTensor paddleTensor; paddle::PaddleTensor paddleTensor;
paddleTensor.dtype = feedVarTensor.dtype; paddleTensor.dtype = feedVarTensor.dtype;
paddleTensor.name = feedVarTensor.name; paddleTensor.name = feedVarTensor.name;
paddleTensor.lod = feedVarTensor.lod; paddleTensor.lod = _batch_in_lod[feedvar_index];
paddleTensor.shape = feedVarTensor.shape; paddleTensor.shape = feedVarTensor.shape;
paddleTensor.shape[0] = _realNumber_batch_in[index]; paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index];
paddleTensor.data.Resize(feedvar_bytesize * paddleTensor.data.Resize(feedvar_bytesize *
_realNumber_batch_in[index]); _total_shape0_batch_in[feedvar_index]);
_batch_in.push_back(paddleTensor); _batch_in.push_back(paddleTensor);
} }
void* dst_ptr = _batch_in[index].data.data() + _batch_in_offset[index]; void* dst_ptr = _batch_in[feedvar_index].data.data() +
_batch_in_offset[feedvar_index];
void* source_ptr = void* source_ptr =
feedVarTensor.data.data() + feedvar_bytesize * tm.begin; feedVarTensor.data.data() +
size_t length = feedvar_bytesize * (tm.end - tm.begin); feedvar_bytesize * tm.feed_shape0_range[feedvar_index][0];
size_t length =
feedvar_bytesize * (tm.feed_shape0_range[feedvar_index][1] -
tm.feed_shape0_range[feedvar_index][0]);
memcpy(dst_ptr, source_ptr, length); memcpy(dst_ptr, source_ptr, length);
_batch_in_offset[index] += length; // nobatch类型的feedvar,不叠加.
if (tm.feedvar_type[feedvar_index] != 3)
_batch_in_offset[feedvar_index] += length;
} }
} }
} }
bool check_fetchvar_valid(int fetchvar_index) { bool check_fetchvar_valid(size_t fetchvar_index) {
if (fetchvar_index < 0 || _batch_out.size() <= fetchvar_index) { if (fetchvar_index < 0 || _batch_out.size() <= fetchvar_index) {
LOG(ERROR) << "fetchvar doesnt exsit or fetchvar_index error"; LOG(ERROR) << "fetchvar doesnt exsit or fetchvar_index error";
return 0; return 0;
...@@ -360,19 +625,11 @@ class BatchTasks { ...@@ -360,19 +625,11 @@ class BatchTasks {
return 1; return 1;
} }
size_t fetchvar_batch_size(int fetchvar_index) { size_t fetchvar_element_bytesize(size_t fetchvar_index) {
if (!check_fetchvar_valid(fetchvar_index)) { if (!check_fetchvar_valid(fetchvar_index)) {
return 0; return 0;
} }
size_t dtype = _batch_out[fetchvar_index].dtype;
return _batch_out[fetchvar_index].shape[0];
}
size_t fetchvar_element_bytesize(int fetchvar_index) {
if (!check_fetchvar_valid(fetchvar_index)) {
return 0;
}
int dtype = _batch_out[fetchvar_index].dtype;
if (dtype == paddle::PaddleDType::INT64) { if (dtype == paddle::PaddleDType::INT64) {
return sizeof(int64_t); return sizeof(int64_t);
} }
...@@ -390,7 +647,7 @@ class BatchTasks { ...@@ -390,7 +647,7 @@ class BatchTasks {
// Now, the implementation of this function is based on assumption // Now, the implementation of this function is based on assumption
// that shape [0] = batch_size. // that shape [0] = batch_size.
size_t fetchvar_element_num(int fetchvar_index) { size_t fetchvar_element_num(size_t fetchvar_index) {
if (!check_fetchvar_valid(fetchvar_index)) { if (!check_fetchvar_valid(fetchvar_index)) {
return 0; return 0;
} }
...@@ -400,35 +657,66 @@ class BatchTasks { ...@@ -400,35 +657,66 @@ class BatchTasks {
return 1; return 1;
} }
// start from shape[1], cause shape[0] = batch_size. // start from shape[1], cause shape[0] = batch_size.
for (int i = 1; i < _batch_out[fetchvar_index].shape.size(); ++i) { for (size_t i = 1; i < _batch_out[fetchvar_index].shape.size(); ++i) {
element_num *= _batch_out[fetchvar_index].shape[i]; element_num *= _batch_out[fetchvar_index].shape[i];
} }
return element_num; return element_num;
} }
size_t fetchvar_bytesize(int fetchvar_index) { size_t fetchvar_bytesize(size_t fetchvar_index) {
return fetchvar_element_num(fetchvar_index) * return fetchvar_element_num(fetchvar_index) *
fetchvar_element_bytesize(fetchvar_index); fetchvar_element_bytesize(fetchvar_index);
} }
bool check_fetchvar_batch_align() { size_t fetchvar_batch_size(size_t fetchvar_index) {
int batch_size_align = fetchvar_batch_size(0); if (!check_fetchvar_valid(fetchvar_index)) {
for (int fetchvar_index = 0; fetchvar_index < _batch_out.size();
++fetchvar_index) {
if (fetchvar_batch_size(fetchvar_index) != batch_size_align) {
return 0; return 0;
} }
// if lod, 'lod[0].size()-1' is batch.
// for PaddleTensor lod is vector<vector<size_t>>, so lod[0] is real lod.
// for example, lod = [0,3,4,6], shape = [6,340,340], batch is 3 actually.
// for lod, the batch < shape[0].
if (_batch_out[fetchvar_index].lod.size() > 0 &&
_batch_out[fetchvar_index].lod[0].size() > 0) {
return _batch_out[fetchvar_index].lod[0].size() - 1;
} }
// if not lod, the first dimension of data `PaddleTensor.shape[0]` is batch.
return 1; return _batch_out[fetchvar_index].shape[0];
} }
size_t fetchvar_batch_size() { size_t fetchvar_batch_size() { return _total_fetch_batch; }
if (check_fetchvar_batch_align()) {
return fetchvar_batch_size(0); bool deal_batch_out() {
_total_fetch_batch = fetchvar_batch_size(0);
if (_total_fetch_batch <= 0) return false;
for (size_t fetchvar_index = 0; fetchvar_index < _batch_out.size();
++fetchvar_index) {
// TODO(HexToString): Distinguish between nobatch and batch =
// 1(By:HexToString)
// 当数据中fetchvar-1: 带batch,且batch =1,shape[0] = 1
// fetchvar-2:不带batch,由于不带batch导致shape[0] =1
// 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉
// 后续希望在其他地方能够区分两者。
if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) {
// which means error.
if (fetchvar_batch_size(fetchvar_index) != 1 &&
_total_fetch_batch != 1) {
return false;
} else {
// which means fetchvar shape[0] = 1.
// shape[0] does not change with batch
set_fetch_nobatch_index.insert(fetchvar_index);
_total_fetch_batch =
std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch);
}
}
// 将lod fetchvar index加入到vector中。
if (_batch_out[fetchvar_index].lod.size() > 0 &&
_batch_out[fetchvar_index].lod[0].size() > 0) {
vector_fetch_lod_index.push_back(fetchvar_index);
} }
return 0; }
return true;
} }
void notify_tasks() { void notify_tasks() {
...@@ -436,12 +724,16 @@ class BatchTasks { ...@@ -436,12 +724,16 @@ class BatchTasks {
LOG(ERROR) << "_taskmeta_vector.size() <=0, error."; LOG(ERROR) << "_taskmeta_vector.size() <=0, error.";
return; return;
} }
if (_realNumber_batch_in[0] != fetchvar_batch_size()) { // 根据_batch_out,求出输出的整体batch
// 并将lod类型和nobatch类型的fetchvar的index记录到set中,方便后续查看。
deal_batch_out();
// 若输出的batch不是1,且不与输入batch对应,则错误
if (_total_feed_batch != _total_fetch_batch && _total_fetch_batch != 1) {
LOG(ERROR) << "_batch_out`s batch != _batch_in`s batch, error."; LOG(ERROR) << "_batch_out`s batch != _batch_in`s batch, error.";
return; return;
} }
int fetchvar_num = _batch_out.size(); size_t fetchvar_num = _batch_out.size();
if (_batch_out_offset.size() == 0) { if (_batch_out_offset.size() == 0) {
_batch_out_offset.resize(fetchvar_num, 0); _batch_out_offset.resize(fetchvar_num, 0);
} }
...@@ -451,44 +743,130 @@ class BatchTasks { ...@@ -451,44 +743,130 @@ class BatchTasks {
size_t begin = _taskmeta_vector[ti].begin; size_t begin = _taskmeta_vector[ti].begin;
size_t end = _taskmeta_vector[ti].end; size_t end = _taskmeta_vector[ti].end;
size_t add = end - begin; size_t add = end - begin;
size_t taskmeta_index = _taskmeta_vector[ti].taskmeta_index;
// 对task中的outVectorT_ptr进行初始化
// 如果是lod输出+多个taskmeta,此时对outLodTensorVector也需要初始化
if (!task->task_fetch_init(*this)) {
LOG(ERROR) << " task_fetch_init error.";
return;
}
size_t fetch_lod_index = 0;
for (int index = 0; index < fetchvar_num; ++index) { for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
// the task->outVectorT_ptr is null before core->run(). ++fetchvar_index) {
// first time we should copy from _batch_out size_t fetchvar_bytesize_index = fetchvar_bytesize(fetchvar_index);
// so we need init.
size_t fetchvar_bytesize_index = fetchvar_bytesize(index); if (set_fetch_nobatch_index.size() > 0 &&
if (task->outVectorT_ptr->size() <= index) { set_fetch_nobatch_index.find(fetchvar_index) !=
paddle::PaddleTensor tensor_out; set_fetch_nobatch_index.end()) {
tensor_out.name = _batch_out[index].name; // nobatch fetchvar情况
tensor_out.dtype = paddle::PaddleDType(_batch_out[index].dtype); // 无论输入是多少batch,该index的fetchvar始终就shape[0] = 1
tensor_out.shape = _batch_out[index].shape; paddle::PaddleTensor& fetchVarTensor =
tensor_out.shape[0] = task->batch_size(); (*task->outVectorT_ptr)[fetchvar_index];
tensor_out.lod = _batch_out[index].lod; void* dst_ptr = fetchVarTensor.data.data();
// resize all batch memory at one time size_t length = fetchvar_bytesize_index * 1;
size_t databuf_size = task->batch_size() * fetchvar_bytesize_index; void* source_ptr = _batch_out[fetchvar_index].data.data();
tensor_out.data.Resize(databuf_size); memcpy(dst_ptr, source_ptr, length);
task->outVectorT_ptr->push_back(tensor_out); } else if (vector_fetch_lod_index.size() > 0 &&
} std::find(vector_fetch_lod_index.begin(),
vector_fetch_lod_index.end(),
paddle::PaddleTensor& fetchVarTensor = (*task->outVectorT_ptr)[index]; fetchvar_index) != vector_fetch_lod_index.end()) {
// lod fetchvar情况,此时无法确定总的shape[0]
// 根据task中的task_num总数开辟task_num个临时空间
// 每个lod型的fetchvar拷贝到对应的临时空间中
// 最后再计算临时空间的总量,合并fetchvar和lod
size_t last_batch = _batch_out_offset[fetchvar_index];
size_t shape0_index_start =
_batch_out[fetchvar_index].lod[0][last_batch];
size_t shape0_index_end =
_batch_out[fetchvar_index].lod[0][last_batch + add];
size_t shape0_length = shape0_index_end - shape0_index_start;
// task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr
// 此时,先拷入task->outLodTensorVector[taskmeta_index]
// 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。
if (task->taskmeta_num > 1) {
paddle::PaddleTensor& fetchVarTensor =
task->outLodTensorVector[taskmeta_index][fetch_lod_index];
size_t length = fetchvar_bytesize_index * shape0_length;
fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index;
memcpy(dst_ptr, source_ptr, length);
// 由于是拆分的各个lod,不要补0,在最后合并给Task中的outVectorT_ptr时再补。
if (fetchVarTensor.lod.size() <= 0) {
fetchVarTensor.lod.push_back({});
}
fetchVarTensor.lod[0].resize(add, 0);
size_t last_lod_value =
_batch_out[fetchvar_index].lod[0][last_batch];
for (size_t lod_index = last_batch + 1, my_index = 0;
lod_index < last_batch + add + 1;
++lod_index, ++my_index) {
fetchVarTensor.lod[0][my_index] =
(_batch_out[fetchvar_index].lod[0][lod_index] -
last_lod_value);
}
} else {
// task未被拆分为多个taskmeta,故只有某个线程中的taskmeta会操作task不存在多线程竞争
// 此时resize后,直接写入task->outVectorT_ptr中即可。
paddle::PaddleTensor& fetchVarTensor =
(*task->outVectorT_ptr)[fetchvar_index];
size_t length = fetchvar_bytesize_index * shape0_length;
fetchVarTensor.data.Resize(length);
void* dst_ptr = fetchVarTensor.data.data();
void* source_ptr = _batch_out[fetchvar_index].data.data() +
shape0_index_start * fetchvar_bytesize_index;
memcpy(dst_ptr, source_ptr, length);
// task中的lod补0
if (fetchVarTensor.lod.size() <= 0) {
fetchVarTensor.lod.push_back({0});
} else if (fetchVarTensor.lod[0].size() <= 0) {
fetchVarTensor.lod[0].push_back(0);
}
// 将合并的lod信息对应的batch,拆分到task中。
// 注意,此时需要去掉前面lod导致的前置积累。
// 例如: 合lod = [0,2,5;7,10],是由两组batch=2的task合并后预测的。
// 此时拆分,第一组时,都减去0,得到[2,5]+(由于前面已经补了0了) =
// [0,2,5]
// 第二组,都需要减5,得到[2,5],这样处理才对。
fetchVarTensor.lod[0].resize(add + 1, 0);
size_t last_lod_value =
_batch_out[fetchvar_index].lod[0][last_batch];
for (size_t lod_index = last_batch + 1, my_index = 1;
lod_index < last_batch + add + 1;
++lod_index, ++my_index) {
fetchVarTensor.lod[0][my_index] =
(_batch_out[fetchvar_index].lod[0][lod_index] -
last_lod_value);
}
}
fetch_lod_index++;
} else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size()
// 输出的batch应与输入的batch对应相等。
paddle::PaddleTensor& fetchVarTensor =
(*task->outVectorT_ptr)[fetchvar_index];
void* dst_ptr = void* dst_ptr =
fetchVarTensor.data.data() + fetchvar_bytesize_index * begin; fetchVarTensor.data.data() + fetchvar_bytesize_index * begin;
size_t length = fetchvar_bytesize_index * add; size_t length = fetchvar_bytesize_index * add;
if (_batch_out_offset[index] + length >
fetchvar_batch_size() * fetchvar_bytesize(index)) {
LOG(ERROR) << "_batch_out is less than taskmeta, error.";
return;
}
void* source_ptr = void* source_ptr =
_batch_out[index].data.data() + _batch_out_offset[index]; _batch_out[fetchvar_index].data.data() +
_batch_out_offset[fetchvar_index] * fetchvar_bytesize_index;
memcpy(dst_ptr, source_ptr, length); memcpy(dst_ptr, source_ptr, length);
_batch_out_offset[index] += length; }
_batch_out_offset[fetchvar_index] += add;
} }
// index是局部变量,fetch_add是原子操作,成功则返回原值。
// 只有最后一个taskmeta都完成后,该线程的index+add才能>task->batch_size()
// 故只有一个线程能进入if{}内.不会造成多线程竞争的问题。
size_t index = task->index.fetch_add(add); size_t index = task->index.fetch_add(add);
if ((index + add) >= task->batch_size()) { if ((index + add) >= task->batch_size()) {
task->combine_taskmeta();
char c = 0; char c = 0;
while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) { while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) {
} }
...@@ -503,17 +881,32 @@ class BatchTasks { ...@@ -503,17 +881,32 @@ class BatchTasks {
size_t task_size() { return _taskmeta_vector.size(); } size_t task_size() { return _taskmeta_vector.size(); }
const size_t get_rem_size() { return _rem_size; }
bool get_batch_align() { return _batch_align; }
bool get_allow_split_request() { return _allow_split_request; }
private: private:
std::vector<TaskMetaT> _taskmeta_vector; std::vector<TaskMetaT> _taskmeta_vector;
typename TaskT::InVectorT _batch_in; typename TaskT::InVectorT _batch_in;
std::vector<size_t> _batch_in_offset; std::vector<size_t> _batch_in_offset;
std::vector<size_t> _realNumber_batch_in; std::vector<size_t> _total_shape0_batch_in;
size_t _total_feed_batch;
std::vector<PaddleTensorLod> _batch_in_lod;
typename TaskT::OutVectorT _batch_out; typename TaskT::OutVectorT _batch_out;
std::vector<size_t> _batch_out_offset; std::vector<size_t> _batch_out_offset;
std::vector<size_t> _realNumber_batch_out; // std::vector<size_t> _total_shape0_batch_out;
size_t _total_fetch_batch;
// std::vector<PaddleTensorLod> _batch_out_lod;
std::set<size_t> set_fetch_nobatch_index;
std::vector<size_t> vector_fetch_lod_index;
size_t _rem_size; size_t _rem_size;
size_t _batch_size; size_t _batch_size;
bool _batch_align; bool _batch_align;
bool _allow_split_request;
}; };
// BSF task handle // BSF task handle
...@@ -589,6 +982,8 @@ class TaskExecutor { ...@@ -589,6 +982,8 @@ class TaskExecutor {
typedef typename TaskT::OutVectorT OutVectorT; typedef typename TaskT::OutVectorT OutVectorT;
typedef std::vector<TaskT> TaskArrayT; typedef std::vector<TaskT> TaskArrayT;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
typedef std::vector<size_t> ShapeVector;
typedef std::vector<ShapeVector> VectorOfShapeVector;
TaskExecutor() TaskExecutor()
: _stop(false), : _stop(false),
...@@ -617,7 +1012,11 @@ class TaskExecutor { ...@@ -617,7 +1012,11 @@ class TaskExecutor {
void set_batch_size(size_t batch_size) { _batch_size = batch_size; } void set_batch_size(size_t batch_size) { _batch_size = batch_size; }
void set_batch_align(size_t batch_align) { _batch_align = batch_align; } void set_batch_align(bool batch_align) { _batch_align = batch_align; }
void set_allow_split_request(bool allow_split_request) {
_allow_split_request = allow_split_request;
}
void set_thread_init_fn(boost::function<int(void*)> init_fn, void set_thread_init_fn(boost::function<int(void*)> init_fn,
void** contexts = NULL) { void** contexts = NULL) {
...@@ -642,7 +1041,7 @@ class TaskExecutor { ...@@ -642,7 +1041,7 @@ class TaskExecutor {
TaskHandler<TaskT> schedule(const void*, void*); TaskHandler<TaskT> schedule(const void*, void*);
bool move_task_to_batch(BatchTasks<TaskT>& batch); // NOLINT bool move_task_to_batch(BatchTasks<TaskT>& batchTask); // NOLINT
private: private:
TaskExecutor(TaskExecutor<TaskT> const& other) = delete; TaskExecutor(TaskExecutor<TaskT> const& other) = delete;
...@@ -670,6 +1069,7 @@ class TaskExecutor { ...@@ -670,6 +1069,7 @@ class TaskExecutor {
size_t _batch_size; size_t _batch_size;
bool _batch_align; bool _batch_align;
bool _allow_split_request;
boost::function<void(const void*, void*)> _fn; boost::function<void(const void*, void*)> _fn;
}; };
...@@ -687,12 +1087,12 @@ class TaskExecutorVector { ...@@ -687,12 +1087,12 @@ class TaskExecutorVector {
void resize(int size) { _vector_executor.resize(size); } void resize(int size) { _vector_executor.resize(size); }
TaskExecutor<TaskT>& operator[](int index) { TaskExecutor<TaskT>& operator[](int task_index) {
if (_vector_executor.size() <= index || index <= -1) { if (_vector_executor.size() <= task_index || task_index <= -1) {
LOG(ERROR) << "_vector_executor.size() <= index or <= -1"; LOG(ERROR) << "_vector_executor.size() <= task_index or <= -1";
throw "_vector_executor.size() <= index or <= -1"; throw "_vector_executor.size() <= task_index or <= -1";
} }
return _vector_executor[index]; return _vector_executor[task_index];
} }
private: private:
...@@ -717,8 +1117,8 @@ class TaskManager { ...@@ -717,8 +1117,8 @@ class TaskManager {
typedef typename TaskT::InVectorT InVectorT; typedef typename TaskT::InVectorT InVectorT;
typedef typename TaskT::OutVectorT OutVectorT; typedef typename TaskT::OutVectorT OutVectorT;
explicit TaskManager(uint32_t index) // NOLINT explicit TaskManager(uint32_t model_index) // NOLINT
: _model_index(index) {} : _model_index(model_index) {}
~TaskManager() { wait(); } ~TaskManager() { wait(); }
......
...@@ -26,6 +26,7 @@ int ReloadableInferEngine::proc_initialize_impl( ...@@ -26,6 +26,7 @@ int ReloadableInferEngine::proc_initialize_impl(
_infer_thread_num = conf.runtime_thread_num(); _infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size(); _infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align(); _infer_batch_align = conf.enable_batch_align();
_allow_split_request = conf.allow_split_request();
_conf = conf; _conf = conf;
...@@ -56,9 +57,6 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -56,9 +57,6 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
} }
// init bsf framework // init bsf framework
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index] im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_thread_init_fn( .set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this)); boost::bind(&InferEngine::thrd_initialize_impl, this));
...@@ -71,6 +69,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -71,6 +69,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
_infer_batch_size); _infer_batch_size);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_align( im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_align(
_infer_batch_align); _infer_batch_align);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_allow_split_request(_allow_split_request);
if (im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].start( if (im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].start(
_infer_thread_num) != 0) { _infer_thread_num) != 0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num; LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
...@@ -79,7 +79,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -79,7 +79,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
LOG(WARNING) << "Enable batch schedule framework, thread_num:" LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size << _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align; << ", enable_batch_align:" << _infer_batch_align
<< ", allow_split_request:" << _allow_split_request;
return 0; return 0;
} }
......
...@@ -165,6 +165,8 @@ class ReloadableInferEngine : public InferEngine { ...@@ -165,6 +165,8 @@ class ReloadableInferEngine : public InferEngine {
// Need to align batch_size in inferring // Need to align batch_size in inferring
bool _infer_batch_align; bool _infer_batch_align;
// allow to split request in inferring
bool _allow_split_request;
// model version // model version
uint64_t _version; uint64_t _version;
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册