diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto old mode 100644 new mode 100755 index 45ad2b84e41196232dd65546ca16da104f5a33c6..13b9d39553b9219f0ab7f494f58ab0b7cfd3b7e8 --- a/core/configure/proto/server_configure.proto +++ b/core/configure/proto/server_configure.proto @@ -22,11 +22,8 @@ message EngineDesc { required string reloadable_type = 4; required string model_dir = 5; repeated int32 gpu_ids = 6; - required int32 runtime_thread_num = 7; - required int32 batch_infer_size = 8; - required int32 enable_batch_align = 9; - optional string version_file = 10; - optional string version_type = 11; + optional string version_file = 7; + optional string version_type = 8; /* * Sparse Parameter Service type. Valid types are: @@ -39,17 +36,34 @@ message EngineDesc { LOCAL = 1; REMOTE = 2; } - optional SparseParamServiceType sparse_param_service_type = 12; - optional string sparse_param_service_table_name = 13; - optional bool enable_memory_optimization = 14; - optional bool enable_ir_optimization = 15; - optional bool use_trt = 16; - optional bool use_lite = 17; - optional bool use_xpu = 18; - optional bool use_gpu = 19; - optional bool combined_model = 20; - optional bool encrypted_model = 21; - optional bool gpu_multi_stream = 22; + optional SparseParamServiceType sparse_param_service_type = 10; + optional string sparse_param_service_table_name = 11; + optional bool enable_memory_optimization = 12; + optional bool enable_ir_optimization = 13; + optional bool use_trt = 14; + optional bool use_lite = 15; + optional bool use_xpu = 16; + optional bool use_gpu = 17; + optional bool combined_model = 18; + optional bool encrypted_model = 19; + optional bool gpu_multi_stream = 20; + + /* + * "runtime_thread_num": n == 0 means don`t use Asynchronous task scheduling + * mode. + * n > 0 means how many Predictor for this engine in Asynchronous task + * scheduling mode. + * "batch_infer_size": the max batch for this engine in Asynchronous task + * scheduling mode. + * "enable_overrun": always put a whole task into the TaskQueue even if the + * total batch is bigger than "batch_infer_size". + * "allow_split_request": allow to split task(which is corresponding to + * request). + */ + optional int32 runtime_thread_num = 30 [ default = 0 ]; + optional int32 batch_infer_size = 31 [ default = 32 ]; + optional bool enable_overrun = 32 [ default = false ]; + optional bool allow_split_request = 33 [ default = true ]; }; // model_toolkit conf diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h old mode 100644 new mode 100755 index 1f5d272d2875ee878f09ac2882364afe9fd899fb..7cd8f149b258e453cd62db256a23288e834844de --- a/core/predictor/framework/bsf-inl.h +++ b/core/predictor/framework/bsf-inl.h @@ -26,9 +26,90 @@ #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/memory.h" +// this file is included by bsf.h namespace im { namespace bsf { +template +bool Task::task_fetch_init(BatchTasks& batchTask) { + // 双检锁,减少加锁的粒度 + if (!fetch_init) { + if (taskmeta_num > 1) { + // 对于task被拆分为多个taskmeta,需要加锁。 + AutoMutex lock(task_mut); + task_fetch_create(batchTask); + } else { + // 对于task只有1个taskmeta,不需要加锁。 + task_fetch_create(batchTask); + } + } + return true; +} + +template +bool Task::task_fetch_create(BatchTasks& batchTask) { + if (!fetch_init) { + vector_fetch_lod_index = batchTask.vector_fetch_lod_index; + set_fetch_nobatch_index = batchTask.set_fetch_nobatch_index; + OutVectorT taskMetaOutLodTensor; + size_t fetchvar_num = batchTask._batch_out.size(); + for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num; + ++fetchvar_index) { + size_t fetchvar_bytesize_index = + batchTask.fetchvar_bytesize(fetchvar_index); + size_t fetchvar_batch = 0; + // 1. nobatch fetchvar情况 + if (set_fetch_nobatch_index.size() > 0 && + set_fetch_nobatch_index.find(fetchvar_index) != + set_fetch_nobatch_index.end()) { + fetchvar_batch = 1; + } else if (vector_fetch_lod_index.size() > 0 && + std::find(vector_fetch_lod_index.begin(), + vector_fetch_lod_index.end(), + fetchvar_index) != vector_fetch_lod_index.end()) { + // lod fetchvar情况,此时无法确定总的shape[0] + // 根据task中的task_num总数开辟task_num个临时空间 + // 每个lod型的fetchvar拷贝到对应的临时空间中 + // 最后再计算临时空间的总量,合并fetchvar和lod + fetchvar_batch = 0; + + } else { + // 普通fetchvar情况,此时该Task总的fetchvar_batch = + // 输入的总的batch_size() + fetchvar_batch = batch_size(); + } + paddle::PaddleTensor tensor_out; + tensor_out.name = batchTask._batch_out[fetchvar_index].name; + tensor_out.dtype = + paddle::PaddleDType(batchTask._batch_out[fetchvar_index].dtype); + tensor_out.shape = batchTask._batch_out[fetchvar_index].shape; + tensor_out.shape[0] = fetchvar_batch; + if (fetchvar_batch != 0) { + // 此时 lod 为空。 + tensor_out.lod = batchTask._batch_out[fetchvar_index].lod; + // resize all batch memory at one time + size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index; + tensor_out.data.Resize(databuf_size); + } else { + // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task + // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy + + // 当task被分为多个taskMeta时,需要临时对象记录 + // 收齐后再一起合并 + if (taskmeta_num > 1) { + taskMetaOutLodTensor.push_back(tensor_out); + } + } + outVectorT_ptr->push_back(tensor_out); + } + // outLodTensorVector实际是一个双层vector + // shape为taskmeta_num * vector_fetch_lod_index.size(); + outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor); + fetch_init = true; + } + return true; +} + template void* TaskExecutor::thread_entry(void* args) { ThreadContext* context = static_cast*>(args); @@ -136,7 +217,7 @@ TaskHandler TaskExecutor::schedule( } /* - if (!BatchTasks::check_valid(in, out, _batch_align)) { + if (!BatchTasks::check_valid(in, out, _overrun)) { LOG(ERROR) << "Invalid input & output"; return TaskHandler::valid_handle(); } @@ -156,9 +237,11 @@ TaskHandler TaskExecutor::schedule( task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; + if (!task->task_init()) { + LOG(ERROR) << "task->init() failed"; + } task->rem = task->batch_size(); task->index.store(0, butil::memory_order_relaxed); - AutoMutex lock(_mut); _task_queue.push_back(task); THREAD_COND_SIGNAL(&_cond); @@ -168,11 +251,12 @@ TaskHandler TaskExecutor::schedule( // this function is accessed by multi thread. // so AutoMutex at first. -// so batch.append_task is thread safe. +// so batchTask.append_task is thread safe. // you dont need to add extra lock in append_task() +// task is already init. template bool TaskExecutor::move_task_to_batch( - BatchTasks& batch) { // NOLINT + BatchTasks& batchTask) { // NOLINT AutoMutex lock(_mut); while (_task_queue.empty()) { THREAD_COND_WAIT(&_cond, &_mut); @@ -183,15 +267,65 @@ bool TaskExecutor::move_task_to_batch( return false; } + TaskT* previous_task = nullptr; while (!_task_queue.empty()) { TaskT* task = _task_queue.front(); - size_t rem = batch.append_task(task); + + // 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod) + // 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。 + // 只需要设置engine的属性allow_split_request = false即可。 + + // 复杂的处理方法是允许拆分Task,无论是否包含lod. + // 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar + // 所以,task中先要创建taskmeta_num* fetchvar + // num(lod类型的)个临时PaddleTensor(存储data及Lod) + // 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建 + // 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。 + // 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。 + // 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。 + + // _overrun表示,异步BatchTasks是否允许单次临时超过限制。 + // _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。 + // _overrun为false时,不允许。 + // 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。 + // 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。 + + // _allow_split_request == + // true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch + // _allow_split_request == + // false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费 + // 默认为true,允许拆分task从而使得空间利用率最大。 + if (!batchTask.get_allow_split_request()) { + if (task->batch_size() > batchTask.get_rem_size() && + !batchTask.get_overrun()) { + break; + } + } + + // combine_task_valid负责判断是否能够合并 + // 除最外层的shape外,内层shape应一致才能合并。 + // 否则跳出循环,放入下一个batchTask中。 + // 以此保证batch.append_task(task)中的task的内层shape相同。 + + // 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值 + // 所以要求该feedvar必须相等,才能合并。 + // 否则跳出循环,放入下一个batchTask中。 + // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存. + // TODO(HexToString): 可以考虑后期支持AutoPadding. + if (previous_task != nullptr) { + if (!task->combine_task_valid(previous_task)) { + break; + } + } + size_t rem = batchTask.append_task(task); + previous_task = task; if (task->rem <= 0) { _task_queue.pop_front(); } if (rem <= 0) break; } - + LOG(INFO) << "Number of tasks remaining in _task_queue is" + << _task_queue.size(); return true; } @@ -201,11 +335,12 @@ bool TaskExecutor::move_task_to_batch( // TaskT is from the SingleTon TaskExecutor`s _task_queue // although TaskMeta is a local variable, but several TaskMeta may points to // the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue. -// put TaskMeta to the local variable BatchTasks batch. +// put TaskMeta to the local variable BatchTasks batchTask. -// batch.merge_tasks() and batch.notify_tasks() has no lock. -// BatchTasks batch itself is a local variable, it`s thread safe. -// If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta +// batchTask.merge_tasks() and batchTask.notify_tasks() has no lock. +// BatchTasks batchTask itself is a local variable, it`s thread safe. +// If batchTask.merge_tasks() and batchTask.notify_tasks() do something to +// TaskMeta // you need to pay attention to that. // Multi-Thread deal with different TaskMeta(cause it`s created as local // variable) @@ -242,11 +377,23 @@ int TaskExecutor::work(ThreadContext* context) { return -1; } - BatchTasks batch(_batch_size, _batch_align); - if (move_task_to_batch(batch)) { - batch.merge_tasks(); - _fn(&batch.in(), &batch.out()); - batch.notify_tasks(); + // move_task_to_batch() take the original task from the `_task_queue` + // put the original task into its own Vector + // the capacity of its own Vector is decided by `_batch_size` or + // `_overrun` + + // merge_tasks() move the imput-data into `_batch_in` from its own + // Vector. + // because the predictor`s input is the `_batch_in` + + // notify_tasks() move the output-data into every single taskmeta from + // `_batch_out`. + // because the predictor`s output is the `_batch_out` + BatchTasks batchTask(_batch_size, _overrun, _allow_split_request); + if (move_task_to_batch(batchTask)) { + batchTask.merge_tasks(); + _fn(&batchTask.in(), &batchTask.out()); + batchTask.notify_tasks(); } } diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h old mode 100644 new mode 100755 index 7a8629e75b87aec889a1cce98b6392dddad32ce0..aa3aab5603012ce5d2149774d63f4c7d14655adf --- a/core/predictor/framework/bsf.h +++ b/core/predictor/framework/bsf.h @@ -16,7 +16,9 @@ #include #include +#include #include +#include #include #ifdef BCLOUD @@ -46,7 +48,8 @@ static const size_t DEFAULT_BATCH_SIZE = 100; // `rem` don`t need to be atomic, cause the operation `put` is synchronous. // actually, the reason is that lock have been added outside the operation // `put`. - +template +class BatchTasks; // size_t `index` records how many batch have been processing completed. // `index` need to be atomic, cause the operation 'notify' is asynchronous. template @@ -56,7 +59,7 @@ struct Task { typedef InItemT InType; typedef OutItemT OutType; typedef Task TaskT; - typedef std::vector ShapeVector; + typedef std::vector ShapeVector; typedef std::vector VectorOfShapeVector; int read_fd; @@ -65,7 +68,17 @@ struct Task { const InVectorT* inVectorT_ptr; OutVectorT* outVectorT_ptr; size_t rem; + size_t total_feed_batch; + std::set set_feed_lod_index; + std::set set_feed_nobatch_index; + std::vector vector_fetch_lod_index; + std::set set_fetch_nobatch_index; butil::atomic index; + size_t taskmeta_num; + THREAD_MUTEX_T task_mut; + bool fetch_init; + // taskmeta_num * set_feed_lod_index.size() + std::vector outLodTensorVector; Task() { read_fd = -1; @@ -73,11 +86,24 @@ struct Task { owner_tid = -1; inVectorT_ptr = NULL; outVectorT_ptr = NULL; + set_feed_lod_index.clear(); + set_feed_nobatch_index.clear(); + vector_fetch_lod_index.clear(); + set_fetch_nobatch_index.clear(); rem = -1; + total_feed_batch = 0; + taskmeta_num = 0; index.store(0, butil::memory_order_relaxed); + THREAD_MUTEX_INIT(&task_mut, NULL); + fetch_init = false; + outLodTensorVector.clear(); + } + ~Task() { + THREAD_MUTEX_DESTROY(&task_mut); + outLodTensorVector.clear(); } - bool check_feedvar_valid(int feedvar_index) { + bool check_feedvar_valid(size_t feedvar_index) { if (feedvar_index < 0 || inVectorT_ptr->size() <= feedvar_index) { LOG(ERROR) << "feedvar doesnt exsit or feedvar_index error"; return 0; @@ -91,20 +117,47 @@ struct Task { return 1; } - // Now, it simply assume that the first dimension of data is batch. - // so the batch is PaddleTensor.shape[0] + bool combine_task_valid(Task* other_task) { + // TODO(HexToString): auto-padding + // 除最外层的shape外,内层shape应一致才能合并。 + // 否则跳出循环,放入下一个batchTask中。 + // 以此保证batch.append_task(task)中的task的内层shape相同。 + if (other_task->feedvar_shape_nobatch() != feedvar_shape_nobatch()) { + return false; + } - // If batch information is added into feedvar.prototxt. - // we can get the information from the feedvar.prototxt instead of assume. - size_t feedvar_batch_size(int feedvar_index) { + // 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值 + // 所以要求该feedvar必须相等,才能合并。 + // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存. + for (size_t feedvar_index = 0; + feedvar_index < set_feed_nobatch_index.size(); + ++feedvar_index) { + int result = + std::memcmp((*inVectorT_ptr)[feedvar_index].data.data(), + (*(other_task->inVectorT_ptr))[feedvar_index].data.data(), + (*inVectorT_ptr)[feedvar_index].data.length()); + if (result != 0) return false; + } + return true; + } + + size_t feedvar_batch_size(size_t feedvar_index) { if (!check_feedvar_valid(feedvar_index)) { return 0; } - + // if lod, 'lod[0].size()-1' is batch. + // for PaddleTensor lod is vector>, so lod[0] is real lod. + // for example, lod = [0,3,4,6], shape = [6,340,340], batch is 3 actually. + // for lod, the batch < shape[0]. + if ((*inVectorT_ptr)[feedvar_index].lod.size() > 0 && + (*inVectorT_ptr)[feedvar_index].lod[0].size() > 0) { + return (*inVectorT_ptr)[feedvar_index].lod[0].size() - 1; + } + // if not lod, the first dimension of data `PaddleTensor.shape[0]` is batch. return (*inVectorT_ptr)[feedvar_index].shape[0]; } - size_t feedvar_element_bytesize(int feedvar_index) { + size_t feedvar_element_bytesize(size_t feedvar_index) { if (!check_feedvar_valid(feedvar_index)) { return 0; } @@ -126,7 +179,7 @@ struct Task { // Now, the implementation of this function is based on assumption // that shape [0] = batch_size. - size_t feedvar_element_num(int feedvar_index) { + size_t feedvar_element_num(size_t feedvar_index) { if (!check_feedvar_valid(feedvar_index)) { return 0; } @@ -138,18 +191,18 @@ struct Task { return 1; } // start from shape[1], cause shape[0] = batch_size. - for (int i = 1; i < (*inVectorT_ptr)[feedvar_index].shape.size(); ++i) { + for (size_t i = 1; i < (*inVectorT_ptr)[feedvar_index].shape.size(); ++i) { element_num *= (*inVectorT_ptr)[feedvar_index].shape[i]; } return element_num; } - size_t feedvar_bytesize(int feedvar_index) { + size_t feedvar_bytesize(size_t feedvar_index) { return feedvar_element_num(feedvar_index) * feedvar_element_bytesize(feedvar_index); } - ShapeVector feedvar_shape_nobatch(int feedvar_index) { + ShapeVector feedvar_shape_nobatch(size_t feedvar_index) { if (!check_feedvar_valid(feedvar_index)) { return ShapeVector(); } @@ -158,40 +211,167 @@ struct Task { } VectorOfShapeVector feedvar_shape_nobatch() { - VectorOfShapeVector vector_of_feedvar_shape_nobatch(inVectorT_ptr->size()); - for (int index = 0; index < inVectorT_ptr->size(); ++index) { - vector_of_feedvar_shape_nobatch.push_back(feedvar_shape_nobatch(index)); + VectorOfShapeVector vector_of_feedvar_shape_nobatch; + for (size_t feedvar_index = 0; feedvar_index < inVectorT_ptr->size(); + ++feedvar_index) { + vector_of_feedvar_shape_nobatch.push_back( + feedvar_shape_nobatch(feedvar_index)); } return vector_of_feedvar_shape_nobatch; } - // At present, it is considered that the batch of all feedvar is consistent. - // so for each feedvar, PaddleTensor.shape[0] should be the same. - bool check_batch_align() { - int batch_size_align = feedvar_batch_size(0); - for (int feedvar_index = 0; feedvar_index < inVectorT_ptr->size(); + // For each feedvar, batch should be 1 or batch_size. + // if feedvar-1: batch_size = 1 (always not batch). + // feedvar-2: batch_size = n, batch = n. + // this function is not thread safe. only called when task is creating. + bool task_init() { + total_feed_batch = feedvar_batch_size(0); + // which means error. + if (total_feed_batch <= 0) return false; + + for (size_t feedvar_index = 0; feedvar_index < inVectorT_ptr->size(); ++feedvar_index) { - if (feedvar_batch_size(feedvar_index) != batch_size_align) { - return 0; + // TODO(HexToString): Distinguish between nobatch and batch = + // 1(By:HexToString) + // 当数据中feedvar-1: 带batch,且batch =1,shape[0] = 1 + // feedvar-2:不带batch,由于不带batch导致shape[0] =1 + // 此时,无法分辨是否是天然nobatch,此时set_feed_nobatch_index会漏掉 + // 后续希望在其他地方能够区分两者。 + if (feedvar_batch_size(feedvar_index) != total_feed_batch) { + // which means error. + if (feedvar_batch_size(feedvar_index) != 1 && total_feed_batch != 1) { + return false; + } else { + // which means feedvar shape[0] = 1. + // shape[0] does not change with batch + set_feed_nobatch_index.insert(feedvar_index); + total_feed_batch = + std::max(feedvar_batch_size(feedvar_index), total_feed_batch); + } + } + // 将lod feedvar index加入到vector中。 + if ((*inVectorT_ptr)[feedvar_index].lod.size() > 0 && + (*inVectorT_ptr)[feedvar_index].lod[0].size() > 0) { + set_feed_lod_index.insert(feedvar_index); } } - /* - for(int fetchvar_index = 0; fetchvar_index < outVectorT_ptr->size(); - ++fetchvar_index) { - if(fetchvar_batch_size(fetchvar_index) != batch_size_align) { - return 0; + return true; + } + + size_t batch_size() { return total_feed_batch; } + + // start_batch range is 0~batch_size, end_batch range is 1~batch_size + // start_batch should not be included, end_batch > start_batch + // return is (start_batch, end_batch] = [start_batch+1,end_batch] + // for not lod, shape0_index = [(start_batch+1)-1,end_batch-1] = + // [start_batch,end_batch-1] = [start_batch,end_batch) + // for lod, shape0_index = [lod[start_batch],lod[end_batch]-1] = + // [lod[start_batch],lod[end_batch]) + // for nobatch, shape0_index = [0,1) + // 对于调用者,拿到shape0_index后,for(size_t myindex =shape0_index[0]; + // myindex > get_feature_by_batch(size_t feedvar_index, + size_t start_batch, + size_t end_batch) { + std::vector> feature_vector; + // feature_vector是双层vector,这么设计是由于一个遍历即可处理所有的特征。 + // feature_vector[0]是由shape0_index的范围值组成的vector,包含两个元素最小和最大值。 + // feature_vector[1]是由lod组成的vector,包含指定batch的lod信息. + // feature_vector[2]是由单个元素的组成的vector,元素值为1表示是nobatch的feedvar。 + + // if 为 nobatch feedvar情况。 + // else if 为带lod的feedvar情况。 + // else为不带lod 普通feedvar情况。 + if (set_feed_nobatch_index.size() > 0 && + set_feed_nobatch_index.find(feedvar_index) != + set_feed_nobatch_index.end()) { + feature_vector = {{0, 1}, {}, {1}}; + } else if (set_feed_lod_index.size() > 0 && + set_feed_lod_index.find(feedvar_index) != + set_feed_lod_index.end()) { + std::vector feed_lod_vector(end_batch - start_batch); + for (size_t lod_index = start_batch + 1, vector_index = 0; + lod_index < end_batch + 1; + ++lod_index, ++vector_index) { + feed_lod_vector[vector_index] = + (*inVectorT_ptr)[feedvar_index].lod[0][lod_index] - + (*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; } + size_t shape0_start = (*inVectorT_ptr)[feedvar_index].lod[0][start_batch]; + size_t shape0_end = (*inVectorT_ptr)[feedvar_index].lod[0][end_batch]; + feature_vector = {{shape0_start, shape0_end}, feed_lod_vector}; + // feature_vector.push_back(feed_lod_vector); + } else { + feature_vector = {{start_batch, end_batch}}; } - */ - return 1; + return feature_vector; } - size_t batch_size() { - if (check_batch_align()) { - return feedvar_batch_size(0); + bool combine_taskmeta() { + // 只有含有lod类型的fetch输出,且task被拆分为多个taskmeta的情况 + // 才需要将数据从outLodTensorVector搬运到outVectorT_ptr + if (vector_fetch_lod_index.size() > 0 && taskmeta_num > 1) { + for (size_t index = 0; index < vector_fetch_lod_index.size(); ++index) { + size_t data_length = 0; + size_t lod_length = 0; + size_t total_shape0 = 0; + size_t feedvar_index = vector_fetch_lod_index[index]; + // 由于PaddleTensor的resize实现,是每次都会清空,所以必须先统计总长度。 + for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num; + ++taskmeta_num) { + data_length += + outLodTensorVector[taskmeta_index][index].data.length(); + lod_length += outLodTensorVector[taskmeta_index][index].lod[0].size(); + total_shape0 += outLodTensorVector[taskmeta_index][index].shape[0]; + } + // 一次性扩容PaddleTensor中的data和lod + paddle::PaddleTensor& fetchVarTensor = (*outVectorT_ptr)[feedvar_index]; + fetchVarTensor.data.Resize(data_length); + // task中的lod补0 + if (fetchVarTensor.lod.size() <= 0) { + fetchVarTensor.lod.push_back({0}); + } else if (fetchVarTensor.lod[0].size() <= 0) { + fetchVarTensor.lod[0].push_back(0); + } + fetchVarTensor.lod[0].resize(lod_length + 1, 0); + + // + size_t data_length_offset = 0; + size_t lod_length_offset = 0; + size_t once_data_length = 0; + size_t once_lod_length = 0; + size_t last_lod_value = fetchVarTensor.lod[0][lod_length_offset]; + for (size_t taskmeta_index = 0; taskmeta_index < taskmeta_num; + ++taskmeta_num) { + void* dst_ptr = fetchVarTensor.data.data() + data_length_offset; + void* source_ptr = + outLodTensorVector[taskmeta_index][index].data.data(); + once_data_length = + outLodTensorVector[taskmeta_index][index].data.length(); + memcpy(dst_ptr, source_ptr, once_data_length); + once_lod_length = + outLodTensorVector[taskmeta_index][index].lod[0].size(); + for (size_t once_index = 0; once_index < once_lod_length; + ++once_index) { + fetchVarTensor.lod[0][lod_length_offset + 1] = + last_lod_value + + outLodTensorVector[taskmeta_index][index].lod[0][once_index]; + } + data_length_offset += once_data_length; + lod_length_offset += once_lod_length; + } + } } - return 0; + return true; } + + bool task_fetch_init(BatchTasks& batchTask); + bool task_fetch_create(BatchTasks& batchTask); }; // `Several Task` or `part of batch in Task` can be a TaskMeta. @@ -206,61 +386,164 @@ struct Task { // TaskMeta is necessary. // cause we need know the the corresponding relationship between -// `batch_out`(which is in BatchTasks) and `outVectorT_ptr`(which is in Task). +// `_batch_out`(which is in BatchTasks) and `outVectorT_ptr`(which is in Task). // especially when 1 Task be divided into several TaskMeta and be put into // several different BatchTasks. + +// begin、add、end means batch, not shape[0]. +// if not lod, batch == shape[0]. if lod, batch != shape[0] +// for example, lod = [0,3,4,6], shape = [6,340,340] +// there is 3 batch actually, add = 3, but shape[0] = 6. template struct TaskMeta { - TaskMeta(TaskT* ptr, size_t start, size_t add) - : task(ptr), begin(start), end(start + add) {} + TaskMeta(TaskT* ptr, size_t start, size_t add, size_t taskmeta_index) + : task(ptr), + begin(start), + end(start + add), + taskmeta_index(taskmeta_index) { + feedvar_num = ptr->inVectorT_ptr->size(); + for (size_t feedvar_index = 0; feedvar_index < feedvar_num; + ++feedvar_index) { + std::vector> feature = + ptr->get_feature_by_batch(feedvar_index, start, start + add); + feed_shape0_range.push_back(feature[0]); + feedvar_type.push_back(feature.size()); + if (feature.size() == 1) { + feed_lod_vector.push_back({}); + } else if (feature.size() == 2) { + feed_lod_vector.push_back(feature[1]); + } else { + feed_lod_vector.push_back({}); + } + } + } TaskT* task; size_t begin; size_t end; + size_t feedvar_num; + size_t taskmeta_index; + std::vector> feed_shape0_range; + std::vector> feed_lod_vector; + std::vector feedvar_type; }; // each TaskT is already include batch in itself // BatchTasks need to combine several `small TaskMeta` into a new `big TaskT`. // The only difference between the `big TaskT` and `small TaskT` is that -// the TaskT.inVectorT_ptr->[feedvar_index].shape[0] -// which is actually batch_size is different. +// the TaskT.inVectorT_ptr->[feedvar_index].shape[0] is different +// `big TaskT`.inVectorT_ptr->[feedvar_index].shape[0] is actually batch_size . template class BatchTasks { public: typedef typename TaskT::InType InType; typedef typename TaskT::OutType OutType; typedef TaskMeta TaskMetaT; + typedef std::vector ShapeVector; + typedef std::vector VectorOfShapeVector; + typedef std::vector LodVector; + typedef std::vector PaddleTensorLod; + friend TaskT; - explicit BatchTasks(size_t batch_size, bool batch_align = true) + explicit BatchTasks(size_t batch_size, + bool overrun = false, + bool allow_split_request = true) : _batch_size(batch_size), _rem_size(batch_size), - _batch_align(batch_align) { + _overrun(overrun), + _allow_split_request(allow_split_request) { _batch_in.clear(); _batch_in_offset.clear(); + _total_shape0_batch_in.clear(); + _total_feed_batch = 0; + _batch_in_lod.clear(); + _batch_out.clear(); _batch_out_offset.clear(); + _total_fetch_batch = 0; _taskmeta_vector.clear(); + set_fetch_nobatch_index.clear(); + vector_fetch_lod_index.clear(); } ~BatchTasks() { _batch_in.clear(); _batch_in_offset.clear(); + _total_shape0_batch_in.clear(); + _total_feed_batch = 0; + _batch_in_lod.clear(); + _batch_out.clear(); _batch_out_offset.clear(); + _total_fetch_batch = 0; _taskmeta_vector.clear(); + set_fetch_nobatch_index.clear(); + vector_fetch_lod_index.clear(); } // synchronized operation // because Upper level callers of this function have already locked. + // 能进到此函数的task都是同类task,在该函数之前已保证了这点。 size_t append_task(TaskT* task) { size_t add = std::min(task->rem, _rem_size); - if (!_batch_align) { + // when _overrun == true, it means always take a whole task as TaskMeta + // we can temporary breakthrough the limit of BatchTask`s capacity + // BatchTask`s capacity is _batch_size or _rem_size + if (_overrun) { add = task->rem; } int start_index = task->batch_size() - task->rem; - TaskMetaT tm(task, start_index, add); + TaskMetaT tm(task, start_index, add, task->taskmeta_num); + task->taskmeta_num += 1; _taskmeta_vector.push_back(tm); - + if (_batch_in_offset.size() == 0) { + _batch_in_offset.resize(tm.feedvar_num, 0); + } + if (_total_shape0_batch_in.size() == 0) { + _total_shape0_batch_in.resize(tm.feedvar_num, 0); + } + if (_batch_in_lod.size() == 0) { + PaddleTensorLod null_lod; + _batch_in_lod.resize(tm.feedvar_num, null_lod); + } + _total_feed_batch += add; + for (size_t feedvar_index = 0; feedvar_index < tm.feedvar_num; + ++feedvar_index) { + if (tm.feedvar_type[feedvar_index] == 1) { + // 普通的非lod feedvar + // 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。 + _total_shape0_batch_in[feedvar_index] += + tm.feed_shape0_range[feedvar_index][1] - + tm.feed_shape0_range[feedvar_index][0]; + } else if (tm.feedvar_type[feedvar_index] == 2) { + // lod类型的feedvar + // 累计计算shape0的累加值,为后面初始化PaddleTensor做准备。 + _total_shape0_batch_in[feedvar_index] += + tm.feed_shape0_range[feedvar_index][1] - + tm.feed_shape0_range[feedvar_index][0]; + // 在Lod最前面加0 + if (_batch_in_lod[feedvar_index].size() <= 0) { + _batch_in_lod[feedvar_index].push_back({0}); + } else if (_batch_in_lod[feedvar_index][0].size() <= 0) { + _batch_in_lod[feedvar_index][0].push_back(0); + } + // 将lod加上前一组lod的结尾最大值,组合Lod + size_t last_lod_value = _batch_in_lod[feedvar_index][0].back(); + for (size_t lod_index = 0; + lod_index < tm.feed_lod_vector[feedvar_index].size(); + ++lod_index) { + _batch_in_lod[feedvar_index][0].push_back( + last_lod_value + tm.feed_lod_vector[feedvar_index][lod_index]); + } + } else { + // tm.feedvar_type[feedvar_index] == 3 + // nobatch类型的feedvar. + // 此时不累加,且值应为1 + _total_shape0_batch_in[feedvar_index] = + tm.feed_shape0_range[feedvar_index][1] - + tm.feed_shape0_range[feedvar_index][0]; + } + } task->rem -= add; _rem_size -= add; return _rem_size; @@ -281,72 +564,56 @@ class BatchTasks { // cause maybe next time we don`t need to do the extra copy. // directly copy the every Task into the Predictor. - // lod is not supported. - // if lod is set, we should not allow to use the bsf task. - // batch.merge_tasks() is thread-safe function // cause batch is a local variable and Task is just read, not written. + void merge_tasks() { if (_taskmeta_vector.size() <= 0) { return; } - // Temporarily, the batch of each feedvar is consistent - // If not consistent, use feedvar_batch_size instead of task->batch_size(). - int temp_batch = 0; - for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { - TaskMetaT& tm = _taskmeta_vector[ti]; - temp_batch += tm.task->batch_size(); - } - if (temp_batch > _batch_size) { - LOG(ERROR) << "_realNumber_batch_in >_batch_size, error."; - return; - } - - int feedvar_num = _taskmeta_vector[0].task->inVectorT_ptr->size(); - if (_batch_in_offset.size() == 0) { - _batch_in_offset.resize(feedvar_num, 0); - _realNumber_batch_in.resize(feedvar_num, temp_batch); - } - for (size_t ti = 0; ti < _taskmeta_vector.size(); ++ti) { TaskMetaT& tm = _taskmeta_vector[ti]; - for (int index = 0; index < feedvar_num; ++index) { + for (size_t feedvar_index = 0; feedvar_index < tm.feedvar_num; + ++feedvar_index) { const paddle::PaddleTensor& feedVarTensor = - (*tm.task->inVectorT_ptr)[index]; - size_t feedvar_bytesize = tm.task->feedvar_bytesize(index); + (*tm.task->inVectorT_ptr)[feedvar_index]; + size_t feedvar_bytesize = tm.task->feedvar_bytesize(feedvar_index); if (ti == 0) { - if (feedVarTensor.lod.size() > 0 && feedVarTensor.lod[0].size() > 0) { - LOG(ERROR) << "lod Tensor is not supported now."; - return; - } + // Create the entire tensor at once // for now, we assume that every task feedvar_bytesize is the same. // which means we dont support auto embedding. // but for different feedvar, it is different. paddle::PaddleTensor paddleTensor; paddleTensor.dtype = feedVarTensor.dtype; paddleTensor.name = feedVarTensor.name; - paddleTensor.lod = feedVarTensor.lod; + paddleTensor.lod = _batch_in_lod[feedvar_index]; paddleTensor.shape = feedVarTensor.shape; - paddleTensor.shape[0] = _realNumber_batch_in[index]; + paddleTensor.shape[0] = _total_shape0_batch_in[feedvar_index]; paddleTensor.data.Resize(feedvar_bytesize * - _realNumber_batch_in[index]); + _total_shape0_batch_in[feedvar_index]); _batch_in.push_back(paddleTensor); } - void* dst_ptr = _batch_in[index].data.data() + _batch_in_offset[index]; + void* dst_ptr = _batch_in[feedvar_index].data.data() + + _batch_in_offset[feedvar_index]; void* source_ptr = - feedVarTensor.data.data() + feedvar_bytesize * tm.begin; - size_t length = feedvar_bytesize * (tm.end - tm.begin); + feedVarTensor.data.data() + + feedvar_bytesize * tm.feed_shape0_range[feedvar_index][0]; + size_t length = + feedvar_bytesize * (tm.feed_shape0_range[feedvar_index][1] - + tm.feed_shape0_range[feedvar_index][0]); memcpy(dst_ptr, source_ptr, length); - _batch_in_offset[index] += length; + // nobatch类型的feedvar,不叠加. + if (tm.feedvar_type[feedvar_index] != 3) + _batch_in_offset[feedvar_index] += length; } } } - bool check_fetchvar_valid(int fetchvar_index) { + bool check_fetchvar_valid(size_t fetchvar_index) { if (fetchvar_index < 0 || _batch_out.size() <= fetchvar_index) { LOG(ERROR) << "fetchvar doesnt exsit or fetchvar_index error"; return 0; @@ -360,19 +627,11 @@ class BatchTasks { return 1; } - size_t fetchvar_batch_size(int fetchvar_index) { - if (!check_fetchvar_valid(fetchvar_index)) { - return 0; - } - - return _batch_out[fetchvar_index].shape[0]; - } - - size_t fetchvar_element_bytesize(int fetchvar_index) { + size_t fetchvar_element_bytesize(size_t fetchvar_index) { if (!check_fetchvar_valid(fetchvar_index)) { return 0; } - int dtype = _batch_out[fetchvar_index].dtype; + size_t dtype = _batch_out[fetchvar_index].dtype; if (dtype == paddle::PaddleDType::INT64) { return sizeof(int64_t); } @@ -390,7 +649,7 @@ class BatchTasks { // Now, the implementation of this function is based on assumption // that shape [0] = batch_size. - size_t fetchvar_element_num(int fetchvar_index) { + size_t fetchvar_element_num(size_t fetchvar_index) { if (!check_fetchvar_valid(fetchvar_index)) { return 0; } @@ -400,35 +659,66 @@ class BatchTasks { return 1; } // start from shape[1], cause shape[0] = batch_size. - for (int i = 1; i < _batch_out[fetchvar_index].shape.size(); ++i) { + for (size_t i = 1; i < _batch_out[fetchvar_index].shape.size(); ++i) { element_num *= _batch_out[fetchvar_index].shape[i]; } return element_num; } - size_t fetchvar_bytesize(int fetchvar_index) { + size_t fetchvar_bytesize(size_t fetchvar_index) { return fetchvar_element_num(fetchvar_index) * fetchvar_element_bytesize(fetchvar_index); } - bool check_fetchvar_batch_align() { - int batch_size_align = fetchvar_batch_size(0); - - for (int fetchvar_index = 0; fetchvar_index < _batch_out.size(); - ++fetchvar_index) { - if (fetchvar_batch_size(fetchvar_index) != batch_size_align) { - return 0; - } + size_t fetchvar_batch_size(size_t fetchvar_index) { + if (!check_fetchvar_valid(fetchvar_index)) { + return 0; } - - return 1; + // if lod, 'lod[0].size()-1' is batch. + // for PaddleTensor lod is vector>, so lod[0] is real lod. + // for example, lod = [0,3,4,6], shape = [6,340,340], batch is 3 actually. + // for lod, the batch < shape[0]. + if (_batch_out[fetchvar_index].lod.size() > 0 && + _batch_out[fetchvar_index].lod[0].size() > 0) { + return _batch_out[fetchvar_index].lod[0].size() - 1; + } + // if not lod, the first dimension of data `PaddleTensor.shape[0]` is batch. + return _batch_out[fetchvar_index].shape[0]; } - size_t fetchvar_batch_size() { - if (check_fetchvar_batch_align()) { - return fetchvar_batch_size(0); + size_t fetchvar_batch_size() { return _total_fetch_batch; } + + bool deal_batch_out() { + _total_fetch_batch = fetchvar_batch_size(0); + if (_total_fetch_batch <= 0) return false; + for (size_t fetchvar_index = 0; fetchvar_index < _batch_out.size(); + ++fetchvar_index) { + // TODO(HexToString): Distinguish between nobatch and batch = + // 1(By:HexToString) + // 当数据中fetchvar-1: 带batch,且batch =1,shape[0] = 1 + // fetchvar-2:不带batch,由于不带batch导致shape[0] =1 + // 此时,无法分辨是否是天然nobatch,此时set_fetch_nobatch_index会漏掉 + // 后续希望在其他地方能够区分两者。 + if (fetchvar_batch_size(fetchvar_index) != _total_fetch_batch) { + // which means error. + if (fetchvar_batch_size(fetchvar_index) != 1 && + _total_fetch_batch != 1) { + return false; + } else { + // which means fetchvar shape[0] = 1. + // shape[0] does not change with batch + set_fetch_nobatch_index.insert(fetchvar_index); + _total_fetch_batch = + std::max(fetchvar_batch_size(fetchvar_index), _total_fetch_batch); + } + } + // 将lod fetchvar index加入到vector中。 + if (_batch_out[fetchvar_index].lod.size() > 0 && + _batch_out[fetchvar_index].lod[0].size() > 0) { + vector_fetch_lod_index.push_back(fetchvar_index); + } } - return 0; + return true; } void notify_tasks() { @@ -436,12 +726,16 @@ class BatchTasks { LOG(ERROR) << "_taskmeta_vector.size() <=0, error."; return; } - if (_realNumber_batch_in[0] != fetchvar_batch_size()) { + // 根据_batch_out,求出输出的整体batch + // 并将lod类型和nobatch类型的fetchvar的index记录到set中,方便后续查看。 + deal_batch_out(); + // 若输出的batch不是1,且不与输入batch对应,则错误 + if (_total_feed_batch != _total_fetch_batch && _total_fetch_batch != 1) { LOG(ERROR) << "_batch_out`s batch != _batch_in`s batch, error."; return; } - int fetchvar_num = _batch_out.size(); + size_t fetchvar_num = _batch_out.size(); if (_batch_out_offset.size() == 0) { _batch_out_offset.resize(fetchvar_num, 0); } @@ -451,44 +745,132 @@ class BatchTasks { size_t begin = _taskmeta_vector[ti].begin; size_t end = _taskmeta_vector[ti].end; size_t add = end - begin; - - for (int index = 0; index < fetchvar_num; ++index) { - // the task->outVectorT_ptr is null before core->run(). - // first time we should copy from _batch_out - // so we need init. - size_t fetchvar_bytesize_index = fetchvar_bytesize(index); - if (task->outVectorT_ptr->size() <= index) { - paddle::PaddleTensor tensor_out; - tensor_out.name = _batch_out[index].name; - tensor_out.dtype = paddle::PaddleDType(_batch_out[index].dtype); - tensor_out.shape = _batch_out[index].shape; - tensor_out.shape[0] = task->batch_size(); - tensor_out.lod = _batch_out[index].lod; - // resize all batch memory at one time - size_t databuf_size = task->batch_size() * fetchvar_bytesize_index; - tensor_out.data.Resize(databuf_size); - task->outVectorT_ptr->push_back(tensor_out); - } - - paddle::PaddleTensor& fetchVarTensor = (*task->outVectorT_ptr)[index]; - - void* dst_ptr = - fetchVarTensor.data.data() + fetchvar_bytesize_index * begin; - size_t length = fetchvar_bytesize_index * add; - if (_batch_out_offset[index] + length > - fetchvar_batch_size() * fetchvar_bytesize(index)) { - LOG(ERROR) << "_batch_out is less than taskmeta, error."; - return; + size_t taskmeta_index = _taskmeta_vector[ti].taskmeta_index; + // 对task中的outVectorT_ptr进行初始化 + // 如果是lod输出+多个taskmeta,此时对outLodTensorVector也需要初始化 + if (!task->task_fetch_init(*this)) { + LOG(ERROR) << " task_fetch_init error."; + return; + } + size_t fetch_lod_index = 0; + + for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num; + ++fetchvar_index) { + size_t fetchvar_bytesize_index = fetchvar_bytesize(fetchvar_index); + + if (set_fetch_nobatch_index.size() > 0 && + set_fetch_nobatch_index.find(fetchvar_index) != + set_fetch_nobatch_index.end()) { + // nobatch fetchvar情况 + // 无论输入是多少batch,该index的fetchvar始终就shape[0] = 1 + paddle::PaddleTensor& fetchVarTensor = + (*task->outVectorT_ptr)[fetchvar_index]; + void* dst_ptr = fetchVarTensor.data.data(); + size_t length = fetchvar_bytesize_index * 1; + void* source_ptr = _batch_out[fetchvar_index].data.data(); + memcpy(dst_ptr, source_ptr, length); + } else if (vector_fetch_lod_index.size() > 0 && + std::find(vector_fetch_lod_index.begin(), + vector_fetch_lod_index.end(), + fetchvar_index) != vector_fetch_lod_index.end()) { + // lod fetchvar情况,此时无法确定总的shape[0] + // 根据task中的task_num总数开辟task_num个临时空间 + // 每个lod型的fetchvar拷贝到对应的临时空间中 + // 最后再计算临时空间的总量,合并fetchvar和lod + size_t last_batch = _batch_out_offset[fetchvar_index]; + size_t shape0_index_start = + _batch_out[fetchvar_index].lod[0][last_batch]; + size_t shape0_index_end = + _batch_out[fetchvar_index].lod[0][last_batch + add]; + size_t shape0_length = shape0_index_end - shape0_index_start; + // task被拆分为多个taskmeta时,不能直接拷入task->outVectorT_ptr + // 此时,先拷入task->outLodTensorVector[taskmeta_index] + // 当task所有的taskmeta都完成时,再按照顺序进行拷贝回task->outVectorT_ptr。 + if (task->taskmeta_num > 1) { + paddle::PaddleTensor& fetchVarTensor = + task->outLodTensorVector[taskmeta_index][fetch_lod_index]; + size_t length = fetchvar_bytesize_index * shape0_length; + fetchVarTensor.shape[0] = shape0_length; + fetchVarTensor.data.Resize(length); + void* dst_ptr = fetchVarTensor.data.data(); + void* source_ptr = _batch_out[fetchvar_index].data.data() + + shape0_index_start * fetchvar_bytesize_index; + memcpy(dst_ptr, source_ptr, length); + // 由于是拆分的各个lod,不要补0,在最后合并给Task中的outVectorT_ptr时再补。 + if (fetchVarTensor.lod.size() <= 0) { + fetchVarTensor.lod.push_back({}); + } + fetchVarTensor.lod[0].resize(add, 0); + size_t last_lod_value = + _batch_out[fetchvar_index].lod[0][last_batch]; + for (size_t lod_index = last_batch + 1, my_index = 0; + lod_index < last_batch + add + 1; + ++lod_index, ++my_index) { + fetchVarTensor.lod[0][my_index] = + (_batch_out[fetchvar_index].lod[0][lod_index] - + last_lod_value); + } + } else { + // task未被拆分为多个taskmeta,故只有某个线程中的taskmeta会操作task不存在多线程竞争 + // 此时resize后,直接写入task->outVectorT_ptr中即可。 + paddle::PaddleTensor& fetchVarTensor = + (*task->outVectorT_ptr)[fetchvar_index]; + size_t length = fetchvar_bytesize_index * shape0_length; + fetchVarTensor.shape[0] = shape0_length; + fetchVarTensor.data.Resize(length); + void* dst_ptr = fetchVarTensor.data.data(); + void* source_ptr = _batch_out[fetchvar_index].data.data() + + shape0_index_start * fetchvar_bytesize_index; + memcpy(dst_ptr, source_ptr, length); + + // task中的lod补0 + if (fetchVarTensor.lod.size() <= 0) { + fetchVarTensor.lod.push_back({0}); + } else if (fetchVarTensor.lod[0].size() <= 0) { + fetchVarTensor.lod[0].push_back(0); + } + // 将合并的lod信息对应的batch,拆分到task中。 + // 注意,此时需要去掉前面lod导致的前置积累。 + // 例如: 合lod = [0,2,5;7,10],是由两组batch=2的task合并后预测的。 + // 此时拆分,第一组时,都减去0,得到[2,5]+(由于前面已经补了0了) = + // [0,2,5] + // 第二组,都需要减5,得到[2,5],这样处理才对。 + fetchVarTensor.lod[0].resize(add + 1, 0); + size_t last_lod_value = + _batch_out[fetchvar_index].lod[0][last_batch]; + for (size_t lod_index = last_batch + 1, my_index = 1; + lod_index < last_batch + add + 1; + ++lod_index, ++my_index) { + fetchVarTensor.lod[0][my_index] = + (_batch_out[fetchvar_index].lod[0][lod_index] - + last_lod_value); + } + } + fetch_lod_index++; + } else { + // 普通fetchvar情况,此时该Task总的fetchvar_batch = + // 输入的总的batch_size() + // 输出的batch应与输入的batch对应相等。 + paddle::PaddleTensor& fetchVarTensor = + (*task->outVectorT_ptr)[fetchvar_index]; + void* dst_ptr = + fetchVarTensor.data.data() + fetchvar_bytesize_index * begin; + size_t length = fetchvar_bytesize_index * add; + void* source_ptr = + _batch_out[fetchvar_index].data.data() + + _batch_out_offset[fetchvar_index] * fetchvar_bytesize_index; + + memcpy(dst_ptr, source_ptr, length); } - void* source_ptr = - _batch_out[index].data.data() + _batch_out_offset[index]; - - memcpy(dst_ptr, source_ptr, length); - _batch_out_offset[index] += length; + _batch_out_offset[fetchvar_index] += add; } + // index是局部变量,fetch_add是原子操作,成功则返回原值。 + // 只有最后一个taskmeta都完成后,该线程的index+add才能>task->batch_size() + // 故只有一个线程能进入if{}内.不会造成多线程竞争的问题。 size_t index = task->index.fetch_add(add); if ((index + add) >= task->batch_size()) { + task->combine_taskmeta(); char c = 0; while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) { } @@ -503,17 +885,32 @@ class BatchTasks { size_t task_size() { return _taskmeta_vector.size(); } + const size_t get_rem_size() { return _rem_size; } + + bool get_overrun() { return _overrun; } + + bool get_allow_split_request() { return _allow_split_request; } + private: std::vector _taskmeta_vector; typename TaskT::InVectorT _batch_in; std::vector _batch_in_offset; - std::vector _realNumber_batch_in; + std::vector _total_shape0_batch_in; + size_t _total_feed_batch; + std::vector _batch_in_lod; + typename TaskT::OutVectorT _batch_out; std::vector _batch_out_offset; - std::vector _realNumber_batch_out; + // std::vector _total_shape0_batch_out; + size_t _total_fetch_batch; + // std::vector _batch_out_lod; + std::set set_fetch_nobatch_index; + std::vector vector_fetch_lod_index; + size_t _rem_size; size_t _batch_size; - bool _batch_align; + bool _overrun; + bool _allow_split_request; }; // BSF task handle @@ -589,6 +986,8 @@ class TaskExecutor { typedef typename TaskT::OutVectorT OutVectorT; typedef std::vector TaskArrayT; typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper; + typedef std::vector ShapeVector; + typedef std::vector VectorOfShapeVector; TaskExecutor() : _stop(false), @@ -596,7 +995,7 @@ class TaskExecutor { _thread_reset_fn(NULL), _user_thread_contexts(NULL), _batch_size(DEFAULT_BATCH_SIZE), - _batch_align(false), + _overrun(false), _fn(NULL) { THREAD_MUTEX_INIT(&_mut, NULL); THREAD_COND_INIT(&_cond, NULL); @@ -617,7 +1016,11 @@ class TaskExecutor { void set_batch_size(size_t batch_size) { _batch_size = batch_size; } - void set_batch_align(size_t batch_align) { _batch_align = batch_align; } + void set_overrun(bool overrun) { _overrun = overrun; } + + void set_allow_split_request(bool allow_split_request) { + _allow_split_request = allow_split_request; + } void set_thread_init_fn(boost::function init_fn, void** contexts = NULL) { @@ -642,7 +1045,7 @@ class TaskExecutor { TaskHandler schedule(const void*, void*); - bool move_task_to_batch(BatchTasks& batch); // NOLINT + bool move_task_to_batch(BatchTasks& batchTask); // NOLINT private: TaskExecutor(TaskExecutor const& other) = delete; @@ -669,7 +1072,8 @@ class TaskExecutor { std::vector*> _thread_contexts; size_t _batch_size; - bool _batch_align; + bool _overrun; + bool _allow_split_request; boost::function _fn; }; @@ -687,12 +1091,12 @@ class TaskExecutorVector { void resize(int size) { _vector_executor.resize(size); } - TaskExecutor& operator[](int index) { - if (_vector_executor.size() <= index || index <= -1) { - LOG(ERROR) << "_vector_executor.size() <= index or <= -1"; - throw "_vector_executor.size() <= index or <= -1"; + TaskExecutor& operator[](int task_index) { + if (_vector_executor.size() <= task_index || task_index <= -1) { + LOG(ERROR) << "_vector_executor.size() <= task_index or <= -1"; + throw "_vector_executor.size() <= task_index or <= -1"; } - return _vector_executor[index]; + return _vector_executor[task_index]; } private: @@ -717,8 +1121,8 @@ class TaskManager { typedef typename TaskT::InVectorT InVectorT; typedef typename TaskT::OutVectorT OutVectorT; - explicit TaskManager(uint32_t index) // NOLINT - : _model_index(index) {} + explicit TaskManager(uint32_t model_index) // NOLINT + : _model_index(model_index) {} ~TaskManager() { wait(); } diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp index aaba06195a825b20d2d8dbc84f81e77d604812cc..e0c284df5b9427e8e60bc1cfa19941f20cf2be9f 100644 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -25,7 +25,8 @@ int ReloadableInferEngine::proc_initialize_impl( _model_dir = conf.model_dir(); _infer_thread_num = conf.runtime_thread_num(); _infer_batch_size = conf.batch_infer_size(); - _infer_batch_align = conf.enable_batch_align(); + _infer_overrun = conf.enable_overrun(); + _allow_split_request = conf.allow_split_request(); _conf = conf; @@ -56,9 +57,6 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, } // init bsf framework - im::bsf::TaskExecutorVector::instance()[_model_index] - .set_thread_init_fn( - boost::bind(&InferEngine::thrd_initialize_impl, this)); im::bsf::TaskExecutorVector::instance()[_model_index] .set_thread_init_fn( boost::bind(&InferEngine::thrd_initialize_impl, this)); @@ -69,8 +67,10 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); im::bsf::TaskExecutorVector::instance()[_model_index].set_batch_size( _infer_batch_size); - im::bsf::TaskExecutorVector::instance()[_model_index].set_batch_align( - _infer_batch_align); + im::bsf::TaskExecutorVector::instance()[_model_index].set_overrun( + _infer_overrun); + im::bsf::TaskExecutorVector::instance()[_model_index] + .set_allow_split_request(_allow_split_request); if (im::bsf::TaskExecutorVector::instance()[_model_index].start( _infer_thread_num) != 0) { LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num; @@ -79,7 +79,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, LOG(WARNING) << "Enable batch schedule framework, thread_num:" << _infer_thread_num << ", batch_size:" << _infer_batch_size - << ", enable_batch_align:" << _infer_batch_align; + << ", enable_overrun:" << _infer_overrun + << ", allow_split_request:" << _allow_split_request; return 0; } @@ -391,6 +392,11 @@ int VersionedInferEngine::task_infer_impl(const void* in, return -1; } +int InferManager::set_taskexecutor_num(size_t total_engine_num) { + im::bsf::TaskExecutorVector::instance().resize(total_engine_num); + return 0; +} + int InferManager::proc_initialize(const char* path, const char* file, std::shared_ptr engine_index_ptr) { @@ -400,8 +406,6 @@ int InferManager::proc_initialize(const char* path, return -1; } uint32_t engine_num = model_toolkit_conf.engines_size(); - im::bsf::TaskExecutorVector::instance().resize(*engine_index_ptr + - engine_num); for (uint32_t ei = 0; ei < engine_num; ++ei) { LOG(INFO) << "model_toolkit_conf.engines(" << ei << ").name: " << model_toolkit_conf.engines(ei).name(); diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 411e6d94566e49ec21ab9e9abc60f3e30e0f29ad..770877d9fbdce4a25a466c5e9f03288da3231694 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -169,8 +169,10 @@ class ReloadableInferEngine : public InferEngine { uint32_t _infer_batch_size; // Need to align batch_size in inferring - bool _infer_batch_align; + bool _infer_overrun; + // allow to split request in inferring + bool _allow_split_request; // model version uint64_t _version; }; @@ -645,6 +647,8 @@ class InferManager { const char* file, std::shared_ptr engine_index_ptr); + int set_taskexecutor_num(size_t total_engine_num); + int thrd_initialize(); int thrd_clear(); diff --git a/core/predictor/framework/resource.cpp b/core/predictor/framework/resource.cpp old mode 100644 new mode 100755 index 7680014429db2ddb7fa6456b441806b2429426b5..0f5539d18e1942ffde31714333fe0ce89a49ff6f --- a/core/predictor/framework/resource.cpp +++ b/core/predictor/framework/resource.cpp @@ -135,6 +135,17 @@ int Resource::initialize(const std::string& path, const std::string& file) { if (FLAGS_enable_model_toolkit) { size_t model_toolkit_num = resource_conf.model_toolkit_path_size(); + // 此处暂时认为,每个model_toolkit仅包含一个engine + // 故认为 model_toolkit_num == engine总数 + // 若以后出现model_toolkit仅包含多个engine + // 则应先for循环统计engine总数,再set_taskexecutor_num + // 切不可动态im::bsf::TaskExecutorVector::instance().resize + // TaskExecutor是线程池,内含锁,在engine进程初始化时已开始work加锁循环运行了 + // 之后再resize内存搬运,会导致work使用原锁,而搬运后的TaskExecutor的锁内存已改变 + if (InferManager::instance().set_taskexecutor_num(model_toolkit_num) != 0) { + LOG(ERROR) << "failed set_taskexecutor_num"; + return -1; + } std::shared_ptr engine_index_ptr(new int(0)); for (size_t mi = 0; mi < model_toolkit_num; ++mi) { std::string model_toolkit_path = resource_conf.model_toolkit_path(mi); diff --git a/doc/HTTP_SERVICE_CN.md b/doc/HTTP_SERVICE_CN.md index a839039bac48e5c23c8c8d1571953365315b7bd8..e8050a6d48275224b2dabe2298b5d8eb9ddccc80 100644 --- a/doc/HTTP_SERVICE_CN.md +++ b/doc/HTTP_SERVICE_CN.md @@ -52,7 +52,9 @@ Java的HttpClient使用示例见[`java/examples/src/main/java/PaddleServingClien 如果不能满足您的需求,您也可以在此基础上添加一些功能。 -如需支持https或者自定义Response的Status Code等,则需要对C++端brpc-Server进行一定的二次开发,请参考https://github.com/apache/incubator-brpc/blob/master/docs/cn/http_service.md,后续如果需求很大,我们也会将这部分功能加入到Server中,尽情期待。 +如需支持https或者自定义Response的Status Code等,则需要对C++端brpc-Server进行一定的二次开发,请参考https://github.com/apache/incubator-brpc/blob/master/docs/cn/http_service.md + +后续如果需求很大,我们也会将这部分功能加入到Server中,尽情期待。 ### curl方式发送Http请求(基本原理) diff --git a/python/examples/bert/bert_httpclient.py b/python/examples/bert/bert_httpclient.py index 560f90910f5e5f92b7e15306d88f1c6e6477e9b3..255c78ec0ca7e33ddd1486f05cf6d9d225a5f406 100644 --- a/python/examples/bert/bert_httpclient.py +++ b/python/examples/bert/bert_httpclient.py @@ -23,11 +23,9 @@ args = benchmark_args() reader = ChineseBertReader({"max_seq_len": 128}) fetch = ["pooled_output"] - -client = HttpClient(ip='127.0.0.1', port='9292') +endpoint_list = ['127.0.0.1:9292'] +client = HttpClient() client.load_client_config(args.model) -#client.set_ip('127.0.0.1') -#client.set_port('9292') ''' if you want use GRPC-client, set_use_grpc_client(True) or you can directly use client.grpc_client_predict(...) @@ -49,6 +47,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default) if you want use JSON data format in HTTP-body, set False ''' #client.set_http_proto(True) +client.connect(endpoint_list) for line in sys.stdin: feed_dict = reader.process(line) diff --git a/python/examples/fit_a_line/test_httpclient.py b/python/examples/fit_a_line/test_httpclient.py index f36a6d221943065544155dc364799946bc86cf78..c9f785dc99e2699027862fd2a28bd429e8b1a0a5 100755 --- a/python/examples/fit_a_line/test_httpclient.py +++ b/python/examples/fit_a_line/test_httpclient.py @@ -20,8 +20,6 @@ import time client = HttpClient() client.load_client_config(sys.argv[1]) -#client.set_ip('127.0.0.1') -#client.set_port('9393') ''' if you want use GRPC-client, set_use_grpc_client(True) or you can directly use client.grpc_client_predict(...) @@ -43,13 +41,14 @@ we recommend use Proto data format in HTTP-body, set True(which is default) if you want use JSON data format in HTTP-body, set False ''' #client.set_http_proto(True) +client.connect(["127.0.0.1:9393"]) +fetch_list = client.get_fetch_names() import paddle test_reader = paddle.batch( paddle.reader.shuffle( paddle.dataset.uci_housing.test(), buf_size=500), batch_size=1) -fetch_list = client.get_fetch_names() for data in test_reader(): new_data = np.zeros((1, 13)).astype("float32") new_data[0] = data[0][0] diff --git a/python/examples/imagenet/resnet50_http_client.py b/python/examples/imagenet/resnet50_http_client.py index 48efbb88c2c3525a0d38d238c6f093f518f5e0b8..77782671b72a1fa41e65ca02b3edeb2a7753face 100644 --- a/python/examples/imagenet/resnet50_http_client.py +++ b/python/examples/imagenet/resnet50_http_client.py @@ -18,10 +18,8 @@ from paddle_serving_app.reader import Sequential, URL2Image, Resize from paddle_serving_app.reader import CenterCrop, RGB2BGR, Transpose, Div, Normalize import time -client = HttpClient(ip='127.0.0.1', port='9696') +client = HttpClient() client.load_client_config(sys.argv[1]) -#client.set_ip('127.0.0.1') -#client.set_port('9292') ''' if you want use GRPC-client, set_use_grpc_client(True) or you can directly use client.grpc_client_predict(...) @@ -43,6 +41,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default) if you want use JSON data format in HTTP-body, set False ''' #client.set_http_proto(True) +client.connect(["127.0.0.1:9696"]) label_dict = {} label_idx = 0 diff --git a/python/examples/imdb/test_http_client.py b/python/examples/imdb/test_http_client.py index 5f1f164218f3ed57647b1841d25d71e410c83a57..e3cc705150ccc197ab1be24bf11e0a92e1d62380 100755 --- a/python/examples/imdb/test_http_client.py +++ b/python/examples/imdb/test_http_client.py @@ -17,10 +17,8 @@ from paddle_serving_app.reader.imdb_reader import IMDBDataset import sys import numpy as np -client = HttpClient(ip='127.0.0.1', port='9292') +client = HttpClient() client.load_client_config(sys.argv[1]) -#client.set_ip('127.0.0.1') -#client.set_port('9292') ''' if you want use GRPC-client, set_use_grpc_client(True) or you can directly use client.grpc_client_predict(...) @@ -42,6 +40,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default) if you want use JSON data format in HTTP-body, set False ''' #client.set_http_proto(True) +client.connect(["127.0.0.1:9292"]) # you can define any english sentence or dataset here # This example reuses imdb reader in training, you diff --git a/python/examples/lac/lac_http_client.py b/python/examples/lac/lac_http_client.py index e894addca3e8b5aebc83804c9e171900bc7e6b65..5cdfaf1df46a43d04b7e09f0f6376364a9dcb89f 100755 --- a/python/examples/lac/lac_http_client.py +++ b/python/examples/lac/lac_http_client.py @@ -21,10 +21,8 @@ import os import io import numpy as np -client = HttpClient(ip='127.0.0.1', port='9292') +client = HttpClient() client.load_client_config(sys.argv[1]) -#client.set_ip('127.0.0.1') -#client.set_port('9292') ''' if you want use GRPC-client, set_use_grpc_client(True) or you can directly use client.grpc_client_predict(...) @@ -46,6 +44,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default) if you want use JSON data format in HTTP-body, set False ''' #client.set_http_proto(True) +client.connect(["127.0.0.1:9292"]) reader = LACReader() for line in sys.stdin: diff --git a/python/examples/xpu/bert/bert_web_service.py b/python/examples/xpu/bert/bert_web_service.py deleted file mode 100644 index f8d805c231d193dff6543ecb0f4ba787e61703b9..0000000000000000000000000000000000000000 --- a/python/examples/xpu/bert/bert_web_service.py +++ /dev/null @@ -1,48 +0,0 @@ -# coding=utf-8 -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# pylint: disable=doc-string-missing -from paddle_serving_server.web_service import WebService -from paddle_serving_app.reader import ChineseBertReader -import sys -import os -import numpy as np - - -class BertService(WebService): - def load(self): - self.reader = ChineseBertReader({ - "vocab_file": "vocab.txt", - "max_seq_len": 128 - }) - - def preprocess(self, feed=[], fetch=[]): - feed_res = [] - is_batch = False - for ins in feed: - feed_dict = self.reader.process(ins["words"].encode("utf-8")) - for key in feed_dict.keys(): - feed_dict[key] = np.array(feed_dict[key]).reshape( - (len(feed_dict[key]), 1)) - feed_res.append(feed_dict) - return feed_res, fetch, is_batch - - -bert_service = BertService(name="bert") -bert_service.load() -bert_service.load_model_config(sys.argv[1]) -bert_service.prepare_server( - workdir="workdir", port=int(sys.argv[2]), use_lite=True, use_xpu=True, ir_optim=True) -bert_service.run_rpc_service() -bert_service.run_web_service() diff --git a/python/examples/xpu/ernie/ernie_web_service.py b/python/examples/xpu/ernie/ernie_web_service.py deleted file mode 100644 index f8d805c231d193dff6543ecb0f4ba787e61703b9..0000000000000000000000000000000000000000 --- a/python/examples/xpu/ernie/ernie_web_service.py +++ /dev/null @@ -1,48 +0,0 @@ -# coding=utf-8 -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# pylint: disable=doc-string-missing -from paddle_serving_server.web_service import WebService -from paddle_serving_app.reader import ChineseBertReader -import sys -import os -import numpy as np - - -class BertService(WebService): - def load(self): - self.reader = ChineseBertReader({ - "vocab_file": "vocab.txt", - "max_seq_len": 128 - }) - - def preprocess(self, feed=[], fetch=[]): - feed_res = [] - is_batch = False - for ins in feed: - feed_dict = self.reader.process(ins["words"].encode("utf-8")) - for key in feed_dict.keys(): - feed_dict[key] = np.array(feed_dict[key]).reshape( - (len(feed_dict[key]), 1)) - feed_res.append(feed_dict) - return feed_res, fetch, is_batch - - -bert_service = BertService(name="bert") -bert_service.load() -bert_service.load_model_config(sys.argv[1]) -bert_service.prepare_server( - workdir="workdir", port=int(sys.argv[2]), use_lite=True, use_xpu=True, ir_optim=True) -bert_service.run_rpc_service() -bert_service.run_web_service() diff --git a/python/examples/xpu/fit_a_line_xpu/README.md b/python/examples/xpu/fit_a_line_xpu/README.md old mode 100644 new mode 100755 index b74ddd38613ba30444fb97a34cbab1c154882574..2640344de82ab5c5f6a9d8e267b0496b603e91f5 --- a/python/examples/xpu/fit_a_line_xpu/README.md +++ b/python/examples/xpu/fit_a_line_xpu/README.md @@ -23,18 +23,3 @@ The `paddlepaddle` package is used in `test_client.py`, and you may need to down ``` shell python3 test_client.py uci_housing_client/serving_client_conf.prototxt ``` - -## HTTP service - -### Start server - -Start a web service with default web service hosting modules: -``` shell -python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9393 --use_lite --use_xpu --ir_optim --name uci -``` - -### Client prediction - -``` shell -curl -H "Content-Type:application/json" -X POST -d '{"feed":[{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]}], "fetch":["price"]}' http://127.0.0.1:9393/uci/prediction -``` diff --git a/python/examples/xpu/fit_a_line_xpu/README_CN.md b/python/examples/xpu/fit_a_line_xpu/README_CN.md old mode 100644 new mode 100755 index 60adac1c17a0a232a37a0235999a687b48dcbc7a..268acf5a92a54a7c93541a0eaa1d3ae2f2d2656e --- a/python/examples/xpu/fit_a_line_xpu/README_CN.md +++ b/python/examples/xpu/fit_a_line_xpu/README_CN.md @@ -31,19 +31,3 @@ python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --p ``` shell python3 test_client.py uci_housing_client/serving_client_conf.prototxt ``` - -## HTTP服务 - -### 开启服务端 - -通过下面的一行代码开启默认web服务: - -``` shell -python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9393 --use_lite --use_xpu --ir_optim --name uci -``` - -### 客户端预测 - -``` shell -curl -H "Content-Type:application/json" -X POST -d '{"feed":[{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]}], "fetch":["price"]}' http://127.0.0.1:9393/uci/prediction -``` diff --git a/python/paddle_serving_client/client.py b/python/paddle_serving_client/client.py index f7d499f7f5c6b7ecab6434b7c81c752afba04ffc..c64254bf312e46d6159ba63ea159b01f5d0c3cbc 100755 --- a/python/paddle_serving_client/client.py +++ b/python/paddle_serving_client/client.py @@ -289,6 +289,7 @@ class Client(object): log_id=0): self.profile_.record('py_prepro_0') + # fetch 可以为空,此时会取所有的输出结果 if feed is None: raise ValueError("You should specify feed for prediction") @@ -297,6 +298,7 @@ class Client(object): fetch_list = [fetch] elif isinstance(fetch, list): fetch_list = fetch + # fetch 可以为空,此时会取所有的输出结果 elif fetch == None: pass else: @@ -441,6 +443,7 @@ class Client(object): model_engine_names = result_batch_handle.get_engine_names() for mi, engine_name in enumerate(model_engine_names): result_map = {} + # fetch 为空,则会取所有的输出结果 if len(fetch_names) == 0: fetch_names = result_batch_handle.get_tensor_alias_names(mi) # result map needs to be a numpy array diff --git a/python/paddle_serving_client/httpclient.py b/python/paddle_serving_client/httpclient.py index 053ef9101d96c16ba9a81060f18b68cb8b4b2028..27ed269db0cccc1856e963a7b02c702d845c7ca6 100755 --- a/python/paddle_serving_client/httpclient.py +++ b/python/paddle_serving_client/httpclient.py @@ -22,6 +22,7 @@ import gzip from collections import Iterable import base64 import sys +import re import grpc from .proto import general_model_service_pb2 @@ -98,7 +99,7 @@ class HttpClient(object): self.headers["Content-Type"] = "application/proto" self.max_body_size = 512 * 1024 * 1024 self.use_grpc_client = False - self.url = None + self.http_s = "http://" # 使用连接池能够不用反复建立连接 self.requests_session = requests.session() @@ -170,7 +171,6 @@ class HttpClient(object): def set_max_body_size(self, max_body_size): self.max_body_size = max_body_size - self.init_grpc_stub() def set_timeout_ms(self, timeout_ms): if not isinstance(timeout_ms, int): @@ -183,25 +183,46 @@ class HttpClient(object): raise ValueError("retry_times must be int type.") else: self.requests_session.mount( - 'http://', HTTPAdapter(max_retries=retry_times)) - - def set_ip(self, ip): - self.ip = ip - self.init_grpc_stub() + self.http_s, HTTPAdapter(max_retries=retry_times)) def set_service_name(self, service_name): self.service_name = service_name - def set_port(self, port): - self.port = port - self.server_port = port - self.init_grpc_stub() - - def set_url(self, url): + def connect(self, url=None, encryption=False): + if isinstance(url, (list, tuple)): + if len(url) > 1: + raise ValueError("HttpClient only support 1 endpoint") + else: + url = url[0] if isinstance(url, str): - self.url = url + if url.startswith("https://"): + url = url[8:] + self.http_s = "https://" + if url.startswith("http://"): + url = url[7:] + self.http_s = "http://" + url_parts = url.split(':') + if len(url_parts) != 2 or self.check_ip(url_parts[0]) == False: + raise ValueError( + "url not right, it should be like 127.0.0.1:9393 or http://127.0.0.1:9393" + ) + else: + self.ip = url_parts[0] + self.port = url_parts[1] + self.server_port = url_parts[1] + if encryption: + self.get_serving_port() + if self.use_grpc_client: + self.init_grpc_stub() + + def check_ip(self, ipAddr): + compile_ip = re.compile( + '^(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|[1-9])\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)$' + ) + if compile_ip.match(ipAddr): + return True else: - print("url must be str") + return False def add_http_headers(self, headers): if isinstance(headers, dict): @@ -229,10 +250,9 @@ class HttpClient(object): def use_key(self, key_filename): with open(key_filename, "rb") as f: self.key = f.read() - self.get_serving_port() def get_serving_port(self): - encrypt_url = "http://" + str(self.ip) + ":" + str(self.port) + encrypt_url = self.http_s + str(self.ip) + ":" + str(self.port) if self.key is not None: req = json.dumps({"key": base64.b64encode(self.key).decode()}) else: @@ -481,13 +501,7 @@ class HttpClient(object): postData = self.process_json_data(feed_dict, fetch_list, batch, log_id) - web_url = "http://" + self.ip + ":" + self.server_port + self.service_name - if self.url != None: - if "http" not in self.url: - self.url = "http://" + self.url - if "self.service_name" not in self.url: - self.url = self.url + self.service_name - web_url = self.url + web_url = self.http_s + self.ip + ":" + self.server_port + self.service_name # 当数据区长度大于512字节时才压缩. self.headers.pop("Content-Encoding", "nokey") try: diff --git a/python/paddle_serving_server/server.py b/python/paddle_serving_server/server.py index 078f3702d125d5829cf1336b1b69493d471170fd..115488b00120d6366e60efb07e2ba3620c97e0d2 100755 --- a/python/paddle_serving_server/server.py +++ b/python/paddle_serving_server/server.py @@ -228,7 +228,8 @@ class Server(object): engine.batch_infer_size = self.op_max_batch[index % len(self.op_max_batch)] - engine.enable_batch_align = 1 + engine.enable_overrun = False + engine.allow_split_request = True engine.model_dir = model_config_path engine.enable_memory_optimization = self.memory_optimization engine.enable_ir_optimization = self.ir_optimization diff --git a/tools/scripts/ipipe_py3.sh b/tools/scripts/ipipe_py3.sh index 47eb600a66a510de8f2906ad161579d5e8c58186..f3e3aa7ea0539d7394370a6a24331ad300a1d2cf 100644 --- a/tools/scripts/ipipe_py3.sh +++ b/tools/scripts/ipipe_py3.sh @@ -40,9 +40,9 @@ go env -w GO111MODULE=auto build_whl_list=(build_cpu_server build_gpu_server build_client build_app) rpc_model_list=(grpc_fit_a_line grpc_yolov4 pipeline_imagenet bert_rpc_gpu bert_rpc_cpu ResNet50_rpc \ -lac_rpc cnn_rpc bow_rpc lstm_rpc fit_a_line_rpc deeplabv3_rpc mobilenet_rpc unet_rpc resnetv2_rpc \ +lac_rpc_asyn cnn_rpc_asyn bow_rpc lstm_rpc fit_a_line_rpc deeplabv3_rpc mobilenet_rpc unet_rpc resnetv2_rpc \ criteo_ctr_rpc_cpu criteo_ctr_rpc_gpu ocr_rpc yolov4_rpc_gpu faster_rcnn_hrnetv2p_w18_1x_encrypt \ -faster_rcnn_model_rpc low_precision_resnet50_int8 ocr_c++_service) +faster_rcnn_model_rpc low_precision_resnet50_int8 ocr_c++_service ocr_c++_service_asyn) http_model_list=(fit_a_line_http lac_http imdb_http_proto imdb_http_json imdb_grpc ResNet50_http bert_http \ pipeline_ocr_cpu_http) @@ -492,7 +492,7 @@ function ResNet101_rpc() { kill_server_process } -function cnn_rpc() { +function cnn_rpc_asyn() { dir=${log_dir}rpc_model/cnn_rpc/ check_dir ${dir} unsetproxy @@ -500,8 +500,9 @@ function cnn_rpc() { data_dir=${data}imdb/ link_data ${data_dir} sed -i 's/9292/8865/g' test_client.py - ${py_version} -m paddle_serving_server.serve --model imdb_cnn_model/ --port 8865 > ${dir}server_log.txt 2>&1 & - check_result server 5 + ${py_version} -m paddle_serving_server.serve --model imdb_cnn_model/ --port 8865 --op_num 4 --thread 10 --gpu_ids 0 > ${dir}server_log.txt 2>&1 & + check_result server 8 + check_gpu_memory 0 head test_data/part-0 | ${py_version} test_client.py imdb_cnn_client_conf/serving_client_conf.prototxt imdb.vocab > ${dir}client_log.txt 2>&1 check_result client "cnn_CPU_RPC server test completed" kill_server_process @@ -537,7 +538,7 @@ function lstm_rpc() { kill_server_process } -function lac_rpc() { +function lac_rpc_asyn() { dir=${log_dir}rpc_model/lac_rpc/ check_dir ${dir} unsetproxy @@ -545,8 +546,9 @@ function lac_rpc() { data_dir=${data}lac/ link_data ${data_dir} sed -i 's/9292/8868/g' lac_client.py - ${py_version} -m paddle_serving_server.serve --model lac_model/ --port 8868 > ${dir}server_log.txt 2>&1 & - check_result server 5 + ${py_version} -m paddle_serving_server.serve --model lac_model/ --port 8868 --gpu_ids 0 --op_num 2 > ${dir}server_log.txt 2>&1 & + check_result server 8 + check_gpu_memory 0 echo "我爱北京天安门" | ${py_version} lac_client.py lac_client/serving_client_conf.prototxt lac_dict/ > ${dir}client_log.txt 2>&1 check_result client "lac_CPU_RPC server test completed" kill_server_process @@ -923,6 +925,23 @@ function ocr_c++_service() { kill_server_process } +function ocr_c++_service_asyn() { + dir=${log_dir}rpc_model/ocr_c++_serving/ + cd ${build_path}/python/examples/ocr + check_dir ${dir} + echo -e "${GREEN_COLOR}OCR_C++_Service_GPU_RPC asyn_server started${RES}" + $py_version -m paddle_serving_server.serve --model ocr_det_model ocr_rec_model --port 9293 --gpu_id 0 --op_num 4 > ${dir}server_log.txt 2>&1 & + check_result server 8 + check_gpu_memory 0 + echo -e "${GREEN_COLOR}OCR_C++_Service_GPU_RPC client started${RES}" + echo "------------------first:" + $py_version ocr_cpp_client.py ocr_det_client ocr_rec_client + echo "------------------second:" + $py_version ocr_cpp_client.py ocr_det_client ocr_rec_client > ${dir}client_log.txt 2>&1 + check_result client "OCR_C++_Service_GPU_RPC server test completed" + kill_server_process +} + function build_all_whl() { for whl in ${build_whl_list[@]} do