提交 ce3b0df8 编写于 作者: H HexToString

update comment

上级 5a9c03ce
...@@ -23,8 +23,8 @@ message EngineDesc { ...@@ -23,8 +23,8 @@ message EngineDesc {
required string model_dir = 5; required string model_dir = 5;
repeated int32 gpu_ids = 6; repeated int32 gpu_ids = 6;
optional int32 runtime_thread_num = 7 [ default = 0 ]; optional int32 runtime_thread_num = 7 [ default = 0 ];
optional int32 batch_infer_size = 8 [ default = 0 ]; optional int32 batch_infer_size = 8 [ default = 32 ];
optional bool enable_batch_align = 9 [ default = true ]; optional bool enable_overrun = 9 [ default = false ];
optional bool allow_split_request = 10 [ default = true ]; optional bool allow_split_request = 10 [ default = true ];
optional string version_file = 11; optional string version_file = 11;
optional string version_type = 12; optional string version_type = 12;
......
...@@ -217,7 +217,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( ...@@ -217,7 +217,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
} }
/* /*
if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) { if (!BatchTasks<TaskT>::check_valid(in, out, _overrun)) {
LOG(ERROR) << "Invalid input & output"; LOG(ERROR) << "Invalid input & output";
return TaskHandler<TaskT>::valid_handle(); return TaskHandler<TaskT>::valid_handle();
} }
...@@ -271,21 +271,30 @@ bool TaskExecutor<TaskT>::move_task_to_batch( ...@@ -271,21 +271,30 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
while (!_task_queue.empty()) { while (!_task_queue.empty()) {
TaskT* task = _task_queue.front(); TaskT* task = _task_queue.front();
// 由于无法确定fetchVar是否为lod,故单个task不能拆分放到多个batchTask中,否则后续组装很难完成。 // 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod)
// 所以,task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。 // 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
// 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar // 只需要设置engine的属性allow_split_request = false即可。
// 所以,task中想要创建taskmeta_num* lod的fetchvar num* PaddleBuf(以及Lod)
// 只能在notify_task中,用taskmeta->task去创建,需要在task中加锁。
// 原子操作不可行,因为多个线程必须等待创建好上述的PaddleBuf后才能继续。
// 复杂的处理方法是允许拆分Task,无论是否包含lod.
// 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
// 所以,task中先要创建taskmeta_num* fetchvar num(lod类型的)个临时PaddleTensor(存储data及Lod)
// 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建
// 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。
// 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。
// 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。 // 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。
// _batch_align为false时,即使空间小,也会全放入一个完整的Task,允许临时超限。 // _overrun表示,异步BatchTasks是否允许单次临时超过限制。
// _allow_split_request == false,则每个task不会被拆分。 // _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。
// _overrun为false时,不允许。
// 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。
// 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。
// _allow_split_request == true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch
// _allow_split_request == false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费
// 默认为true,允许拆分task从而使得空间利用率最大。 // 默认为true,允许拆分task从而使得空间利用率最大。
if (!batchTask.get_allow_split_request()) { if (!batchTask.get_allow_split_request()) {
if (task->batch_size() > batchTask.get_rem_size() && if (task->batch_size() > batchTask.get_rem_size() &&
batchTask.get_batch_align()) { !batchTask.get_overrun()) {
break; break;
} }
} }
...@@ -299,6 +308,7 @@ bool TaskExecutor<TaskT>::move_task_to_batch( ...@@ -299,6 +308,7 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
// 所以要求该feedvar必须相等,才能合并。 // 所以要求该feedvar必须相等,才能合并。
// 否则跳出循环,放入下一个batchTask中。 // 否则跳出循环,放入下一个batchTask中。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存. // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
// TODO(HexToString): 可以考虑后期支持AutoPadding.
if (previous_task != nullptr) { if (previous_task != nullptr) {
if (!task->combine_task_valid(previous_task)) { if (!task->combine_task_valid(previous_task)) {
break; break;
...@@ -366,7 +376,7 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) { ...@@ -366,7 +376,7 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
// move_task_to_batch() take the original task from the `_task_queue` // move_task_to_batch() take the original task from the `_task_queue`
// put the original task into its own Vector<taskmeta> // put the original task into its own Vector<taskmeta>
// the capacity of its own Vector<taskmeta> is decided by `_batch_size` or // the capacity of its own Vector<taskmeta> is decided by `_batch_size` or
// `_batch_align` // `_overrun`
// merge_tasks() move the imput-data into `_batch_in` from its own // merge_tasks() move the imput-data into `_batch_in` from its own
// Vector<taskmeta>. // Vector<taskmeta>.
...@@ -376,7 +386,7 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) { ...@@ -376,7 +386,7 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
// `_batch_out`. // `_batch_out`.
// because the predictor`s output is the `_batch_out` // because the predictor`s output is the `_batch_out`
BatchTasks<TaskT> batchTask( BatchTasks<TaskT> batchTask(
_batch_size, _batch_align, _allow_split_request); _batch_size, _overrun, _allow_split_request);
if (move_task_to_batch(batchTask)) { if (move_task_to_batch(batchTask)) {
batchTask.merge_tasks(); batchTask.merge_tasks();
_fn(&batchTask.in(), &batchTask.out()); _fn(&batchTask.in(), &batchTask.out());
......
...@@ -444,11 +444,11 @@ class BatchTasks { ...@@ -444,11 +444,11 @@ class BatchTasks {
friend TaskT; friend TaskT;
explicit BatchTasks(size_t batch_size, explicit BatchTasks(size_t batch_size,
bool batch_align = true, bool overrun = false,
bool allow_split_request = true) bool allow_split_request = true)
: _batch_size(batch_size), : _batch_size(batch_size),
_rem_size(batch_size), _rem_size(batch_size),
_batch_align(batch_align), _overrun(overrun),
_allow_split_request(allow_split_request) { _allow_split_request(allow_split_request) {
_batch_in.clear(); _batch_in.clear();
_batch_in_offset.clear(); _batch_in_offset.clear();
...@@ -484,10 +484,10 @@ class BatchTasks { ...@@ -484,10 +484,10 @@ class BatchTasks {
// 能进到此函数的task都是同类task,在该函数之前已保证了这点。 // 能进到此函数的task都是同类task,在该函数之前已保证了这点。
size_t append_task(TaskT* task) { size_t append_task(TaskT* task) {
size_t add = std::min(task->rem, _rem_size); size_t add = std::min(task->rem, _rem_size);
// when _batch_align == false, it means always take a whole task as TaskMeta // when _overrun == true, it means always take a whole task as TaskMeta
// we can temporary breakthrough the limit of BatchTask`s capacity // we can temporary breakthrough the limit of BatchTask`s capacity
// BatchTask`s capacity is _batch_size or _rem_size // BatchTask`s capacity is _batch_size or _rem_size
if (!_batch_align) { if (_overrun) {
add = task->rem; add = task->rem;
} }
int start_index = task->batch_size() - task->rem; int start_index = task->batch_size() - task->rem;
...@@ -883,7 +883,7 @@ class BatchTasks { ...@@ -883,7 +883,7 @@ class BatchTasks {
const size_t get_rem_size() { return _rem_size; } const size_t get_rem_size() { return _rem_size; }
bool get_batch_align() { return _batch_align; } bool get_overrun() { return _overrun; }
bool get_allow_split_request() { return _allow_split_request; } bool get_allow_split_request() { return _allow_split_request; }
...@@ -905,7 +905,7 @@ class BatchTasks { ...@@ -905,7 +905,7 @@ class BatchTasks {
size_t _rem_size; size_t _rem_size;
size_t _batch_size; size_t _batch_size;
bool _batch_align; bool _overrun;
bool _allow_split_request; bool _allow_split_request;
}; };
...@@ -991,7 +991,7 @@ class TaskExecutor { ...@@ -991,7 +991,7 @@ class TaskExecutor {
_thread_reset_fn(NULL), _thread_reset_fn(NULL),
_user_thread_contexts(NULL), _user_thread_contexts(NULL),
_batch_size(DEFAULT_BATCH_SIZE), _batch_size(DEFAULT_BATCH_SIZE),
_batch_align(false), _overrun(false),
_fn(NULL) { _fn(NULL) {
THREAD_MUTEX_INIT(&_mut, NULL); THREAD_MUTEX_INIT(&_mut, NULL);
THREAD_COND_INIT(&_cond, NULL); THREAD_COND_INIT(&_cond, NULL);
...@@ -1012,7 +1012,7 @@ class TaskExecutor { ...@@ -1012,7 +1012,7 @@ class TaskExecutor {
void set_batch_size(size_t batch_size) { _batch_size = batch_size; } void set_batch_size(size_t batch_size) { _batch_size = batch_size; }
void set_batch_align(bool batch_align) { _batch_align = batch_align; } void set_overrun(bool overrun) { _overrun = overrun; }
void set_allow_split_request(bool allow_split_request) { void set_allow_split_request(bool allow_split_request) {
_allow_split_request = allow_split_request; _allow_split_request = allow_split_request;
...@@ -1068,7 +1068,7 @@ class TaskExecutor { ...@@ -1068,7 +1068,7 @@ class TaskExecutor {
std::vector<ThreadContext<TaskT>*> _thread_contexts; std::vector<ThreadContext<TaskT>*> _thread_contexts;
size_t _batch_size; size_t _batch_size;
bool _batch_align; bool _overrun;
bool _allow_split_request; bool _allow_split_request;
boost::function<void(const void*, void*)> _fn; boost::function<void(const void*, void*)> _fn;
......
...@@ -25,7 +25,7 @@ int ReloadableInferEngine::proc_initialize_impl( ...@@ -25,7 +25,7 @@ int ReloadableInferEngine::proc_initialize_impl(
_model_dir = conf.model_dir(); _model_dir = conf.model_dir();
_infer_thread_num = conf.runtime_thread_num(); _infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size(); _infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align(); _infer_overrun = conf.enable_overrun();
_allow_split_request = conf.allow_split_request(); _allow_split_request = conf.allow_split_request();
_conf = conf; _conf = conf;
...@@ -67,8 +67,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -67,8 +67,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); boost::bind(&InferEngine::task_infer_impl, this, _1, _2));
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_size( im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_size(
_infer_batch_size); _infer_batch_size);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_align( im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_overrun(
_infer_batch_align); _infer_overrun);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index] im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_allow_split_request(_allow_split_request); .set_allow_split_request(_allow_split_request);
if (im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].start( if (im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].start(
...@@ -79,7 +79,7 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -79,7 +79,7 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
LOG(WARNING) << "Enable batch schedule framework, thread_num:" LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size << _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align << ", enable_overrun:" << _infer_overrun
<< ", allow_split_request:" << _allow_split_request; << ", allow_split_request:" << _allow_split_request;
return 0; return 0;
} }
......
...@@ -163,7 +163,7 @@ class ReloadableInferEngine : public InferEngine { ...@@ -163,7 +163,7 @@ class ReloadableInferEngine : public InferEngine {
uint32_t _infer_batch_size; uint32_t _infer_batch_size;
// Need to align batch_size in inferring // Need to align batch_size in inferring
bool _infer_batch_align; bool _infer_overrun;
// allow to split request in inferring // allow to split request in inferring
bool _allow_split_request; bool _allow_split_request;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册