diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 593c35879ae2b3680b93ac5d8443110e61cb99fe..5d1df566aff67c975f8188c144374e92518ab0ae 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -161,7 +161,6 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker { public: RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable(); AddComment(R"DOC( Recv operator diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 908810c8be1d262c459825a49a61c3b3d826c1d0..77f80442e06cb18402bb1b8b97aa9119c7473f54 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -474,8 +474,7 @@ class DistributeTranspiler: # Append the recv op pserver_program.global_block().append_op( type="recv", - inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] - }, # grads to recv + inputs={}, outputs={}, attrs={ "OptimizeBlock": optimize_sub_program.global_block(), diff --git a/python/paddle/v2/fluid/layers/io.py b/python/paddle/v2/fluid/layers/io.py index 9af00e7de560d96103b54b37facaeadba2d3fe23..b7b2cf2296cc8868dd0b5eb6cd6d58b9ae795d5d 100644 --- a/python/paddle/v2/fluid/layers/io.py +++ b/python/paddle/v2/fluid/layers/io.py @@ -14,8 +14,10 @@ from .. import core from ..layer_helper import LayerHelper +from control_flow import BlockGuard +from ..layer_helper import LayerHelper -__all__ = ['data'] +__all__ = ['data', 'BlockGuardServ', 'ListenAndServ', 'Send'] def data(name, @@ -74,3 +76,123 @@ def data(name, type=type, stop_gradient=stop_gradient, lod_level=lod_level) + + +class BlockGuardServ(BlockGuard): + """ + BlockGuardServ class. + + BlockGuardServ class is used to create an op with a block in a program. + """ + + def __init__(self, server): + if not (isinstance(server, ListenAndServ)): + raise TypeError("BlockGuardServ takes a ListenAndServ") + super(BlockGuardServ, self).__init__(server.helper.main_program) + self.server = server + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return False + + self.server.complete_op() + return super(BlockGuardServ, self).__exit__(exc_type, exc_val, exc_tb) + + +class ListenAndServ(object): + """ + ListenAndServ class. + + ListenAndServ class is used to wrap listen_and_serv op to create a server + which can receive variables from clients and run a block. + """ + + def __init__(self, endpoint, fan_in=1, optimizer_mode=True): + self.helper = LayerHelper("recv") + self.inputs = [] + self.outputs = [] + self.endpoint = endpoint + self.fan_in = fan_in + # FIXME(typhoonzero): add optimizer_mode is stupid, should make it more + # general. + self.optimizer_mode = optimizer_mode + + def do(self): + return BlockGuardServ(self) + + def get_params_and_grads(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + # params and grads in the same order. + params = list() + grads = list() + for op in current_block.ops: + # FIXME(typhoonzero): op.inputs is None if it's cloned. + if self.optimizer_mode: + if "Grad" in op.inputs and "Param" in op.inputs: + params.append(op.inputs["Param"].name) + grads.append(op.inputs["Grad"].name) + else: + # simple recv mode, recv operators inputs. + for iname in op.input_names: + for in_var_name in op.input(iname): + params.append(parent_block.var(in_var_name)) + grads.append(parent_block.var(in_var_name)) + + return params, grads + + def parent_block(self): + prog = self.helper.main_program + parent_idx = prog.current_block().parent_idx + assert parent_idx >= 0 + parent_block = prog.block(parent_idx) + return parent_block + + def complete_op(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + + params, grads = self.get_params_and_grads() + param_names = [p.name for p in params] + grad_names = [g.name for g in grads] + parent_block.append_op( + type='recv', + inputs={}, + outputs={}, + attrs={ + 'endpoint': self.endpoint, + 'Fanin': self.fan_in, + 'ParamList': param_names, + 'GradList': grad_names, + 'OptimizeBlock': current_block + }) + + +def Send(endpoints, send_vars, get_vars): + """ + Send layer + + Args: + endpoints: comma seperated IP:PORT pairs in the order + of send_vars to send + send_vars: vars to send + get_vars: vars to get from server after send completes. + + Send variables to the server side, and get vars from server + side when server have finished running server side program. + """ + assert (type(send_vars) == list) + assert (type(get_vars) == list) + + epmap = endpoints.split(",") + endpoints = list(set(epmap)) + + helper = LayerHelper("Send", **locals()) + helper.append_op( + type="send", + inputs={"X": send_vars}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, + "epmap": epmap}) diff --git a/python/paddle/v2/fluid/tests/CMakeLists.txt b/python/paddle/v2/fluid/tests/CMakeLists.txt index 83053160820a70bb5e54f721c0d7b881c5765004..628ce60b406d880d961d705a6abd2b5236fb1c8c 100644 --- a/python/paddle/v2/fluid/tests/CMakeLists.txt +++ b/python/paddle/v2/fluid/tests/CMakeLists.txt @@ -1,5 +1,10 @@ file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") + +if(NOT WITH_DISTRIBUTE) + list(REMOVE_ITEM TEST_OPS test_recv_op) +endif(NOT WITH_DISTRIBUTE) + foreach(src ${TEST_OPS}) py_test(${src} SRCS ${src}.py) endforeach() diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py new file mode 100644 index 0000000000000000000000000000000000000000..5c4cec028d354b99d6203281ec4c727d7e3eceac --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -0,0 +1,68 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle.v2.fluid as fluid +import paddle.v2.fluid.layers as layers +import numpy +from multiprocessing import Process +import os, sys + + +class TestRecvOp(unittest.TestCase): + def test_send(self): + # Run init_serv in a thread + place = fluid.CPUPlace() + p = Process(target=self.init_serv, args=(place, )) + p.daemon = True + p.start() + self.init_client(place) + # FIXME(typhoonzero): find a way to gracefully shutdown the server. + os.system("kill -9 %d" % p.pid) + p.join() + + def init_serv(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name="X", + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) + serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False) + with serv.do(): + o = layers.scale(x=x, scale=10.0) + main.global_block().create_var( + name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + exe = fluid.Executor(place) + exe.run(main) + + def init_client(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name='X', + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) + layers.Send("127.0.0.1:6174", [x], [x]) + exe = fluid.Executor(place) + exe.run(main) + + +if __name__ == "__main__": + unittest.main()