From 4cb0100c8ea714e4ce7f8c0cd3c9ebc50aff9e35 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 6 Dec 2018 16:59:53 +0800 Subject: [PATCH] add prefetch in nce --- paddle/fluid/operators/nce_op.cc | 18 +++++ paddle/fluid/operators/nce_op.h | 67 ++++++++++++++++--- .../fluid/transpiler/distribute_transpiler.py | 2 +- 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/nce_op.cc b/paddle/fluid/operators/nce_op.cc index 9f97f7821dd..06ff825fde3 100644 --- a/paddle/fluid/operators/nce_op.cc +++ b/paddle/fluid/operators/nce_op.cc @@ -155,6 +155,24 @@ class NCEOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("is_sparse", "(boolean, default false) Sparse update.") .SetDefault(false); + // for parameter prefetch + AddAttr("remote_prefetch", "").SetDefault(false); + AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); + AddAttr>( + "epmap", + "(string vector, default 127.0.0.1:6164)" + "Server endpoints in the order of input variables for mapping") + .SetDefault({}); + AddAttr>( + "table_names", + "(string vector, the splited table names that will be fetched from " + "parameter server)" + "in the order of input variables for mapping") + .SetDefault({}); + AddAttr>("custom_neg_classes", "This attribute only be used in unitest. Classes " "in this list wiil be used as negative classes " diff --git a/paddle/fluid/operators/nce_op.h b/paddle/fluid/operators/nce_op.h index f2ca6ec247f..8f82f77f501 100644 --- a/paddle/fluid/operators/nce_op.h +++ b/paddle/fluid/operators/nce_op.h @@ -15,8 +15,10 @@ limitations under the License. */ #pragma once #include +#include #include #include +#include #include #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/op_registry.h" @@ -144,15 +146,64 @@ class NCEKernel : public framework::OpKernel { } // forward mul auto input_mat = EigenMatrix::From(*(context.Input("Input"))); - auto weight_mat = EigenMatrix::From(*(context.Input("Weight"))); - for (int64_t i = 0; i < sample_labels->numel(); ++i) { - Eigen::Tensor result = - (input_mat.chip(static_cast(i / sample_labels->dims()[1]), 0) * - weight_mat.chip(sample_labels_data[i], 0)) - .sum(); - sample_out_data[i] += result(0); - sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i]))); + + // for remote prefetch + auto epmap = context.Attr>("epmap"); + + if (!epmap.empty()) { + // if epmap is not empty, then the parameter will be fetched from remote + // parameter + // server + + std::vector labels; + for (int64_t i = 0; i < sample_labels->numel(); ++i) { + labels.push_back(sample_labels_data[i]); + } + std::set st(labels.begin(), labels.end()); + labels.assign(st.begin(), st.end()); + + auto &local_scope = context.scope().NewScope(); + auto height_sections = context.Attr>("height_sections"); + auto table_names = context.Attr>("table_names"); + + framework::Variable *ids = local_scope.Var("Ids"); + framework::Variable *weight = local_scope.Var("Weight"); + +#ifdef PADDLE_WITH_DISTRIBUTE + operators::distributed::prefetch("Ids", "Weight", table_names, epmap, + height_sections, context); +#else + PADDLE_THROW( + "paddle is not compiled with distribute support, can not do " + "parameter prefetch!"); + + auto weight_mat = EigenMatrix::From(*(weight->Get())); + for (int64_t i = 0; i < sample_labels->numel(); ++i) { + std::vector::iterator it = + std::find(labels.begin(), labels.end(), sample_labels_data[i]); + int idx = std::distance(labels.begin(), it); + + Eigen::Tensor result = + (input_mat.chip(static_cast(i / sample_labels->dims()[1]), 0) * + weight_mat.chip(idx, 0)) + .sum(); + sample_out_data[i] += result(0); + sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i]))); + } +#endif + } else { + auto weight_mat = + EigenMatrix::From(*(context.Input("Weight"))); + for (int64_t i = 0; i < sample_labels->numel(); ++i) { + Eigen::Tensor result = + (input_mat.chip(static_cast(i / sample_labels->dims()[1]), 0) * + weight_mat.chip(sample_labels_data[i], 0)) + .sum(); + sample_out_data[i] += result(0); + sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i]))); + } } + // forward cost for (int64_t i = 0; i < sample_labels->dims()[0]; ++i) { out_data[i] = 0; diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 1d867d91943..817af602bd5 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -239,7 +239,7 @@ class DistributeTranspiler(object): def _get_all_remote_sparse_update_op(self, main_program): sparse_update_ops = [] - sparse_update_op_types = ["lookup_table"] + sparse_update_op_types = ["lookup_table", "nce"] for op in main_program.global_block().ops: if op.type in sparse_update_op_types and op.attr( 'remote_prefetch') is True and not op.attr( -- GitLab