From bcc67401113f04cbd52438d9a861f03725ad9a1a Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 23 Jan 2018 19:32:44 +0800 Subject: [PATCH] WIP python binding of send recv --- paddle/operators/recv_op.cc | 18 ++-- python/paddle/v2/fluid/layers/io.py | 101 +++++++++++++++++++ python/paddle/v2/fluid/tests/test_recv_op.py | 45 +++++++++ 3 files changed, 155 insertions(+), 9 deletions(-) create mode 100644 python/paddle/v2/fluid/tests/test_recv_op.py diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 593c35879a..e3c86966b8 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -49,7 +49,7 @@ static void CreateTensorFromMessageType(framework::Variable *var, var->GetMutable(); } else { PADDLE_THROW( - "VariableMessage type %d is not in " + "VraibleMessage type %d is not in " "[LoDTensor, SelectedRows]", var_type); } @@ -121,17 +121,17 @@ class RecvOp : public framework::OperatorBase { if (it != grad_list.end()) { param_var_name = param_list[it - grad_list.begin()]; } else { - LOG(ERROR) << "grad has no paired param:" << grad_var_name; + LOG(ERROR) << "grad have no paired param:" << grad_var_name; } - VLOG(3) << "received grad: " << grad_var_name + VLOG(3) << "recved grad: " << grad_var_name << " updating param: " << param_var_name; if (fan_in > 1) { grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); } auto *var = recv_scope.FindVar(grad_var_name); if (var == nullptr) { - LOG(ERROR) << "Can not find server side var: " << grad_var_name; - PADDLE_THROW("Can not find server side var"); + LOG(ERROR) << "can not find server side var: " << grad_var_name; + PADDLE_THROW("can not find server side var"); } detail::DeserializeFromMessage(v.second, dev_ctx, var); } @@ -161,11 +161,11 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker { public: RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable(); + // AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable(); AddComment(R"DOC( Recv operator -This operator will recieve tensor from send_op +This operator will recv tensor from send_op )DOC"); AddAttr("endpoint", "(string, default 127.0.0.1:6164)" @@ -176,11 +176,11 @@ This operator will recieve tensor from send_op kOptimizeBlock, "Serialized ProgramDesc string for recv to run."); AddAttr>( "ParamList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") + "grad->param name mapping to find which param to optimize.") .SetDefault({}); AddAttr>( "GradList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") + "grad->param name mapping to find which param to optimize.") .SetDefault({}); AddAttr("Fanin", "type int", "Number of trainers in the current cluster job") diff --git a/python/paddle/v2/fluid/layers/io.py b/python/paddle/v2/fluid/layers/io.py index 9af00e7de5..6a6c561641 100644 --- a/python/paddle/v2/fluid/layers/io.py +++ b/python/paddle/v2/fluid/layers/io.py @@ -74,3 +74,104 @@ def data(name, type=type, stop_gradient=stop_gradient, lod_level=lod_level) + + +class BlockGuardServ(BlockGuard): + """ + BlockGuardServ class. + + BlockGuardServ class is used to create an op with a block in a program. + """ + + def __init__(self, server): + if not (isinstance(server, ListenAndServ)): + raise TypeError("BlockGuardServ takes a ListenAndServ") + super(BlockGuardServ, self).__init__(server.helper.main_program) + self.server = server + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return False + + self.server.complete_op() + return super(BlockGuardServ, self).__exit__(exc_type, exc_val, exc_tb) + + +class ListenAndServ(object): + """ + ListenAndServ class. + + ListenAndServ class is used to wrap listen_and_serv op to create a server + which can receive variables from clients and run a block. + """ + + def __init__(self, endpoint, fan_in=1): + self.helper = LayerHelper("recv", name=name) + self.inputs = [] + self.outputs = [] + self.endpoint = endpoint + self.fan_in = fan_in + + def do(self): + return BlockGuardServ(self) + + def get_params_and_grads(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + # params and grads in the same order. + params = list() + grads = list() + for op in current_block.ops: + # FIXME(typhoonzero): op.inputs is None if it's cloned. + if "Grad" in op.inputs and "Param" in op.inputs: + params.append(op.inputs["Param"].name) + grads.append(op.inputs["Grad"].name) + + return params, grads + + def complete_op(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + + params, grads = self.get_params_and_grads() + parent_block.append_op( + type='recv', + inputs={}, + outputs={}, + attrs={ + 'endpoint': self.endpoint, + 'Fanin': self.fan_in, + 'ParamList': params, + 'GradList': grads, + 'OptimizeBlock': current_block + }) + + +def Send(endpoints, send_vars, get_vars): + """ + Send layer + + Args: + endpoints: comma seperated IP:PORT pairs in the order + of send_vars to send + send_vars: vars to send + get_vars: vars to get from server after send completes. + + Send variables to the server side, and get vars from server + side when server have finished running server side program. + """ + assert (type(send_vars) == list) + assert (type(get_vars) == list) + + epmap = endpoints.split(",") + endpoints = set(epmap) + + helper = LayerHelper("Send", **locals()) + helper.append_op( + type="send", + inputs={"X": send_vars}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, + "epmap": epmap}) diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py new file mode 100644 index 0000000000..fbd182a716 --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -0,0 +1,45 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle.v2.fluid as fluid +import paddle.v2.fluid.layers as layers +import numpy + + +class TestRecvOp(unittest.TestCase): + def run_test(self): + # Run init_serv in a thread + pass + + def init_serv(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data(shape=[32, 32], dtype='float32', name='X') + serv = fluid.ListenAndServ("127.0.0.1:6174") + with serv.do(): + layers.scale(input=x, scale=10) + exe = fluid.Executor(place) + exe.run(main) + + def init_client(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data(shape=[32, 32], dtype='float32', name='X') + i = fluid.initializer.Constant(x=1.0) + i(x, main.global_block()) + layers.Send("127.0.0.1:6174", [x], [x]) + exe = fluid.Executor(place) + exe.run(main) -- GitLab