diff --git a/paddle/framework/operator.h b/paddle/framework/operator.h index b592eea1b96113a8cbcb0e137890927cfcc22670..03cd578e9b98d54aa6c5ed9936999ab0b6edd8da 100644 --- a/paddle/framework/operator.h +++ b/paddle/framework/operator.h @@ -84,6 +84,9 @@ class OperatorBase { /// Net will call this function to Run an op. virtual void Run(const Scope& scope, const platform::Place& place) const = 0; + // FIXME(typhoonzero): this is only used for recv_op to stop event_loop. + virtual void Stop() {} + virtual bool IsNetOp() const { return false; } virtual bool SupportGPU() const { return false; } diff --git a/paddle/operators/detail/send_recv_impl.h b/paddle/operators/detail/send_recv_impl.h index eec9dd38d188247cba4da2a377038a28c847e40e..cab2b49aa8692688a6802b5606aa9d9145724c77 100644 --- a/paddle/operators/detail/send_recv_impl.h +++ b/paddle/operators/detail/send_recv_impl.h @@ -62,6 +62,8 @@ class SendRecvServerImpl final : public SendRecvService::Service { const TensorWithName Get() { return this->var_recv_queue_.Pop(); } + void Push(const TensorWithName &msg) { this->var_recv_queue_.Push(msg); } + private: // received variable from RPC, operators fetch variable from this queue. SimpleBlockQueue var_recv_queue_; diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 7712d98df50d9f07f14eb8dc7e7008a8ffa34553..f599bf984173a8ca873eda9fe72201eb9cd97724 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -1,16 +1,16 @@ /* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. */ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ #include #include @@ -28,6 +28,8 @@ #include "paddle/operators/detail/send_recv_impl.h" #include "paddle/operators/detail/simple_block_queue.h" +#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV" + namespace paddle { namespace operators { @@ -57,7 +59,12 @@ class RecvOp : public framework::OperatorBase { } } - virtual ~RecvOp() { + void Stop() override { + detail::TensorWithName term_msg; + term_msg.first = LISTEN_TERMINATE_MESSAGE; + LOG(ERROR) << "push term msg"; + rpc_service_->Push(term_msg); + LOG(ERROR) << "push term msg over"; rpc_server_->Shutdown(); server_thread_->join(); } @@ -83,13 +90,19 @@ class RecvOp : public framework::OperatorBase { size_t param_count = param_list.size(); rpc_service_->Reset(); // TODO(typhoonzero): change this to a while_op for every cluster-batch. - while (true) { + bool exit_flag = false; + while (!exit_flag) { // Get from multiple trainers, we don't care about order in which // the gradient arrives, just add suffix 0~n then average the gradient. for (size_t i = 0; i < param_count * trainer_count; ++i) { // blocking get one var from client. const detail::TensorWithName &v = rpc_service_->Get(); auto grad_var_name = v.first; + LOG(ERROR) << "recved varname" << grad_var_name; + if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { + exit_flag = true; + break; + } auto it = std::find(grad_list.begin(), grad_list.end(), grad_var_name); std::string param_var_name; if (it != grad_list.end()) { @@ -114,8 +127,11 @@ class RecvOp : public framework::OperatorBase { auto *tensor = var->GetMutable(); // FIXME(typhoonzero): do not copy platform::DeviceContextPool &pool = platform::DeviceContextPool::Get(); - auto &dev_ctx = *pool.Borrow(place); - framework::CopyFrom(v.second, place, dev_ctx, tensor); + auto &dev_ctx = *pool.Borrow(dev_place); + framework::CopyFrom(v.second, dev_place, dev_ctx, tensor); + } + if (exit_flag) { + break; } rpc_service_->Reset(); @@ -123,7 +139,7 @@ class RecvOp : public framework::OperatorBase { framework::proto::ProgramDesc program_desc; program_desc.ParseFromString(program_str); framework::ProgramDesc program(program_desc); - framework::Executor executor(place); + framework::Executor executor(dev_place); // Run sub graph to get optimized tensor try { executor.Run(program, &recv_scope, 0, /*global_block*/ diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index 3ad5df5e4a210c4702675708755c6a22f031c150..c719f802577aac1d188d797f59c210f6ba6f24c8 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -43,8 +43,9 @@ class SendOp : public framework::OperatorBase { } void Run(const framework::Scope &scope, - const platform::DeviceContext &dev_ctx) const override { + const platform::Place &dev_place) const override { auto ins = Inputs("X"); + auto outs = Outputs("Out"); std::vector epmap = Attr>("epmap"); // TODO(typhoonzero): use async calls to send multiple variable asyncly. for (size_t i = 0; i < ins.size(); ++i) { @@ -55,10 +56,10 @@ class SendOp : public framework::OperatorBase { } // TODO(typhoonzero): support async optimization client_map_[epmap[0]]->Wait(); - for (size_t i = 0; i < ins.size(); ++i) { - bool ret = client_map_[epmap[i]]->GetVariable(scope, ins[i]); + for (size_t i = 0; i < outs.size(); ++i) { + bool ret = client_map_[epmap[i]]->GetVariable(scope, outs[i]); if (!ret) { - LOG(ERROR) << "GetVariable error: " << ins[i]; + LOG(ERROR) << "GetVariable error: " << outs[i]; } } } @@ -73,6 +74,8 @@ class SendOpMaker : public framework::OpProtoAndCheckerMaker { SendOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("X", "(Tensor) Input tensor to be send").AsDuplicable(); + AddOutput("Out", "(Tensor) Output tensor to get from server") + .AsDuplicable(); AddComment(R"DOC( Recv operator @@ -80,11 +83,13 @@ This operator will recv tensor from send_op )DOC"); AddAttr>("endpoints", "(string vector, default 127.0.0.1:6164)" - "Server endpoints to send variables to."); + "Server endpoints to send variables to.") + .SetDefault({}); AddAttr>("epmap", "(string vector, default 127.0.0.1:6164)" "Server endpoints in the order of input " - "variables for mapping"); + "variables for mapping") + .SetDefault({}); } }; diff --git a/paddle/operators/send_recv_op_benchmark.cc b/paddle/operators/send_recv_op_benchmark.cc deleted file mode 100644 index 5aa81f051cd93f83302f36a4ccf7e8aa7c6d24aa..0000000000000000000000000000000000000000 --- a/paddle/operators/send_recv_op_benchmark.cc +++ /dev/null @@ -1,120 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. */ - -// TODO(typhoonzero): add python bindings for this test as -// a RemoteOptimizer. - -#include -#include -#include - -#include "gtest/gtest.h" -#include "paddle/framework/op_registry.h" -#include "paddle/framework/operator.h" -#include "paddle/framework/program_desc.h" - -USE_NO_KERNEL_OP(send); -USE_NO_KERNEL_OP(recv); -USE_OP(sum); - -// global for simplicity. -std::unique_ptr recv_op; -int benchmark_count = 1000; -// FIXME(typhoonzero): protobuf message size limits the maximum tensor size -int mat_size = 512; - -void InitTensorsInScope(paddle::framework::Scope &scope, - paddle::platform::CPUPlace &place) { - paddle::platform::CPUDeviceContext ctx(place); - auto var = scope.Var("X"); - auto tensor = var->GetMutable(); - tensor->Resize({mat_size, mat_size}); - float *expect = tensor->mutable_data(place); - for (int64_t i = 0; i < tensor->numel(); ++i) { - expect[i] = static_cast(i) / 1000.0f; - } - - auto out_var = scope.Var("Out"); - auto out_tensor = out_var->GetMutable(); - out_tensor->Resize({mat_size, mat_size}); - out_tensor->mutable_data(place); // allocate -} - -void AddOp(const std::string &type, - const paddle::framework::VariableNameMap &inputs, - const paddle::framework::VariableNameMap &outputs, - paddle::framework::AttributeMap attrs, - paddle::framework::BlockDescBind *block) { - // insert output - for (auto kv : outputs) { - for (auto v : kv.second) { - auto var = block->Var(v); - var->SetDataType(paddle::framework::DataType::FP32); - } - } - - // insert op - auto op = block->AppendOp(); - op->SetType(type); - for (auto &kv : inputs) { - op->SetInput(kv.first, kv.second); - } - for (auto &kv : outputs) { - op->SetOutput(kv.first, kv.second); - } - op->SetAttrMap(attrs); -} - -void StartServerNet() { - paddle::framework::Scope scope; - paddle::platform::CPUPlace place; - InitTensorsInScope(scope, place); - - // sub program run in recv_op, for simple test we use sum - paddle::framework::ProgramDescBind program; - paddle::framework::BlockDescBind *block = program.MutableBlock(0); - // X for server side tensors, RX for received tensers, must be of same shape. - AddOp("sum", {{"X", {"X", "RX"}}}, {{"Out", {"Out"}}}, {}, block); - - paddle::framework::AttributeMap attrs; - attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); - attrs.insert({"OptimizeBlock", block}); - recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"RX"}}}, - {{"Out", {"Out"}}}, attrs); - paddle::platform::CPUDeviceContext ctx(place); - recv_op->Run(scope, ctx); -} - -TEST(SendRecvBenchmark, CPU) { - std::thread server_thread(StartServerNet); - sleep(5); // wait server to start - // local net - paddle::framework::Scope scope; - paddle::platform::CPUPlace place; - InitTensorsInScope(scope, place); - - paddle::framework::AttributeMap attrs; - attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); - - auto send_op = paddle::framework::OpRegistry::CreateOp( - "send", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - paddle::platform::CPUDeviceContext ctx(place); - - for (int i = 0; i < benchmark_count; ++i) { - send_op->Run(scope, ctx); - } - - recv_op.reset(); // dtor can shutdown and join server thread. - server_thread.join(); -} diff --git a/paddle/operators/send_recv_op_test.cc b/paddle/operators/send_recv_op_test.cc index 69a3adae9b286941ee17e24dbf9ddc10e00d8c68..383509aa1acf00aa5c565435bcea8bef37b0d769 100644 --- a/paddle/operators/send_recv_op_test.cc +++ b/paddle/operators/send_recv_op_test.cc @@ -13,6 +13,7 @@ limitations under the License. */ #include +#include #include #include @@ -83,22 +84,19 @@ void StartServerNet() { paddle::framework::ProgramDesc program; paddle::framework::BlockDesc *block = program.MutableBlock(0); // X for server side tensors, RX for received tensers, must be of same shape. - AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block); + AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"x0"}}}, {}, block); paddle::framework::AttributeMap attrs; attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); + attrs.insert({"ParamList", std::vector({"x0"})}); + attrs.insert({"GradList", std::vector({"x1"})}); std::string program_proto; PADDLE_ENFORCE(program.Proto()->SerializeToString(&program_proto)); attrs.insert({"OptimizeProgram", program_proto}); - recv_op = paddle::framework::OpRegistry::CreateOp( - "recv", {{"RX", {"x0", "x1"}}}, {{"Out", {"Out"}}}, attrs); - paddle::platform::CPUDeviceContext ctx(place); - while (1) { - recv_op->Run(scope, ctx); - // run once - break; - } + recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"x1"}}}, + {}, attrs); + recv_op->Run(scope, place); } TEST(SendRecvOp, CPU) { @@ -110,25 +108,25 @@ TEST(SendRecvOp, CPU) { InitTensorsInScope(scope, place); paddle::framework::AttributeMap attrs; - attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); - + attrs.insert({"endpoints", std::vector({"127.0.0.1:6174"})}); + attrs.insert({"epmap", std::vector({"127.0.0.1:6174"})}); auto send_op = paddle::framework::OpRegistry::CreateOp( - "send", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, attrs); - paddle::platform::CPUDeviceContext ctx(place); - send_op->Run(scope, ctx); + "send", {{"X", {"x1"}}}, {{"Out", {"x0"}}}, attrs); + send_op->Run(scope, place); - auto in_var = scope.Var("x0"); + auto in_var = scope.Var("x1"); auto tensor = in_var->GetMutable(); float *expected = tensor->data(); - - auto out_var = scope.Var("Out"); + auto out_var = scope.Var("x0"); auto target = out_var->GetMutable(); - // send fail cause output is none. + // x1 * 2 == x0 EXPECT_NE(target->memory_size(), size_t(0)); float *actual = target->data(); for (int64_t i = 0; i < target->numel(); ++i) { EXPECT_EQ(expected[i] * 2, actual[i]); } - recv_op.reset(); // dtor can shutdown and join server thread. + + recv_op->Stop(); server_thread.join(); + // recv_op.reset(); } diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 50364c64bec46aabc7be4f4b4370a3ad5b0eb07c..111937f59c3ab05e5917a79ca7e1f81f59747fc3 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -141,16 +141,18 @@ class DistributeTranspiler: self.param_grad_map = split_method(params_and_grads, pserver_endpoints) send_op_ordered_inputs = [] + send_op_ordered_outputs = [] epmap = [] for ep, v in self.param_grad_map.iteritems(): send_op_ordered_inputs.extend(v["grads"]) + send_op_ordered_outputs.extend(v["params"]) for i in v["grads"]: epmap.append(ep) send_op = program.global_block().append_op( type="send", inputs={"X": send_op_ordered_inputs }, # inputs is a list of tensors to be send - outputs={}, + outputs={"Out": send_op_ordered_outputs}, attrs={"endpoints": pserver_endpoints, "epmap": epmap})