Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
1bdc7261
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1bdc7261
编写于
4月 13, 2018
作者:
W
Wu Yi
提交者:
GitHub
4月 13, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9578 from typhoonzero/threadpool_for_io
Multi stream thread pool
上级
2c552d4e
a08bf76f
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
63 addition
and
27 deletion
+63
-27
paddle/fluid/framework/threadpool.cc
paddle/fluid/framework/threadpool.cc
+19
-0
paddle/fluid/framework/threadpool.h
paddle/fluid/framework/threadpool.h
+36
-20
paddle/fluid/operators/detail/grpc_client.cc
paddle/fluid/operators/detail/grpc_client.cc
+7
-5
paddle/fluid/operators/detail/grpc_server.cc
paddle/fluid/operators/detail/grpc_server.cc
+1
-1
python/paddle/fluid/tests/book/test_recognize_digits.py
python/paddle/fluid/tests/book/test_recognize_digits.py
+0
-1
未找到文件。
paddle/fluid/framework/threadpool.cc
浏览文件 @
1bdc7261
...
...
@@ -14,8 +14,12 @@
#include "paddle/fluid/framework/threadpool.h"
#include "gflags/gflags.h"
#include "paddle/fluid/platform/enforce.h"
DEFINE_int32
(
io_threadpool_size
,
100
,
"number of threads used for doing IO, default 100"
);
namespace
paddle
{
namespace
framework
{
...
...
@@ -91,5 +95,20 @@ void ThreadPool::TaskLoop() {
}
}
std
::
unique_ptr
<
ThreadPool
>
ThreadPoolIO
::
io_threadpool_
(
nullptr
);
std
::
once_flag
ThreadPoolIO
::
io_init_flag_
;
ThreadPool
*
ThreadPoolIO
::
GetInstanceIO
()
{
std
::
call_once
(
io_init_flag_
,
&
ThreadPoolIO
::
InitIO
);
return
io_threadpool_
.
get
();
}
void
ThreadPoolIO
::
InitIO
()
{
if
(
io_threadpool_
.
get
()
==
nullptr
)
{
// TODO(typhoonzero1986): make this configurable
io_threadpool_
.
reset
(
new
ThreadPool
(
FLAGS_io_threadpool_size
));
}
}
}
// namespace framework
}
// namespace paddle
paddle/fluid/framework/threadpool.h
浏览文件 @
1bdc7261
...
...
@@ -14,12 +14,12 @@ limitations under the License. */
#pragma once
#include <condition_variable>
#include <condition_variable>
// NOLINT
#include <functional>
#include <future>
#include <mutex>
#include <future>
// NOLINT
#include <mutex>
// NOLINT
#include <queue>
#include <thread>
#include <thread>
// NOLINT
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
...
...
@@ -28,6 +28,22 @@ limitations under the License. */
namespace
paddle
{
namespace
framework
{
struct
ExceptionHandler
{
mutable
std
::
future
<
std
::
unique_ptr
<
platform
::
EnforceNotMet
>>
future_
;
explicit
ExceptionHandler
(
std
::
future
<
std
::
unique_ptr
<
platform
::
EnforceNotMet
>>&&
f
)
:
future_
(
std
::
move
(
f
))
{}
void
operator
()()
const
{
auto
ex
=
this
->
future_
.
get
();
if
(
ex
!=
nullptr
)
{
LOG
(
FATAL
)
<<
"The exception is thrown inside the thread pool. You "
"should use RunAndGetException to handle the exception.
\n
"
"The default exception handler is LOG(FATAL)."
<<
ex
->
what
();
}
}
};
// ThreadPool maintains a queue of tasks, and runs them using a fixed
// number of threads.
class
ThreadPool
{
...
...
@@ -87,22 +103,6 @@ class ThreadPool {
void
Wait
();
private:
struct
ExceptionHandler
{
mutable
std
::
future
<
std
::
unique_ptr
<
platform
::
EnforceNotMet
>>
future_
;
explicit
ExceptionHandler
(
std
::
future
<
std
::
unique_ptr
<
platform
::
EnforceNotMet
>>&&
f
)
:
future_
(
std
::
move
(
f
))
{}
void
operator
()()
const
{
auto
ex
=
this
->
future_
.
get
();
if
(
ex
!=
nullptr
)
{
LOG
(
FATAL
)
<<
"The exception is thrown inside the thread pool. You "
"should use RunAndGetException to handle the exception.
\n
"
"The default exception handler is LOG(FATAL)."
<<
ex
->
what
();
}
}
};
DISABLE_COPY_AND_ASSIGN
(
ThreadPool
);
// If the task queue is empty and avaialbe is equal to the number of
...
...
@@ -135,6 +135,17 @@ class ThreadPool {
std
::
condition_variable
completed_
;
};
class
ThreadPoolIO
:
ThreadPool
{
public:
static
ThreadPool
*
GetInstanceIO
();
static
void
InitIO
();
private:
// NOTE: threadpool in base will be inhereted here.
static
std
::
unique_ptr
<
ThreadPool
>
io_threadpool_
;
static
std
::
once_flag
io_init_flag_
;
};
// Run a function asynchronously.
// NOTE: The function must return void. If the function need to return a value,
// you can use lambda to capture a value pointer.
...
...
@@ -143,5 +154,10 @@ std::future<void> Async(Callback callback) {
return
ThreadPool
::
GetInstance
()
->
Run
(
callback
);
}
template
<
typename
Callback
>
std
::
future
<
void
>
AsyncIO
(
Callback
callback
)
{
return
ThreadPoolIO
::
GetInstanceIO
()
->
Run
(
callback
);
}
}
// namespace framework
}
// namespace paddle
paddle/fluid/operators/detail/grpc_client.cc
浏览文件 @
1bdc7261
...
...
@@ -35,7 +35,8 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
const
framework
::
Scope
*
p_scope
=
&
scope
;
const
auto
ch
=
GetChannel
(
ep_val
);
framework
::
Async
([
var_name_val
,
p_ctx
,
ep_val
,
p_scope
,
time_out
,
ch
,
this
]
{
framework
::
AsyncIO
([
var_name_val
,
p_ctx
,
ep_val
,
p_scope
,
time_out
,
ch
,
this
]
{
auto
*
var
=
p_scope
->
FindVar
(
var_name_val
);
::
grpc
::
ByteBuffer
req
;
...
...
@@ -89,7 +90,8 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
const
framework
::
Scope
*
p_scope
=
&
scope
;
const
auto
ch
=
GetChannel
(
ep_val
);
framework
::
Async
([
var_name_val
,
ep_val
,
p_scope
,
p_ctx
,
time_out
,
ch
,
this
]
{
framework
::
AsyncIO
([
var_name_val
,
ep_val
,
p_scope
,
p_ctx
,
time_out
,
ch
,
this
]
{
// prepare input
sendrecv
::
VariableMessage
req
;
req
.
set_varname
(
var_name_val
);
...
...
@@ -132,8 +134,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep,
const
framework
::
Scope
*
p_scope
=
&
scope
;
const
auto
ch
=
GetChannel
(
ep_val
);
framework
::
Async
([
in_var_name_val
,
out_var_name_val
,
ep_val
,
p_scope
,
p_ctx
,
time_out
,
ch
,
this
]
{
framework
::
Async
IO
([
in_var_name_val
,
out_var_name_val
,
ep_val
,
p_scope
,
p_ctx
,
time_out
,
ch
,
this
]
{
auto
*
var
=
p_scope
->
FindVar
(
in_var_name_val
);
::
grpc
::
ByteBuffer
req
;
...
...
@@ -196,7 +198,7 @@ bool RPCClient::Wait() {
std
::
vector
<
std
::
future
<
void
>>
waits
(
req_count_
);
for
(
int
i
=
0
;
i
<
req_count_
;
i
++
)
{
waits
[
i
]
=
framework
::
Async
([
i
,
&
a
,
this
]
{
a
[
i
]
=
Proceed
();
});
waits
[
i
]
=
framework
::
Async
IO
([
i
,
&
a
,
this
]
{
a
[
i
]
=
Proceed
();
});
}
for
(
int
i
=
0
;
i
<
req_count_
;
i
++
)
{
...
...
paddle/fluid/operators/detail/grpc_server.cc
浏览文件 @
1bdc7261
...
...
@@ -217,10 +217,10 @@ void AsyncGRPCServer::RunSyncUpdate() {
std
::
function
<
void
()
>
prefetch_register
=
std
::
bind
(
&
AsyncGRPCServer
::
TryToRegisterNewPrefetchOne
,
this
);
// TODO(wuyi): Run these "HandleRequest" in thread pool
t_send_
.
reset
(
new
std
::
thread
(
std
::
bind
(
&
AsyncGRPCServer
::
HandleRequest
,
this
,
cq_send_
.
get
(),
"cq_send"
,
send_register
)));
t_get_
.
reset
(
new
std
::
thread
(
std
::
bind
(
&
AsyncGRPCServer
::
HandleRequest
,
this
,
cq_get_
.
get
(),
"cq_get"
,
get_register
)));
...
...
python/paddle/fluid/tests/book/test_recognize_digits.py
浏览文件 @
1bdc7261
...
...
@@ -157,7 +157,6 @@ def train(nn_type,
for
ip
in
pserver_ips
.
split
(
","
):
eplist
.
append
(
':'
.
join
([
ip
,
port
]))
pserver_endpoints
=
","
.
join
(
eplist
)
# ip:port,ip:port...
pserver_endpoints
=
os
.
getenv
(
"PSERVERS"
)
trainers
=
int
(
os
.
getenv
(
"TRAINERS"
))
current_endpoint
=
os
.
getenv
(
"POD_IP"
)
+
":"
+
port
trainer_id
=
int
(
os
.
getenv
(
"PADDLE_INIT_TRAINER_ID"
))
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录