提交 f43be75b 编写于 作者: T typhoonzero

multi stream thread pool

上级 3fd92662
...@@ -91,5 +91,20 @@ void ThreadPool::TaskLoop() { ...@@ -91,5 +91,20 @@ void ThreadPool::TaskLoop() {
} }
} }
std::unique_ptr<ThreadPool> MultiStreamThreadPool::io_threadpool_(nullptr);
std::once_flag MultiStreamThreadPool::io_init_flag_;
MultiStreamThreadPool* MultiStreamThreadPool::GetInstanceIO() {
std::call_once(io_init_flag_, &MultiStreamThreadPool::InitIO);
return static_cast<MultiStreamThreadPool*>(io_threadpool_.get());
}
void MultiStreamThreadPool::InitIO() {
if (io_threadpool_.get() == nullptr) {
// TODO(typhoonzero1986): make this configurable
io_threadpool_.reset(new ThreadPool(100));
}
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -135,6 +135,17 @@ class ThreadPool { ...@@ -135,6 +135,17 @@ class ThreadPool {
std::condition_variable completed_; std::condition_variable completed_;
}; };
class MultiStreamThreadPool : ThreadPool {
public:
static MultiStreamThreadPool* GetInstanceIO();
static void InitIO();
private:
// NOTE: threadpool in base will be inhereted here.
static std::unique_ptr<ThreadPool> io_threadpool_;
static std::once_flag io_init_flag_;
};
// Run a function asynchronously. // Run a function asynchronously.
// NOTE: The function must return void. If the function need to return a value, // NOTE: The function must return void. If the function need to return a value,
// you can use lambda to capture a value pointer. // you can use lambda to capture a value pointer.
...@@ -143,5 +154,10 @@ std::future<void> Async(Callback callback) { ...@@ -143,5 +154,10 @@ std::future<void> Async(Callback callback) {
return ThreadPool::GetInstance()->Run(callback); return ThreadPool::GetInstance()->Run(callback);
} }
template <typename Callback>
std::future<void> AsyncIO(Callback callback) {
return MultiStreamThreadPool::GetInstanceIO()->Run(callback);
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -33,7 +33,8 @@ bool RPCClient::AsyncSendVariable(const std::string& ep, ...@@ -33,7 +33,8 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
const framework::Scope* p_scope = &scope; const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val); const auto ch = GetChannel(ep_val);
framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] { framework::AsyncIO([var_name_val, p_ctx, ep_val, p_scope, time_out, ch,
this] {
auto* var = p_scope->FindVar(var_name_val); auto* var = p_scope->FindVar(var_name_val);
::grpc::ByteBuffer req; ::grpc::ByteBuffer req;
...@@ -88,7 +89,8 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, ...@@ -88,7 +89,8 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
const framework::Scope* p_scope = &scope; const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val); const auto ch = GetChannel(ep_val);
framework::Async([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] { framework::AsyncIO([var_name_val, ep_val, p_scope, p_ctx, time_out, ch,
this] {
// prepare input // prepare input
sendrecv::VariableMessage req; sendrecv::VariableMessage req;
req.set_varname(var_name_val); req.set_varname(var_name_val);
...@@ -131,8 +133,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep, ...@@ -131,8 +133,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep,
const framework::Scope* p_scope = &scope; const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val); const auto ch = GetChannel(ep_val);
framework::Async([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx, framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx,
time_out, ch, this] { time_out, ch, this] {
auto* var = p_scope->FindVar(in_var_name_val); auto* var = p_scope->FindVar(in_var_name_val);
::grpc::ByteBuffer req; ::grpc::ByteBuffer req;
...@@ -195,7 +197,7 @@ bool RPCClient::Wait() { ...@@ -195,7 +197,7 @@ bool RPCClient::Wait() {
std::vector<std::future<void>> waits(req_count_); std::vector<std::future<void>> waits(req_count_);
for (int i = 0; i < req_count_; i++) { for (int i = 0; i < req_count_; i++) {
waits[i] = framework::Async([i, &a, this] { a[i] = Proceed(); }); waits[i] = framework::AsyncIO([i, &a, this] { a[i] = Proceed(); });
} }
for (int i = 0; i < req_count_; i++) { for (int i = 0; i < req_count_; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册