Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
bcc67401
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bcc67401
编写于
1月 23, 2018
作者:
T
typhoonzero
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
WIP python binding of send recv
上级
d76fcb6f
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
155 addition
and
9 deletion
+155
-9
paddle/operators/recv_op.cc
paddle/operators/recv_op.cc
+9
-9
python/paddle/v2/fluid/layers/io.py
python/paddle/v2/fluid/layers/io.py
+101
-0
python/paddle/v2/fluid/tests/test_recv_op.py
python/paddle/v2/fluid/tests/test_recv_op.py
+45
-0
未找到文件。
paddle/operators/recv_op.cc
浏览文件 @
bcc67401
...
@@ -49,7 +49,7 @@ static void CreateTensorFromMessageType(framework::Variable *var,
...
@@ -49,7 +49,7 @@ static void CreateTensorFromMessageType(framework::Variable *var,
var
->
GetMutable
<
framework
::
SelectedRows
>
();
var
->
GetMutable
<
framework
::
SelectedRows
>
();
}
else
{
}
else
{
PADDLE_THROW
(
PADDLE_THROW
(
"V
aria
bleMessage type %d is not in "
"V
rai
bleMessage type %d is not in "
"[LoDTensor, SelectedRows]"
,
"[LoDTensor, SelectedRows]"
,
var_type
);
var_type
);
}
}
...
@@ -121,17 +121,17 @@ class RecvOp : public framework::OperatorBase {
...
@@ -121,17 +121,17 @@ class RecvOp : public framework::OperatorBase {
if
(
it
!=
grad_list
.
end
())
{
if
(
it
!=
grad_list
.
end
())
{
param_var_name
=
param_list
[
it
-
grad_list
.
begin
()];
param_var_name
=
param_list
[
it
-
grad_list
.
begin
()];
}
else
{
}
else
{
LOG
(
ERROR
)
<<
"grad ha
s
no paired param:"
<<
grad_var_name
;
LOG
(
ERROR
)
<<
"grad ha
ve
no paired param:"
<<
grad_var_name
;
}
}
VLOG
(
3
)
<<
"rec
ei
ved grad: "
<<
grad_var_name
VLOG
(
3
)
<<
"recved grad: "
<<
grad_var_name
<<
" updating param: "
<<
param_var_name
;
<<
" updating param: "
<<
param_var_name
;
if
(
fan_in
>
1
)
{
if
(
fan_in
>
1
)
{
grad_var_name
=
this
->
GetGradVarNameForTrainer
(
grad_var_name
);
grad_var_name
=
this
->
GetGradVarNameForTrainer
(
grad_var_name
);
}
}
auto
*
var
=
recv_scope
.
FindVar
(
grad_var_name
);
auto
*
var
=
recv_scope
.
FindVar
(
grad_var_name
);
if
(
var
==
nullptr
)
{
if
(
var
==
nullptr
)
{
LOG
(
ERROR
)
<<
"
C
an not find server side var: "
<<
grad_var_name
;
LOG
(
ERROR
)
<<
"
c
an not find server side var: "
<<
grad_var_name
;
PADDLE_THROW
(
"
C
an not find server side var"
);
PADDLE_THROW
(
"
c
an not find server side var"
);
}
}
detail
::
DeserializeFromMessage
(
v
.
second
,
dev_ctx
,
var
);
detail
::
DeserializeFromMessage
(
v
.
second
,
dev_ctx
,
var
);
}
}
...
@@ -161,11 +161,11 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
...
@@ -161,11 +161,11 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
public:
public:
RecvOpMaker
(
OpProto
*
proto
,
OpAttrChecker
*
op_checker
)
RecvOpMaker
(
OpProto
*
proto
,
OpAttrChecker
*
op_checker
)
:
OpProtoAndCheckerMaker
(
proto
,
op_checker
)
{
:
OpProtoAndCheckerMaker
(
proto
,
op_checker
)
{
AddInput
(
"RX"
,
"(Tensor) Input tensor to be optimized"
).
AsDuplicable
();
//
AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable();
AddComment
(
R"DOC(
AddComment
(
R"DOC(
Recv operator
Recv operator
This operator will rec
ieve
tensor from send_op
This operator will rec
v
tensor from send_op
)DOC"
);
)DOC"
);
AddAttr
<
std
::
string
>
(
"endpoint"
,
AddAttr
<
std
::
string
>
(
"endpoint"
,
"(string, default 127.0.0.1:6164)"
"(string, default 127.0.0.1:6164)"
...
@@ -176,11 +176,11 @@ This operator will recieve tensor from send_op
...
@@ -176,11 +176,11 @@ This operator will recieve tensor from send_op
kOptimizeBlock
,
"Serialized ProgramDesc string for recv to run."
);
kOptimizeBlock
,
"Serialized ProgramDesc string for recv to run."
);
AddAttr
<
std
::
vector
<
std
::
string
>>
(
AddAttr
<
std
::
vector
<
std
::
string
>>
(
"ParamList"
,
"type list of string"
,
"ParamList"
,
"type list of string"
,
"grad->param name mapping to find which param
eters
to optimize."
)
"grad->param name mapping to find which param to optimize."
)
.
SetDefault
({});
.
SetDefault
({});
AddAttr
<
std
::
vector
<
std
::
string
>>
(
AddAttr
<
std
::
vector
<
std
::
string
>>
(
"GradList"
,
"type list of string"
,
"GradList"
,
"type list of string"
,
"grad->param name mapping to find which param
eters
to optimize."
)
"grad->param name mapping to find which param to optimize."
)
.
SetDefault
({});
.
SetDefault
({});
AddAttr
<
int
>
(
"Fanin"
,
"type int"
,
AddAttr
<
int
>
(
"Fanin"
,
"type int"
,
"Number of trainers in the current cluster job"
)
"Number of trainers in the current cluster job"
)
...
...
python/paddle/v2/fluid/layers/io.py
浏览文件 @
bcc67401
...
@@ -74,3 +74,104 @@ def data(name,
...
@@ -74,3 +74,104 @@ def data(name,
type
=
type
,
type
=
type
,
stop_gradient
=
stop_gradient
,
stop_gradient
=
stop_gradient
,
lod_level
=
lod_level
)
lod_level
=
lod_level
)
class
BlockGuardServ
(
BlockGuard
):
"""
BlockGuardServ class.
BlockGuardServ class is used to create an op with a block in a program.
"""
def
__init__
(
self
,
server
):
if
not
(
isinstance
(
server
,
ListenAndServ
)):
raise
TypeError
(
"BlockGuardServ takes a ListenAndServ"
)
super
(
BlockGuardServ
,
self
).
__init__
(
server
.
helper
.
main_program
)
self
.
server
=
server
def
__exit__
(
self
,
exc_type
,
exc_val
,
exc_tb
):
if
exc_type
is
not
None
:
return
False
self
.
server
.
complete_op
()
return
super
(
BlockGuardServ
,
self
).
__exit__
(
exc_type
,
exc_val
,
exc_tb
)
class
ListenAndServ
(
object
):
"""
ListenAndServ class.
ListenAndServ class is used to wrap listen_and_serv op to create a server
which can receive variables from clients and run a block.
"""
def
__init__
(
self
,
endpoint
,
fan_in
=
1
):
self
.
helper
=
LayerHelper
(
"recv"
,
name
=
name
)
self
.
inputs
=
[]
self
.
outputs
=
[]
self
.
endpoint
=
endpoint
self
.
fan_in
=
fan_in
def
do
(
self
):
return
BlockGuardServ
(
self
)
def
get_params_and_grads
(
self
):
main_program
=
self
.
helper
.
main_program
current_block
=
main_program
.
current_block
()
parent_block
=
self
.
parent_block
()
# params and grads in the same order.
params
=
list
()
grads
=
list
()
for
op
in
current_block
.
ops
:
# FIXME(typhoonzero): op.inputs is None if it's cloned.
if
"Grad"
in
op
.
inputs
and
"Param"
in
op
.
inputs
:
params
.
append
(
op
.
inputs
[
"Param"
].
name
)
grads
.
append
(
op
.
inputs
[
"Grad"
].
name
)
return
params
,
grads
def
complete_op
(
self
):
main_program
=
self
.
helper
.
main_program
current_block
=
main_program
.
current_block
()
parent_block
=
self
.
parent_block
()
params
,
grads
=
self
.
get_params_and_grads
()
parent_block
.
append_op
(
type
=
'recv'
,
inputs
=
{},
outputs
=
{},
attrs
=
{
'endpoint'
:
self
.
endpoint
,
'Fanin'
:
self
.
fan_in
,
'ParamList'
:
params
,
'GradList'
:
grads
,
'OptimizeBlock'
:
current_block
})
def
Send
(
endpoints
,
send_vars
,
get_vars
):
"""
Send layer
Args:
endpoints: comma seperated IP:PORT pairs in the order
of send_vars to send
send_vars: vars to send
get_vars: vars to get from server after send completes.
Send variables to the server side, and get vars from server
side when server have finished running server side program.
"""
assert
(
type
(
send_vars
)
==
list
)
assert
(
type
(
get_vars
)
==
list
)
epmap
=
endpoints
.
split
(
","
)
endpoints
=
set
(
epmap
)
helper
=
LayerHelper
(
"Send"
,
**
locals
())
helper
.
append_op
(
type
=
"send"
,
inputs
=
{
"X"
:
send_vars
},
outputs
=
{
"Out"
:
get_vars
},
attrs
=
{
"endpoints"
:
endpoints
,
"epmap"
:
epmap
})
python/paddle/v2/fluid/tests/test_recv_op.py
0 → 100644
浏览文件 @
bcc67401
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
unittest
import
paddle.v2.fluid
as
fluid
import
paddle.v2.fluid.layers
as
layers
import
numpy
class
TestRecvOp
(
unittest
.
TestCase
):
def
run_test
(
self
):
# Run init_serv in a thread
pass
def
init_serv
(
self
,
place
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
x
=
layers
.
data
(
shape
=
[
32
,
32
],
dtype
=
'float32'
,
name
=
'X'
)
serv
=
fluid
.
ListenAndServ
(
"127.0.0.1:6174"
)
with
serv
.
do
():
layers
.
scale
(
input
=
x
,
scale
=
10
)
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
main
)
def
init_client
(
self
,
place
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
x
=
layers
.
data
(
shape
=
[
32
,
32
],
dtype
=
'float32'
,
name
=
'X'
)
i
=
fluid
.
initializer
.
Constant
(
x
=
1.0
)
i
(
x
,
main
.
global_block
())
layers
.
Send
(
"127.0.0.1:6174"
,
[
x
],
[
x
])
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
main
)
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录