Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
PaddleDetection
提交
00f8e63b
P
PaddleDetection
项目概览
s920243400
/
PaddleDetection
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleDetection
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleDetection
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
00f8e63b
编写于
4月 03, 2018
作者:
T
typhoonzero
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update
上级
94eea16e
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
33 addition
and
36 deletion
+33
-36
paddle/fluid/operators/CMakeLists.txt
paddle/fluid/operators/CMakeLists.txt
+1
-0
paddle/fluid/operators/detail/grpc_server.cc
paddle/fluid/operators/detail/grpc_server.cc
+0
-3
paddle/fluid/operators/listen_and_serv_op.cc
paddle/fluid/operators/listen_and_serv_op.cc
+32
-1
paddle/fluid/operators/listen_and_serv_op.h
paddle/fluid/operators/listen_and_serv_op.h
+0
-32
未找到文件。
paddle/fluid/operators/CMakeLists.txt
浏览文件 @
00f8e63b
...
...
@@ -193,6 +193,7 @@ if(WITH_DISTRIBUTE)
set_source_files_properties
(
send_vars_op.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
op_library
(
send_barrier_op DEPS
${
DISTRIBUTE_DEPS
}
)
set_source_files_properties
(
send_barrier_op.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
set_source_files_properties
(
send_recv_op_test.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
cc_test
(
test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor
)
else
()
set
(
DEPS_OPS
${
DEPS_OPS
}
send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op
)
...
...
paddle/fluid/operators/detail/grpc_server.cc
浏览文件 @
00f8e63b
...
...
@@ -244,9 +244,6 @@ void AsyncGRPCServer::TryToRegisterNewSendOne() {
VLOG
(
3
)
<<
"shutdown, do not TryToRegisterNewSendOne"
;
return
;
}
while
(
scope_
==
nullptr
)
{
sleep
(
0.01
);
}
RequestSend
*
send
=
new
RequestSend
(
&
service_
,
cq_send_
.
get
(),
scope_
,
&
var_recv_queue_
,
dev_ctx_
);
VLOG
(
4
)
<<
"Create RequestSend status:"
<<
send
->
Status
();
...
...
paddle/fluid/operators/listen_and_serv_op.cc
浏览文件 @
00f8e63b
...
...
@@ -25,6 +25,38 @@ void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) {
VLOG
(
4
)
<<
"RunServer thread end"
;
}
static
void
CreateTensorFromMessageType
(
framework
::
Variable
*
var
,
sendrecv
::
VarType
var_type
)
{
if
(
var_type
==
sendrecv
::
VarType
::
LOD_TENSOR
)
{
var
->
GetMutable
<
framework
::
LoDTensor
>
();
}
else
if
(
var_type
==
sendrecv
::
VarType
::
SELECTED_ROWS
)
{
var
->
GetMutable
<
framework
::
SelectedRows
>
();
}
else
{
PADDLE_THROW
(
"VariableMessage type %d is not in "
"[LoDTensor, SelectedRows]"
,
var_type
);
}
}
static
void
ParallelExecuteBlocks
(
const
std
::
vector
<
size_t
>
&
parallel_blkids
,
framework
::
Executor
*
executor
,
framework
::
ProgramDesc
*
program
,
framework
::
Scope
*
scope
)
{
std
::
vector
<
std
::
future
<
void
>>
fs
;
for
(
size_t
idx
:
parallel_blkids
)
{
fs
.
push_back
(
framework
::
Async
([
&
executor
,
&
program
,
&
scope
,
idx
]()
{
int
run_block
=
idx
;
// thread local
try
{
executor
->
Run
(
*
program
,
scope
,
run_block
,
false
,
false
);
}
catch
(
std
::
exception
&
e
)
{
LOG
(
ERROR
)
<<
"run sub program error "
<<
e
.
what
();
}
}));
}
for
(
size_t
i
=
0
;
i
<
fs
.
size
();
++
i
)
fs
[
i
].
wait
();
}
ListenAndServOp
::
ListenAndServOp
(
const
std
::
string
&
type
,
const
framework
::
VariableNameMap
&
inputs
,
const
framework
::
VariableNameMap
&
outputs
,
...
...
@@ -62,7 +94,6 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
framework
::
Executor
executor
(
dev_place
);
// FIXME(Yancey1989): initialize rpc server with lazy mode.
rpc_service_
->
SetScope
(
&
recv_scope
);
rpc_service_
->
SetDevCtx
(
&
dev_ctx
);
// TODO(qiao) set proper fields for table lookup and update
...
...
paddle/fluid/operators/listen_and_serv_op.h
浏览文件 @
00f8e63b
...
...
@@ -30,38 +30,6 @@ constexpr char kOptimizeBlock[] = "OptimizeBlock";
void
RunServer
(
std
::
shared_ptr
<
detail
::
AsyncGRPCServer
>
service
);
static
void
CreateTensorFromMessageType
(
framework
::
Variable
*
var
,
sendrecv
::
VarType
var_type
)
{
if
(
var_type
==
sendrecv
::
VarType
::
LOD_TENSOR
)
{
var
->
GetMutable
<
framework
::
LoDTensor
>
();
}
else
if
(
var_type
==
sendrecv
::
VarType
::
SELECTED_ROWS
)
{
var
->
GetMutable
<
framework
::
SelectedRows
>
();
}
else
{
PADDLE_THROW
(
"VariableMessage type %d is not in "
"[LoDTensor, SelectedRows]"
,
var_type
);
}
}
static
void
ParallelExecuteBlocks
(
const
std
::
vector
<
size_t
>
&
parallel_blkids
,
framework
::
Executor
*
executor
,
framework
::
ProgramDesc
*
program
,
framework
::
Scope
*
scope
)
{
std
::
vector
<
std
::
future
<
void
>>
fs
;
for
(
size_t
idx
:
parallel_blkids
)
{
fs
.
push_back
(
framework
::
Async
([
&
executor
,
&
program
,
&
scope
,
idx
]()
{
int
run_block
=
idx
;
// thread local
try
{
executor
->
Run
(
*
program
,
scope
,
run_block
,
false
,
false
);
}
catch
(
std
::
exception
&
e
)
{
LOG
(
ERROR
)
<<
"run sub program error "
<<
e
.
what
();
}
}));
}
for
(
size_t
i
=
0
;
i
<
fs
.
size
();
++
i
)
fs
[
i
].
wait
();
}
class
ListenAndServOp
:
public
framework
::
OperatorBase
{
public:
ListenAndServOp
(
const
std
::
string
&
type
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录