diff --git a/paddle/fluid/operators/detail/CMakeLists.txt b/paddle/fluid/operators/detail/CMakeLists.txt index 2b19f0448955d2d7582f23ac133c14ffdf5c9e49..997309325cd59e644ab38a3b13695ce400b615f7 100644 --- a/paddle/fluid/operators/detail/CMakeLists.txt +++ b/paddle/fluid/operators/detail/CMakeLists.txt @@ -2,7 +2,8 @@ if(WITH_DISTRIBUTE) grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc grpc_server.cc variable_response.cc PROTO send_recv.proto DEPS lod_tensor selected_rows) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") - set_source_files_properties(test_serde.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(test_serde.cc grpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(serde_test SRCS test_serde.cc variable_response.cc DEPS grpc++_unsecure grpc_unsecure gpr cares zlib protobuf sendrecvop_grpc) + cc_test(grpc_server_test SRCS grpc_server_test.cc DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) endif() diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/detail/grpc_client.cc index 9652bb888b5937390cc183a96ff7ebf5a4fa2426..ba9882ce244f69d5fbe3214d3c3470cd4ec87510 100644 --- a/paddle/fluid/operators/detail/grpc_client.cc +++ b/paddle/fluid/operators/detail/grpc_client.cc @@ -150,7 +150,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep, s->response_call_back_ = ProcGetResponse; auto call = s->stub_g_.PrepareUnaryCall( - s->context_.get(), "/sendrecv.SendRecvService/GetVariable", req, &cq_); + s->context_.get(), "/sendrecv.SendRecvService/PrefetchVariable", req, + &cq_); call->StartCall(); call->Finish(&s->reply_, &s->status_, (void*)s); }); diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 9691d1e86b111def5b82e022dd01795aaf5c7b0d..26bef375cb3493b4b6a428677986e005654b6be3 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -128,6 +128,47 @@ class RequestGet final : public RequestBase { SimpleBlockQueue* queue_; }; +class RequestPrefetch final : public RequestBase { + public: + explicit RequestPrefetch(GrpcService::AsyncService* service, + ::grpc::ServerCompletionQueue* cq, + framework::Scope* scope, + const platform::DeviceContext* dev_ctx, + framework::Executor* executor, + framework::ProgramDesc* program, int blkid) + : RequestBase(service, cq, dev_ctx), + responder_(&ctx_), + scope_(scope), + executor_(executor), + program_(program), + blkid_(blkid) { + int method_id = static_cast(detail::GrpcMethod::kPrefetchVariable); + service_->RequestAsyncUnary(method_id, &ctx_, &request_, &responder_, cq_, + cq_, this); + } + + virtual ~RequestPrefetch() {} + + virtual std::string GetReqName() { return request_.varname(); } + + virtual void Process() { + // prefetch process... + ::grpc::ByteBuffer relay; + // TODO(Yancey1989): execute the Block which containers prefetch ops + + responder_.Finish(relay, ::grpc::Status::OK, this); + status_ = FINISH; + } + + protected: + sendrecv::VariableMessage request_; + ServerAsyncResponseWriter<::grpc::ByteBuffer> responder_; + framework::Scope* scope_; + framework::Executor* executor_; + framework::ProgramDesc* program_; + int blkid_; +}; + void AsyncGRPCServer::WaitClientGet(int count) { int fetch_barriers = 0; while (fetch_barriers < count) { @@ -147,6 +188,7 @@ void AsyncGRPCServer::RunSyncUpdate() { cq_send_ = builder.AddCompletionQueue(); cq_get_ = builder.AddCompletionQueue(); + cq_prefetch_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); LOG(INFO) << "Server listening on " << address_ << std::endl; @@ -155,6 +197,8 @@ void AsyncGRPCServer::RunSyncUpdate() { std::bind(&AsyncGRPCServer::TryToRegisterNewSendOne, this); std::function get_register = std::bind(&AsyncGRPCServer::TryToRegisterNewGetOne, this); + std::function prefetch_register = + std::bind(&AsyncGRPCServer::TryToRegisterNewPrefetchOne, this); t_send_.reset( new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, @@ -163,11 +207,14 @@ void AsyncGRPCServer::RunSyncUpdate() { t_get_.reset( new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, cq_get_.get(), "cq_get", get_register))); - + t_prefetch_.reset(new std::thread( + std::bind(&AsyncGRPCServer::HandleRequest, this, cq_prefetch_.get(), + "cq_prefetch", prefetch_register))); // wait server server_->Wait(); t_send_->join(); t_get_->join(); + t_prefetch_->join(); } void AsyncGRPCServer::ShutdownQueue() { @@ -203,6 +250,18 @@ void AsyncGRPCServer::TryToRegisterNewGetOne() { VLOG(4) << "Create RequestGet status:" << get->Status(); } +void AsyncGRPCServer::TryToRegisterNewPrefetchOne() { + std::unique_lock lock(cq_mutex_); + if (is_shut_down_) { + return; + } + RequestPrefetch* prefetch = + new RequestPrefetch(&service_, cq_prefetch_.get(), scope_, dev_ctx_, + executor_, program_, prefetch_blk_id_); + + VLOG(4) << "Create RequestPrefetch status:" << prefetch->Status(); +} + // FIXME(typhoonzero): change cq_name to enum. void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq, std::string cq_name, diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 10e6dd45a901d36de4a6577db4da05551645eb73..dd5cf4b377cb8e4a53c9a161cb32985613de32eb 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -17,7 +17,9 @@ limitations under the License. */ #include #include +#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/var_type.h" @@ -53,6 +55,12 @@ class AsyncGRPCServer final { void SetDevCtx(const platform::DeviceContext *dev_ctx) { dev_ctx_ = dev_ctx; } + void SetProgram(framework::ProgramDesc *program) { program_ = program; } + + void SetPrefetchBlkdId(int blkid) { prefetch_blk_id_ = blkid; } + + void SetExecutor(framework::Executor *executor) { executor_ = executor; } + const ReceivedMessage Get() { return this->var_recv_queue_.Pop(); } void Push(const std::string &msg_name) { @@ -66,6 +74,7 @@ class AsyncGRPCServer final { std::function TryToRegisterNewOne); void TryToRegisterNewSendOne(); void TryToRegisterNewGetOne(); + void TryToRegisterNewPrefetchOne(); void ShutdownQueue(); private: @@ -73,6 +82,7 @@ class AsyncGRPCServer final { volatile bool is_shut_down_ = false; std::unique_ptr<::grpc::ServerCompletionQueue> cq_send_; std::unique_ptr<::grpc::ServerCompletionQueue> cq_get_; + std::unique_ptr<::grpc::ServerCompletionQueue> cq_prefetch_; GrpcService::AsyncService service_; std::unique_ptr<::grpc::Server> server_; @@ -92,6 +102,11 @@ class AsyncGRPCServer final { std::unique_ptr t_send_; std::unique_ptr t_get_; + std::unique_ptr t_prefetch_; + + int prefetch_blk_id_; + framework::ProgramDesc *program_; + framework::Executor *executor_; }; }; // namespace detail diff --git a/paddle/fluid/operators/detail/grpc_server_test.cc b/paddle/fluid/operators/detail/grpc_server_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..577374810696c039b8794fc151083ca7ddf43a10 --- /dev/null +++ b/paddle/fluid/operators/detail/grpc_server_test.cc @@ -0,0 +1,51 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paddle/fluid/operators/detail/grpc_client.h" +#include "paddle/fluid/operators/detail/grpc_server.h" + +namespace framework = paddle::framework; +namespace platform = paddle::platform; +namespace detail = paddle::operators::detail; + +std::unique_ptr rpc_service_; + +void StartServer(const std::string& endpoint) { + rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); +} + +TEST(PREFETCH, CPU) { + // start up a server instance backend + // TODO(Yancey1989): Need to start a server with optimize blocks and + // prefetch blocks. + std::thread server_thread(StartServer, "127.0.0.1:8889"); + framework::Scope scope; + platform::CPUPlace place; + platform::CPUDeviceContext ctx(place); + // create var on local scope + std::string var_name("tmp_0"); + auto var = scope.Var(var_name); + auto tensor = var->GetMutable(); + tensor->Resize({10, 10}); + + detail::RPCClient client; + client.AsyncPrefetchVariable("127.0.0.1:8889", ctx, scope, var_name, ""); + server_thread.join(); + rpc_service_.reset(nullptr); +} diff --git a/paddle/fluid/operators/detail/grpc_service.h b/paddle/fluid/operators/detail/grpc_service.h index ae6f9db3bd31a4b4839b34e8e53dd87f1ecf4b1d..879e21933b452363c3fccacffb4d16ac1bfd6020 100644 --- a/paddle/fluid/operators/detail/grpc_service.h +++ b/paddle/fluid/operators/detail/grpc_service.h @@ -76,6 +76,7 @@ namespace detail { enum class GrpcMethod { kSendVariable, kGetVariable, + kPrefetchVariable, }; static const int kGrpcNumMethods = @@ -87,6 +88,8 @@ inline const char* GrpcMethodName(GrpcMethod id) { return "/sendrecv.SendRecvService/SendVariable"; case GrpcMethod::kGetVariable: return "/sendrecv.SendRecvService/GetVariable"; + case GrpcMethod::kPrefetchVariable: + return "/sendrecv.SendREcvService/PrefetchVariable"; } // Shouldn't be reached. diff --git a/paddle/fluid/operators/detail/send_recv.proto b/paddle/fluid/operators/detail/send_recv.proto index 2d33f026e45c51d9a3812b2391381f74d6fddb29..fc12e82a7e6bd10262092d1ca367980df64e91c2 100644 --- a/paddle/fluid/operators/detail/send_recv.proto +++ b/paddle/fluid/operators/detail/send_recv.proto @@ -21,6 +21,8 @@ service SendRecvService { rpc SendVariable(VariableMessage) returns (VoidMessage) {} // Argument VariableMessage for GetVariable should only contain varname. rpc GetVariable(VariableMessage) returns (VariableMessage) {} + // Prefetch variable by Ids + rpc PrefetchVariable(VariableMessage) returns (VariableMessage) {} } // VariableMessage is serialized paddle variable message. diff --git a/paddle/fluid/platform/profiler_test.cc b/paddle/fluid/platform/profiler_test.cc index 366c82bf96e413add60448a56241d88cdcf2d1d4..45cc271bb888fc3a07ecc5daea6b549cb88b6d21 100644 --- a/paddle/fluid/platform/profiler_test.cc +++ b/paddle/fluid/platform/profiler_test.cc @@ -13,7 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/platform/profiler.h" +#ifdef PADDLE_WITH_CUDA #include "cuda_runtime.h" +#endif #include "gtest/gtest.h" TEST(Event, CpuElapsedTime) { @@ -159,6 +161,7 @@ TEST(RecordEvent, RecordEvent) { DisableProfiler(EventSortingKey::kTotal, "/tmp/profiler"); } +#ifdef PADDLE_WITH_CUDA TEST(TMP, stream_wait) { cudaStream_t stream; cudaStreamCreate(&stream); @@ -166,3 +169,4 @@ TEST(TMP, stream_wait) { cudaStreamSynchronize(stream); cudaStreamSynchronize(stream); } +#endif