From d5c78982017ace34fa6f33e2011f0b9de702e333 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sun, 24 Mar 2019 08:20:09 +0800 Subject: [PATCH] complete pserver side update --- .../operators/distributed/CMakeLists.txt | 7 ++-- .../operators/distributed/grpc/grpc_server.cc | 4 ++- .../distributed/request_handler_impl.cc | 32 ++++++++++++++++++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/operators/distributed/CMakeLists.txt b/paddle/fluid/operators/distributed/CMakeLists.txt index 575027893..f6531ec9e 100644 --- a/paddle/fluid/operators/distributed/CMakeLists.txt +++ b/paddle/fluid/operators/distributed/CMakeLists.txt @@ -9,6 +9,9 @@ else() endif() configure_file(send_recv.proto.in ${CMAKE_CURRENT_SOURCE_DIR}/send_recv.proto @ONLY) +cc_library(async_sparse_param_update_recorder SRCS async_sparse_param_update_recorder.cc DEPS enforce simple_threadpool) +cc_test(async_sparse_param_update_recorder_test SRCS async_sparse_param_update_recorder_test.cc DEPS async_sparse_param_update_recorder) + # FIXME(typhoonzero): use add_subdirectory once we clean the dependency of these files set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") if(WITH_GRPC) @@ -20,7 +23,7 @@ if(WITH_GRPC) collective_client.cc collective_server.cc ${GRPC_SRCS} PROTO send_recv.proto - DEPS lod_tensor selected_rows_functor memory scope ${GRPC_DEPS}) + DEPS lod_tensor selected_rows_functor memory scope ${GRPC_DEPS} async_sparse_param_update_recorder) set_source_files_properties(grpc_serde_test.cc rpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set(RPC_DEPS sendrecvop_rpc ${GRPC_DEPS}) @@ -51,8 +54,6 @@ endif() cc_test(rpc_server_test SRCS rpc_server_test.cc DEPS ${RPC_DEPS} executor proto_desc lookup_sparse_table_op SERIAL) cc_test(varhandle_test SRCS varhandle_test.cc DEPS profiler scope) -cc_library(async_sparse_param_update_recorder SRCS async_sparse_param_update_recorder.cc DEPS enforce simple_threadpool) -cc_test(async_sparse_param_update_recorder_test SRCS async_sparse_param_update_recorder_test.cc DEPS async_sparse_param_update_recorder) cc_library(parameter_prefetch SRCS parameter_prefetch.cc DEPS sendrecvop_rpc memory) cc_library(parameter_send SRCS parameter_send.cc DEPS sendrecvop_rpc memory) cc_library(parameter_recv SRCS parameter_recv.cc DEPS sendrecvop_rpc memory) diff --git a/paddle/fluid/operators/distributed/grpc/grpc_server.cc b/paddle/fluid/operators/distributed/grpc/grpc_server.cc index b86f0a53c..78cfd3d0c 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_server.cc @@ -144,13 +144,15 @@ class RequestGet final : public RequestBase { framework::Variable* invar = nullptr; framework::Variable* outvar = nullptr; - request_handler_->Handle(varname, scope, invar, &outvar, trainer_id, + auto* tmp_scope = scope->NewTmpScope(); + request_handler_->Handle(varname, tmp_scope, invar, &outvar, trainer_id, out_varname); if (outvar) { SerializeToByteBuffer(out_varname, outvar, *request_handler_->dev_ctx(), &reply_); } + delete tmp_scope; Finish(reply_, &responder_); } diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index e289ec929..e4c259722 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -22,6 +22,7 @@ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/variable_helper.h" +#include "paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h" #include "paddle/fluid/operators/distributed/rpc_server.h" #include "paddle/fluid/string/piece.h" #include "paddle/fluid/string/printf.h" @@ -59,6 +60,12 @@ bool RequestSendHandler::Handle(const std::string& varname, "async mode should not recv BATCH_BARRIER_MESSAGE or " "COMPLETE_MESSAGE"); } + if (AsyncSparseParamUpdateRecorder::GetInstance()->HasGrad(varname)) { + auto& grad_slr = + scope->FindVar(varname)->Get(); + AsyncSparseParamUpdateRecorder::GetInstance()->Update(varname, + grad_slr.rows()); + } executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), scope); return true; @@ -108,7 +115,30 @@ bool RequestGetHandler::Handle(const std::string& varname, VLOG(3) << "copying " << varname << " to " << param_bak_name; framework::TensorCopy(t_orig, dev_ctx_->GetPlace(), t); } - *outvar = scope_->FindVar(varname); + if (AsyncSparseParamUpdateRecorder::GetInstance()->HasParam(varname)) { + std::vector updated_rows; + AsyncSparseParamUpdateRecorder::GetInstance()->GetAndClear( + varname, trainer_id, &updated_rows); + auto& origin_tensor = + scope_->FindVar(varname)->Get(); + auto* origin_tensor_data = origin_tensor.data(); + auto& dims = origin_tensor.dims(); + *outvar = scope->Var(); + auto* out_slr = (*outvar)->GetMutable(); + out_slr->set_rows(updated_rows); + out_slr->set_height(dims[0]); + auto out_dims = framework::make_ddim( + {static_cast(updated_rows.size()), dims[1]}); + auto* data = out_slr->mutable_value()->mutable_data( + out_dims, origin_tensor.place()); + auto width = dims[1]; + for (auto i = 0; i < updated_rows.size(); ++i) { + memcpy(data + i * width, origin_tensor_data + updated_rows[i] * width, + sizeof(float) * width); + } + } else { + *outvar = scope_->FindVar(varname); + } } } return true; -- GitLab