From ce3b0df8b7c5b6d6213a32e597151ae1bf58fa55 Mon Sep 17 00:00:00 2001 From: HexToString <506181616@qq.com> Date: Fri, 27 Aug 2021 08:40:42 +0000 Subject: [PATCH] update comment --- core/configure/proto/server_configure.proto | 4 +-- core/predictor/framework/bsf-inl.h | 34 +++++++++++++-------- core/predictor/framework/bsf.h | 18 +++++------ core/predictor/framework/infer.cpp | 8 ++--- core/predictor/framework/infer.h | 2 +- 5 files changed, 38 insertions(+), 28 deletions(-) mode change 100644 => 100755 core/predictor/framework/bsf-inl.h mode change 100644 => 100755 core/predictor/framework/bsf.h mode change 100644 => 100755 core/predictor/framework/infer.cpp diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto index 0a452402..e78d68a3 100755 --- a/core/configure/proto/server_configure.proto +++ b/core/configure/proto/server_configure.proto @@ -23,8 +23,8 @@ message EngineDesc { required string model_dir = 5; repeated int32 gpu_ids = 6; optional int32 runtime_thread_num = 7 [ default = 0 ]; - optional int32 batch_infer_size = 8 [ default = 0 ]; - optional bool enable_batch_align = 9 [ default = true ]; + optional int32 batch_infer_size = 8 [ default = 32 ]; + optional bool enable_overrun = 9 [ default = false ]; optional bool allow_split_request = 10 [ default = true ]; optional string version_file = 11; optional string version_type = 12; diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h old mode 100644 new mode 100755 index da502d0d..88858582 --- a/core/predictor/framework/bsf-inl.h +++ b/core/predictor/framework/bsf-inl.h @@ -217,7 +217,7 @@ TaskHandler TaskExecutor::schedule( } /* - if (!BatchTasks::check_valid(in, out, _batch_align)) { + if (!BatchTasks::check_valid(in, out, _overrun)) { LOG(ERROR) << "Invalid input & output"; return TaskHandler::valid_handle(); } @@ -271,21 +271,30 @@ bool TaskExecutor::move_task_to_batch( while (!_task_queue.empty()) { TaskT* task = _task_queue.front(); - // 由于无法确定fetchVar是否为lod,故单个task不能拆分放到多个batchTask中,否则后续组装很难完成。 - // 所以,task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。 - // 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar - // 所以,task中想要创建taskmeta_num* lod的fetchvar num* PaddleBuf(以及Lod) - // 只能在notify_task中,用taskmeta->task去创建,需要在task中加锁。 - // 原子操作不可行,因为多个线程必须等待创建好上述的PaddleBuf后才能继续。 + // 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod) + // 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。 + // 只需要设置engine的属性allow_split_request = false即可。 + // 复杂的处理方法是允许拆分Task,无论是否包含lod. + // 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar + // 所以,task中先要创建taskmeta_num* fetchvar num(lod类型的)个临时PaddleTensor(存储data及Lod) + // 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建 + // 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。 + // 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。 // 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。 - // _batch_align为false时,即使空间小,也会全放入一个完整的Task,允许临时超限。 - // _allow_split_request == false,则每个task不会被拆分。 + // _overrun表示,异步BatchTasks是否允许单次临时超过限制。 + // _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。 + // _overrun为false时,不允许。 + // 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。 + // 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。 + + // _allow_split_request == true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch + // _allow_split_request == false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费 // 默认为true,允许拆分task从而使得空间利用率最大。 if (!batchTask.get_allow_split_request()) { if (task->batch_size() > batchTask.get_rem_size() && - batchTask.get_batch_align()) { + !batchTask.get_overrun()) { break; } } @@ -299,6 +308,7 @@ bool TaskExecutor::move_task_to_batch( // 所以要求该feedvar必须相等,才能合并。 // 否则跳出循环,放入下一个batchTask中。 // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存. + // TODO(HexToString): 可以考虑后期支持AutoPadding. if (previous_task != nullptr) { if (!task->combine_task_valid(previous_task)) { break; @@ -366,7 +376,7 @@ int TaskExecutor::work(ThreadContext* context) { // move_task_to_batch() take the original task from the `_task_queue` // put the original task into its own Vector // the capacity of its own Vector is decided by `_batch_size` or - // `_batch_align` + // `_overrun` // merge_tasks() move the imput-data into `_batch_in` from its own // Vector. @@ -376,7 +386,7 @@ int TaskExecutor::work(ThreadContext* context) { // `_batch_out`. // because the predictor`s output is the `_batch_out` BatchTasks batchTask( - _batch_size, _batch_align, _allow_split_request); + _batch_size, _overrun, _allow_split_request); if (move_task_to_batch(batchTask)) { batchTask.merge_tasks(); _fn(&batchTask.in(), &batchTask.out()); diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h old mode 100644 new mode 100755 index 633b9524..4b1b2212 --- a/core/predictor/framework/bsf.h +++ b/core/predictor/framework/bsf.h @@ -444,11 +444,11 @@ class BatchTasks { friend TaskT; explicit BatchTasks(size_t batch_size, - bool batch_align = true, + bool overrun = false, bool allow_split_request = true) : _batch_size(batch_size), _rem_size(batch_size), - _batch_align(batch_align), + _overrun(overrun), _allow_split_request(allow_split_request) { _batch_in.clear(); _batch_in_offset.clear(); @@ -484,10 +484,10 @@ class BatchTasks { // 能进到此函数的task都是同类task,在该函数之前已保证了这点。 size_t append_task(TaskT* task) { size_t add = std::min(task->rem, _rem_size); - // when _batch_align == false, it means always take a whole task as TaskMeta + // when _overrun == true, it means always take a whole task as TaskMeta // we can temporary breakthrough the limit of BatchTask`s capacity // BatchTask`s capacity is _batch_size or _rem_size - if (!_batch_align) { + if (_overrun) { add = task->rem; } int start_index = task->batch_size() - task->rem; @@ -883,7 +883,7 @@ class BatchTasks { const size_t get_rem_size() { return _rem_size; } - bool get_batch_align() { return _batch_align; } + bool get_overrun() { return _overrun; } bool get_allow_split_request() { return _allow_split_request; } @@ -905,7 +905,7 @@ class BatchTasks { size_t _rem_size; size_t _batch_size; - bool _batch_align; + bool _overrun; bool _allow_split_request; }; @@ -991,7 +991,7 @@ class TaskExecutor { _thread_reset_fn(NULL), _user_thread_contexts(NULL), _batch_size(DEFAULT_BATCH_SIZE), - _batch_align(false), + _overrun(false), _fn(NULL) { THREAD_MUTEX_INIT(&_mut, NULL); THREAD_COND_INIT(&_cond, NULL); @@ -1012,7 +1012,7 @@ class TaskExecutor { void set_batch_size(size_t batch_size) { _batch_size = batch_size; } - void set_batch_align(bool batch_align) { _batch_align = batch_align; } + void set_overrun(bool overrun) { _overrun = overrun; } void set_allow_split_request(bool allow_split_request) { _allow_split_request = allow_split_request; @@ -1068,7 +1068,7 @@ class TaskExecutor { std::vector*> _thread_contexts; size_t _batch_size; - bool _batch_align; + bool _overrun; bool _allow_split_request; boost::function _fn; diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp old mode 100644 new mode 100755 index 47ba975a..a01f3a58 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -25,7 +25,7 @@ int ReloadableInferEngine::proc_initialize_impl( _model_dir = conf.model_dir(); _infer_thread_num = conf.runtime_thread_num(); _infer_batch_size = conf.batch_infer_size(); - _infer_batch_align = conf.enable_batch_align(); + _infer_overrun = conf.enable_overrun(); _allow_split_request = conf.allow_split_request(); _conf = conf; @@ -67,8 +67,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); im::bsf::TaskExecutorVector::instance()[_model_index].set_batch_size( _infer_batch_size); - im::bsf::TaskExecutorVector::instance()[_model_index].set_batch_align( - _infer_batch_align); + im::bsf::TaskExecutorVector::instance()[_model_index].set_overrun( + _infer_overrun); im::bsf::TaskExecutorVector::instance()[_model_index] .set_allow_split_request(_allow_split_request); if (im::bsf::TaskExecutorVector::instance()[_model_index].start( @@ -79,7 +79,7 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, LOG(WARNING) << "Enable batch schedule framework, thread_num:" << _infer_thread_num << ", batch_size:" << _infer_batch_size - << ", enable_batch_align:" << _infer_batch_align + << ", enable_overrun:" << _infer_overrun << ", allow_split_request:" << _allow_split_request; return 0; } diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 66040fa3..71fdb824 100755 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -163,7 +163,7 @@ class ReloadableInferEngine : public InferEngine { uint32_t _infer_batch_size; // Need to align batch_size in inferring - bool _infer_batch_align; + bool _infer_overrun; // allow to split request in inferring bool _allow_split_request; -- GitLab