diff --git a/paddle/framework/operator.h b/paddle/framework/operator.h index 55eed57e6665515aec36dab4be8028dc75dbf7f3..d0a9b643d565d6651fd7ec0b515f088362852ba3 100644 --- a/paddle/framework/operator.h +++ b/paddle/framework/operator.h @@ -89,6 +89,9 @@ class OperatorBase { /// Net will call this function to Run an op. virtual void Run(const Scope& scope, const platform::Place& place) const = 0; + // FIXME(typhoonzero): this is only used for recv_op to stop event_loop. + virtual void Stop() {} + virtual bool IsNetOp() const { return false; } virtual bool SupportGPU() const { return false; } diff --git a/paddle/operators/detail/send_recv_impl.h b/paddle/operators/detail/send_recv_impl.h index 07f0d8e3ee5679ad8d0607d47d65d09872e846a7..47f730f7ae897096fbdd23a55252448cf6655fb4 100644 --- a/paddle/operators/detail/send_recv_impl.h +++ b/paddle/operators/detail/send_recv_impl.h @@ -62,6 +62,8 @@ class SendRecvServerImpl final : public SendRecvService::Service { const TensorWithName Get() { return this->var_recv_queue_.Pop(); } + void Push(const TensorWithName &msg) { this->var_recv_queue_.Push(msg); } + private: // received variable from RPC, operators fetch variable from this queue. SimpleBlockQueue var_recv_queue_; diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 29e1fc1986cf3b33417a68f3c23812fa0c3c227e..322f8571cfd4341f064e8f9df512a8d74b91ed9d 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -28,6 +28,8 @@ limitations under the License. */ #include "paddle/operators/detail/send_recv_impl.h" #include "paddle/operators/detail/simple_block_queue.h" +#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV" + namespace paddle { namespace operators { @@ -39,7 +41,7 @@ void RunServer(Server **rpc_server, builder.RegisterService(service.get()); std::unique_ptr server(builder.BuildAndStart()); *rpc_server = server.get(); - LOG(INFO) << "Server listening on " << server_address << std::endl; + LOG(INFO) << "Server listening on " << server_address; server->Wait(); } @@ -57,7 +59,10 @@ class RecvOp : public framework::OperatorBase { } } - virtual ~RecvOp() { + void Stop() override { + detail::TensorWithName term_msg; + term_msg.first = LISTEN_TERMINATE_MESSAGE; + rpc_service_->Push(term_msg); rpc_server_->Shutdown(); server_thread_->join(); } @@ -83,13 +88,18 @@ class RecvOp : public framework::OperatorBase { size_t param_count = param_list.size(); rpc_service_->Reset(); // TODO(typhoonzero): change this to a while_op for every cluster-batch. - while (true) { + bool exit_flag = false; + while (!exit_flag) { // Get from multiple trainers, we don't care about order in which // the gradient arrives, just add suffix 0~n then average the gradient. for (size_t i = 0; i < param_count * trainer_count; ++i) { // blocking get one var from client. const detail::TensorWithName &v = rpc_service_->Get(); auto grad_var_name = v.first; + if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { + exit_flag = true; + break; + } auto it = std::find(grad_list.begin(), grad_list.end(), grad_var_name); std::string param_var_name; if (it != grad_list.end()) { @@ -114,8 +124,11 @@ class RecvOp : public framework::OperatorBase { auto *tensor = var->GetMutable(); // FIXME(typhoonzero): do not copy platform::DeviceContextPool &pool = platform::DeviceContextPool::Get(); - auto &dev_ctx = *pool.Borrow(place); - framework::CopyFrom(v.second, place, dev_ctx, tensor); + auto &dev_ctx = *pool.Borrow(dev_place); + framework::CopyFrom(v.second, dev_place, dev_ctx, tensor); + } + if (exit_flag) { + break; } rpc_service_->Reset(); @@ -123,7 +136,7 @@ class RecvOp : public framework::OperatorBase { framework::proto::ProgramDesc program_desc; program_desc.ParseFromString(program_str); framework::ProgramDesc program(program_desc); - framework::Executor executor(place); + framework::Executor executor(dev_place); // Run sub graph to get optimized tensor try { executor.Run(program, &recv_scope, 0, /*global_block*/ diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index e16c985b4d97cd3553831b6e6615ad0e9106bf5c..6e829386832a875769669b02e6f41755618d348a 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -41,9 +41,11 @@ class SendOp : public framework::OperatorBase { grpc::CreateChannel(ep, grpc::InsecureChannelCredentials()))); } } + void Run(const framework::Scope &scope, - const platform::DeviceContext &dev_ctx) const override { + const platform::Place &dev_place) const override { auto ins = Inputs("X"); + auto outs = Outputs("Out"); std::vector epmap = Attr>("epmap"); // TODO(typhoonzero): use async calls to send multiple variable asyncly. for (size_t i = 0; i < ins.size(); ++i) { @@ -54,10 +56,10 @@ class SendOp : public framework::OperatorBase { } // TODO(typhoonzero): support async optimization client_map_[epmap[0]]->Wait(); - for (size_t i = 0; i < ins.size(); ++i) { - bool ret = client_map_[epmap[i]]->GetVariable(scope, ins[i]); + for (size_t i = 0; i < outs.size(); ++i) { + bool ret = client_map_[epmap[i]]->GetVariable(scope, outs[i]); if (!ret) { - LOG(ERROR) << "GetVariable error: " << ins[i]; + LOG(ERROR) << "GetVariable error: " << outs[i]; } } } @@ -72,6 +74,8 @@ class SendOpMaker : public framework::OpProtoAndCheckerMaker { SendOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("X", "(Tensor) Input tensor to be send").AsDuplicable(); + AddOutput("Out", "(Tensor) Output tensor to get from server") + .AsDuplicable(); AddComment(R"DOC( Recv operator @@ -79,11 +83,13 @@ This operator will recv tensor from send_op )DOC"); AddAttr>("endpoints", "(string vector, default 127.0.0.1:6164)" - "Server endpoints to send variables to."); + "Server endpoints to send variables to.") + .SetDefault({}); AddAttr>("epmap", "(string vector, default 127.0.0.1:6164)" "Server endpoints in the order of input " - "variables for mapping"); + "variables for mapping") + .SetDefault({}); } }; diff --git a/paddle/operators/send_recv_op_test.cc b/paddle/operators/send_recv_op_test.cc index e59208cd6191f5fb36a47265966cb1cef7897117..108e2dec6b3eecadd431fd25f9a31ec17a874b6b 100644 --- a/paddle/operators/send_recv_op_test.cc +++ b/paddle/operators/send_recv_op_test.cc @@ -12,9 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -// TODO(typhoonzero): add python bindings for this test as -// a RemoteOptimizer. - #include #include #include @@ -86,18 +83,19 @@ void StartServerNet() { paddle::framework::ProgramDesc program; paddle::framework::BlockDesc *block = program.MutableBlock(0); // X for server side tensors, RX for received tensers, must be of same shape. - AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block); + AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"x0"}}}, {}, block); paddle::framework::AttributeMap attrs; attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); + attrs.insert({"ParamList", std::vector({"x0"})}); + attrs.insert({"GradList", std::vector({"x1"})}); std::string program_proto; PADDLE_ENFORCE(program.Proto()->SerializeToString(&program_proto)); attrs.insert({"OptimizeProgram", program_proto}); - recv_op = paddle::framework::OpRegistry::CreateOp( - "recv", {{"RX", {"x0", "x1"}}}, {{"Out", {"Out"}}}, attrs); - paddle::platform::CPUDeviceContext ctx(place); - recv_op->Run(scope, ctx); + recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"x1"}}}, + {}, attrs); + recv_op->Run(scope, place); } TEST(SendRecvOp, CPU) { @@ -109,25 +107,25 @@ TEST(SendRecvOp, CPU) { InitTensorsInScope(scope, place); paddle::framework::AttributeMap attrs; - attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); - + attrs.insert({"endpoints", std::vector({"127.0.0.1:6174"})}); + attrs.insert({"epmap", std::vector({"127.0.0.1:6174"})}); auto send_op = paddle::framework::OpRegistry::CreateOp( - "send", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, attrs); - paddle::platform::CPUDeviceContext ctx(place); - send_op->Run(scope, ctx); + "send", {{"X", {"x1"}}}, {{"Out", {"x0"}}}, attrs); + send_op->Run(scope, place); - auto in_var = scope.Var("x0"); + auto in_var = scope.Var("x1"); auto tensor = in_var->GetMutable(); float *expected = tensor->data(); - - auto out_var = scope.Var("Out"); + auto out_var = scope.Var("x0"); auto target = out_var->GetMutable(); - // send fail cause output is none. + // x1 * 2 == x0 EXPECT_NE(target->memory_size(), size_t(0)); float *actual = target->data(); for (int64_t i = 0; i < target->numel(); ++i) { EXPECT_EQ(expected[i] * 2, actual[i]); } - recv_op.reset(); // dtor can shutdown and join server thread. + + recv_op->Stop(); server_thread.join(); + // recv_op.reset(); } diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 50364c64bec46aabc7be4f4b4370a3ad5b0eb07c..111937f59c3ab05e5917a79ca7e1f81f59747fc3 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -141,16 +141,18 @@ class DistributeTranspiler: self.param_grad_map = split_method(params_and_grads, pserver_endpoints) send_op_ordered_inputs = [] + send_op_ordered_outputs = [] epmap = [] for ep, v in self.param_grad_map.iteritems(): send_op_ordered_inputs.extend(v["grads"]) + send_op_ordered_outputs.extend(v["params"]) for i in v["grads"]: epmap.append(ep) send_op = program.global_block().append_op( type="send", inputs={"X": send_op_ordered_inputs }, # inputs is a list of tensors to be send - outputs={}, + outputs={"Out": send_op_ordered_outputs}, attrs={"endpoints": pserver_endpoints, "epmap": epmap})