Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
71f2383e
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
71f2383e
编写于
1月 29, 2018
作者:
武
武毅
提交者:
GitHub
1月 29, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #7793 from typhoonzero/recv_op_python_with_guard
Feature/Recv op python with guard
上级
d082f3a9
96eb9587
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
197 addition
and
4 deletion
+197
-4
paddle/operators/recv_op.cc
paddle/operators/recv_op.cc
+0
-1
python/paddle/v2/fluid/distribute_transpiler.py
python/paddle/v2/fluid/distribute_transpiler.py
+1
-2
python/paddle/v2/fluid/layers/io.py
python/paddle/v2/fluid/layers/io.py
+123
-1
python/paddle/v2/fluid/tests/CMakeLists.txt
python/paddle/v2/fluid/tests/CMakeLists.txt
+5
-0
python/paddle/v2/fluid/tests/test_recv_op.py
python/paddle/v2/fluid/tests/test_recv_op.py
+68
-0
未找到文件。
paddle/operators/recv_op.cc
浏览文件 @
71f2383e
...
...
@@ -161,7 +161,6 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
public:
RecvOpMaker
(
OpProto
*
proto
,
OpAttrChecker
*
op_checker
)
:
OpProtoAndCheckerMaker
(
proto
,
op_checker
)
{
AddInput
(
"RX"
,
"(Tensor) Input tensor to be optimized"
).
AsDuplicable
();
AddComment
(
R"DOC(
Recv operator
...
...
python/paddle/v2/fluid/distribute_transpiler.py
浏览文件 @
71f2383e
...
...
@@ -474,8 +474,7 @@ class DistributeTranspiler:
# Append the recv op
pserver_program
.
global_block
().
append_op
(
type
=
"recv"
,
inputs
=
{
"RX"
:
self
.
param_grad_ep_mapping
[
endpoint
][
"grads"
]
},
# grads to recv
inputs
=
{},
outputs
=
{},
attrs
=
{
"OptimizeBlock"
:
optimize_sub_program
.
global_block
(),
...
...
python/paddle/v2/fluid/layers/io.py
浏览文件 @
71f2383e
...
...
@@ -14,8 +14,10 @@
from
..
import
core
from
..layer_helper
import
LayerHelper
from
control_flow
import
BlockGuard
from
..layer_helper
import
LayerHelper
__all__
=
[
'data'
]
__all__
=
[
'data'
,
'BlockGuardServ'
,
'ListenAndServ'
,
'Send'
]
def
data
(
name
,
...
...
@@ -74,3 +76,123 @@ def data(name,
type
=
type
,
stop_gradient
=
stop_gradient
,
lod_level
=
lod_level
)
class
BlockGuardServ
(
BlockGuard
):
"""
BlockGuardServ class.
BlockGuardServ class is used to create an op with a block in a program.
"""
def
__init__
(
self
,
server
):
if
not
(
isinstance
(
server
,
ListenAndServ
)):
raise
TypeError
(
"BlockGuardServ takes a ListenAndServ"
)
super
(
BlockGuardServ
,
self
).
__init__
(
server
.
helper
.
main_program
)
self
.
server
=
server
def
__exit__
(
self
,
exc_type
,
exc_val
,
exc_tb
):
if
exc_type
is
not
None
:
return
False
self
.
server
.
complete_op
()
return
super
(
BlockGuardServ
,
self
).
__exit__
(
exc_type
,
exc_val
,
exc_tb
)
class
ListenAndServ
(
object
):
"""
ListenAndServ class.
ListenAndServ class is used to wrap listen_and_serv op to create a server
which can receive variables from clients and run a block.
"""
def
__init__
(
self
,
endpoint
,
fan_in
=
1
,
optimizer_mode
=
True
):
self
.
helper
=
LayerHelper
(
"recv"
)
self
.
inputs
=
[]
self
.
outputs
=
[]
self
.
endpoint
=
endpoint
self
.
fan_in
=
fan_in
# FIXME(typhoonzero): add optimizer_mode is stupid, should make it more
# general.
self
.
optimizer_mode
=
optimizer_mode
def
do
(
self
):
return
BlockGuardServ
(
self
)
def
get_params_and_grads
(
self
):
main_program
=
self
.
helper
.
main_program
current_block
=
main_program
.
current_block
()
parent_block
=
self
.
parent_block
()
# params and grads in the same order.
params
=
list
()
grads
=
list
()
for
op
in
current_block
.
ops
:
# FIXME(typhoonzero): op.inputs is None if it's cloned.
if
self
.
optimizer_mode
:
if
"Grad"
in
op
.
inputs
and
"Param"
in
op
.
inputs
:
params
.
append
(
op
.
inputs
[
"Param"
].
name
)
grads
.
append
(
op
.
inputs
[
"Grad"
].
name
)
else
:
# simple recv mode, recv operators inputs.
for
iname
in
op
.
input_names
:
for
in_var_name
in
op
.
input
(
iname
):
params
.
append
(
parent_block
.
var
(
in_var_name
))
grads
.
append
(
parent_block
.
var
(
in_var_name
))
return
params
,
grads
def
parent_block
(
self
):
prog
=
self
.
helper
.
main_program
parent_idx
=
prog
.
current_block
().
parent_idx
assert
parent_idx
>=
0
parent_block
=
prog
.
block
(
parent_idx
)
return
parent_block
def
complete_op
(
self
):
main_program
=
self
.
helper
.
main_program
current_block
=
main_program
.
current_block
()
parent_block
=
self
.
parent_block
()
params
,
grads
=
self
.
get_params_and_grads
()
param_names
=
[
p
.
name
for
p
in
params
]
grad_names
=
[
g
.
name
for
g
in
grads
]
parent_block
.
append_op
(
type
=
'recv'
,
inputs
=
{},
outputs
=
{},
attrs
=
{
'endpoint'
:
self
.
endpoint
,
'Fanin'
:
self
.
fan_in
,
'ParamList'
:
param_names
,
'GradList'
:
grad_names
,
'OptimizeBlock'
:
current_block
})
def
Send
(
endpoints
,
send_vars
,
get_vars
):
"""
Send layer
Args:
endpoints: comma seperated IP:PORT pairs in the order
of send_vars to send
send_vars: vars to send
get_vars: vars to get from server after send completes.
Send variables to the server side, and get vars from server
side when server have finished running server side program.
"""
assert
(
type
(
send_vars
)
==
list
)
assert
(
type
(
get_vars
)
==
list
)
epmap
=
endpoints
.
split
(
","
)
endpoints
=
list
(
set
(
epmap
))
helper
=
LayerHelper
(
"Send"
,
**
locals
())
helper
.
append_op
(
type
=
"send"
,
inputs
=
{
"X"
:
send_vars
},
outputs
=
{
"Out"
:
get_vars
},
attrs
=
{
"endpoints"
:
endpoints
,
"epmap"
:
epmap
})
python/paddle/v2/fluid/tests/CMakeLists.txt
浏览文件 @
71f2383e
file
(
GLOB TEST_OPS RELATIVE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
"
"test_*.py"
)
string
(
REPLACE
".py"
""
TEST_OPS
"
${
TEST_OPS
}
"
)
if
(
NOT WITH_DISTRIBUTE
)
list
(
REMOVE_ITEM TEST_OPS test_recv_op
)
endif
(
NOT WITH_DISTRIBUTE
)
foreach
(
src
${
TEST_OPS
}
)
py_test
(
${
src
}
SRCS
${
src
}
.py
)
endforeach
()
...
...
python/paddle/v2/fluid/tests/test_recv_op.py
0 → 100644
浏览文件 @
71f2383e
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
unittest
import
paddle.v2.fluid
as
fluid
import
paddle.v2.fluid.layers
as
layers
import
numpy
from
multiprocessing
import
Process
import
os
,
sys
class
TestRecvOp
(
unittest
.
TestCase
):
def
test_send
(
self
):
# Run init_serv in a thread
place
=
fluid
.
CPUPlace
()
p
=
Process
(
target
=
self
.
init_serv
,
args
=
(
place
,
))
p
.
daemon
=
True
p
.
start
()
self
.
init_client
(
place
)
# FIXME(typhoonzero): find a way to gracefully shutdown the server.
os
.
system
(
"kill -9 %d"
%
p
.
pid
)
p
.
join
()
def
init_serv
(
self
,
place
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
x
=
layers
.
data
(
shape
=
[
32
,
32
],
dtype
=
'float32'
,
name
=
"X"
,
append_batch_size
=
False
)
fluid
.
initializer
.
Constant
(
value
=
1.0
)(
x
,
main
.
global_block
())
serv
=
layers
.
ListenAndServ
(
"127.0.0.1:6174"
,
optimizer_mode
=
False
)
with
serv
.
do
():
o
=
layers
.
scale
(
x
=
x
,
scale
=
10.0
)
main
.
global_block
().
create_var
(
name
=
o
.
name
,
psersistable
=
False
,
dtype
=
o
.
dtype
,
shape
=
o
.
shape
)
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
main
)
def
init_client
(
self
,
place
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
x
=
layers
.
data
(
shape
=
[
32
,
32
],
dtype
=
'float32'
,
name
=
'X'
,
append_batch_size
=
False
)
fluid
.
initializer
.
Constant
(
value
=
1.0
)(
x
,
main
.
global_block
())
layers
.
Send
(
"127.0.0.1:6174"
,
[
x
],
[
x
])
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
main
)
if
__name__
==
"__main__"
:
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录