提交 c35fdf15 编写于 作者: J JiabinYang

Merge branch 'add_prefetch_in_nce' of https://github.com/seiriosPlus/Paddle...

Merge branch 'add_prefetch_in_nce' of https://github.com/seiriosPlus/Paddle into feature/add_prefech_hs
...@@ -117,6 +117,12 @@ static void MergeMultipleVarsIntoOneBySection( ...@@ -117,6 +117,12 @@ static void MergeMultipleVarsIntoOneBySection(
auto& id_tensor = scope->FindVar(id_name)->Get<framework::LoDTensor>(); auto& id_tensor = scope->FindVar(id_name)->Get<framework::LoDTensor>();
auto* out_tensor = auto* out_tensor =
scope->FindVar(out_name)->GetMutable<framework::LoDTensor>(); scope->FindVar(out_name)->GetMutable<framework::LoDTensor>();
PADDLE_ENFORCE_GT(
out_tensor->numel(), 0,
"When calling this method, the Tensor's numel must larger than zero. "
"Please check Tensor::Resize has been called first.");
auto* out_tensor_data = out_tensor->mutable_data<float>(id_tensor.place()); auto* out_tensor_data = out_tensor->mutable_data<float>(id_tensor.place());
bool is_on_cpu_place = true; bool is_on_cpu_place = true;
...@@ -172,8 +178,9 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -172,8 +178,9 @@ void prefetch(const std::string& id_name, const std::string& out_name,
const std::vector<std::string>& table_names, const std::vector<std::string>& table_names,
const std::vector<std::string>& epmap, const std::vector<std::string>& epmap,
const std::vector<int>& height_sections, const std::vector<int>& height_sections,
const framework::ExecutionContext& context) { const framework::ExecutionContext& context,
auto& local_scope = context.scope().NewScope(); const framework::Scope& scope) {
auto& local_scope = scope.NewScope();
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& cpu_ctx = *pool.Get(platform::CPUPlace()); auto& cpu_ctx = *pool.Get(platform::CPUPlace());
...@@ -190,7 +197,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -190,7 +197,7 @@ void prefetch(const std::string& id_name, const std::string& out_name,
out_var_names.push_back(out_name + "@" + epmap[i]); out_var_names.push_back(out_name + "@" + epmap[i]);
} }
auto& id_tensor = local_scope.FindVar(id_name)->Get<framework::LoDTensor>(); auto& id_tensor = scope.FindVar(id_name)->Get<framework::LoDTensor>();
std::vector<int64_t> ids_vector; std::vector<int64_t> ids_vector;
if (platform::is_cpu_place(id_tensor.place())) { if (platform::is_cpu_place(id_tensor.place())) {
auto* id_data = id_tensor.data<int64_t>(); auto* id_data = id_tensor.data<int64_t>();
...@@ -246,8 +253,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -246,8 +253,7 @@ void prefetch(const std::string& id_name, const std::string& out_name,
MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name, MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name,
out_var_names, height_sections, splited_ids, out_var_names, height_sections, splited_ids,
context, &local_scope, &actual_ctx); context, &local_scope, &actual_ctx);
scope.DeleteScope(&local_scope);
context.scope().DeleteScope(&local_scope);
} }
}; // namespace distributed }; // namespace distributed
......
...@@ -27,7 +27,8 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -27,7 +27,8 @@ void prefetch(const std::string& id_name, const std::string& out_name,
const std::vector<std::string>& table_names, const std::vector<std::string>& table_names,
const std::vector<std::string>& epmap, const std::vector<std::string>& epmap,
const std::vector<int>& height_sections, const std::vector<int>& height_sections,
const framework::ExecutionContext& context); const framework::ExecutionContext& context,
const framework::Scope& scope);
}; // namespace distributed }; // namespace distributed
}; // namespace operators }; // namespace operators
......
...@@ -59,7 +59,8 @@ class LookupTableKernel : public framework::OpKernel<T> { ...@@ -59,7 +59,8 @@ class LookupTableKernel : public framework::OpKernel<T> {
// server // server
#ifdef PADDLE_WITH_DISTRIBUTE #ifdef PADDLE_WITH_DISTRIBUTE
operators::distributed::prefetch(id_name, out_name, table_names, epmap, operators::distributed::prefetch(id_name, out_name, table_names, epmap,
height_sections, context); height_sections, context,
context.scope());
#else #else
PADDLE_THROW( PADDLE_THROW(
"paddle is not compiled with distribute support, can not do " "paddle is not compiled with distribute support, can not do "
......
...@@ -155,6 +155,24 @@ class NCEOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -155,6 +155,24 @@ class NCEOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<bool>("is_sparse", "(boolean, default false) Sparse update.") AddAttr<bool>("is_sparse", "(boolean, default false) Sparse update.")
.SetDefault(false); .SetDefault(false);
// for parameter prefetch
AddAttr<bool>("remote_prefetch", "").SetDefault(false);
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0);
AddAttr<std::vector<int>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int>({}));
AddAttr<std::vector<std::string>>(
"epmap",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input variables for mapping")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"table_names",
"(string vector, the splited table names that will be fetched from "
"parameter server)"
"in the order of input variables for mapping")
.SetDefault({});
AddAttr<std::vector<int>>("custom_neg_classes", AddAttr<std::vector<int>>("custom_neg_classes",
"This attribute only be used in unitest. Classes " "This attribute only be used in unitest. Classes "
"in this list wiil be used as negative classes " "in this list wiil be used as negative classes "
...@@ -225,24 +243,20 @@ class NCEOpGradVarTypeInference : public framework::VarTypeInference { ...@@ -225,24 +243,20 @@ class NCEOpGradVarTypeInference : public framework::VarTypeInference {
void operator()(const framework::OpDesc &op_desc, void operator()(const framework::OpDesc &op_desc,
framework::BlockDesc *block) const override { framework::BlockDesc *block) const override {
auto weight_grad = op_desc.Output(framework::GradVarName("Weight")).front(); auto weight_grad = op_desc.Output(framework::GradVarName("Weight")).front();
auto bias_grad = op_desc.Output(framework::GradVarName("Bias")).front();
auto attr = op_desc.GetAttr("is_sparse"); auto attr = op_desc.GetAttr("is_sparse");
bool is_sparse = boost::get<bool>(attr); bool is_sparse = boost::get<bool>(attr);
if (is_sparse) { if (is_sparse) {
VLOG(3) << "nce_op_grad op " << weight_grad << " and " << bias_grad VLOG(3) << "nce_op_grad op " << weight_grad << " and "
<< " is set to SelectedRows"; << " is set to SelectedRows";
block->Var(weight_grad) block->Var(weight_grad)
->SetType(framework::proto::VarType::SELECTED_ROWS); ->SetType(framework::proto::VarType::SELECTED_ROWS);
block->Var(bias_grad)->SetType(framework::proto::VarType::SELECTED_ROWS);
} else { } else {
VLOG(3) << "nce_op_grad op " << weight_grad << " and " << bias_grad VLOG(3) << "nce_op_grad op " << weight_grad << " and "
<< " is set to LoDTensor"; << " is set to LoDTensor";
block->Var(weight_grad)->SetType(framework::proto::VarType::LOD_TENSOR); block->Var(weight_grad)->SetType(framework::proto::VarType::LOD_TENSOR);
block->Var(bias_grad)->SetType(framework::proto::VarType::LOD_TENSOR);
} }
block->Var(weight_grad)->SetDataType(block->Var("Input")->GetDataType()); block->Var(weight_grad)->SetDataType(block->Var("Input")->GetDataType());
block->Var(bias_grad)->SetDataType(block->Var("Input")->GetDataType());
} }
}; };
......
...@@ -15,8 +15,10 @@ limitations under the License. */ ...@@ -15,8 +15,10 @@ limitations under the License. */
#pragma once #pragma once
#include <math.h> #include <math.h>
#include <iterator>
#include <random> #include <random>
#include <set> #include <set>
#include <string>
#include <vector> #include <vector>
#include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
...@@ -24,6 +26,10 @@ limitations under the License. */ ...@@ -24,6 +26,10 @@ limitations under the License. */
#include "paddle/fluid/operators/math/sampler.h" #include "paddle/fluid/operators/math/sampler.h"
#include "unsupported/Eigen/CXX11/Tensor" #include "unsupported/Eigen/CXX11/Tensor"
#ifdef PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/distributed/parameter_prefetch.h"
#endif
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -43,7 +49,6 @@ void PrepareSamples(const framework::ExecutionContext &context, ...@@ -43,7 +49,6 @@ void PrepareSamples(const framework::ExecutionContext &context,
auto label = context.Input<Tensor>("Label"); auto label = context.Input<Tensor>("Label");
const int64_t *label_data = label->data<int64_t>(); const int64_t *label_data = label->data<int64_t>();
auto label_dims = label->dims(); auto label_dims = label->dims();
// int num_total_classes = context.Attr<int>("num_total_classes");
// for unitest // for unitest
std::vector<int> custom_neg_classes = std::vector<int> custom_neg_classes =
context.Attr<std::vector<int>>("custom_neg_classes"); context.Attr<std::vector<int>>("custom_neg_classes");
...@@ -144,15 +149,82 @@ class NCEKernel : public framework::OpKernel<T> { ...@@ -144,15 +149,82 @@ class NCEKernel : public framework::OpKernel<T> {
} }
// forward mul // forward mul
auto input_mat = EigenMatrix<T>::From(*(context.Input<Tensor>("Input"))); auto input_mat = EigenMatrix<T>::From(*(context.Input<Tensor>("Input")));
auto weight_mat = EigenMatrix<T>::From(*(context.Input<Tensor>("Weight")));
for (int64_t i = 0; i < sample_labels->numel(); ++i) { // for remote prefetch
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result = auto epmap = context.Attr<std::vector<std::string>>("epmap");
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
weight_mat.chip(sample_labels_data[i], 0)) if (!epmap.empty()) {
.sum(); // if epmap is not empty, then the parameter will be fetched from remote
sample_out_data[i] += result(0); // parameter
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i]))); // server
std::vector<int64_t> labels;
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
labels.push_back(sample_labels_data[i]);
}
std::set<T> st(labels.begin(), labels.end());
labels.assign(st.begin(), st.end());
framework::Scope &local_scope = context.scope().NewScope();
auto height_sections = context.Attr<std::vector<int>>("height_sections");
auto table_names = context.Attr<std::vector<std::string>>("table_names");
auto *ids = local_scope.Var("Ids@Prefetch");
auto *x_tensor = ids->GetMutable<framework::LoDTensor>();
x_tensor->mutable_data<int64_t>(
framework::make_ddim({static_cast<int64_t>(labels.size()), 1}),
context.GetPlace());
// copy.
std::memcpy(x_tensor->data<int64_t>(), labels.data(),
labels.size() * sizeof(int64_t));
std::vector<int> w_dims = paddle::framework::vectorize2int(
context.Input<Tensor>("Weight")->dims());
w_dims[0] = static_cast<int>(labels.size());
auto *w_tensor = local_scope.Var("Weight@Prefetch")
->GetMutable<framework::LoDTensor>();
w_tensor->Resize(framework::make_ddim(w_dims));
#ifdef PADDLE_WITH_DISTRIBUTE
operators::distributed::prefetch("Ids@Prefetch", "Weight@Prefetch",
table_names, epmap, height_sections,
context, local_scope);
#else
PADDLE_THROW(
"paddle is not compiled with distribute support, can not do "
"parameter prefetch!");
#endif
auto weight_mat = EigenMatrix<T>::From(
(local_scope.Var("Weight@Prefetch")->Get<framework::LoDTensor>()));
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
std::vector<int64_t>::iterator it =
std::find(labels.begin(), labels.end(), sample_labels_data[i]);
int idx = std::distance(labels.begin(), it);
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result =
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
weight_mat.chip(idx, 0))
.sum();
sample_out_data[i] += result(0);
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i])));
}
context.scope().DeleteScope(&local_scope);
} else {
auto weight_mat =
EigenMatrix<T>::From(*(context.Input<Tensor>("Weight")));
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result =
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
weight_mat.chip(sample_labels_data[i], 0))
.sum();
sample_out_data[i] += result(0);
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i])));
}
} }
// forward cost // forward cost
for (int64_t i = 0; i < sample_labels->dims()[0]; ++i) { for (int64_t i = 0; i < sample_labels->dims()[0]; ++i) {
out_data[i] = 0; out_data[i] = 0;
...@@ -240,18 +312,19 @@ class NCEGradKernel : public framework::OpKernel<T> { ...@@ -240,18 +312,19 @@ class NCEGradKernel : public framework::OpKernel<T> {
sample_grad_data[i] *= d_out_data[sample_idx]; sample_grad_data[i] *= d_out_data[sample_idx];
} }
// get d_bias
auto d_bias = context.Output<Tensor>(framework::GradVarName("Bias"));
if (d_bias != nullptr) {
T *d_bias_data = d_bias->mutable_data<T>(context.GetPlace());
std::fill(d_bias_data, d_bias_data + d_bias->numel(), 0.0);
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
d_bias_data[sample_labels_data[i]] += sample_grad_data[i];
}
}
bool is_sparse = context.Attr<bool>("is_sparse"); bool is_sparse = context.Attr<bool>("is_sparse");
if (!is_sparse) { if (!is_sparse) {
// get d_bias
auto d_bias = context.Output<Tensor>(framework::GradVarName("Bias"));
if (d_bias != nullptr) {
T *d_bias_data = d_bias->mutable_data<T>(context.GetPlace());
std::fill(d_bias_data, d_bias_data + d_bias->numel(), 0.0);
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
d_bias_data[sample_labels_data[i]] += sample_grad_data[i];
}
}
// get d_w // get d_w
auto d_w = context.Output<Tensor>(framework::GradVarName("Weight")); auto d_w = context.Output<Tensor>(framework::GradVarName("Weight"));
if (d_w != nullptr) { if (d_w != nullptr) {
...@@ -273,34 +346,6 @@ class NCEGradKernel : public framework::OpKernel<T> { ...@@ -273,34 +346,6 @@ class NCEGradKernel : public framework::OpKernel<T> {
std::set<T> st(labels.begin(), labels.end()); std::set<T> st(labels.begin(), labels.end());
labels.assign(st.begin(), st.end()); labels.assign(st.begin(), st.end());
auto *bias_var = context.InputVar("Bias");
DDim bias_dim;
if (bias_var->IsType<LoDTensor>()) {
bias_dim = context.Input<LoDTensor>("Bias")->dims();
} else if (bias_var->IsType<SelectedRows>()) {
auto *table_t = context.Input<SelectedRows>("Bias");
bias_dim = table_t->value().dims();
} else {
PADDLE_THROW(
"The parameter Bias of a NCE_OP "
"must be either LoDTensor or SelectedRows");
}
auto d_bias =
context.Output<SelectedRows>(framework::GradVarName("Bias"));
d_bias->set_rows(labels);
d_bias->set_height(bias_dim[0]);
d_bias->mutable_value()->Resize(
{static_cast<int64_t>(labels.size()), bias_dim[1]});
T *d_bias_data =
d_bias->mutable_value()->mutable_data<T>(context.GetPlace());
std::fill(d_bias_data, d_bias_data + labels.size(), 0.0);
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
d_bias_data[d_bias->Index(sample_labels_data[i])] +=
sample_grad_data[i];
}
auto *table_var = context.InputVar("Weight"); auto *table_var = context.InputVar("Weight");
DDim table_dim; DDim table_dim;
if (table_var->IsType<LoDTensor>()) { if (table_var->IsType<LoDTensor>()) {
......
...@@ -24,7 +24,7 @@ from ..initializer import Normal, Constant ...@@ -24,7 +24,7 @@ from ..initializer import Normal, Constant
from ..framework import Variable, OpProtoHolder from ..framework import Variable, OpProtoHolder
from ..param_attr import ParamAttr from ..param_attr import ParamAttr
from .layer_function_generator import autodoc, templatedoc, _generate_doc_string_ from .layer_function_generator import autodoc, templatedoc, _generate_doc_string_
from .tensor import concat from .tensor import concat, assign
from . import utils from . import utils
from .. import unique_name from .. import unique_name
from functools import reduce from functools import reduce
...@@ -4811,12 +4811,17 @@ def nce(input, ...@@ -4811,12 +4811,17 @@ def nce(input,
else: else:
num_neg_samples = int(num_neg_samples) num_neg_samples = int(num_neg_samples)
remote_prefetch = False
if os.environ.get('PADDLE_ENABLE_REMOTE_PREFETCH'):
remote_prefetch = True
attrs = { attrs = {
'num_total_classes': int(num_total_classes), 'num_total_classes': int(num_total_classes),
'num_neg_samples': num_neg_samples, 'num_neg_samples': num_neg_samples,
'seed': seed, 'seed': seed,
'sampler': sampler, 'sampler': sampler,
'is_sparse': is_sparse 'is_sparse': is_sparse,
'remote_prefetch': remote_prefetch
} }
helper.append_op( helper.append_op(
......
...@@ -14,14 +14,15 @@ ...@@ -14,14 +14,15 @@
from __future__ import print_function from __future__ import print_function
import traceback
import math import math
import collections
import six
import unittest import unittest
import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import delete_ops
import traceback
import collections
import six
class TranspilerTest(unittest.TestCase): class TranspilerTest(unittest.TestCase):
...@@ -824,5 +825,55 @@ class TestRemoteLookupTable(TestDistLookupTableBase): ...@@ -824,5 +825,55 @@ class TestRemoteLookupTable(TestDistLookupTableBase):
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops) self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
# test for remote prefetch
class TestRemoteNce(TestDistLookupTableBase):
def network_with_table(self, is_sparse, is_distributed):
num_total_classes = 20
sampler = "uniform"
nid_freq_arr = np.random.dirichlet(np.ones(20) * 1000).astype('float32')
input = fluid.layers.data(name="input", shape=[10], dtype="float32")
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
w_param = fluid.default_main_program().global_block().create_parameter(
shape=[num_total_classes, 10],
dtype='float32',
name='nce_w',
initializer=fluid.initializer.ConstantInitializer())
b_param = fluid.default_main_program().global_block().create_parameter(
shape=[num_total_classes, 1],
dtype='float32',
name='nce_b',
initializer=fluid.initializer.ConstantInitializer())
cost = fluid.layers.nce(input=input,
label=label,
num_total_classes=num_total_classes,
sampler=sampler,
custom_dist=nid_freq_arr.tolist(),
sample_weight=None,
param_attr='nce_w',
bias_attr='nce_b',
seed=1,
num_neg_samples=5,
is_sparse=is_sparse)
avg_cost = fluid.layers.mean(cost)
# optimizer
optimizer = fluid.optimizer.Adam(learning_rate=0.003)
optimizer.minimize(avg_cost)
def net_conf(self):
import os
os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1"
self.network_with_table(is_sparse=True, is_distributed=False)
def transpiler_test_impl(self):
trainer, _ = self.get_trainer()
for op in trainer.blocks[0].ops:
if op.type == "recv":
pass
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -242,11 +242,10 @@ class DistributeTranspiler(object): ...@@ -242,11 +242,10 @@ class DistributeTranspiler(object):
def _get_all_remote_sparse_update_op(self, main_program): def _get_all_remote_sparse_update_op(self, main_program):
sparse_update_ops = [] sparse_update_ops = []
sparse_update_op_types = ["lookup_table"] sparse_update_op_types = ["lookup_table", "nce"]
for op in main_program.global_block().ops: for op in main_program.global_block().ops:
if op.type in sparse_update_op_types and op.attr( if op.type in sparse_update_op_types and op.attr(
'remote_prefetch') is True and not op.attr( 'remote_prefetch') is True:
'is_distributed'):
sparse_update_ops.append(op) sparse_update_ops.append(op)
return sparse_update_ops return sparse_update_ops
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册