提交 4cb0100c 编写于 作者: T tangwei12

add prefetch in nce

上级 6e67d0fb
......@@ -155,6 +155,24 @@ class NCEOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<bool>("is_sparse", "(boolean, default false) Sparse update.")
.SetDefault(false);
// for parameter prefetch
AddAttr<bool>("remote_prefetch", "").SetDefault(false);
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0);
AddAttr<std::vector<int>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int>({}));
AddAttr<std::vector<std::string>>(
"epmap",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input variables for mapping")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"table_names",
"(string vector, the splited table names that will be fetched from "
"parameter server)"
"in the order of input variables for mapping")
.SetDefault({});
AddAttr<std::vector<int>>("custom_neg_classes",
"This attribute only be used in unitest. Classes "
"in this list wiil be used as negative classes "
......
......@@ -15,8 +15,10 @@ limitations under the License. */
#pragma once
#include <math.h>
#include <iterator>
#include <random>
#include <set>
#include <string>
#include <vector>
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
......@@ -144,7 +146,54 @@ class NCEKernel : public framework::OpKernel<T> {
}
// forward mul
auto input_mat = EigenMatrix<T>::From(*(context.Input<Tensor>("Input")));
auto weight_mat = EigenMatrix<T>::From(*(context.Input<Tensor>("Weight")));
// for remote prefetch
auto epmap = context.Attr<std::vector<std::string>>("epmap");
if (!epmap.empty()) {
// if epmap is not empty, then the parameter will be fetched from remote
// parameter
// server
std::vector<int64_t> labels;
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
labels.push_back(sample_labels_data[i]);
}
std::set<T> st(labels.begin(), labels.end());
labels.assign(st.begin(), st.end());
auto &local_scope = context.scope().NewScope();
auto height_sections = context.Attr<std::vector<int>>("height_sections");
auto table_names = context.Attr<std::vector<std::string>>("table_names");
framework::Variable *ids = local_scope.Var("Ids");
framework::Variable *weight = local_scope.Var("Weight");
#ifdef PADDLE_WITH_DISTRIBUTE
operators::distributed::prefetch("Ids", "Weight", table_names, epmap,
height_sections, context);
#else
PADDLE_THROW(
"paddle is not compiled with distribute support, can not do "
"parameter prefetch!");
auto weight_mat = EigenMatrix<T>::From(*(weight->Get<T>()));
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
std::vector<int64_t>::iterator it =
std::find(labels.begin(), labels.end(), sample_labels_data[i]);
int idx = std::distance(labels.begin(), it);
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result =
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
weight_mat.chip(idx, 0))
.sum();
sample_out_data[i] += result(0);
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i])));
}
#endif
} else {
auto weight_mat =
EigenMatrix<T>::From(*(context.Input<Tensor>("Weight")));
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result =
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
......@@ -153,6 +202,8 @@ class NCEKernel : public framework::OpKernel<T> {
sample_out_data[i] += result(0);
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i])));
}
}
// forward cost
for (int64_t i = 0; i < sample_labels->dims()[0]; ++i) {
out_data[i] = 0;
......
......@@ -239,7 +239,7 @@ class DistributeTranspiler(object):
def _get_all_remote_sparse_update_op(self, main_program):
sparse_update_ops = []
sparse_update_op_types = ["lookup_table"]
sparse_update_op_types = ["lookup_table", "nce"]
for op in main_program.global_block().ops:
if op.type in sparse_update_op_types and op.attr(
'remote_prefetch') is True and not op.attr(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册