diff --git a/paddle/fluid/framework/communicator.h b/paddle/fluid/framework/communicator.h index ba8fb3e1731c73a12cded219df58b7b20bd11e16..0e90ba02e6e030fa9b1c4f91fc76a14e1c145455 100644 --- a/paddle/fluid/framework/communicator.h +++ b/paddle/fluid/framework/communicator.h @@ -41,6 +41,8 @@ class Communicator { void receive() {} + void prefetch() {} + void wait() {} private: diff --git a/paddle/fluid/operators/distributed/CMakeLists.txt b/paddle/fluid/operators/distributed/CMakeLists.txt index cb361e95e8bffd423a8ddc95be6cf9d598a75a6e..fa8abf4ceca93c0e738cfe18921115f86e86bc34 100644 --- a/paddle/fluid/operators/distributed/CMakeLists.txt +++ b/paddle/fluid/operators/distributed/CMakeLists.txt @@ -30,7 +30,7 @@ if(WITH_GRPC) else() set(BRPC_SRCS brpc/brpc_client.cc brpc/brpc_server.cc brpc/brpc_sendrecvop_utils.cc brpc/brpc_variable_response.cc brpc/brpc_rdma_pool.cc) - set_source_files_properties(${BRPC_SRCS} parameter_prefetch.cc rpc_server_test.cc brpc/brpc_serde_test.cc collective_server.cc collective_server_test.cc collective_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(${BRPC_SRCS} parameter_prefetch.cc parameter_send.cc rpc_server_test.cc brpc/brpc_serde_test.cc collective_server.cc collective_server_test.cc collective_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) brpc_library(sendrecvop_rpc SRCS sendrecvop_utils.cc request_handler_impl.cc rpc_client.cc rpc_server.cc @@ -50,6 +50,7 @@ cc_test(rpc_server_test SRCS rpc_server_test.cc DEPS ${RPC_DEPS} executor proto_desc lookup_sparse_table_op SERIAL) cc_test(varhandle_test SRCS varhandle_test.cc DEPS profiler scope) cc_library(parameter_prefetch SRCS parameter_prefetch.cc DEPS sendrecvop_rpc memory) +cc_library(parameter_send SRCS parameter_send.cc DEPS sendrecvop_rpc memory) if(WITH_GPU) cc_test(collective_server_test SRCS collective_server_test.cc DEPS sendrecvop_rpc executor ${RPC_DEPS} diff --git a/paddle/fluid/operators/distributed/parameter_send.cc b/paddle/fluid/operators/distributed/parameter_send.cc new file mode 100644 index 0000000000000000000000000000000000000000..01e7341f15f9868b9a049dc1418f557b47ac95f5 --- /dev/null +++ b/paddle/fluid/operators/distributed/parameter_send.cc @@ -0,0 +1,189 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "paddle/fluid/operators/distributed/parameter_send.h" + +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/tensor.h" + +#include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" +#include "paddle/fluid/operators/distributed/variable_response.h" +#include "paddle/fluid/operators/distributed_ops/send_recv_util.h" + +namespace paddle { +namespace operators { +namespace distributed { + +using LoDTensor = framework::LoDTensor; +using LoDTensor = framework::LoDTensor; +using SelectedRows = framework::SelectedRows; +using DDim = framework::DDim; + +static size_t GetSectionIndex(int64_t id, + const std::vector& abs_sections) { + for (size_t i = 1; i < abs_sections.size(); ++i) { + if (id < abs_sections[i]) { + return i - 1; + } + } + return abs_sections.size() - 1; +} + +static std::vector ToAbsoluteSection( + const std::vector& height_sections) { + std::vector abs_sections; + abs_sections.resize(height_sections.size()); + abs_sections[0] = 0; + for (size_t i = 1; i < height_sections.size(); ++i) { + abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1]; + } + return abs_sections; +} + +static std::vector> SplitIds( + const std::vector& ids_vector, + const std::vector& height_section, framework::Scope* scope) { + std::set all_ids; + for (auto id : ids_vector) { + all_ids.insert(id); + } + + auto abs_sections = ToAbsoluteSection(height_section); + std::vector> splited_ids; + splited_ids.resize(height_section.size() + 1); + for (auto& id : all_ids) { + auto section_index = GetSectionIndex(id, abs_sections); + splited_ids[section_index].push_back(id - abs_sections[section_index]); + } + return splited_ids; +} + +static void SplitIdsIntoMultipleVarsBySection( + const std::vector& in_var_names, + const std::vector& height_section, + const std::vector>& splited_ids, + framework::Scope* scope) { + PADDLE_ENFORCE_EQ(in_var_names.size(), height_section.size(), ""); + + auto place = platform::CPUPlace(); + + for (size_t i = 0; i < in_var_names.size(); ++i) { + auto* id_tensor = + scope->Var(in_var_names[i])->GetMutable(); + auto& ids = splited_ids[i]; + if (!ids.empty()) { + auto* id_tensor_data = id_tensor->mutable_data( + framework::make_ddim({static_cast(ids.size()), 1}), place); + memcpy(id_tensor_data, ids.data(), sizeof(int64_t) * ids.size()); + } + } +} + +void send(const std::string& var_name, + const std::vector& send_varnames, + const std::vector& epmap, + const std::vector& height_sections, + const framework::ExecutionContext& context, + const framework::Scope& scope, bool sync) { + framework::Scope* local_scope = scope.NewTmpScope(); + + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& cpu_ctx = *pool.Get(platform::CPUPlace()); + auto& actual_ctx = *pool.Get(context.GetPlace()); + + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance( + context.Attr("trainer_id")); + + auto* send_var = scope.FindVar(var_name); + size_t out_num = send_varnames.size(); + if (send_var->IsType()) { + auto& send_tensor = send_var->Get(); + auto& send_tensor_dims = send_tensor.dims(); + std::vector outs_dims; + outs_dims.reserve(out_num); + + // infer output shape + int num = context.Attr("num"); + if (num > 0) { + int64_t in_axis_dim = send_tensor_dims[0]; + PADDLE_ENFORCE_EQ(in_axis_dim % num, 0, + "tensor split does not result" + " in an equal division"); + size_t out_axis_dim = in_axis_dim / num; + for (size_t i = 0; i < out_num; ++i) { + auto dim = send_tensor_dims; + dim[0] = out_axis_dim; + outs_dims.push_back(dim); + } + } else if (height_sections.size() > 0) { + PADDLE_ENFORCE_EQ(height_sections.size(), out_num, + "tensor split sections size" + "should be equal to output size."); + for (size_t i = 0; i < out_num; ++i) { + auto dim = send_tensor_dims; + dim[0] = height_sections[i]; + outs_dims.push_back(dim); + } + } + + // create output var in local scope + size_t row_offset = 0; + for (auto i = 0; i < out_num; ++i) { + auto* out = + local_scope->Var(send_varnames[i])->GetMutable(); + *out = send_tensor.Slice(row_offset, row_offset + outs_dims[i][0]); + row_offset += outs_dims[i][0]; + } + } else if (send_var->IsType()) { + // create output var in local scope + for (auto& name : send_varnames) { + local_scope->Var(name)->GetMutable(); + } + } else { + PADDLE_THROW("unsupported var type"); + } + + std::vector rets; + for (size_t i = 0; i < send_varnames.size(); i++) { + auto& send_var_name = send_varnames[i]; + auto& endpoint = epmap[i]; + if (NeedSend(*local_scope, send_var_name)) { + VLOG(3) << "sending " << send_var_name << " to " << endpoint; + rets.push_back(rpc_client->AsyncSendVar(endpoint, cpu_ctx, *local_scope, + send_var_name)); + } else { + VLOG(3) << "don't send non-initialized variable: " << send_varnames[i]; + } + } + + if (sync) { + for (size_t i = 0; i < rets.size(); i++) { + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + } + } + + delete local_scope; +} + +}; // namespace distributed +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/distributed/parameter_send.h b/paddle/fluid/operators/distributed/parameter_send.h new file mode 100644 index 0000000000000000000000000000000000000000..ee4da997b73c9c1bc734c43bb9fe5fa6fdc12624 --- /dev/null +++ b/paddle/fluid/operators/distributed/parameter_send.h @@ -0,0 +1,35 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "paddle/fluid/framework/operator.h" + +namespace paddle { +namespace operators { +namespace distributed { + +void send(const std::string& var_name, + const std::vector& send_varnames, + const std::vector& epmap, + const std::vector& height_sections, + const framework::ExecutionContext& context, + const framework::Scope& scope, bool sync); + +}; // namespace distributed +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/distributed_ops/send_op.cc b/paddle/fluid/operators/distributed_ops/send_op.cc index e2c2147ab5e9a76498a0fd9e1f18b75eed32e91e..02397bb6b3e08d4ac3267126b48834873b6ce908 100644 --- a/paddle/fluid/operators/distributed_ops/send_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_op.cc @@ -88,6 +88,21 @@ This operator will send variables to listen_and_serve op at the parameter server "Server endpoints in the order of input " "variables for mapping") .SetDefault({"127.0.0.1:6164"}); + AddAttr>("sections", + "(vector) " + "the length of each output along the " + "specified axis.") + .SetDefault(std::vector{}); + AddAttr>( + "send_varnames", + "(vector) " + "the splited output varnames to send to pserver") + .SetDefault(std::vector{}); + AddAttr("num", + "(int, default 0)" + "Number of sub-tensors. This must evenly divide " + "Input.dims()[axis]") + .SetDefault(0); } };