Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
PaddleDetection
提交
8f7c7730
P
PaddleDetection
项目概览
s920243400
/
PaddleDetection
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleDetection
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleDetection
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8f7c7730
编写于
4月 20, 2018
作者:
Q
qiaolongfei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refine listen_and_serv_op
上级
cec4e6ed
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
75 addition
and
66 deletion
+75
-66
paddle/fluid/operators/detail/grpc_server.h
paddle/fluid/operators/detail/grpc_server.h
+1
-1
paddle/fluid/operators/listen_and_serv_op.cc
paddle/fluid/operators/listen_and_serv_op.cc
+58
-58
paddle/fluid/operators/listen_and_serv_op.h
paddle/fluid/operators/listen_and_serv_op.h
+15
-6
paddle/fluid/operators/send_recv_op_test.cc
paddle/fluid/operators/send_recv_op_test.cc
+1
-1
未找到文件。
paddle/fluid/operators/detail/grpc_server.h
浏览文件 @
8f7c7730
...
...
@@ -67,7 +67,7 @@ class AsyncGRPCServer final {
prefetch_ctx_
=
prepared
;
}
int
GetSelectedPort
()
{
return
selected_port_
;
}
int
GetSelectedPort
()
const
{
return
selected_port_
;
}
const
ReceivedMessage
Get
()
{
return
this
->
var_recv_queue_
.
Pop
();
}
...
...
paddle/fluid/operators/listen_and_serv_op.cc
浏览文件 @
8f7c7730
...
...
@@ -27,20 +27,6 @@ void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) {
VLOG
(
4
)
<<
"RunServer thread end"
;
}
static
void
CreateTensorFromMessageType
(
framework
::
Variable
*
var
,
sendrecv
::
VarType
var_type
)
{
if
(
var_type
==
sendrecv
::
VarType
::
LOD_TENSOR
)
{
var
->
GetMutable
<
framework
::
LoDTensor
>
();
}
else
if
(
var_type
==
sendrecv
::
VarType
::
SELECTED_ROWS
)
{
var
->
GetMutable
<
framework
::
SelectedRows
>
();
}
else
{
PADDLE_THROW
(
"VariableMessage type %d is not in "
"[LoDTensor, SelectedRows]"
,
var_type
);
}
}
static
void
ParallelExecuteBlocks
(
const
std
::
vector
<
size_t
>
&
parallel_blkids
,
framework
::
Executor
*
executor
,
const
std
::
vector
<
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>>
...
...
@@ -77,59 +63,37 @@ void ListenAndServOp::Stop() {
server_thread_
->
join
();
}
void
ListenAndServOp
::
RunImpl
(
const
framework
::
Scope
&
scope
,
const
platform
::
Place
&
dev_place
)
const
{
platform
::
DeviceContextPool
&
pool
=
platform
::
DeviceContextPool
::
Instance
();
auto
&
dev_ctx
=
*
pool
.
Get
(
dev_place
);
framework
::
Scope
&
recv_scope
=
scope
.
NewScope
();
if
(
!
rpc_service_
)
{
std
::
string
endpoint
=
Attr
<
std
::
string
>
(
"endpoint"
);
rpc_service_
.
reset
(
new
detail
::
AsyncGRPCServer
(
endpoint
));
}
void
ListenAndServOp
::
PreparePrefetchCtx
(
framework
::
Executor
*
executor
,
framework
::
BlockDesc
*
prefetch_block
,
framework
::
ProgramDesc
*
program
)
const
{
// TODO(qiao) set proper fields for table lookup and update
rpc_service_
->
SetExecutor
(
executor
);
VLOG
(
3
)
<<
"prefetch block id is "
<<
prefetch_block
->
ID
();
auto
prefetch_prepared
=
executor
->
Prepare
(
*
program
,
prefetch_block
->
ID
());
rpc_service_
->
SetPrefetchBlkdId
(
prefetch_block
->
ID
());
rpc_service_
->
SetPrefetchPreparedCtx
(
prefetch_prepared
.
get
());
prefetch_prepared
.
release
();
}
auto
ins
=
Inputs
(
"X"
);
void
ListenAndServOp
::
RunSyncUpdate
(
framework
::
Executor
*
executor
,
framework
::
ProgramDesc
*
program
,
framework
::
Scope
*
recv_scope
,
framework
::
BlockDesc
*
prefetch_block
)
const
{
auto
fan_in
=
Attr
<
int
>
(
"Fanin"
);
auto
*
optimize_block
=
Attr
<
framework
::
BlockDesc
*>
(
kOptimizeBlock
);
auto
*
prefetch_block
=
Attr
<
framework
::
BlockDesc
*>
(
kPrefetchBlock
);
auto
*
program
=
optimize_block
->
Program
();
size_t
num_blocks
=
program
->
Size
();
PADDLE_ENFORCE_GE
(
num_blocks
,
2
,
"server program should have at least 2 blocks"
);
framework
::
Executor
executor
(
dev_place
);
std
::
vector
<
int
>
block_list
;
for
(
size_t
blkid
=
1
;
blkid
<
num_blocks
;
++
blkid
)
{
if
(
blkid
!=
static_cast
<
size_t
>
(
prefetch_block
->
ID
()))
{
block_list
.
push_back
(
blkid
);
}
block_list
.
push_back
(
blkid
);
}
auto
optimize_prepared
=
executor
.
Prepare
(
*
program
,
block_list
);
auto
optimize_prepared
=
executor
->
Prepare
(
*
program
,
block_list
);
// Insert placeholder for block0 which holds current op itself.
optimize_prepared
.
insert
(
optimize_prepared
.
begin
(),
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>
(
nullptr
));
rpc_service_
->
SetScope
(
&
recv_scope
);
rpc_service_
->
SetDevCtx
(
&
dev_ctx
);
// TODO(qiao) set proper fields for table lookup and update
rpc_service_
->
SetExecutor
(
&
executor
);
VLOG
(
3
)
<<
"prefetch block id is "
<<
prefetch_block
->
ID
();
auto
prefetch_prepared
=
executor
.
Prepare
(
*
program
,
prefetch_block
->
ID
());
rpc_service_
->
SetPrefetchBlkdId
(
prefetch_block
->
ID
());
rpc_service_
->
SetPrefetchPreparedCtx
(
prefetch_prepared
.
get
());
prefetch_prepared
.
release
();
rpc_service_
->
SetProgram
(
program
);
// start the server listening after all member initialized.
server_thread_
.
reset
(
new
std
::
thread
(
RunServer
,
rpc_service_
));
VLOG
(
3
)
<<
"wait server thread to become ready..."
;
sleep
(
5
);
// Write to a file of server selected port for python use.
std
::
ofstream
port_file
;
port_file
.
open
(
"/tmp/paddle.selected_port"
);
port_file
<<
rpc_service_
->
GetSelectedPort
();
port_file
.
close
();
bool
exit_flag
=
false
;
// Record received sparse variables, so that
// we could reset those after execute optimize program
...
...
@@ -170,7 +134,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
break
;
}
// NOTE: if is_gpu_place, CUDA kernels are lau
gched
by multiple threads
// NOTE: if is_gpu_place, CUDA kernels are lau
nch
by multiple threads
// and this will still work.
// The optimize blocks which have the same parent ID would run parallel
...
...
@@ -182,16 +146,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
for
(
size_t
blkid
=
2
;
blkid
<
num_blocks
;
++
blkid
)
{
if
(
blkid
!=
static_cast
<
size_t
>
(
prefetch_block
->
ID
()))
{
if
(
program
->
Block
(
blkid
).
Parent
()
!=
last_parent_blkid
)
{
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
optimize_prepared
,
program
,
&
recv_scope
);
ParallelExecuteBlocks
(
parallel_blkids
,
executor
,
optimize_prepared
,
program
,
recv_scope
);
parallel_blkids
.
clear
();
last_parent_blkid
=
program
->
Block
(
blkid
).
Parent
();
}
parallel_blkids
.
push_back
(
blkid
);
}
}
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
optimize_prepared
,
program
,
&
recv_scope
);
ParallelExecuteBlocks
(
parallel_blkids
,
executor
,
optimize_prepared
,
program
,
recv_scope
);
VLOG
(
2
)
<<
"run all blocks spent "
<<
detail
::
GetTimestamp
()
-
ts
<<
"(ms)"
;
// Reset the received sparse variables, the sum operator would not
...
...
@@ -209,6 +173,42 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
}
// while(true)
}
static
void
SavePort
(
std
::
shared_ptr
<
detail
::
AsyncGRPCServer
>
rpc_service
)
{
std
::
ofstream
port_file
;
port_file
.
open
(
"/tmp/paddle.selected_port"
);
port_file
<<
rpc_service
->
GetSelectedPort
();
port_file
.
close
();
}
void
ListenAndServOp
::
RunImpl
(
const
framework
::
Scope
&
scope
,
const
platform
::
Place
&
dev_place
)
const
{
platform
::
DeviceContextPool
&
pool
=
platform
::
DeviceContextPool
::
Instance
();
auto
&
dev_ctx
=
*
pool
.
Get
(
dev_place
);
framework
::
Scope
&
recv_scope
=
scope
.
NewScope
();
PADDLE_ENFORCE
(
!
rpc_service_
);
std
::
string
endpoint
=
Attr
<
std
::
string
>
(
"endpoint"
);
rpc_service_
.
reset
(
new
detail
::
AsyncGRPCServer
(
endpoint
));
auto
*
optimize_block
=
Attr
<
framework
::
BlockDesc
*>
(
kOptimizeBlock
);
auto
*
prefetch_block
=
Attr
<
framework
::
BlockDesc
*>
(
kPrefetchBlock
);
auto
*
program
=
optimize_block
->
Program
();
framework
::
Executor
executor
(
dev_place
);
// prepare rpc_service
rpc_service_
->
SetScope
(
&
recv_scope
);
rpc_service_
->
SetDevCtx
(
&
dev_ctx
);
rpc_service_
->
SetProgram
(
program
);
PreparePrefetchCtx
(
&
executor
,
prefetch_block
,
program
);
// start the server listening after all member initialized.
server_thread_
.
reset
(
new
std
::
thread
(
RunServer
,
rpc_service_
));
VLOG
(
3
)
<<
"wait server thread to become ready..."
;
sleep
(
5
);
// Write to a file of server selected port for python use.
SavePort
(
rpc_service_
);
RunSyncUpdate
(
&
executor
,
program
,
&
recv_scope
,
prefetch_block
);
}
class
ListenAndServOpMaker
:
public
framework
::
OpProtoAndCheckerMaker
{
public:
ListenAndServOpMaker
(
OpProto
*
proto
,
OpAttrChecker
*
op_checker
)
...
...
paddle/fluid/operators/listen_and_serv_op.h
浏览文件 @
8f7c7730
...
...
@@ -34,17 +34,26 @@ void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service);
class
ListenAndServOp
:
public
framework
::
OperatorBase
{
public:
ListenAndServOp
(
const
std
::
string
&
type
,
const
framework
::
VariableNameMap
&
inputs
,
const
framework
::
VariableNameMap
&
outputs
,
const
framework
::
AttributeMap
&
attrs
);
ListenAndServOp
(
const
std
::
string
&
type
,
const
framework
::
VariableNameMap
&
inputs
,
const
framework
::
VariableNameMap
&
outputs
,
const
framework
::
AttributeMap
&
attrs
);
int
GetSelectedPort
()
const
;
void
PreparePrefetchCtx
(
framework
::
Executor
*
executor
,
framework
::
BlockDesc
*
prefetch_block
,
framework
::
ProgramDesc
*
program
)
const
;
void
RunSyncUpdate
(
framework
::
Executor
*
executor
,
framework
::
ProgramDesc
*
program
,
framework
::
Scope
*
recv_scope
,
framework
::
BlockDesc
*
prefetch_block
)
const
;
void
Stop
()
override
;
void
RunImpl
(
const
framework
::
Scope
&
scope
,
const
platform
::
Place
&
dev_place
)
const
override
;
void
RunImpl
(
const
framework
::
Scope
&
scope
,
const
platform
::
Place
&
dev_place
)
const
override
;
protected:
mutable
std
::
shared_ptr
<
detail
::
AsyncGRPCServer
>
rpc_service_
;
...
...
paddle/fluid/operators/send_recv_op_test.cc
浏览文件 @
8f7c7730
...
...
@@ -127,7 +127,7 @@ void StartServerNet(bool is_sparse) {
const
auto
&
root_block
=
program
.
Block
(
0
);
auto
*
optimize_block
=
program
.
AppendBlock
(
root_block
);
auto
*
prefetch_block
=
program
.
AppendBlock
(
root_block
);
// X for server side tensors, RX for received tens
e
rs, must be of same shape.
// X for server side tensors, RX for received tens
o
rs, must be of same shape.
AddOp
(
"sum"
,
{{
"X"
,
{
"x0"
,
"x1"
}}},
{{
"Out"
,
{
"Out"
}}},
{},
optimize_block
);
f
::
AttributeMap
attrs
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录