// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef BCLOUD
#include
#else
#include
#endif
#include
#include
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/memory.h"
// this file is included by bsf.h
namespace im {
namespace bsf {
template
bool Task::task_fetch_init(BatchTasks& batchTask) {
// 双检锁,减少加锁的粒度
if (!fetch_init) {
if (taskmeta_num > 1) {
// 对于task被拆分为多个taskmeta,需要加锁。
AutoMutex lock(task_mut);
task_fetch_create(batchTask);
} else {
// 对于task只有1个taskmeta,不需要加锁。
task_fetch_create(batchTask);
}
}
return true;
}
template
bool Task::task_fetch_create(BatchTasks& batchTask) {
if (!fetch_init) {
vector_fetch_lod_index = batchTask.vector_fetch_lod_index;
set_fetch_nobatch_index = batchTask.set_fetch_nobatch_index;
OutVectorT taskMetaOutLodTensor;
size_t fetchvar_num = batchTask._batch_out.size();
for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
++fetchvar_index) {
size_t fetchvar_bytesize_index =
batchTask.fetchvar_bytesize(fetchvar_index);
size_t fetchvar_batch = 0;
// 1. nobatch fetchvar情况
if (set_fetch_nobatch_index.size() > 0 &&
set_fetch_nobatch_index.find(fetchvar_index) !=
set_fetch_nobatch_index.end()) {
fetchvar_batch = 1;
} else if (vector_fetch_lod_index.size() > 0 &&
std::find(vector_fetch_lod_index.begin(),
vector_fetch_lod_index.end(),
fetchvar_index) != vector_fetch_lod_index.end()) {
// lod fetchvar情况,此时无法确定总的shape[0]
// 根据task中的task_num总数开辟task_num个临时空间
// 每个lod型的fetchvar拷贝到对应的临时空间中
// 最后再计算临时空间的总量,合并fetchvar和lod
fetchvar_batch = 0;
} else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size()
fetchvar_batch = batch_size();
}
paddle::PaddleTensor tensor_out;
tensor_out.name = batchTask._batch_out[fetchvar_index].name;
tensor_out.dtype =
paddle::PaddleDType(batchTask._batch_out[fetchvar_index].dtype);
tensor_out.shape = batchTask._batch_out[fetchvar_index].shape;
tensor_out.shape[0] = fetchvar_batch;
if (fetchvar_batch != 0) {
// 此时 lod 为空。
tensor_out.lod = batchTask._batch_out[fetchvar_index].lod;
// resize all batch memory at one time
size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
tensor_out.data.Resize(databuf_size);
} else {
// 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
// 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy
// 当task被分为多个taskMeta时,需要临时对象记录
// 收齐后再一起合并
if (taskmeta_num > 1) {
taskMetaOutLodTensor.push_back(tensor_out);
}
}
outVectorT_ptr->push_back(tensor_out);
}
// outLodTensorVector实际是一个双层vector
// shape为taskmeta_num * vector_fetch_lod_index.size();
outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor);
fetch_init = true;
}
return true;
}
template
void* TaskExecutor::thread_entry(void* args) {
ThreadContext* context = static_cast*>(args);
TaskExecutor* executor =
static_cast*>(context->executor);
executor->work(context);
return nullptr;
}
template
int TaskExecutor::start(uint32_t thread_num, uint32_t init_timeout_sec) {
_stop = false;
if (!_thread_contexts.empty()) {
LOG(WARNING) << "BSF has started";
return 0;
}
if (thread_num == 0) {
LOG(ERROR) << "cannot init BSF with zero thread";
return -1;
}
ThreadContext* contexts = new ThreadContext[thread_num];
for (uint32_t i = 0; i < thread_num; ++i) {
contexts[i].executor = this;
if (_user_thread_contexts != NULL) {
contexts[i].user_thread_context = _user_thread_contexts[i];
}
int rc = THREAD_CREATE(
&contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
if (rc != 0) {
LOG(ERROR) << "failed to create BSF worker thread: index=" << i
<< ", rc=" << rc << ", errno=" << errno << ":"
<< strerror(errno);
return -1;
}
_thread_contexts.push_back(&contexts[i]);
}
size_t init_timeout = init_timeout_sec * 1000 * 1000;
bool has_error = false;
bool has_timeout = true;
if (init_timeout == 0) {
has_timeout = false;
}
while (!has_timeout || init_timeout > 0) {
bool done = true;
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
if (_thread_contexts[i]->init_status < 0) {
has_error = true;
break;
}
if (_thread_contexts[i]->init_status == 0) {
done = false;
}
}
if (has_error) {
LOG(ERROR) << "BSF thread init error";
return -1;
}
if (done) {
LOG(INFO) << "BSF thread init done";
return 0;
}
// 100ms
const size_t sleep_interval = 100 * 1000;
usleep(sleep_interval);
init_timeout -= sleep_interval;
}
LOG(ERROR) << "BSF thread init timed out";
return -1;
}
template
void TaskExecutor::stop() {
_stop = true;
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
THREAD_CANCEL(_thread_contexts[i]->tid);
}
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
THREAD_JOIN(_thread_contexts[i]->tid, NULL);
}
_thread_contexts.clear();
}
template
TaskHandler TaskExecutor::schedule(
const void* inVectorT_ptr,
void* outVectorT_ptr) { // NOLINT
TaskT* task = butil::get_object();
if (!task) {
LOG(ERROR) << "Failed get TaskT from object pool";
return TaskHandler::valid_handle();
}
/*
if (!BatchTasks::check_valid(in, out, _overrun)) {
LOG(ERROR) << "Invalid input & output";
return TaskHandler::valid_handle();
}
*/
int fds[2];
int rc = pipe(fds);
if (rc != 0) {
LOG(ERROR) << "call pipe() failed, errno=" << errno << ":"
<< strerror(errno);
return TaskHandler::valid_handle();
}
task->read_fd = fds[0];
task->write_fd = fds[1];
task->owner_tid = ::syscall(SYS_gettid);
task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
if (!task->task_init()) {
LOG(ERROR) << "task->init() failed";
}
task->rem = task->batch_size();
task->index.store(0, butil::memory_order_relaxed);
AutoMutex lock(_mut);
_task_queue.push_back(task);
THREAD_COND_SIGNAL(&_cond);
return TaskHandler(*task);
}
// this function is accessed by multi thread.
// so AutoMutex at first.
// so batchTask.append_task is thread safe.
// you dont need to add extra lock in append_task()
// task is already init.
template
bool TaskExecutor::move_task_to_batch(
BatchTasks& batchTask) { // NOLINT
AutoMutex lock(_mut);
while (_task_queue.empty()) {
THREAD_COND_WAIT(&_cond, &_mut);
}
if (_task_queue.empty()) {
LOG(ERROR) << "invalid task queue!";
return false;
}
TaskT* previous_task = nullptr;
while (!_task_queue.empty()) {
TaskT* task = _task_queue.front();
// 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod)
// 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
// 只需要设置engine的属性allow_split_request = false即可。
// 复杂的处理方法是允许拆分Task,无论是否包含lod.
// 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
// 所以,task中先要创建taskmeta_num* fetchvar
// num(lod类型的)个临时PaddleTensor(存储data及Lod)
// 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建
// 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。
// 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。
// 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。
// _overrun表示,异步BatchTasks是否允许单次临时超过限制。
// _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。
// _overrun为false时,不允许。
// 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。
// 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。
// _allow_split_request ==
// true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch
// _allow_split_request ==
// false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费
// 默认为true,允许拆分task从而使得空间利用率最大。
if (!batchTask.get_allow_split_request()) {
if (task->batch_size() > batchTask.get_rem_size() &&
!batchTask.get_overrun()) {
break;
}
}
// combine_task_valid负责判断是否能够合并
// 除最外层的shape外,内层shape应一致才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 以此保证batch.append_task(task)中的task的内层shape相同。
// 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
// 所以要求该feedvar必须相等,才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
// TODO(HexToString): 可以考虑后期支持AutoPadding.
if (previous_task != nullptr) {
if (!task->combine_task_valid(previous_task)) {
break;
}
}
size_t rem = batchTask.append_task(task);
previous_task = task;
if (task->rem <= 0) {
_task_queue.pop_front();
}
if (rem <= 0) break;
}
LOG(INFO) << "Number of tasks remaining in _task_queue is"
<< _task_queue.size();
return true;
}
// this function is accessed by multi thread.
// move_task_to_batch have add lock inside the function.
// Packaging 1 TaskT as 1 or Several TaskMeta.
// TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
// put TaskMeta to the local variable BatchTasks batchTask.
// batchTask.merge_tasks() and batchTask.notify_tasks() has no lock.
// BatchTasks batchTask itself is a local variable, it`s thread safe.
// If batchTask.merge_tasks() and batchTask.notify_tasks() do something to
// TaskMeta
// you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable)
// But different TaskMeta may points to the same TaskT
// which is get from the SingleTon TaskExecutor`s _task_queue.
template
int TaskExecutor::work(ThreadContext* context) {
if (MempoolWrapper::instance().thread_initialize() != 0) {
LOG(ERROR) << "Failed thread initialize mempool";
return -1;
}
if (_thread_init_fn != NULL) {
if (_thread_init_fn(context->user_thread_context) != 0) {
LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
context->init_status = -1;
return -1;
} else {
LOG(INFO) << "execute thread init thunk succeed";
}
}
context->init_status = 1;
while (!_stop) {
if (_thread_reset_fn != NULL) {
if (_thread_reset_fn(context->user_thread_context) != 0) {
LOG(ERROR) << "execute user thread reset failed";
}
}
if (MempoolWrapper::instance().thread_clear() != 0) {
LOG(ERROR) << "Failed thread clear mempool";
return -1;
}
// move_task_to_batch() take the original task from the `_task_queue`
// put the original task into its own Vector
// the capacity of its own Vector is decided by `_batch_size` or
// `_overrun`
// merge_tasks() move the imput-data into `_batch_in` from its own
// Vector.
// because the predictor`s input is the `_batch_in`
// notify_tasks() move the output-data into every single taskmeta from
// `_batch_out`.
// because the predictor`s output is the `_batch_out`
BatchTasks batchTask(_batch_size, _overrun, _allow_split_request);
if (move_task_to_batch(batchTask)) {
batchTask.merge_tasks();
_fn(&batchTask.in(), &batchTask.out());
batchTask.notify_tasks();
}
}
return 0;
}
template
bool TaskManager::schedule(const void* in,
void* out) { // NOLINT
TaskHandler handler =
TaskExecutorVector::instance()[_model_index].schedule(in, out);
if (handler.valid()) {
_task_owned = handler;
return true;
} else {
LOG(ERROR) << "failed to schedule task";
return false;
}
}
template
void TaskManager::wait() {
char buffer[128];
while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 &&
errno == EINTR) {
}
close(_task_owned.read_fd);
close(_task_owned.write_fd);
_task_owned.read_fd = -1;
_task_owned.write_fd = -1;
return;
}
} // namespace bsf
} // namespace im