diff --git a/paddle/operators/detail/CMakeLists.txt b/paddle/operators/detail/CMakeLists.txt index f6bdc63cc2cfae526fe911ee4d989675452d5c5d..571a75c9dcd903672d460f192bf28ddbeaea7c78 100644 --- a/paddle/operators/detail/CMakeLists.txt +++ b/paddle/operators/detail/CMakeLists.txt @@ -1 +1 @@ -grpc_library(sendrecvop_grpc SRCS recv_impl.cc send_impl.cc PROTO send_recv.proto DEPS lod_tensor selected_rows) +grpc_library(sendrecvop_grpc SRCS sendrecvop_utils.cc grpc_client.cc grpc_server.cc PROTO send_recv.proto DEPS lod_tensor selected_rows) diff --git a/paddle/operators/detail/grpc_client.cc b/paddle/operators/detail/grpc_client.cc new file mode 100644 index 0000000000000000000000000000000000000000..5a4db2d7e686ce84abef620f890be8f3aa82cb73 --- /dev/null +++ b/paddle/operators/detail/grpc_client.cc @@ -0,0 +1,147 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "grpc_client.h" +namespace paddle { +namespace operators { +namespace detail { + +bool RPCClient::AsyncSendVariable(const std::string& ep, + const platform::DeviceContext& ctx, + const framework::Scope& scope, + const std::string& var_name, + int64_t time_out) { + sendrecv::VariableMessage req; + auto* var = scope.FindVar(var_name); + SerializeToMessage(var_name, var, ctx, &req); + + // varhandle + VarHandle var_h; + var_h.ep = ep; + var_h.scope = &scope; + var_h.name = var_name; + var_h.ctx = &ctx; + + // stub context + auto ch = GetChannel(ep); + SendProcessor* s = new SendProcessor(ch); + s->Prepare(var_h, time_out); + s->response_call_back_ = NULL; + + auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_); + rpc->Finish(&s->reply_, &s->status_, (void*)s); + + req_count_++; + + return true; +} + +void ProcGetResponse(const VarHandle& var_h, + const sendrecv::VariableMessage& ret_msg) { + auto* outvar = var_h.scope->FindVar(var_h.name); + + std::istringstream iss(ret_msg.serialized()); + DeserializeFromMessage(ret_msg, *var_h.ctx, outvar); +} + +bool RPCClient::AsyncGetVariable(const std::string& ep, + const platform::DeviceContext& ctx, + const framework::Scope& scope, + const std::string& var_name, + int64_t time_out) { + sendrecv::VariableMessage req; + req.set_varname(var_name); + + auto* var = scope.FindVar(var_name); + SerializeToMessage(var_name, var, ctx, &req); + + // varhandle + VarHandle var_h; + var_h.ep = ep; + var_h.scope = &scope; + var_h.name = var_name; + var_h.ctx = &ctx; + + // stub context + auto ch = GetChannel(ep); + GetProcessor* s = new GetProcessor(ch); + s->Prepare(var_h, time_out); + s->response_call_back_ = ProcGetResponse; + + auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_); + rpc->Finish(&s->reply_, &s->status_, (void*)s); + + req_count_++; + + return true; +} + +bool RPCClient::wait() { + bool ok = true; + + while (true) { + if (req_count_ <= 0) { + break; + } + + if (!Proceed()) { + LOG(ERROR) << "Get meets CompletionQueue error"; + return false; + } + } + + return ok; +} + +bool RPCClient::Proceed() { + void* tag = NULL; + bool ok = false; + + // request counts. + if (!cq_.Next(&tag, &ok)) { + return false; + } + req_count_--; + + GPR_ASSERT(ok); + PADDLE_ENFORCE(tag); + + // TODO(gongwb): add more retries. + ClientBase* c = static_cast(tag); + if (!c->status_.ok()) { + delete c; + return true; + } + + c->Process(); + delete c; + return true; +} + +std::shared_ptr RPCClient::GetChannel(const std::string& ep) { + auto it = channels_.find(ep); + if (it != channels_.end()) { + return it->second; + } + + auto ch = std::shared_ptr( + grpc::CreateChannel(ep, grpc::InsecureChannelCredentials())); + + channels_[ep] = ch; + return ch; +} + +} // namespace detail +} // namespace operators +} // namespace paddle diff --git a/paddle/operators/detail/grpc_client.h b/paddle/operators/detail/grpc_client.h new file mode 100644 index 0000000000000000000000000000000000000000..d27b5ced9ece67f9b9da3b7f87ec231477603580 --- /dev/null +++ b/paddle/operators/detail/grpc_client.h @@ -0,0 +1,147 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "paddle/framework/data_type.h" +#include "paddle/framework/lod_tensor.h" +#include "paddle/framework/scope.h" +#include "paddle/framework/selected_rows.h" +#include "paddle/operators/detail/sendrecvop_utils.h" +#include "paddle/operators/detail/simple_block_queue.h" + +namespace paddle { +namespace operators { +namespace detail { + +struct VarHandle { + std::string ep; + const platform::DeviceContext* ctx; + const framework::Scope* scope; + std::string name; + + std::string String() const { + std::ostringstream s; + s << "name:[" << name << "] ep:[" << ep << "]"; + return s.str(); + } +}; + +void ProcGetResponse(const VarHandle& var_h, + const sendrecv::VariableMessage& msg); + +class ClientBase { + public: + explicit ClientBase(std::shared_ptr ch) { + stub_ = sendrecv::SendRecvService::NewStub(ch); + context_ = NULL; + } + + virtual ~ClientBase() {} + + virtual void Prepare(const VarHandle& var_info, int64_t time_out) { + context_.reset(new grpc::ClientContext()); + var_h_ = var_info; + + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(time_out); + + context_->set_deadline(deadline); + } + + virtual void Process() = 0; + + std::unique_ptr stub_; + std::unique_ptr context_; + grpc::Status status_; + VarHandle var_h_; +}; + +typedef std::function + RequestSendCallBack; + +class SendProcessor : public ClientBase { + public: + explicit SendProcessor(std::shared_ptr ch) : ClientBase(ch) {} + + virtual ~SendProcessor() {} + + virtual void Process() { + if (response_call_back_) { + response_call_back_(var_h_, reply_); + } + } + + sendrecv::VoidMessage reply_; + RequestSendCallBack response_call_back_ = NULL; +}; + +typedef std::function + RequestGetCallBack; + +class GetProcessor : public ClientBase { + public: + explicit GetProcessor(std::shared_ptr ch) : ClientBase(ch) {} + + virtual ~GetProcessor() {} + + virtual void Process() { + if (response_call_back_) { + response_call_back_(var_h_, reply_); + } + } + + sendrecv::VariableMessage reply_; + RequestGetCallBack response_call_back_ = ProcGetResponse; +}; + +class RPCClient { + public: + bool AsyncSendVariable(const std::string& ep, + const platform::DeviceContext& ctx, + const framework::Scope& scope, + const std::string& var_name, + int64_t time_out = 600 * 1000); + + bool AsyncGetVariable(const std::string& ep, + const platform::DeviceContext& ctx, + const framework::Scope& scope, + const std::string& var_name, + int64_t time_out = 600 * 1000); + bool wait(); + + private: + bool Proceed(); + std::shared_ptr GetChannel(const std::string& ep); + + private: + grpc::CompletionQueue cq_; + std::map> channels_; + int64_t req_count_ = 0; +}; + +} // namespace detail +} // namespace operators +} // namespace paddle diff --git a/paddle/operators/detail/grpc_server.cc b/paddle/operators/detail/grpc_server.cc new file mode 100644 index 0000000000000000000000000000000000000000..e8d561a57ff59e9221400241f881cb26fb6c6f06 --- /dev/null +++ b/paddle/operators/detail/grpc_server.cc @@ -0,0 +1,237 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/operators/detail/grpc_server.h" + +using grpc::ServerAsyncResponseWriter; + +namespace paddle { +namespace operators { +namespace detail { + +enum CallStatus { PROCESS = 0, FINISH }; + +// reference: +// https://stackoverflow.com/questions/41732884/grpc-multiple-services-in-cpp-async-server +class RequestBase { + public: + explicit RequestBase(sendrecv::SendRecvService::AsyncService* service, + grpc::ServerCompletionQueue* cq) + : service_(service), cq_(cq), status_(PROCESS) {} + virtual ~RequestBase() {} + virtual void Process() { assert(false); } + + CallStatus Status() { return status_; } + void SetStatus(CallStatus status) { status_ = status; } + + protected: + grpc::ServerContext ctx_; + sendrecv::SendRecvService::AsyncService* service_; + grpc::ServerCompletionQueue* cq_; + CallStatus status_; +}; + +typedef std::pair MessageWithName; + +class RequestSend final : public RequestBase { + public: + explicit RequestSend(sendrecv::SendRecvService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + SimpleBlockQueue* queue) + : RequestBase(service, cq), queue_(queue), responder_(&ctx_) { + service_->RequestSendVariable(&ctx_, &request_, &responder_, cq_, cq_, + this); + } + + virtual ~RequestSend() {} + + virtual void Process() { + MessageWithName msg_with_name = + std::make_pair(request_.varname(), std::move(request_)); + queue_->Push(std::move(msg_with_name)); + // TODO(gongwb): check var's info. + responder_.Finish(reply_, grpc::Status::OK, this); + } + + protected: + sendrecv::VariableMessage request_; + sendrecv::VoidMessage reply_; + SimpleBlockQueue* queue_; + ServerAsyncResponseWriter responder_; +}; + +class RequestGet final : public RequestBase { + public: + explicit RequestGet(sendrecv::SendRecvService::AsyncService* service, + grpc::ServerCompletionQueue* cq, framework::Scope* scope) + : RequestBase(service, cq), responder_(&ctx_), scope_(scope) { + service_->RequestGetVariable(&ctx_, &request_, &responder_, cq_, cq_, this); + } + + virtual ~RequestGet() {} + + virtual void Process() { + // proc request. + std::string var_name = request_.varname(); + auto* var = scope_->FindVar(var_name); + SerializeToMessage(var_name, var, platform::CPUDeviceContext(), &reply_); + // TODO(gongwb): check var's info. + responder_.Finish(reply_, grpc::Status::OK, this); + } + + protected: + sendrecv::VariableMessage request_; + sendrecv::VariableMessage reply_; + ServerAsyncResponseWriter responder_; + framework::Scope* scope_; +}; + +void AsyncGRPCServer::RunSyncUpdate() { + grpc::ServerBuilder builder; + builder.AddListeningPort(address_, grpc::InsecureServerCredentials()); + builder.RegisterService(&service_); + + cq_send_ = builder.AddCompletionQueue(); + cq_get_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + LOG(INFO) << "Server listening on " << address_ << std::endl; + + std::function send_register = + std::bind(&AsyncGRPCServer::TryToRegisterNewSendOne, this); + std::function get_register = + std::bind(&AsyncGRPCServer::TryToRegisterNewGetOne, this); + + t_send_.reset( + new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, false, + cq_send_.get(), "cq_send", send_register))); + + t_get_.reset( + new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, true, + cq_get_.get(), "cq_get", get_register))); + + // wait server + server_->Wait(); + t_send_->join(); + t_get_->join(); +} + +void AsyncGRPCServer::ShutdownQueue() { + std::unique_lock lock(cq_mutex_); + cq_send_->Shutdown(); + cq_get_->Shutdown(); + is_shut_down_ = true; +} + +// This URL explains why shutdown is complicate: +// https://stackoverflow.com/questions/35708348/grpc-what-is-the-recommended-way-to-shut-down-an-asynchronous-server-in-c +void AsyncGRPCServer::ShutDown() { + server_->Shutdown(); + ShutdownQueue(); +} + +void AsyncGRPCServer::TryToRegisterNewSendOne() { + std::unique_lock lock(cq_mutex_); + if (is_shut_down_) { + return; + } + RequestSend* send = + new RequestSend(&service_, cq_send_.get(), &var_recv_queue_); + VLOG(4) << "create RequestSend status:" << send->Status(); +} + +void AsyncGRPCServer::TryToRegisterNewGetOne() { + std::unique_lock lock(cq_mutex_); + if (is_shut_down_) { + return; + } + RequestGet* get = new RequestGet(&service_, cq_get_.get(), scope_); + VLOG(4) << "create Requestget status:" << get->Status(); +} + +void AsyncGRPCServer::SetFinishOrDelete(RequestBase*& last) { + std::unique_lock lock(cq_mutex_); + if (is_shut_down_) { + delete last; + last = NULL; + return; + } + + last->SetStatus(FINISH); + return; +} + +void AsyncGRPCServer::HandleRequest(bool wait, grpc::ServerCompletionQueue* cq, + std::string cq_name, + std::function TryToRegisterNewOne) { + TryToRegisterNewOne(); + + void* tag = NULL; + bool ok = false; + while (true) { + if (!cq->Next(&tag, &ok)) { + LOG(INFO) << cq_name << " get CompletionQueue shutdown!"; + break; + } + + if (wait && !done_) { + Wait(); + } + + RequestBase* base = (RequestBase*)tag; + if (!ok) { + VLOG(4) << cq_name << " recv no regular event"; + TryToRegisterNewOne(); + delete base; + continue; + } + + switch (base->Status()) { + case PROCESS: { + VLOG(4) << cq_name << " status:" << base->Status(); + TryToRegisterNewOne(); + base->Process(); + SetFinishOrDelete(base); + break; + } + case FINISH: { + VLOG(4) << cq_name << " status:" << base->Status(); + delete base; + break; + } + default: { assert(false); } + } + } +} + +void AsyncGRPCServer::Wait() { + std::unique_lock lock(this->mutex_); + condition_.wait(lock, [=] { return this->done_ == true; }); +} + +void AsyncGRPCServer::Reset() { + std::lock_guard lock(this->mutex_); + done_ = false; +} + +void AsyncGRPCServer::Done() { + { + std::lock_guard lock(this->mutex_); + done_ = true; + } + condition_.notify_all(); +} + +} // namespace detail +} // namespace operators +} // namespace paddle diff --git a/paddle/operators/detail/grpc_server.h b/paddle/operators/detail/grpc_server.h new file mode 100644 index 0000000000000000000000000000000000000000..041fe05b2e9c37e8a91669b8f523c47b56e14cba --- /dev/null +++ b/paddle/operators/detail/grpc_server.h @@ -0,0 +1,91 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include "paddle/framework/lod_tensor.h" +#include "paddle/framework/scope.h" +#include "paddle/framework/selected_rows.h" +#include "paddle/framework/var_type.h" +#include "paddle/operators/detail/simple_block_queue.h" + +#include "paddle/operators/detail/send_recv.grpc.pb.h" +#include "paddle/operators/detail/send_recv.pb.h" + +#include +#include +#include +#include "paddle/operators/detail/sendrecvop_utils.h" + +namespace paddle { +namespace operators { +namespace detail { + +typedef std::pair MessageWithName; +class RequestBase; + +class AsyncGRPCServer final : public sendrecv::SendRecvService::Service { + public: + explicit AsyncGRPCServer(std::string address) { address_ = address; } + + void RunSyncUpdate(); + + void Reset(); + + void Done(); + + void SetScope(framework::Scope *scope) { scope_ = scope; } + + const MessageWithName Get() { return this->var_recv_queue_.Pop(); } + + void Push(const MessageWithName &msg) { this->var_recv_queue_.Push(msg); } + + void ShutDown(); + + protected: + void Wait(); + void HandleRequest(bool wait, grpc::ServerCompletionQueue *cq, + std::string cq_name, + std::function TryToRegisterNewOne); + void TryToRegisterNewSendOne(); + void TryToRegisterNewGetOne(); + void SetFinishOrDelete(RequestBase *&last); + void ShutdownQueue(); + + private: + std::mutex cq_mutex_; + volatile bool is_shut_down_ = false; + std::unique_ptr cq_send_; + std::unique_ptr cq_get_; + + sendrecv::SendRecvService::AsyncService service_; + std::unique_ptr server_; + + std::string address_; + framework::Scope *scope_; + // received variable from RPC, operators fetch variable from this queue. + SimpleBlockQueue var_recv_queue_; + + // condition of the sub program + std::mutex mutex_; + volatile mutable bool done_; + std::condition_variable condition_; + + std::unique_ptr t_send_; + std::unique_ptr t_get_; +}; + +}; // namespace detail +}; // namespace operators +}; // namespace paddle diff --git a/paddle/operators/detail/recv_impl.cc b/paddle/operators/detail/recv_impl.cc deleted file mode 100644 index 319404e56a5f3c407f313991240bbbb85fd39a2a..0000000000000000000000000000000000000000 --- a/paddle/operators/detail/recv_impl.cc +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "send_recv_impl.h" - -namespace paddle { -namespace operators { -namespace detail { - -Status SendRecvServerImpl::SendVariable(ServerContext *context, - const VariableMessage *in_var, - VoidMessage *out_var) { - MessageWithName msg_with_name = - std::make_pair(in_var->varname(), std::move(*in_var)); - var_recv_queue_.Push(std::move(msg_with_name)); - return Status::OK; -} - -Status SendRecvServerImpl::GetVariable(ServerContext *context, - const VariableMessage *in_var, - VariableMessage *out_var) { - std::string get_var_name = in_var->varname(); - auto *var = scope_->FindVar(get_var_name); - - SerializeToMessage(get_var_name, var, platform::CPUDeviceContext(), out_var); - return Status::OK; -} - -Status SendRecvServerImpl::Wait(ServerContext *context, - const VoidMessage *in_var, - VoidMessage *out_var) { - { - std::unique_lock lock(this->mutex_); - condition_.wait(lock, [=] { return this->done_ == true; }); - } - return Status::OK; -} - -void SendRecvServerImpl::Reset() { - std::lock_guard lock(this->mutex_); - done_ = false; -} - -void SendRecvServerImpl::Done() { - { - std::lock_guard lock(this->mutex_); - done_ = true; - } - condition_.notify_all(); -} - -} // namespace detail -} // namespace operators -} // namespace paddle diff --git a/paddle/operators/detail/send_impl.cc b/paddle/operators/detail/send_impl.cc deleted file mode 100644 index ae85cf2cec2cd8e046c0c7fd3408f2212f225819..0000000000000000000000000000000000000000 --- a/paddle/operators/detail/send_impl.cc +++ /dev/null @@ -1,67 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "send_recv_impl.h" - -namespace paddle { -namespace operators { -namespace detail { - -bool RPCClient::SendVariable(const framework::Scope& scope, - const std::string& inname) { - ClientContext context; - VariableMessage msg; - VoidMessage out_msg; - // FIXME(typhoonzero): pass device context to here. - auto ctx = platform::CPUDeviceContext(); - auto* var = scope.FindVar(inname); - PADDLE_ENFORCE(var); - SerializeToMessage(inname, var, ctx, &msg); - - Status status = stub_->SendVariable(&context, msg, &out_msg); - if (!status.ok()) { - LOG(ERROR) << "gRPC error: " << status.error_message(); - return false; - } - return true; -} - -bool RPCClient::GetVariable(const framework::Scope& scope, - const std::string& outname) { - ClientContext context; - VariableMessage call_msg, ret_msg; - call_msg.set_varname(outname); - auto ctx = platform::CPUDeviceContext(); - Status status = stub_->GetVariable(&context, call_msg, &ret_msg); - auto* outvar = scope.FindVar(outname); - if (!status.ok()) { - LOG(ERROR) << "gRPC error: " << status.error_message(); - return false; - } - - std::istringstream iss(ret_msg.serialized()); - DeserializeFromMessage(ret_msg, ctx, outvar); - - return true; -} - -void RPCClient::Wait() { - ClientContext context; - VoidMessage call_msg, ret_msg; - stub_->Wait(&context, call_msg, &ret_msg); -} - -} // namespace detail -} // namespace operators -} // namespace paddle diff --git a/paddle/operators/detail/send_recv.proto b/paddle/operators/detail/send_recv.proto index f141c755ce14ef540aeab32c11c289179aff3f8c..8f962b4c69cc83dc2ab98b7dc27e18bc4b42bf18 100644 --- a/paddle/operators/detail/send_recv.proto +++ b/paddle/operators/detail/send_recv.proto @@ -21,8 +21,6 @@ service SendRecvService { rpc SendVariable(VariableMessage) returns (VoidMessage) {} // Argument VariableMessage for GetVariable should only contain varname. rpc GetVariable(VariableMessage) returns (VariableMessage) {} - // wait for one execution of the program - rpc Wait(VoidMessage) returns (VoidMessage) {} } // VariableMessage is serialized paddle variable message. diff --git a/paddle/operators/detail/send_recv_impl.h b/paddle/operators/detail/send_recv_impl.h deleted file mode 100644 index 1fe54f1f0536aed7d41bbdeeca076534abafe98d..0000000000000000000000000000000000000000 --- a/paddle/operators/detail/send_recv_impl.h +++ /dev/null @@ -1,141 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include "paddle/framework/lod_tensor.h" -#include "paddle/framework/scope.h" -#include "paddle/framework/selected_rows.h" -#include "paddle/framework/var_type.h" -#include "paddle/operators/detail/simple_block_queue.h" - -#include "paddle/operators/detail/send_recv.grpc.pb.h" -#include "paddle/operators/detail/send_recv.pb.h" - -#include - -using grpc::Channel; -using grpc::Server; -using grpc::ServerContext; -using grpc::ServerReader; -using grpc::ServerBuilder; - -using grpc::ClientContext; -using grpc::ClientReader; -using grpc::ClientReaderWriter; -using grpc::ClientWriter; -using grpc::Status; -using sendrecv::SendRecvService; -using sendrecv::VariableMessage; -using sendrecv::VoidMessage; - -namespace paddle { -namespace operators { -namespace detail { - -typedef std::pair MessageWithName; - -class SendRecvServerImpl final : public SendRecvService::Service { - public: - explicit SendRecvServerImpl() {} - - Status SendVariable(ServerContext *context, const VariableMessage *in_var, - VoidMessage *out_var) override; - Status GetVariable(ServerContext *context, const VariableMessage *in_var, - VariableMessage *out_var) override; - Status Wait(ServerContext *context, const VoidMessage *in_var, - VoidMessage *out_var) override; - void Reset(); - void Done(); - void SetScope(framework::Scope *scope) { scope_ = scope; }; - - const MessageWithName Get() { return this->var_recv_queue_.Pop(); } - - void Push(const MessageWithName &msg) { this->var_recv_queue_.Push(msg); } - - private: - // received variable from RPC, operators fetch variable from this queue. - SimpleBlockQueue var_recv_queue_; - framework::Scope *scope_; - // condition of the sub program - std::mutex mutex_; - bool done_; - std::condition_variable condition_; -}; - -// RPCClient is a class to send tensors to pserver sub-network -// using different hashing methods. -class RPCClient { - public: - RPCClient(std::shared_ptr channel) - : stub_(SendRecvService::NewStub(channel)) {} - - bool SendVariable(const framework::Scope &scope, const std::string &inname); - bool GetVariable(const framework::Scope &scope, const std::string &outname); - void Wait(); - - private: - std::unique_ptr stub_; -}; - -inline void SerializeToMessage(const std::string &name, - const framework::Variable *var, - const platform::DeviceContext &ctx, - VariableMessage *msg) { - msg->set_varname(name); - std::ostringstream oss; - switch (framework::ToVarType(var->Type())) { - case framework::proto::VarDesc_VarType_LOD_TENSOR: - msg->set_type(sendrecv::VarType::LOD_TENSOR); - framework::SerializeToStream(oss, var->Get(), ctx); - break; - case framework::proto::VarDesc_VarType_SELECTED_ROWS: - msg->set_type(sendrecv::VarType::SELECTED_ROWS); - framework::SerializeToStream(oss, var->Get(), - ctx); - break; - default: { - PADDLE_THROW("Serialize does not support type: %s", - typeid(var->Type()).name()); - break; - } - } - msg->set_serialized(oss.str()); -} - -inline void DeserializeFromMessage(const VariableMessage &msg, - const platform::DeviceContext &ctx, - framework::Variable *var) { - using namespace paddle::framework::proto; - std::istringstream iss(msg.serialized()); - switch (msg.type()) { - case sendrecv::VarType::LOD_TENSOR: - DeserializeFromStream(iss, var->GetMutable(), ctx); - break; - case sendrecv::VarType::SELECTED_ROWS: { - DeserializeFromStream(iss, var->GetMutable(), - ctx); - break; - } - default: { - PADDLE_THROW("Deserialize does not support type: %s", - typeid(var->Type()).name()); - break; - } - } -} - -} // namespace detail -} // namespace operators -} // namespace paddle diff --git a/paddle/operators/detail/sendrecvop_utils.cc b/paddle/operators/detail/sendrecvop_utils.cc new file mode 100644 index 0000000000000000000000000000000000000000..7635b9e8dbdff624bb42a9de346b8d05a980f9b6 --- /dev/null +++ b/paddle/operators/detail/sendrecvop_utils.cc @@ -0,0 +1,68 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/operators/detail/sendrecvop_utils.h" + +namespace paddle { +namespace operators { +namespace detail { + +void SerializeToMessage(const std::string& name, const framework::Variable* var, + const platform::DeviceContext& ctx, + sendrecv::VariableMessage* msg) { + msg->set_varname(name); + std::ostringstream oss; + switch (framework::ToVarType(var->Type())) { + case framework::proto::VarDesc_VarType_LOD_TENSOR: + msg->set_type(sendrecv::VarType::LOD_TENSOR); + framework::SerializeToStream(oss, var->Get(), ctx); + break; + case framework::proto::VarDesc_VarType_SELECTED_ROWS: + msg->set_type(sendrecv::VarType::SELECTED_ROWS); + framework::SerializeToStream(oss, var->Get(), + ctx); + break; + default: { + PADDLE_THROW("Serialize does not support type: %s", + typeid(var->Type()).name()); + break; + } + } + msg->set_serialized(oss.str()); +} + +void DeserializeFromMessage(const sendrecv::VariableMessage& msg, + const platform::DeviceContext& ctx, + framework::Variable* var) { + std::istringstream iss(msg.serialized()); + switch (msg.type()) { + case sendrecv::VarType::LOD_TENSOR: + DeserializeFromStream(iss, var->GetMutable(), ctx); + break; + case sendrecv::VarType::SELECTED_ROWS: { + DeserializeFromStream(iss, var->GetMutable(), + ctx); + break; + } + default: { + PADDLE_THROW("Deserialize does not support type: %s", + typeid(var->Type()).name()); + break; + } + } +} + +} // namespace detail +} // namespace operators +} // namespace paddle diff --git a/paddle/operators/detail/sendrecvop_utils.h b/paddle/operators/detail/sendrecvop_utils.h new file mode 100644 index 0000000000000000000000000000000000000000..bc6581afab93c626c7c2439d699c6c2d858df9fa --- /dev/null +++ b/paddle/operators/detail/sendrecvop_utils.h @@ -0,0 +1,42 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include +#include +#include + +#include "paddle/framework/data_type.h" +#include "paddle/framework/lod_tensor.h" +#include "paddle/framework/scope.h" +#include "paddle/framework/selected_rows.h" +#include "paddle/framework/var_type.h" + +#include "paddle/operators/detail/send_recv.grpc.pb.h" +#include "paddle/operators/detail/send_recv.pb.h" + +namespace paddle { +namespace operators { +namespace detail { + +void SerializeToMessage(const std::string& name, const framework::Variable* var, + const platform::DeviceContext& ctx, + sendrecv::VariableMessage* msg); + +void DeserializeFromMessage(const sendrecv::VariableMessage& msg, + const platform::DeviceContext& ctx, + framework::Variable* var); +} // namespace detail +} // namespace operators +} // namespace paddle diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 9331c7b563491902b2824898766cacb9bfdee2d9..55b33343af43802e1b6b95a32603bfee806c9764 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -24,7 +24,8 @@ limitations under the License. */ #include "paddle/framework/lod_tensor.h" #include "paddle/framework/op_registry.h" #include "paddle/framework/proto_desc.h" -#include "paddle/operators/detail/send_recv_impl.h" +#include "paddle/operators/detail/grpc_server.h" +#include "paddle/operators/detail/sendrecvop_utils.h" #include "paddle/operators/detail/simple_block_queue.h" #define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV" @@ -32,6 +33,11 @@ limitations under the License. */ namespace paddle { namespace operators { +void RunServer(std::shared_ptr service) { + service->RunSyncUpdate(); + VLOG(4) << "RunServer thread end"; +} + static void CreateTensorFromMessageType(framework::Variable *var, sendrecv::VarType var_type) { if (var_type == sendrecv::VarType::LOD_TENSOR) { @@ -46,18 +52,6 @@ static void CreateTensorFromMessageType(framework::Variable *var, } } -void RunServer(Server **rpc_server, - std::shared_ptr service, - const std::string &server_address) { - ServerBuilder builder; - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); - builder.RegisterService(service.get()); - std::unique_ptr server(builder.BuildAndStart()); - *rpc_server = server.get(); - LOG(INFO) << "Server listening on " << server_address; - server->Wait(); -} - class RecvOp : public framework::OperatorBase { public: RecvOp(const std::string &type, const framework::VariableNameMap &inputs, @@ -65,10 +59,9 @@ class RecvOp : public framework::OperatorBase { const framework::AttributeMap &attrs) : OperatorBase(type, inputs, outputs, attrs) { if (!rpc_service_) { - rpc_service_.reset(new detail::SendRecvServerImpl()); std::string endpoint = Attr("endpoint"); - server_thread_.reset( - new std::thread(RunServer, &rpc_server_, rpc_service_, endpoint)); + rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); + server_thread_.reset(new std::thread(RunServer, rpc_service_)); } } @@ -76,7 +69,7 @@ class RecvOp : public framework::OperatorBase { detail::MessageWithName term_msg; term_msg.first = LISTEN_TERMINATE_MESSAGE; rpc_service_->Push(term_msg); - rpc_server_->Shutdown(); + rpc_service_->ShutDown(); server_thread_->join(); } @@ -99,10 +92,12 @@ class RecvOp : public framework::OperatorBase { auto grad_list = Attr>("GradList"); auto trainer_count = Attr("Trainers"); size_t param_count = param_list.size(); + rpc_service_->Reset(); // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; while (!exit_flag) { + // TODO(gognwb): simply this loop. // Get from multiple trainers, we don't care about order in which // the gradient arrives, just add suffix 0~n then average the gradient. for (size_t i = 0; i < param_count * trainer_count; ++i) { @@ -110,6 +105,7 @@ class RecvOp : public framework::OperatorBase { const detail::MessageWithName &v = rpc_service_->Get(); auto grad_var_name = v.first; if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { + VLOG(4) << "received LISTEN_TERMINATE_MESSAGE and RunOp.Run() exit"; exit_flag = true; break; } @@ -118,10 +114,12 @@ class RecvOp : public framework::OperatorBase { if (it != grad_list.end()) { param_var_name = param_list[it - grad_list.begin()]; } else { - LOG(ERROR) << "grad have no paired param found!"; + LOG(ERROR) << "grad have no paired param found!\"" << grad_var_name + << "\""; } VLOG(3) << "recved grad: " << grad_var_name << " updating param: " << param_var_name; + auto *merged_grad = recv_scope.FindVar(grad_var_name); if (merged_grad == nullptr) { auto *ptr = recv_scope.Var(grad_var_name); @@ -141,9 +139,11 @@ class RecvOp : public framework::OperatorBase { auto &dev_ctx = *pool.Get(dev_place); detail::DeserializeFromMessage(v.second, dev_ctx, var); } + if (exit_flag) { break; } + rpc_service_->Reset(); std::string program_str = Attr("OptimizeProgram"); @@ -158,17 +158,14 @@ class RecvOp : public framework::OperatorBase { } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } + rpc_service_->Done(); grads_counter_.clear(); } // while(true) } protected: - // grpc server instance to track status and gracefully shutdown. - // borrow an pointer from server thread. - Server *rpc_server_{nullptr}; - // grpc send/recv service implement to register. - std::shared_ptr rpc_service_; + std::shared_ptr rpc_service_; std::shared_ptr server_thread_; mutable std::unordered_map grads_counter_; }; diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index 95c207221a7b34732eca4cfd07fed0a8f1671981..4d145250bdc73607c8817e20fdb753f4c96e2391 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -19,59 +19,45 @@ limitations under the License. */ #include "paddle/framework/lod_tensor.h" #include "paddle/framework/op_registry.h" -#include "paddle/operators/detail/send_recv_impl.h" -#include "paddle/operators/detail/simple_block_queue.h" +#include +#include "paddle/operators/detail/grpc_client.h" namespace paddle { namespace operators { -// TODO(typhoonzero): this is a simple implementation which only send -// one tensor class SendOp : public framework::OperatorBase { public: - SendOp(const std::string &type, const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) - : OperatorBase(type, inputs, outputs, attrs) { - // init client when the operator is created at runtime. - std::vector endpoints = - Attr>("endpoints"); - for (auto ep : endpoints) { - client_map_[ep].reset(new detail::RPCClient( - grpc::CreateChannel(ep, grpc::InsecureChannelCredentials()))); - } - } + SendOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} - void Run(const framework::Scope &scope, - const platform::Place &dev_place) const override { + void Run(const framework::Scope& scope, + const platform::Place& dev_place) const override { auto ins = Inputs("X"); auto outs = Outputs("Out"); std::vector epmap = Attr>("epmap"); - // TODO(typhoonzero): use async calls to send multiple variable asyncly. - for (size_t i = 0; i < ins.size(); ++i) { - bool ret = client_map_[epmap[i]]->SendVariable(scope, ins[i]); - if (!ret) { - LOG(ERROR) << "send variable error: " << ins[i]; - } + + // FIXME(gongwb): DeviceContext? + auto ctx = platform::CPUDeviceContext(); + for (size_t i = 0; i < ins.size(); i++) { + client_.AsyncSendVariable(epmap[i], ctx, scope, ins[i]); } - // TODO(typhoonzero): support async optimization - client_map_[epmap[0]]->Wait(); - for (size_t i = 0; i < outs.size(); ++i) { - bool ret = client_map_[epmap[i]]->GetVariable(scope, outs[i]); - if (!ret) { - LOG(ERROR) << "GetVariable error: " << outs[i]; - } + + for (size_t i = 0; i < outs.size(); i++) { + client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]); } + + client_.wait(); } - protected: - mutable std::unordered_map> - client_map_; + private: + mutable detail::RPCClient client_; }; class SendOpMaker : public framework::OpProtoAndCheckerMaker { public: - SendOpMaker(OpProto *proto, OpAttrChecker *op_checker) + SendOpMaker(OpProto* proto, OpAttrChecker* op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("X", "(Tensor) Input tensor to be send").AsDuplicable(); AddOutput("Out", "(Tensor) Output tensor to get from server") diff --git a/paddle/operators/send_recv_op_test.cc b/paddle/operators/send_recv_op_test.cc index fa94424bf9e8e719ec0822268685b0806a109d21..ea091694798475dfd9631910a750405be950c20c 100644 --- a/paddle/operators/send_recv_op_test.cc +++ b/paddle/operators/send_recv_op_test.cc @@ -140,7 +140,7 @@ void StartServerNet(bool is_sparse) { TEST(SendRecvOp, CPUDense) { std::thread server_thread(StartServerNet, false); - sleep(3); // wait server to start + sleep(10); // wait server to start // local net f::Scope scope; p::CPUPlace place;