diff --git a/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.cc b/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.cc deleted file mode 100644 index 5d3a50a44cf187cdfaa7a0c890aca3fb7605385a..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.cc +++ /dev/null @@ -1,114 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h" -#include "paddle/fluid/framework/var_type_inference.h" - -namespace paddle { -namespace operators { - -class LookupRemoteTableOp : public framework::OperatorWithKernel { - public: - using framework::OperatorWithKernel::OperatorWithKernel; - - void InferShape(framework::InferShapeContext* ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("W"), - "Input(W) of LookupRemoteTableOp should not be null."); - PADDLE_ENFORCE(ctx->HasInput("Ids"), - "Input(Ids) of LookupRemoteTableOp should not be null."); - PADDLE_ENFORCE(ctx->HasOutput("Out"), - "Output(Out) of LookupRemoteTableOp should not be null."); - - auto table_dims = ctx->GetInputDim("W"); - auto ids_dims = ctx->GetInputDim("Ids"); - int ids_rank = ids_dims.size(); - - PADDLE_ENFORCE_EQ(table_dims.size(), 2); - PADDLE_ENFORCE_EQ(ids_dims[ids_rank - 1], 1, - "The last dimension of the 'Ids' tensor must be 1."); - - auto output_dims = - framework::vectorize(framework::slice_ddim(ids_dims, 0, ids_rank - 1)); - output_dims.push_back(table_dims[1]); - ctx->SetOutputDim("Out", framework::make_ddim(output_dims)); - - if (ctx->GetOutputsVarType("Out")[0] == - framework::proto::VarType::LOD_TENSOR) { - ctx->ShareLoD("Ids", /*->*/ "Out"); - } - } - - protected: - framework::OpKernelType GetExpectedKernelType( - const framework::ExecutionContext& ctx) const override { - auto data_type = framework::GetDataTypeOfVar(ctx.InputVar("W")); - return framework::OpKernelType(data_type, ctx.device_context()); - } -}; - -class LookupRemoteTableOpMaker : public framework::OpProtoAndCheckerMaker { - public: - void Make() override { - AddInput("W", - "(Tensor) The input represents embedding tensors, " - "which is a learnable parameter."); - AddInput("Ids", - "An input with type int32 or int64 " - "contains the ids to be looked up in W. " - "The last dimension size must be 1."); - AddOutput("Out", "The lookup results, which have the same type as W."); - AddAttr>("height_sections", - "Height for each output SelectedRows.") - .SetDefault(std::vector({})); - AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); - AddAttr>( - "epmap", - "(string vector, default 127.0.0.1:6164)" - "Server endpoints in the order of input variables for mapping") - .SetDefault({"127.0.0.1:6164"}); - AddAttr("padding_idx", - "(int64, default -1) " - "If the value is -1, it makes no effect to lookup. " - "Otherwise the given value indicates padding the output " - "with zeros whenever lookup encounters it in Ids.") - .SetDefault(kNoPadding); - // NOTE(minqiyang): grad_inplace is an temporal attribute, - // please do NOT set this attribute in python layer. - AddAttr("grad_inplace", - "(boolean, default false) " - "If the grad op reuse the input's variable.") - .SetDefault(false); - AddComment(R"DOC( -Lookup Remote Table Operator. - -This operator is used to perform lookups on the parameter W, -then concatenated into a dense tensor. - -The input Ids can carry the LoD (Level of Details) information, -or not. And the output only shares the LoD information with input Ids. - -)DOC"); - } -}; - -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators; -REGISTER_OPERATOR(lookup_remote_table, ops::LookupRemoteTableOp, - paddle::framework::EmptyGradOpMaker, - ops::LookupRemoteTableOpMaker); - -REGISTER_OP_CPU_KERNEL(lookup_remote_table, ops::LookupRemoteTableKernel, - ops::LookupRemoteTableKernel); diff --git a/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h b/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h deleted file mode 100644 index 97c8fbfed3076930b59eef8a9988457443b7a25f..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/distributed_ops/lookup_remote_table_op.h +++ /dev/null @@ -1,274 +0,0 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include // NOLINT -#include -#include -#include -#include -#include - -#include "paddle/fluid/framework/data_type.h" -#include "paddle/fluid/framework/eigen.h" -#include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/framework/selected_rows.h" -#include "paddle/fluid/memory/memcpy.h" -#include "paddle/fluid/operators/detail/macros.h" -#include "paddle/fluid/operators/distributed_ops/send_recv_util.h" -#include "paddle/fluid/operators/math/blas.h" - -namespace paddle { -namespace operators { - -using Tensor = framework::Tensor; -using LoDTensor = framework::LoDTensor; -using SelectedRows = framework::SelectedRows; -using DDim = framework::DDim; - -constexpr int64_t kNoPadding = -1; - -inline size_t GetSectionIndex(int64_t id, - const std::vector& abs_sections) { - for (size_t i = 1; i < abs_sections.size(); ++i) { - if (id < abs_sections[i]) { - return i - 1; - } - } - return abs_sections.size() - 1; -} - -inline std::vector ToAbsoluteSection( - const std::vector& height_sections) { - std::vector abs_sections; - abs_sections.resize(height_sections.size()); - abs_sections[0] = 0; - for (size_t i = 1; i < height_sections.size(); ++i) { - abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1]; - } - return abs_sections; -} - -inline std::vector> SplitIds( - const std::string& id_name, const std::vector& height_section, - framework::Scope* scope) { - auto& id_tensor = scope->Var(id_name)->Get(); - auto* id_data = id_tensor.data(); - std::set all_ids; - for (size_t i = 0; i < id_tensor.numel(); ++i) { - all_ids.insert(id_data[i]); - } - auto abs_sections = ToAbsoluteSection(height_section); - std::vector> splited_ids; - splited_ids.resize(height_section.size() + 1); - for (auto& id : all_ids) { - auto section_index = GetSectionIndex(id, abs_sections); - splited_ids[section_index].push_back(id - abs_sections[section_index]); - } - return splited_ids; -} - -inline void SplitIdsIntoMultipleVarsBySection( - const std::string& id_name, const std::vector& in_var_names, - const std::vector& height_section, - const std::vector>& splited_ids, - framework::Scope* scope) { - PADDLE_ENFORCE_EQ(in_var_names.size(), height_section.size() + 1, ""); - - auto place = platform::CPUPlace(); - - for (size_t i = 0; i < in_var_names.size(); ++i) { - auto* id_tensor = - scope->Var(in_var_names[i])->GetMutable(); - auto& ids = splited_ids[i]; - if (!ids.empty()) { - auto* id_tensor_data = id_tensor->mutable_data( - framework::make_ddim({static_cast(ids.size()), 1}), place); - memcpy(id_tensor_data, ids.data(), sizeof(int64_t) * ids.size()); - } - } -} - -inline void MergeMultipleVarsIntoOnBySection( - const std::string& id_name, const std::string& out_name, - const std::vector& out_var_names, - const std::vector& height_section, - const std::vector>& splited_ids, - const framework::ExecutionContext& context, framework::Scope* scope) { - PADDLE_ENFORCE_EQ(out_var_names.size(), height_section.size() + 1, ""); - - auto cpu_place = platform::CPUPlace(); - - auto abs_sections = ToAbsoluteSection(height_section); - auto& id_tensor = scope->Var(id_name)->Get(); - auto* id_data = id_tensor.data(); - std::unordered_map> id_to_offset; - for (size_t i = 0; i < id_tensor.numel(); ++i) { - id_to_offset[id_data[i]].push_back(i); - } - - auto* out_tensor = scope->Var(out_name)->GetMutable(); - auto* out_tensor_data = out_tensor->mutable_data(context.GetPlace()); - - for (size_t section_idx = 0; section_idx < out_var_names.size(); - ++section_idx) { - auto& ids_in_this_section = splited_ids[section_idx]; - auto& prefetch_out_var = - scope->Var(out_var_names[section_idx])->Get(); - const auto* out_var_data = prefetch_out_var.data(); - auto& dims = prefetch_out_var.dims(); - - PADDLE_ENFORCE_EQ(dims.size(), 2, ""); - PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0]); - - auto row_numel = dims[1]; - - for (size_t i = 0; i < dims[0]; ++i) { - auto id = ids_in_this_section[i]; - auto origin_id = id + abs_sections[section_idx]; - auto& offsets = id_to_offset[origin_id]; - for (auto& offset : offsets) { - // should support GPU tensor - memory::Copy(cpu_place, out_tensor_data + offset * row_numel, cpu_place, - out_var_data + i * row_numel, sizeof(float) * row_numel); - } - } - } -} - -void prefetch(const std::string& id_name, const std::string& out_name, - const std::string& table_name, - const std::vector& epmap, - const std::vector& height_sections, - const framework::ExecutionContext& context) { - auto& local_scope = context.scope().NewScope(); - - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& ctx = *pool.Get(context.GetPlace()); - - distributed::RPCClient* rpc_client = - distributed::RPCClient::GetInstance( - context.Attr("trainer_id")); - - std::vector in_var_names; - std::vector out_var_names; - for (size_t i = 0; i < epmap.size(); ++i) { - in_var_names.push_back(id_name + "@" + epmap[i]); - out_var_names.push_back(out_name + "@" + epmap[i]); - } - - auto splited_ids = SplitIds(id_name, height_sections, &local_scope); - SplitIdsIntoMultipleVarsBySection(id_name, in_var_names, height_sections, - splited_ids, &local_scope); - - // create output var in local scope - for (auto& name : out_var_names) { - local_scope.Var(name)->GetMutable(); - } - - std::vector rets; - for (size_t i = 0; i < in_var_names.size(); i++) { - if (NeedSend(local_scope, in_var_names[i])) { - VLOG(30) << "sending " << in_var_names[i] << " to " << epmap[i] - << " to get " << out_var_names[i] << " back"; - rets.push_back(rpc_client->AsyncPrefetchVar( - epmap[i], ctx, local_scope, in_var_names[i], out_var_names[i])); - } else { - VLOG(30) << "don't send no-initialied variable: " << out_var_names[i]; - } - } - for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - } - - MergeMultipleVarsIntoOnBySection(id_name, out_name, out_var_names, - height_sections, splited_ids, context, - &local_scope); - - context.scope().DeleteScope(&local_scope); -} - -template -class LookupRemoteTableKernel : public framework::OpKernel { - public: - void Compute(const framework::ExecutionContext& context) const override { - std::string id_name = context.Inputs("Ids").front(); - auto* ids_t = context.Input("Ids"); // int tensor - - std::string out_name = context.Outputs("Out").front(); - auto* output_t = context.Output("Out"); // float tensor - - std::string table_name = context.Inputs("W").front(); - auto* table_var = context.InputVar("W"); - - int64_t padding_idx = context.Attr("padding_idx"); - int64_t* ids = const_cast(ids_t->data()); - int64_t ids_numel = ids_t->numel(); - - auto epmap = context.Attr>("epmap"); - auto height_sections = - context.Attr>("height_sections"); - - auto& local_scope = context.scope().NewScope(); - - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& ctx = *pool.Get(context.GetPlace()); - - distributed::RPCClient* rpc_client = - distributed::RPCClient::GetInstance( - context.Attr("trainer_id")); - - std::vector in_var_names; - std::vector out_var_names; - for (size_t i = 0; i < epmap.size(); ++i) { - in_var_names.push_back(id_name + "@" + epmap[i]); - out_var_names.push_back(out_name + "@" + epmap[i]); - } - - auto splited_ids = SplitIds(id_name, height_sections, &local_scope); - SplitIdsIntoMultipleVarsBySection(id_name, in_var_names, height_sections, - splited_ids, &local_scope); - - // create output var in local scope - for (auto& name : out_var_names) { - local_scope.Var(name)->GetMutable(); - } - - std::vector rets; - for (size_t i = 0; i < in_var_names.size(); i++) { - if (NeedSend(local_scope, in_var_names[i])) { - VLOG(30) << "sending " << in_var_names[i] << " to " << epmap[i] - << " to get " << out_var_names[i] << " back"; - rets.push_back(rpc_client->AsyncPrefetchVar( - epmap[i], ctx, local_scope, in_var_names[i], out_var_names[i])); - } else { - VLOG(30) << "don't send no-initialied variable: " << out_var_names[i]; - } - } - for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - } - - MergeMultipleVarsIntoOnBySection(id_name, out_name, out_var_names, - height_sections, splited_ids, context, - &local_scope); - - context.scope().DeleteScope(&local_scope); - } -}; - -} // namespace operators -} // namespace paddle diff --git a/paddle/fluid/operators/lookup_table_op.cc b/paddle/fluid/operators/lookup_table_op.cc index 99944b800c42224cd579209eb7d15cb419d8776e..faf91775e4fe68b87e4bebd292e346de53cd0b85 100644 --- a/paddle/fluid/operators/lookup_table_op.cc +++ b/paddle/fluid/operators/lookup_table_op.cc @@ -98,7 +98,7 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker { "epmap", "(string vector, default 127.0.0.1:6164)" "Server endpoints in the order of input variables for mapping") - .SetDefault({"127.0.0.1:6164"}); + .SetDefault({}); AddComment(R"DOC( Lookup Table Operator. diff --git a/paddle/fluid/operators/lookup_table_op.h b/paddle/fluid/operators/lookup_table_op.h index 335e4adafa89ccfd3fc2d5fd9adf57524a4cf4e4..4adb829f20fbe4080495568cb18a2df3a5a73a8b 100644 --- a/paddle/fluid/operators/lookup_table_op.h +++ b/paddle/fluid/operators/lookup_table_op.h @@ -51,10 +51,11 @@ class LookupTableKernel : public framework::OpKernel { auto out_name = context.Outputs("Out").front(); auto table_name = context.Inputs("W").front(); auto epmap = context.Attr>("epmap"); + auto remote_prefetch = context.Attr("remote_prefetch"); auto height_sections = context.Attr>("height_sections"); - if (!epmap.empty()) { + if (remote_prefetch) { // if emap is not empty, then the paramter will be fetched from remote parameter // server #ifdef PADDLE_WITH_DISTRIBUTE