diff --git a/paddle/operators/CMakeLists.txt b/paddle/operators/CMakeLists.txt index b2e73b6f23bd36e29be4e97237a269d12b92bd90..e903f43ba69ee9e28b3a03e8921a41ffa81a2542 100644 --- a/paddle/operators/CMakeLists.txt +++ b/paddle/operators/CMakeLists.txt @@ -122,9 +122,11 @@ if(WITH_DISTRIBUTE) set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) op_library(recv_op DEPS ${DISTRIBUTE_DEPS}) set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op recv_op sum_op executor) + op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS}) + set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op listen_and_serv_op sum_op executor) else() - set(DEPS_OPS ${DEPS_OPS} send_op recv_op) + set(DEPS_OPS ${DEPS_OPS} send_op recv_op listen_and_serv_op) endif() op_library(cond_op DEPS framework_proto tensor net_op) diff --git a/paddle/operators/listen_and_serv_op.cc b/paddle/operators/listen_and_serv_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..5745938ed9b31e596acac844fa1b16793340838b --- /dev/null +++ b/paddle/operators/listen_and_serv_op.cc @@ -0,0 +1,207 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include +#include + +#include + +#include "paddle/framework/executor.h" +#include "paddle/framework/framework.pb.h" +#include "paddle/framework/lod_tensor.h" +#include "paddle/framework/op_registry.h" +#include "paddle/framework/proto_desc.h" +#include "paddle/operators/detail/grpc_server.h" +#include "paddle/operators/detail/sendrecvop_utils.h" +#include "paddle/operators/detail/simple_block_queue.h" +#include "paddle/string/printf.h" + +namespace paddle { +namespace operators { + +constexpr char kOptimizeBlock[] = "OptimizeBlock"; + +void RunServer(std::shared_ptr service) { + service->RunSyncUpdate(); + VLOG(4) << "RunServer thread end"; +} + +static void CreateTensorFromMessageType(framework::Variable *var, + sendrecv::VarType var_type) { + if (var_type == sendrecv::VarType::LOD_TENSOR) { + var->GetMutable(); + } else if (var_type == sendrecv::VarType::SELECTED_ROWS) { + var->GetMutable(); + } else { + PADDLE_THROW( + "VariableMessage type %d is not in " + "[LoDTensor, SelectedRows]", + var_type); + } +} + +class ListenAndServOp : public framework::OperatorBase { + public: + ListenAndServOp(const std::string &type, + const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : OperatorBase(type, inputs, outputs, attrs) { + if (!rpc_service_) { + std::string endpoint = Attr("endpoint"); + rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); + server_thread_.reset(new std::thread(RunServer, rpc_service_)); + } + } + + void Stop() override { + detail::MessageWithName term_msg; + term_msg.first = LISTEN_TERMINATE_MESSAGE; + rpc_service_->Push(term_msg); + rpc_service_->ShutDown(); + server_thread_->join(); + } + + std::string GetGradVarNameForTrainer(const std::string &varname) const { + if (grads_counter_.find(varname) == grads_counter_.end()) { + grads_counter_[varname] = 0; + } + return string::Sprintf("%s.trainer_%d", varname, grads_counter_[varname]++); + } + + void Run(const framework::Scope &scope, + const platform::Place &dev_place) const override { + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &dev_ctx = *pool.Get(dev_place); + framework::Scope &recv_scope = scope.NewScope(); + + // FIXME(Yancey1989): initialize rpc server with lazy mode. + rpc_service_->SetScope(&recv_scope); + rpc_service_->SetDevCtx(&dev_ctx); + auto param_list = Attr>("ParamList"); + auto grad_list = Attr>("GradList"); + auto fan_in = Attr("Fanin"); + + auto *block = Attr(kOptimizeBlock); + auto *program = block->Program(); + framework::Executor executor(dev_place); + + // TODO(typhoonzero): change this to a while_op for every cluster-batch. + bool exit_flag = false; + while (!exit_flag) { + // Get from multiple trainers, we don't care about the order in which + // the gradients arrives, just add suffix 0~n and merge the gradient. + rpc_service_->SetCond(0); + size_t recv_var_cnt = 0; + int batch_barrier = 0; + while (batch_barrier != fan_in) { + const detail::MessageWithName &v = rpc_service_->Get(); + auto grad_var_name = v.first; + if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { + LOG(INFO) << "received terminate message and exit"; + exit_flag = true; + break; + } else if (grad_var_name == BATCH_BARRIER_MESSAGE) { + VLOG(3) << "recv batch barrier message"; + batch_barrier++; + continue; + } else { + // receive a variable + recv_var_cnt++; + auto it = + std::find(grad_list.begin(), grad_list.end(), grad_var_name); + std::string param_var_name; + if (it != grad_list.end()) { + param_var_name = param_list[it - grad_list.begin()]; + } else { + LOG(ERROR) << "grad has no paired param:" << grad_var_name; + } + VLOG(3) << "received grad: " << grad_var_name + << " updating param: " << param_var_name; + + if (fan_in > 1) { + grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); + } + auto *var = recv_scope.FindVar(grad_var_name); + if (var == nullptr) { + LOG(ERROR) << "Can not find server side var: " << grad_var_name; + PADDLE_THROW("Can not find server side var"); + } + detail::DeserializeFromMessage(v.second, dev_ctx, var); + } + } + VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; + // TODO(Yancey1989): merge SelectedRows variables here + if (exit_flag) { + rpc_service_->ShutDown(); + } + + try { + executor.Run(*program, &recv_scope, block->ID(), /*global_block*/ + false /*create_local_scope*/, false /*create_vars*/); + } catch (std::exception &e) { + LOG(ERROR) << "run sub program error " << e.what(); + } + rpc_service_->SetCond(1); + rpc_service_->WaitClientGet(recv_var_cnt); + grads_counter_.clear(); + } // while(true) + } + + protected: + std::shared_ptr rpc_service_; + std::shared_ptr server_thread_; + mutable std::unordered_map grads_counter_; +}; + +class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { + public: + ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddComment(R"DOC( +ListenAndServ operator + +This operator will start a RPC server which can receive variables +from send_op and send back variables to recv_op. +)DOC"); + AddAttr("endpoint", + "(string, default 127.0.0.1:6164)" + "IP address to listen on.") + .SetDefault("127.0.0.1:6164") + .AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); + AddAttr(kOptimizeBlock, + "BlockID to run on server side."); + AddAttr>( + "ParamList", "type list of string", + "grad->param name mapping to find which parameters to optimize.") + .SetDefault({}); + AddAttr>( + "GradList", "type list of string", + "grad->param name mapping to find which parameters to optimize.") + .SetDefault({}); + AddAttr("Fanin", "type int", + "Number of trainers in the current cluster job") + .SetDefault(1); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(listen_and_serv, ops::ListenAndServOp, + ops::ListenAndServOpMaker); \ No newline at end of file diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 49e1eb3402482e7ff12d9b2b640f7271a80cf6d9..ba71094219f37eb7a3c2df68be986cec7afbf7ab 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -12,187 +12,60 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include -#include #include -#include -#include - -#include "paddle/framework/executor.h" +#include "paddle/framework/data_type.h" #include "paddle/framework/framework.pb.h" #include "paddle/framework/lod_tensor.h" #include "paddle/framework/op_registry.h" -#include "paddle/framework/proto_desc.h" -#include "paddle/operators/detail/grpc_server.h" -#include "paddle/operators/detail/sendrecvop_utils.h" -#include "paddle/operators/detail/simple_block_queue.h" -#include "paddle/string/printf.h" + +#include +#include "paddle/operators/detail/grpc_client.h" namespace paddle { namespace operators { -constexpr char kOptimizeBlock[] = "OptimizeBlock"; - -void RunServer(std::shared_ptr service) { - service->RunSyncUpdate(); - VLOG(4) << "RunServer thread end"; -} - -static void CreateTensorFromMessageType(framework::Variable *var, - sendrecv::VarType var_type) { - if (var_type == sendrecv::VarType::LOD_TENSOR) { - var->GetMutable(); - } else if (var_type == sendrecv::VarType::SELECTED_ROWS) { - var->GetMutable(); - } else { - PADDLE_THROW( - "VariableMessage type %d is not in " - "[LoDTensor, SelectedRows]", - var_type); - } -} - class RecvOp : public framework::OperatorBase { public: - RecvOp(const std::string &type, const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) - : OperatorBase(type, inputs, outputs, attrs) { - if (!rpc_service_) { - std::string endpoint = Attr("endpoint"); - rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); - server_thread_.reset(new std::thread(RunServer, rpc_service_)); - } - } - - void Stop() override { - detail::MessageWithName term_msg; - term_msg.first = LISTEN_TERMINATE_MESSAGE; - rpc_service_->Push(term_msg); - rpc_service_->ShutDown(); - server_thread_->join(); - } - - std::string GetGradVarNameForTrainer(const std::string &varname) const { - if (grads_counter_.find(varname) == grads_counter_.end()) { - grads_counter_[varname] = 0; + RecvOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void Run(const framework::Scope& scope, + const platform::Place& place) const override { + auto outs = Outputs("Out"); + std::vector epmap = Attr>("epmap"); + + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& ctx = *pool.Get(place); + + for (size_t i = 0; i < outs.size(); i++) { + VLOG(3) << "getting " << outs[i]; + client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]); } - return string::Sprintf("%s.trainer_%d", varname, grads_counter_[varname]++); + PADDLE_ENFORCE(client_.Wait()); } - void Run(const framework::Scope &scope, - const platform::Place &dev_place) const override { - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto &dev_ctx = *pool.Get(dev_place); - framework::Scope &recv_scope = scope.NewScope(); - - // FIXME(Yancey1989): initialize rpc server with laze mode. - rpc_service_->SetScope(&recv_scope); - rpc_service_->SetDevCtx(&dev_ctx); - auto param_list = Attr>("ParamList"); - auto grad_list = Attr>("GradList"); - auto fan_in = Attr("Fanin"); - - auto *block = Attr(kOptimizeBlock); - auto *program = block->Program(); - framework::Executor executor(dev_place); - - // TODO(typhoonzero): change this to a while_op for every cluster-batch. - bool exit_flag = false; - while (!exit_flag) { - // Get from multiple trainers, we don't care about the order in which - // the gradients arrives, just add suffix 0~n and merge the gradient. - rpc_service_->SetCond(0); - size_t recv_var_cnt = 0; - int batch_barrier = 0; - while (batch_barrier != fan_in) { - const detail::MessageWithName &v = rpc_service_->Get(); - auto grad_var_name = v.first; - if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { - LOG(INFO) << "received terminate message and exit"; - exit_flag = true; - break; - } else if (grad_var_name == BATCH_BARRIER_MESSAGE) { - VLOG(3) << "recv batch barrier message"; - batch_barrier++; - continue; - } else { - // receive a variable - recv_var_cnt++; - auto it = - std::find(grad_list.begin(), grad_list.end(), grad_var_name); - std::string param_var_name; - if (it != grad_list.end()) { - param_var_name = param_list[it - grad_list.begin()]; - } else { - LOG(ERROR) << "grad has no paired param:" << grad_var_name; - } - VLOG(3) << "received grad: " << grad_var_name - << " updating param: " << param_var_name; - - if (fan_in > 1) { - grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); - } - auto *var = recv_scope.FindVar(grad_var_name); - if (var == nullptr) { - LOG(ERROR) << "Can not find server side var: " << grad_var_name; - PADDLE_THROW("Can not find server side var"); - } - detail::DeserializeFromMessage(v.second, dev_ctx, var); - } - } - VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; - // TODO(Yancey1989): merge SelectedRows variables here - if (exit_flag) { - break; - } - - try { - executor.Run(*program, &recv_scope, block->ID(), /*global_block*/ - false /*create_local_scope*/, false /*create_vars*/); - } catch (std::exception &e) { - LOG(ERROR) << "run sub program error " << e.what(); - } - rpc_service_->SetCond(1); - rpc_service_->WaitClientGet(recv_var_cnt); - grads_counter_.clear(); - } // while(true) - } - - protected: - std::shared_ptr rpc_service_; - std::shared_ptr server_thread_; - mutable std::unordered_map grads_counter_; + private: + mutable detail::RPCClient client_; }; class RecvOpMaker : public framework::OpProtoAndCheckerMaker { public: - RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) + RecvOpMaker(OpProto* proto, OpAttrChecker* op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { + AddOutput("Out", "(Tensor) Variables to get from server.").AsDuplicable(); AddComment(R"DOC( Recv operator -This operator will recieve tensor from send_op +This operator can get variables from server side. )DOC"); - AddAttr("endpoint", - "(string, default 127.0.0.1:6164)" - "IP address to listen on.") - .SetDefault("127.0.0.1:6164") - .AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); - AddAttr( - kOptimizeBlock, "Serialized ProgramDesc string for recv to run."); - AddAttr>( - "ParamList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") - .SetDefault({}); - AddAttr>( - "GradList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") + AddAttr>("epmap", + "(string vector, default 127.0.0.1:6164)" + "Server endpoints in the order of input " + "variables for mapping") .SetDefault({}); - AddAttr("Fanin", "type int", - "Number of trainers in the current cluster job") - .SetDefault(1); } }; diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index be41b527f2289d5d657a58f3cb6d7be725323cd0..291d19ba8bd6113fb69719d5385792ad5cde387d 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -62,11 +62,13 @@ class SendOp : public framework::OperatorBase { } PADDLE_ENFORCE(rpc_client->Wait()); - for (size_t i = 0; i < outs.size(); i++) { - VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; - rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]); + if (outs.size() > 0) { + for (size_t i = 0; i < outs.size(); i++) { + VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; + client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]); + } + PADDLE_ENFORCE(client_.Wait()); } - PADDLE_ENFORCE(rpc_client->Wait()); } }; @@ -85,6 +87,8 @@ Send operator This operator will send tensor to recv_op at the parameter server. )DOC"); + // TODO(typhoonzero): remove this attr generate de-duplicated vector from + // epmap when initializing. AddAttr>("endpoints", "(string vector, default 127.0.0.1:6164)" "Server endpoints to send variables to.") diff --git a/paddle/operators/send_recv_op_test.cc b/paddle/operators/send_recv_op_test.cc index 045a0f5434f339bab345d14881ed05450ce6588d..31527a906d56da54d2571910de627757d708a996 100644 --- a/paddle/operators/send_recv_op_test.cc +++ b/paddle/operators/send_recv_op_test.cc @@ -25,7 +25,7 @@ limitations under the License. */ #include "paddle/string/printf.h" USE_NO_KERNEL_OP(send); -USE_NO_KERNEL_OP(recv); +USE_NO_KERNEL_OP(listen_and_serv); USE_OP(sum); namespace f = paddle::framework; @@ -33,7 +33,7 @@ namespace p = paddle::platform; namespace m = paddle::operators::math; // global for simplicity. -std::unique_ptr recv_op; +std::unique_ptr listen_and_serv_op; void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) { p::CPUDeviceContext ctx(place); @@ -120,7 +120,7 @@ void StartServerNet(bool is_sparse) { InitTensorsInScope(scope, place); } - // sub program run in recv_op, for simple test we use sum + // sub program run in listen_and_serv_op, for simple test we use sum f::ProgramDesc program; f::BlockDesc *block = program.MutableBlock(0); // X for server side tensors, RX for received tensers, must be of same shape. @@ -131,8 +131,9 @@ void StartServerNet(bool is_sparse) { attrs.insert({"ParamList", std::vector({"Out"})}); attrs.insert({"GradList", std::vector({"x1"})}); attrs.insert({"OptimizeBlock", block}); - recv_op = f::OpRegistry::CreateOp("recv", {{"RX", {"x1"}}}, {}, attrs); - recv_op->Run(scope, place); + listen_and_serv_op = + f::OpRegistry::CreateOp("listen_and_serv", {}, {}, attrs); + listen_and_serv_op->Run(scope, place); } TEST(SendRecvOp, CPUDense) { @@ -161,9 +162,9 @@ TEST(SendRecvOp, CPUDense) { for (int64_t i = 0; i < target->numel(); ++i) { EXPECT_EQ(expected[i] * 2, actual[i]); } - recv_op->Stop(); + listen_and_serv_op->Stop(); server_thread.join(); - recv_op.reset(nullptr); + listen_and_serv_op.reset(nullptr); } TEST(SendRecvOp, CPUSparse) { @@ -200,7 +201,7 @@ TEST(SendRecvOp, CPUSparse) { EXPECT_EQ(expect_value->mutable_data(place)[i], actual->mutable_data(place)[i]); } - recv_op->Stop(); + listen_and_serv_op->Stop(); server_thread.join(); - recv_op.reset(); + listen_and_serv_op.reset(); } diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index a4464a281aae714d79a531ec8a2cf793d6330a12..121b407cae41fa477843b7252ebacc9053d5f7aa 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -478,9 +478,9 @@ class DistributeTranspiler: else: self._append_pserver_non_opt_ops(optimize_sub_program, pserver_program, opt_op) - # Append the recv op + # Append the listen_and_serv op pserver_program.global_block().append_op( - type="recv", + type="listen_and_serv", inputs={}, outputs={}, attrs={ diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index 8bf545e2ecc3939b00ba25d003a6b3887a54f860..ae98e299a4838db8b7089e09683c33c940cfde3d 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -489,7 +489,7 @@ class Operator(object): no_kernel_op_set = { 'feed', 'fetch', 'save', 'load', 'recurrent', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', - 'recv', 'parallel_do' + 'recv', 'listen_and_serv', 'parallel_do' } if type not in no_kernel_op_set: self.desc.infer_var_type(self.block.desc) diff --git a/python/paddle/v2/fluid/layers/io.py b/python/paddle/v2/fluid/layers/io.py index b7b2cf2296cc8868dd0b5eb6cd6d58b9ae795d5d..85e44a0e5149bd36f2787d9f2d516dbe4abdbb2e 100644 --- a/python/paddle/v2/fluid/layers/io.py +++ b/python/paddle/v2/fluid/layers/io.py @@ -108,7 +108,7 @@ class ListenAndServ(object): """ def __init__(self, endpoint, fan_in=1, optimizer_mode=True): - self.helper = LayerHelper("recv") + self.helper = LayerHelper("listen_and_serv") self.inputs = [] self.outputs = [] self.endpoint = endpoint @@ -158,7 +158,7 @@ class ListenAndServ(object): param_names = [p.name for p in params] grad_names = [g.name for g in grads] parent_block.append_op( - type='recv', + type='listen_and_serv', inputs={}, outputs={}, attrs={ @@ -196,3 +196,31 @@ def Send(endpoints, send_vars, get_vars): outputs={"Out": get_vars}, attrs={"endpoints": endpoints, "epmap": epmap}) + + +def Recv(endpoints, get_vars): + """ + Recv layer + + Args: + endpoints: comma seperated IP:PORT pairs in the order + of send_vars to send + send_vars: vars to send + get_vars: vars to get from server after send completes. + + Send variables to the server side, and get vars from server + side when server have finished running server side program. + """ + assert (type(send_vars) == list) + assert (type(get_vars) == list) + + epmap = endpoints.split(",") + endpoints = list(set(epmap)) + + helper = LayerHelper("Recv", **locals()) + helper.append_op( + type="recv", + inputs={"X": get_vars}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, + "epmap": epmap}) diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py index 5c4cec028d354b99d6203281ec4c727d7e3eceac..3a02b882410fe896cd2add03060127a01cbdaa38 100644 --- a/python/paddle/v2/fluid/tests/test_recv_op.py +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -19,6 +19,7 @@ import paddle.v2.fluid.layers as layers import numpy from multiprocessing import Process import os, sys +import time class TestRecvOp(unittest.TestCase): @@ -28,6 +29,7 @@ class TestRecvOp(unittest.TestCase): p = Process(target=self.init_serv, args=(place, )) p.daemon = True p.start() + time.sleep(1) self.init_client(place) # FIXME(typhoonzero): find a way to gracefully shutdown the server. os.system("kill -9 %d" % p.pid)