提交 0e850c74 编写于 作者: T typhoonzero

WIP

上级 bcc67401
...@@ -14,8 +14,10 @@ ...@@ -14,8 +14,10 @@
from .. import core from .. import core
from ..layer_helper import LayerHelper from ..layer_helper import LayerHelper
from control_flow import BlockGuard
from ..layer_helper import LayerHelper
__all__ = ['data'] __all__ = ['data', 'BlockGuardServ', 'ListenAndServ', 'Send']
def data(name, def data(name,
...@@ -105,12 +107,14 @@ class ListenAndServ(object): ...@@ -105,12 +107,14 @@ class ListenAndServ(object):
which can receive variables from clients and run a block. which can receive variables from clients and run a block.
""" """
def __init__(self, endpoint, fan_in=1): def __init__(self, endpoint, fan_in=1, optimizer_mode=True):
self.helper = LayerHelper("recv", name=name) self.helper = LayerHelper("recv")
self.inputs = [] self.inputs = []
self.outputs = [] self.outputs = []
self.endpoint = endpoint self.endpoint = endpoint
self.fan_in = fan_in self.fan_in = fan_in
# FIXME(typhoonzero): Add this switch is stupid
self.optimizer_mode = optimizer_mode
def do(self): def do(self):
return BlockGuardServ(self) return BlockGuardServ(self)
...@@ -124,9 +128,16 @@ class ListenAndServ(object): ...@@ -124,9 +128,16 @@ class ListenAndServ(object):
grads = list() grads = list()
for op in current_block.ops: for op in current_block.ops:
# FIXME(typhoonzero): op.inputs is None if it's cloned. # FIXME(typhoonzero): op.inputs is None if it's cloned.
if "Grad" in op.inputs and "Param" in op.inputs: if self.optimizer_mode:
params.append(op.inputs["Param"].name) if "Grad" in op.inputs and "Param" in op.inputs:
grads.append(op.inputs["Grad"].name) params.append(op.inputs["Param"].name)
grads.append(op.inputs["Grad"].name)
else:
# simple recv mode, recv operators inputs.
for iname in op.input_names:
for in_var_name in op.input(iname):
params.append(parent_block.var(name))
grads.append(parent_block.var(name))
return params, grads return params, grads
......
...@@ -17,20 +17,27 @@ import unittest ...@@ -17,20 +17,27 @@ import unittest
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import paddle.v2.fluid.layers as layers import paddle.v2.fluid.layers as layers
import numpy import numpy
import threading
class TestRecvOp(unittest.TestCase): class TestRecvOp(unittest.TestCase):
def run_test(self): def test_send(self):
# Run init_serv in a thread # Run init_serv in a thread
pass place = fluid.CPUPlace()
t = threading.Thread(target=self.init_serv, args=(place, ))
t.start()
self.init_client(place)
t.join()
def init_serv(self, place): def init_serv(self, place):
main = fluid.Program() main = fluid.Program()
with fluid.program_guard(main): with fluid.program_guard(main):
x = layers.data(shape=[32, 32], dtype='float32', name='X') x = layers.data(shape=[32, 32], dtype='float32', name='X')
serv = fluid.ListenAndServ("127.0.0.1:6174") i = fluid.initializer.Constant(value=1.0)
y = i(x, main.global_block())
serv = layers.ListenAndServ("127.0.0.1:6174")
with serv.do(): with serv.do():
layers.scale(input=x, scale=10) layers.scale(input=y, scale=10.0)
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(main) exe.run(main)
...@@ -38,8 +45,12 @@ class TestRecvOp(unittest.TestCase): ...@@ -38,8 +45,12 @@ class TestRecvOp(unittest.TestCase):
main = fluid.Program() main = fluid.Program()
with fluid.program_guard(main): with fluid.program_guard(main):
x = layers.data(shape=[32, 32], dtype='float32', name='X') x = layers.data(shape=[32, 32], dtype='float32', name='X')
i = fluid.initializer.Constant(x=1.0) i = fluid.initializer.Constant(value=1.0)
i(x, main.global_block()) i(x, main.global_block())
layers.Send("127.0.0.1:6174", [x], [x]) layers.Send("127.0.0.1:6174", [x], [x])
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(main) exe.run(main)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册