From 9d276fe8a890dbfc62ceb21366d2eab99870d9c7 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sun, 25 Nov 2018 10:11:06 +0800 Subject: [PATCH] add parameter prefetch --- .../distributed/parameter_prefetch.h | 209 ++++++++++++++++++ .../distributed_ops/lookup_remote_table_op.h | 52 +++++ 2 files changed, 261 insertions(+) create mode 100644 paddle/fluid/operators/distributed/parameter_prefetch.h diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.h b/paddle/fluid/operators/distributed/parameter_prefetch.h new file mode 100644 index 000000000..03336367d --- /dev/null +++ b/paddle/fluid/operators/distributed/parameter_prefetch.h @@ -0,0 +1,209 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/var_type.h" + +#include "paddle/fluid/operators/distributed/send_recv.grpc.pb.h" +#include "paddle/fluid/operators/distributed/send_recv.pb.h" + +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/io/zero_copy_stream.h" +#include "paddle/fluid/framework/tensor.h" +#include "paddle/fluid/operators/distributed/grpc_bytebuffer_stream.h" +#include "paddle/fluid/operators/distributed/variable_response.h" + +namespace paddle { +namespace operators { +namespace distributed { + +using Tensor = framework::Tensor; +using LoDTensor = framework::LoDTensor; +using SelectedRows = framework::SelectedRows; +using DDim = framework::DDim; + +constexpr int64_t kNoPadding = -1; + +inline size_t GetSectionIndex(int64_t id, + const std::vector& abs_sections) { + for (size_t i = 1; i < abs_sections.size(); ++i) { + if (id < abs_sections[i]) { + return i - 1; + } + } + return abs_sections.size() - 1; +} + +inline std::vector ToAbsoluteSection( + const std::vector& height_sections) { + std::vector abs_sections; + abs_sections.resize(height_sections.size()); + abs_sections[0] = 0; + for (size_t i = 1; i < height_sections.size(); ++i) { + abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1]; + } + return abs_sections; +} + +inline std::vector> SplitIds( + const std::string& id_name, const std::vector& height_section, + framework::Scope* scope) { + auto& id_tensor = scope->Var(id_name)->Get(); + auto* id_data = id_tensor.data(); + std::set all_ids; + for (size_t i = 0; i < id_tensor.numel(); ++i) { + all_ids.insert(id_data[i]); + } + auto abs_sections = ToAbsoluteSection(height_section); + std::vector> splited_ids; + splited_ids.resize(height_section.size() + 1); + for (auto& id : all_ids) { + auto section_index = GetSectionIndex(id, abs_sections); + splited_ids[section_index].push_back(id - abs_sections[section_index]); + } + return splited_ids; +} + +inline void SplitIdsIntoMultipleVarsBySection( + const std::string& id_name, const std::vector& in_var_names, + const std::vector& height_section, + const std::vector>& splited_ids, + framework::Scope* scope) { + PADDLE_ENFORCE_EQ(in_var_names.size(), height_section.size() + 1, ""); + + auto place = platform::CPUPlace(); + + for (size_t i = 0; i < in_var_names.size(); ++i) { + auto* id_tensor = + scope->Var(in_var_names[i])->GetMutable(); + auto& ids = splited_ids[i]; + if (!ids.empty()) { + auto* id_tensor_data = id_tensor->mutable_data( + framework::make_ddim({static_cast(ids.size()), 1}), place); + memcpy(id_tensor_data, ids.data(), sizeof(int64_t) * ids.size()); + } + } +} + +inline void MergeMultipleVarsIntoOnBySection( + const std::string& id_name, const std::string& out_name, + const std::vector& out_var_names, + const std::vector& height_section, + const std::vector>& splited_ids, + const framework::ExecutionContext& context, framework::Scope* scope) { + PADDLE_ENFORCE_EQ(out_var_names.size(), height_section.size() + 1, ""); + + auto cpu_place = platform::CPUPlace(); + + auto abs_sections = ToAbsoluteSection(height_section); + auto& id_tensor = scope->Var(id_name)->Get(); + auto* id_data = id_tensor.data(); + std::unordered_map> id_to_offset; + for (size_t i = 0; i < id_tensor.numel(); ++i) { + id_to_offset[id_data[i]].push_back(i); + } + + auto* out_tensor = scope->Var(out_name)->GetMutable(); + auto* out_tensor_data = out_tensor->mutable_data(context.GetPlace()); + + for (size_t section_idx = 0; section_idx < out_var_names.size(); + ++section_idx) { + auto& ids_in_this_section = splited_ids[section_idx]; + auto& prefetch_out_var = + scope->Var(out_var_names[section_idx])->Get(); + const auto* out_var_data = prefetch_out_var.data(); + auto& dims = prefetch_out_var.dims(); + + PADDLE_ENFORCE_EQ(dims.size(), 2, ""); + PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0]); + + auto row_numel = dims[1]; + + for (size_t i = 0; i < dims[0]; ++i) { + auto id = ids_in_this_section[i]; + auto origin_id = id + abs_sections[section_idx]; + auto& offsets = id_to_offset[origin_id]; + for (auto& offset : offsets) { + // should support GPU tensor + memory::Copy(cpu_place, out_tensor_data + offset * row_numel, cpu_place, + out_var_data + i * row_numel, sizeof(float) * row_numel); + } + } + } +} + +void prefetch(const std::string& id_name, const std::string& out_name, + const std::string& table_name, + const std::vector& epmap, + const std::vector& height_sections, + const framework::ExecutionContext& context) { + auto& local_scope = context.scope().NewScope(); + + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& ctx = *pool.Get(context.GetPlace()); + + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance( + context.Attr("trainer_id")); + + std::vector in_var_names; + std::vector out_var_names; + for (size_t i = 0; i < epmap.size(); ++i) { + in_var_names.push_back(id_name + "@" + epmap[i]); + out_var_names.push_back(out_name + "@" + epmap[i]); + } + + auto splited_ids = SplitIds(id_name, height_sections, &local_scope); + SplitIdsIntoMultipleVarsBySection(id_name, in_var_names, height_sections, + splited_ids, &local_scope); + + // create output var in local scope + for (auto& name : out_var_names) { + local_scope.Var(name)->GetMutable(); + } + + std::vector rets; + for (size_t i = 0; i < in_var_names.size(); i++) { + if (NeedSend(local_scope, in_var_names[i])) { + VLOG(30) << "sending " << in_var_names[i] << " to " << epmap[i] + << " to get " << out_var_names[i] << " back"; + rets.push_back(rpc_client->AsyncPrefetchVar( + epmap[i], ctx, local_scope, in_var_names[i], out_var_names[i])); + } else { + VLOG(30) << "don't send no-initialied variable: " << out_var_names[i]; + } + } + for (size_t i = 0; i < rets.size(); i++) { + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + } + + MergeMultipleVarsIntoOnBySection(id_name, out_name, out_var_names, + height_sections, splited_ids, context, + &local_scope); + + context.scope().DeleteScope(&local_scope); +} + +}; // namespace distributed +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h b/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h index 5c53ca695..97c8fbfed 100644 --- a/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h +++ b/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h @@ -149,6 +149,58 @@ inline void MergeMultipleVarsIntoOnBySection( } } +void prefetch(const std::string& id_name, const std::string& out_name, + const std::string& table_name, + const std::vector& epmap, + const std::vector& height_sections, + const framework::ExecutionContext& context) { + auto& local_scope = context.scope().NewScope(); + + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& ctx = *pool.Get(context.GetPlace()); + + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance( + context.Attr("trainer_id")); + + std::vector in_var_names; + std::vector out_var_names; + for (size_t i = 0; i < epmap.size(); ++i) { + in_var_names.push_back(id_name + "@" + epmap[i]); + out_var_names.push_back(out_name + "@" + epmap[i]); + } + + auto splited_ids = SplitIds(id_name, height_sections, &local_scope); + SplitIdsIntoMultipleVarsBySection(id_name, in_var_names, height_sections, + splited_ids, &local_scope); + + // create output var in local scope + for (auto& name : out_var_names) { + local_scope.Var(name)->GetMutable(); + } + + std::vector rets; + for (size_t i = 0; i < in_var_names.size(); i++) { + if (NeedSend(local_scope, in_var_names[i])) { + VLOG(30) << "sending " << in_var_names[i] << " to " << epmap[i] + << " to get " << out_var_names[i] << " back"; + rets.push_back(rpc_client->AsyncPrefetchVar( + epmap[i], ctx, local_scope, in_var_names[i], out_var_names[i])); + } else { + VLOG(30) << "don't send no-initialied variable: " << out_var_names[i]; + } + } + for (size_t i = 0; i < rets.size(); i++) { + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + } + + MergeMultipleVarsIntoOnBySection(id_name, out_name, out_var_names, + height_sections, splited_ids, context, + &local_scope); + + context.scope().DeleteScope(&local_scope); +} + template class LookupRemoteTableKernel : public framework::OpKernel { public: -- GitLab