提交 8c834fba 编写于 作者: H HexToString

add multi-TaskExecutor

上级 391d9794
...@@ -181,7 +181,10 @@ int GeneralReaderOp::inference() { ...@@ -181,7 +181,10 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len; << "]: " << data_len;
databuf_size = data_len * elem_size; databuf_size = data_len * elem_size;
out->at(i).data.Resize(databuf_size); void *databuf_char = MempoolWrapper::instance().malloc(databuf_size);
paddle::PaddleBuf paddleBuf(databuf_char, databuf_size);
out->at(i).data = paddleBuf;
// out->at(i).data.Resize(databuf_size);
if (out->at(i).lod.size() > 0) { if (out->at(i).lod.size() > 0) {
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has lod_tensor and len=" << out->at(i).lod[0].back(); << "] has lod_tensor and len=" << out->at(i).lod[0].back();
......
...@@ -36,7 +36,7 @@ void* TaskExecutor<TaskT>::thread_entry(void* args) { ...@@ -36,7 +36,7 @@ void* TaskExecutor<TaskT>::thread_entry(void* args) {
static_cast<TaskExecutor<TaskT>*>(context->executor); static_cast<TaskExecutor<TaskT>*>(context->executor);
executor->work(context); executor->work(context);
return NULL; return nullptr;
} }
template <typename TaskT> template <typename TaskT>
...@@ -256,7 +256,8 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) { ...@@ -256,7 +256,8 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
template <typename InItemT, typename OutItemT> template <typename InItemT, typename OutItemT>
bool TaskManager<InItemT, OutItemT>::schedule(const void* in, bool TaskManager<InItemT, OutItemT>::schedule(const void* in,
void* out) { // NOLINT void* out) { // NOLINT
TaskHandler<TaskT> handler = _executor.schedule(in, out); TaskHandler<TaskT> handler =
TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out);
if (handler.valid()) { if (handler.valid()) {
_task_owned = handler; _task_owned = handler;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include <errno.h> #include <errno.h>
#include <algorithm> #include <algorithm>
#include <deque> #include <list>
#include <vector> #include <vector>
#ifdef BCLOUD #ifdef BCLOUD
...@@ -220,7 +220,8 @@ struct TaskMeta { ...@@ -220,7 +220,8 @@ struct TaskMeta {
// each TaskT is already include batch in itself // each TaskT is already include batch in itself
// BatchTasks need to combine several `small TaskMeta` into a new `big TaskT`. // BatchTasks need to combine several `small TaskMeta` into a new `big TaskT`.
// The only difference between the `big TaskT` and `small TaskT` is that // The only difference between the `big TaskT` and `small TaskT` is that
// the TaskT.inVectorT_ptr->[feedvar_index].shape[0] is different. // the TaskT.inVectorT_ptr->[feedvar_index].shape[0]
// which is actually batch_size is different.
template <typename TaskT> template <typename TaskT>
class BatchTasks { class BatchTasks {
public: public:
...@@ -540,9 +541,6 @@ struct TaskHandler { ...@@ -540,9 +541,6 @@ struct TaskHandler {
template <typename TaskT> template <typename TaskT>
class TaskExecutor; class TaskExecutor;
template <typename InItemT, typename OutItemT>
class TaskManager;
template <typename TaskT> template <typename TaskT>
struct ThreadContext { struct ThreadContext {
TaskExecutor<TaskT>* executor; TaskExecutor<TaskT>* executor;
...@@ -591,10 +589,18 @@ class TaskExecutor { ...@@ -591,10 +589,18 @@ class TaskExecutor {
THREAD_COND_DESTROY(&_cond); THREAD_COND_DESTROY(&_cond);
} }
static TaskExecutor<TaskT>* instance() { // cause vector.resize will use copy or move construct.
static TaskExecutor<TaskT> singleton; TaskExecutor(TaskExecutor<TaskT>&& other) noexcept {
return &singleton; if (this != &other) {
TaskExecutor();
}
} }
/*
static TaskExecutor<TaskT>* instance() {
static TaskExecutor<TaskT> singleton;
return &singleton;
}
*/
void set_batch_size(size_t batch_size) { _batch_size = batch_size; } void set_batch_size(size_t batch_size) { _batch_size = batch_size; }
...@@ -619,30 +625,35 @@ class TaskExecutor { ...@@ -619,30 +625,35 @@ class TaskExecutor {
static void* thread_entry(void* args); static void* thread_entry(void* args);
private:
TaskExecutor(TaskExecutor<TaskT> const& other);
TaskExecutor* operator=(TaskExecutor<TaskT> const& other);
int work(ThreadContext<TaskT>* context); int work(ThreadContext<TaskT>* context);
TaskHandler<TaskT> schedule(const void*, void*); TaskHandler<TaskT> schedule(const void*, void*);
bool move_task_to_batch(BatchTasks<TaskT>& batch); // NOLINT bool move_task_to_batch(BatchTasks<TaskT>& batch); // NOLINT
private:
TaskExecutor(TaskExecutor<TaskT> const& other) = delete;
TaskExecutor& operator=(TaskExecutor<TaskT> const& other) = delete;
/*
TaskExecutor(TaskExecutor<TaskT> && other) = delete;
TaskExecutor& operator=(TaskExecutor<TaskT> && other) = delete;
*/
bool _stop; bool _stop;
// can't use boost::mutex, because some stupid macro // can't use boost::mutex, because some stupid macro
THREAD_MUTEX_T _mut; THREAD_MUTEX_T _mut;
THREAD_COND_T _cond; THREAD_COND_T _cond;
std::deque<TaskT*> _task_queue; std::list<TaskT*> _task_queue;
boost::function<int(void*)> _thread_init_fn; boost::function<int(void*)> _thread_init_fn;
boost::function<int(void*)> _thread_reset_fn; boost::function<int(void*)> _thread_reset_fn;
void** _user_thread_contexts; void** _user_thread_contexts;
std::vector<ThreadContext<TaskT>*> _thread_contexts; std::vector<ThreadContext<TaskT>*> _thread_contexts;
friend class TaskManager<InType, OutType>;
size_t _batch_size; size_t _batch_size;
bool _batch_align; bool _batch_align;
...@@ -650,6 +661,34 @@ class TaskExecutor { ...@@ -650,6 +661,34 @@ class TaskExecutor {
boost::function<void(const void*, void*)> _fn; boost::function<void(const void*, void*)> _fn;
}; };
template <typename TaskT>
class TaskExecutorVector {
public:
static TaskExecutorVector<TaskT>& instance() {
static TaskExecutorVector<TaskT> singleton;
return singleton;
}
void resize(int size) { _vector_executor.resize(size); }
TaskExecutor<TaskT>& operator[](int index) {
if (_vector_executor.size() <= index || index <= -1) {
LOG(ERROR) << "_vector_executor.size() <= index or <= -1";
throw "_vector_executor.size() <= index or <= -1";
}
return _vector_executor[index];
}
private:
TaskExecutorVector() = default;
TaskExecutorVector(const TaskExecutorVector<TaskT>& other) = delete;
TaskExecutorVector& operator=(const TaskExecutorVector<TaskT>& other) =
delete;
TaskExecutorVector(TaskExecutorVector<TaskT>&& other) = delete;
TaskExecutorVector& operator=(TaskExecutorVector<TaskT>&& other) = delete;
std::vector<TaskExecutor<TaskT>> _vector_executor;
};
template <typename InItemT, typename OutItemT> template <typename InItemT, typename OutItemT>
class TaskManager { class TaskManager {
public: public:
...@@ -657,10 +696,8 @@ class TaskManager { ...@@ -657,10 +696,8 @@ class TaskManager {
typedef typename TaskT::InVectorT InVectorT; typedef typename TaskT::InVectorT InVectorT;
typedef typename TaskT::OutVectorT OutVectorT; typedef typename TaskT::OutVectorT OutVectorT;
explicit TaskManager(TaskExecutor<TaskT>& exe, size_t batch_size) // NOLINT explicit TaskManager(uint32_t index) // NOLINT
: _executor(exe) {} : _model_index(index) {}
TaskManager() : _executor(*TaskExecutor<TaskT>::instance()) {}
~TaskManager() { wait(); } ~TaskManager() { wait(); }
...@@ -670,8 +707,8 @@ class TaskManager { ...@@ -670,8 +707,8 @@ class TaskManager {
inline void clear() { wait(); } inline void clear() { wait(); }
private: private:
TaskExecutor<TaskT>& _executor;
TaskHandler<TaskT> _task_owned; TaskHandler<TaskT> _task_owned;
uint32_t _model_index;
}; // class TaskManager }; // class TaskManager
class AutoMutex { class AutoMutex {
......
...@@ -56,15 +56,23 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -56,15 +56,23 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
} }
// init bsf framework // init bsf framework
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_init_fn( im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
boost::bind(&InferEngine::thrd_initialize_impl, this)); .set_thread_init_fn(
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_reset_fn( boost::bind(&InferEngine::thrd_initialize_impl, this));
boost::bind(&InferEngine::thrd_clear_impl, this)); im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_callback_fn( .set_thread_init_fn(
boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_size(_infer_batch_size); im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_align(_infer_batch_align); .set_thread_reset_fn(boost::bind(&InferEngine::thrd_clear_impl, this));
if (im::bsf::TaskExecutor<TaskT>::instance()->start(_infer_thread_num) != 0) { im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_thread_callback_fn(
boost::bind(&InferEngine::task_infer_impl, this, _1, _2));
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_size(
_infer_batch_size);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_align(
_infer_batch_align);
if (im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].start(
_infer_thread_num) != 0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num; LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
return -1; return -1;
} }
...@@ -75,6 +83,11 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -75,6 +83,11 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
return 0; return 0;
} }
// Multiple threads will enter this method of the same object
// One Model corresponds to One ReloadableInferEngine object.
// ReloadableInferEngine object is Process object.
// One ReloadableInferEngine object can have several ModelData<EngineCore>
// ModelData<EngineCore> is Thread object.
int ReloadableInferEngine::infer(const void* in, int ReloadableInferEngine::infer(const void* in,
void* out, void* out,
uint32_t batch_size) { uint32_t batch_size) {
...@@ -82,7 +95,8 @@ int ReloadableInferEngine::infer(const void* in, ...@@ -82,7 +95,8 @@ int ReloadableInferEngine::infer(const void* in,
return infer_impl(in, out, batch_size); return infer_impl(in, out, batch_size);
} }
im::bsf::TaskManager<paddle::PaddleTensor, paddle::PaddleTensor> task_manager; im::bsf::TaskManager<paddle::PaddleTensor, paddle::PaddleTensor> task_manager(
_model_index);
task_manager.schedule(in, out); task_manager.schedule(in, out);
task_manager.wait(); task_manager.wait();
...@@ -110,7 +124,7 @@ int ReloadableInferEngine::proc_finalize() { ...@@ -110,7 +124,7 @@ int ReloadableInferEngine::proc_finalize() {
} }
if (_infer_thread_num > 0) { if (_infer_thread_num > 0) {
im::bsf::TaskExecutor<TaskT>::instance()->stop(); im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].stop();
} }
return 0; return 0;
} }
...@@ -191,6 +205,7 @@ int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf, ...@@ -191,6 +205,7 @@ int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf,
std::string engine_type = conf.type(); std::string engine_type = conf.type();
InferEngine* engine = InferEngine* engine =
StaticInferFactory::instance().generate_object(engine_type); StaticInferFactory::instance().generate_object(engine_type);
engine->set_model_index(_model_index);
if (!engine) { if (!engine) {
LOG(ERROR) << "Failed generate engine with type:" << engine_type; LOG(ERROR) << "Failed generate engine with type:" << engine_type;
return -1; return -1;
...@@ -373,12 +388,14 @@ int InferManager::proc_initialize(const char* path, const char* file) { ...@@ -373,12 +388,14 @@ int InferManager::proc_initialize(const char* path, const char* file) {
LOG(ERROR) << "failed load infer config, path: " << path << "/" << file; LOG(ERROR) << "failed load infer config, path: " << path << "/" << file;
return -1; return -1;
} }
size_t engine_num = model_toolkit_conf.engines_size(); uint32_t engine_num = model_toolkit_conf.engines_size();
for (size_t ei = 0; ei < engine_num; ++ei) { im::bsf::TaskExecutorVector<TaskT>::instance().resize(engine_num);
for (uint32_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name(); << ").name: " << model_toolkit_conf.engines(ei).name();
std::string engine_name = model_toolkit_conf.engines(ei).name(); std::string engine_name = model_toolkit_conf.engines(ei).name();
VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine(); VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine();
engine->set_model_index(ei);
if (!engine) { if (!engine) {
LOG(ERROR) << "Failed generate versioned engine: " << engine_name; LOG(ERROR) << "Failed generate versioned engine: " << engine_name;
return -1; return -1;
......
...@@ -17,11 +17,11 @@ ...@@ -17,11 +17,11 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <functional>
#include <numeric> #include <numeric>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <functional>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/bsf.h" #include "core/predictor/framework/bsf.h"
#include "core/predictor/framework/factory.h" #include "core/predictor/framework/factory.h"
...@@ -73,7 +73,7 @@ class InferEngine { ...@@ -73,7 +73,7 @@ class InferEngine {
virtual int infer(const void* in, void* out, uint32_t batch_size = -1) { virtual int infer(const void* in, void* out, uint32_t batch_size = -1) {
return infer_impl(in, out, batch_size); return infer_impl(in, out, batch_size);
} }
virtual void set_model_index(uint32_t index) { _model_index = index; }
virtual int reload() = 0; virtual int reload() = 0;
virtual uint64_t version() const = 0; virtual uint64_t version() const = 0;
...@@ -89,10 +89,12 @@ class InferEngine { ...@@ -89,10 +89,12 @@ class InferEngine {
void* out, void* out,
uint32_t batch_size = -1) = 0; uint32_t batch_size = -1) = 0;
virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT
protected:
uint32_t _model_index;
// end: framework inner call // end: framework inner call
}; };
typedef im::bsf::Task<paddle::PaddleTensor, paddle::PaddleTensor> TaskT;
class ReloadableInferEngine : public InferEngine { class ReloadableInferEngine : public InferEngine {
public: public:
virtual ~ReloadableInferEngine() {} virtual ~ReloadableInferEngine() {}
...@@ -105,7 +107,6 @@ class ReloadableInferEngine : public InferEngine { ...@@ -105,7 +107,6 @@ class ReloadableInferEngine : public InferEngine {
}; };
virtual int load(const configure::EngineDesc& conf) = 0; virtual int load(const configure::EngineDesc& conf) = 0;
typedef im::bsf::Task<paddle::PaddleTensor, paddle::PaddleTensor> TaskT;
int proc_initialize_impl(const configure::EngineDesc& conf, bool version); int proc_initialize_impl(const configure::EngineDesc& conf, bool version);
...@@ -372,8 +373,7 @@ class CloneDBReloadableInferEngine ...@@ -372,8 +373,7 @@ class CloneDBReloadableInferEngine
protected: protected:
// 模板EngineCore,如果已创建,则多个线程级EngineCore共用该对象的模型数据 // 模板EngineCore,如果已创建,则多个线程级EngineCore共用该对象的模型数据
std::vector<ModelData<EngineCore>*> std::vector<ModelData<EngineCore>*> _cloneTemplate;
_cloneTemplate;
}; };
template <typename EngineCore> template <typename EngineCore>
......
...@@ -24,7 +24,7 @@ namespace fugue { ...@@ -24,7 +24,7 @@ namespace fugue {
namespace memory { namespace memory {
void Region::init() { void Region::init() {
_big_mem_capacity = 64 * 1024 * 1024; // 64MB _big_mem_capacity = 128 * 1024 * 1024; // 64MB
_big_mem_start = new char[_big_mem_capacity]; _big_mem_start = new char[_big_mem_capacity];
} }
......
...@@ -345,9 +345,10 @@ class Region { ...@@ -345,9 +345,10 @@ class Region {
2 * 1024 * 2 * 1024 *
1024; // 2MB,means when you need less than 2M, get memory from Block. 1024; // 2MB,means when you need less than 2M, get memory from Block.
// 64MB,means when you need less than 64MB, get memory from BigMemory instead // 128MB,means when you need less than 128MB, get memory from BigMemory
// instead
// of BigNode // of BigNode
static const int BIGNODE_MEM_THRESHOLD = (64 * 1024 * 1024 + 1); static const int BIGNODE_MEM_THRESHOLD = (128 * 1024 * 1024 + 1);
static const int COUNTER_SIZE = static const int COUNTER_SIZE =
BIGNODE_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1; // this is not used BIGNODE_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1; // this is not used
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册