提交 a5d1f9cf 编写于 作者: P phlrain

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into fix_shape_check_many

...@@ -47,33 +47,34 @@ find_package(Threads REQUIRED) ...@@ -47,33 +47,34 @@ find_package(Threads REQUIRED)
include(simd) include(simd)
################################ Configurations ####################################### ################################ Exposed Configurations #######################################
option(WITH_GPU "Compile PaddlePaddle with NVIDIA GPU" ${CUDA_FOUND}) option(WITH_GPU "Compile PaddlePaddle with NVIDIA GPU" ${CUDA_FOUND})
option(WITH_AMD_GPU "Compile PaddlePaddle with AMD GPU" OFF) option(WITH_DSO "Compile PaddlePaddle with dynamic linked CUDA" ON)
option(WITH_AVX "Compile PaddlePaddle with AVX intrinsics" ${AVX_FOUND}) option(WITH_AVX "Compile PaddlePaddle with AVX intrinsics" ${AVX_FOUND})
option(WITH_PYTHON "Compile PaddlePaddle with python interpreter" ON)
option(WITH_TESTING "Compile PaddlePaddle with unit testing" OFF)
option(WITH_MKL "Compile PaddlePaddle with MKL support." ${AVX_FOUND}) option(WITH_MKL "Compile PaddlePaddle with MKL support." ${AVX_FOUND})
option(WITH_SYSTEM_BLAS "Use system blas library" OFF)
option(WITH_DISTRIBUTE "Compile with distributed support" OFF)
option(WITH_BRPC_RDMA "Use brpc rdma as the rpc protocal" OFF)
option(ON_INFER "Turn on inference optimization." OFF)
option(WITH_ANAKIN "Compile with Anakin library" OFF)
################################ Internal Configurations #######################################
option(WITH_AMD_GPU "Compile PaddlePaddle with AMD GPU" OFF)
option(WITH_NGRAPH "Compile PaddlePaddle with nGraph support." OFF) option(WITH_NGRAPH "Compile PaddlePaddle with nGraph support." OFF)
option(WITH_DSO "Compile PaddlePaddle with dynamic linked CUDA" ON)
option(WITH_TESTING "Compile PaddlePaddle with unit testing" OFF)
option(WITH_PYTHON "Compile PaddlePaddle with python interpreter" ON)
option(WITH_PROFILER "Compile PaddlePaddle with GPU profiler and gperftools" OFF) option(WITH_PROFILER "Compile PaddlePaddle with GPU profiler and gperftools" OFF)
option(WITH_JEMALLOC "Compile PaddlePaddle with jemalloc" OFF) option(WITH_JEMALLOC "Compile PaddlePaddle with jemalloc" OFF)
option(WITH_COVERAGE "Compile PaddlePaddle with code coverage" OFF) option(WITH_COVERAGE "Compile PaddlePaddle with code coverage" OFF)
option(COVERALLS_UPLOAD "Package code coverage data to coveralls" OFF) option(COVERALLS_UPLOAD "Package code coverage data to coveralls" OFF)
option(WITH_DISTRIBUTE "Compile with distributed support" OFF)
option(WITH_PSLIB "Compile with pslib support" OFF) option(WITH_PSLIB "Compile with pslib support" OFF)
option(WITH_CONTRIB "Compile the third-party contributation" OFF) option(WITH_CONTRIB "Compile the third-party contributation" OFF)
option(REPLACE_ENFORCE_GLOG "Replace PADDLE_ENFORCE with glog/CHECK for better debug." OFF) option(REPLACE_ENFORCE_GLOG "Replace PADDLE_ENFORCE with glog/CHECK for better debug." OFF)
# TODO(Superjomn) Remove WITH_ANAKIN option if not needed latter. # TODO(Superjomn) Remove WITH_ANAKIN option if not needed latter.
option(WITH_ANAKIN "Compile with Anakin library" OFF)
option(ANAKIN_BUILD_FAT_BIN "Build anakin cuda fat-bin lib for all device plantform, ignored when WITH_ANAKIN=OFF" OFF) option(ANAKIN_BUILD_FAT_BIN "Build anakin cuda fat-bin lib for all device plantform, ignored when WITH_ANAKIN=OFF" OFF)
option(ANAKIN_BUILD_CROSS_PLANTFORM "Build anakin lib for any nvidia device plantform. ignored when WITH_ANAKIN=OFF" ON) option(ANAKIN_BUILD_CROSS_PLANTFORM "Build anakin lib for any nvidia device plantform. ignored when WITH_ANAKIN=OFF" ON)
option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE}) option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE})
option(WITH_BRPC_RDMA "Use brpc rdma as the rpc protocal" OFF)
option(ON_INFER "Turn on inference optimization." OFF)
option(WITH_INFERENCE_API_TEST "Test fluid inference C++ high-level api interface" OFF) option(WITH_INFERENCE_API_TEST "Test fluid inference C++ high-level api interface" OFF)
option(WITH_HIGH_LEVEL_API_TEST "Test fluid python high-level api interface" OFF) option(WITH_HIGH_LEVEL_API_TEST "Test fluid python high-level api interface" OFF)
option(WITH_SYSTEM_BLAS "Use system blas library" OFF)
option(PY_VERSION "Compile PaddlePaddle with python3 support" ${PY_VERSION}) option(PY_VERSION "Compile PaddlePaddle with python3 support" ${PY_VERSION})
option(WITH_FAST_MATH "Make use of fast math library, might affect the precision to some extent" ON) option(WITH_FAST_MATH "Make use of fast math library, might affect the precision to some extent" ON)
......
...@@ -241,6 +241,7 @@ paddle.fluid.layers.tree_conv (ArgSpec(args=['nodes_vector', 'edge_set', 'output ...@@ -241,6 +241,7 @@ paddle.fluid.layers.tree_conv (ArgSpec(args=['nodes_vector', 'edge_set', 'output
paddle.fluid.layers.npair_loss (ArgSpec(args=['anchor', 'positive', 'labels', 'l2_reg'], varargs=None, keywords=None, defaults=(0.002,)), ('document', '46994d10276dd4cb803b4062b5d14329')) paddle.fluid.layers.npair_loss (ArgSpec(args=['anchor', 'positive', 'labels', 'l2_reg'], varargs=None, keywords=None, defaults=(0.002,)), ('document', '46994d10276dd4cb803b4062b5d14329'))
paddle.fluid.layers.pixel_shuffle (ArgSpec(args=['x', 'upscale_factor'], varargs=None, keywords=None, defaults=None), ('document', '731b21c62a4add60a33bd76d802ffc5c')) paddle.fluid.layers.pixel_shuffle (ArgSpec(args=['x', 'upscale_factor'], varargs=None, keywords=None, defaults=None), ('document', '731b21c62a4add60a33bd76d802ffc5c'))
paddle.fluid.layers.fsp_matrix (ArgSpec(args=['x', 'y'], varargs=None, keywords=None, defaults=None), ('document', 'b76ccca3735bea4a58a0dbf0d77c5393')) paddle.fluid.layers.fsp_matrix (ArgSpec(args=['x', 'y'], varargs=None, keywords=None, defaults=None), ('document', 'b76ccca3735bea4a58a0dbf0d77c5393'))
paddle.fluid.layers.continuous_value_model (ArgSpec(args=['input', 'cvm', 'use_cvm'], varargs=None, keywords=None, defaults=(True,)), ('document', 'a07a44c2bacdcd09c1f5f35a96a0514e'))
paddle.fluid.layers.data (ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True)), ('document', '33bbd42027d872b3818b3d64ec52e139')) paddle.fluid.layers.data (ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True)), ('document', '33bbd42027d872b3818b3d64ec52e139'))
paddle.fluid.layers.open_files (ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None)), ('document', 'b1ae2e1cc0750e58726374061ea90ecc')) paddle.fluid.layers.open_files (ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None)), ('document', 'b1ae2e1cc0750e58726374061ea90ecc'))
paddle.fluid.layers.read_file (ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None), ('document', 'b0a1c2fc51c27a106da28f3308c41f5e')) paddle.fluid.layers.read_file (ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None), ('document', 'b0a1c2fc51c27a106da28f3308c41f5e'))
......
...@@ -64,9 +64,12 @@ void ProcessGraph(std::vector<ir::Graph *> graphs, Scope *scope) { ...@@ -64,9 +64,12 @@ void ProcessGraph(std::vector<ir::Graph *> graphs, Scope *scope) {
node->Op()->GetNullableAttr("epmap")); node->Op()->GetNullableAttr("epmap"));
auto height_section = boost::get<std::vector<int64_t>>( auto height_section = boost::get<std::vector<int64_t>>(
node->Op()->GetNullableAttr("sections")); node->Op()->GetNullableAttr("sections"));
auto trainer_id =
boost::get<int>(node->Op()->GetNullableAttr("trainer_id"));
send_varname_to_ctx[send_var_name] = send_varname_to_ctx[send_var_name] =
operators::distributed::RpcContext(send_var_name, send_varnames, operators::distributed::RpcContext(send_var_name, send_varnames,
epmap, height_section); epmap, height_section,
trainer_id);
VLOG(3) << "find and init an send op: " VLOG(3) << "find and init an send op: "
<< send_varname_to_ctx[send_var_name]; << send_varname_to_ctx[send_var_name];
} else if (node->Name() == "recv") { } else if (node->Name() == "recv") {
...@@ -75,9 +78,11 @@ void ProcessGraph(std::vector<ir::Graph *> graphs, Scope *scope) { ...@@ -75,9 +78,11 @@ void ProcessGraph(std::vector<ir::Graph *> graphs, Scope *scope) {
node->Op()->GetNullableAttr("recv_varnames")); node->Op()->GetNullableAttr("recv_varnames"));
auto epmap = boost::get<std::vector<std::string>>( auto epmap = boost::get<std::vector<std::string>>(
node->Op()->GetNullableAttr("epmap")); node->Op()->GetNullableAttr("epmap"));
auto trainer_id =
boost::get<int>(node->Op()->GetNullableAttr("trainer_id"));
recv_varname_to_ctx[recv_var_name] = recv_varname_to_ctx[recv_var_name] =
operators::distributed::RpcContext(recv_var_name, recv_varnames, operators::distributed::RpcContext(recv_var_name, recv_varnames,
epmap, {}); epmap, {}, trainer_id);
nodes_to_delete.push_back(node); nodes_to_delete.push_back(node);
VLOG(3) << "find and remove an recv op: " VLOG(3) << "find and remove an recv op: "
<< recv_varname_to_ctx[recv_var_name]; << recv_varname_to_ctx[recv_var_name];
......
...@@ -832,6 +832,45 @@ std::string AnalysisPredictor::GetSerializedProgram() const { ...@@ -832,6 +832,45 @@ std::string AnalysisPredictor::GetSerializedProgram() const {
return inference_program_->Proto()->SerializeAsString(); return inference_program_->Proto()->SerializeAsString();
} }
// Add SaveOptimModel
void AnalysisPredictor::SaveOptimModel(const std::string &dir) {
// save model
std::string model_name = dir + "/model";
std::ofstream outfile;
outfile.open(model_name, std::ios::out | std::ios::binary);
std::string inference_prog_desc = GetSerializedProgram();
outfile << inference_prog_desc;
// save params
framework::ProgramDesc save_program;
auto *save_block = save_program.MutableBlock(0);
const framework::ProgramDesc &main_program = program();
const framework::BlockDesc &global_block = main_program.Block(0);
std::vector<std::string> save_var_list;
for (framework::VarDesc *var : global_block.AllVars()) {
if (IsPersistable(var)) {
framework::VarDesc *new_var = save_block->Var(var->Name());
new_var->SetShape(var->GetShape());
new_var->SetDataType(var->GetDataType());
new_var->SetType(var->GetType());
new_var->SetLoDLevel(var->GetLoDLevel());
new_var->SetPersistable(true);
save_var_list.push_back(new_var->Name());
}
}
std::sort(save_var_list.begin(), save_var_list.end());
auto *op = save_block->AppendOp();
op->SetType("save_combine");
op->SetInput("X", save_var_list);
op->SetAttr("file_path", dir + "/params");
op->CheckAttrs();
platform::CPUPlace place;
framework::Executor exe(place);
exe.Run(save_program, scope(), 0, true, true);
}
template <> template <>
std::unique_ptr<PaddlePredictor> CreatePaddlePredictor<AnalysisConfig>( std::unique_ptr<PaddlePredictor> CreatePaddlePredictor<AnalysisConfig>(
const AnalysisConfig &config) { const AnalysisConfig &config) {
......
...@@ -86,6 +86,10 @@ class AnalysisPredictor : public PaddlePredictor { ...@@ -86,6 +86,10 @@ class AnalysisPredictor : public PaddlePredictor {
bool MkldnnQuantize(); bool MkldnnQuantize();
// save program to model
// save parameters to params
void SaveOptimModel(const std::string &dir);
protected: protected:
// For memory optimization. // For memory optimization.
bool need_collect_var_shapes_for_memory_optim(); bool need_collect_var_shapes_for_memory_optim();
......
...@@ -196,6 +196,9 @@ TEST(AnalysisPredictor, Clone) { ...@@ -196,6 +196,9 @@ TEST(AnalysisPredictor, Clone) {
} }
} }
// This function is not released yet, will fail on some machine.
// TODO(Superjomn) Turn on it latter.
/*
TEST(AnalysisPredictor, memory_optim) { TEST(AnalysisPredictor, memory_optim) {
AnalysisConfig config(FLAGS_dirname); AnalysisConfig config(FLAGS_dirname);
config.DisableGpu(); config.DisableGpu();
...@@ -246,6 +249,7 @@ TEST(AnalysisPredictor, memory_optim) { ...@@ -246,6 +249,7 @@ TEST(AnalysisPredictor, memory_optim) {
inference::CompareResult(output, output1); inference::CompareResult(output, output1);
} }
*/
#ifdef PADDLE_WITH_MKLDNN #ifdef PADDLE_WITH_MKLDNN
class MkldnnQuantizerTest : public testing::Test { class MkldnnQuantizerTest : public testing::Test {
......
...@@ -170,6 +170,15 @@ void SetConfig(AnalysisConfig *cfg) { ...@@ -170,6 +170,15 @@ void SetConfig(AnalysisConfig *cfg) {
cfg->SwitchIrOptim(true); cfg->SwitchIrOptim(true);
} }
void SetOptimConfig(AnalysisConfig *cfg) {
std::string optimModelPath =
FLAGS_infer_model.substr(0, FLAGS_infer_model.find_last_of("/")) +
"/saved_optim_model";
cfg->SetModel(optimModelPath + "/model", optimModelPath + "/params");
cfg->SwitchIrOptim(true);
cfg->SwitchSpecifyInputNames();
}
void SetInput(std::vector<std::vector<PaddleTensor>> *inputs) { void SetInput(std::vector<std::vector<PaddleTensor>> *inputs) {
DataRecord data(FLAGS_infer_data, FLAGS_batch_size); DataRecord data(FLAGS_infer_data, FLAGS_batch_size);
std::vector<PaddleTensor> input_slots; std::vector<PaddleTensor> input_slots;
...@@ -315,5 +324,44 @@ TEST(Analyzer_dam, compare_determine) { ...@@ -315,5 +324,44 @@ TEST(Analyzer_dam, compare_determine) {
input_slots_all); input_slots_all);
} }
// Save optim model
TEST(Analyzer_dam, save_optim_model) {
AnalysisConfig cfg;
SetConfig(&cfg);
std::string optimModelPath =
FLAGS_infer_model.substr(0, FLAGS_infer_model.find_last_of("/")) +
"/saved_optim_model";
mkdir(optimModelPath.c_str(), 0777);
auto predictor = CreateTestPredictor(
reinterpret_cast<const PaddlePredictor::Config *>(&cfg),
FLAGS_use_analysis);
(static_cast<AnalysisPredictor *>(predictor.get()))
->SaveOptimModel(optimModelPath);
}
void CompareOptimAndOrig(const PaddlePredictor::Config *orig_config,
const PaddlePredictor::Config *optim_config,
const std::vector<std::vector<PaddleTensor>> &inputs) {
PrintConfig(orig_config, true);
PrintConfig(optim_config, true);
std::vector<std::vector<PaddleTensor>> orig_outputs, optim_outputs;
TestOneThreadPrediction(orig_config, inputs, &orig_outputs, false);
TestOneThreadPrediction(optim_config, inputs, &optim_outputs, false);
CompareResult(orig_outputs.back(), optim_outputs.back());
}
TEST(Analyzer_dam, compare_optim_orig) {
AnalysisConfig orig_cfg;
AnalysisConfig optim_cfg;
SetConfig(&orig_cfg);
SetOptimConfig(&optim_cfg);
std::vector<std::vector<PaddleTensor>> input_slots_all;
SetInput(&input_slots_all);
CompareOptimAndOrig(
reinterpret_cast<const PaddlePredictor::Config *>(&orig_cfg),
reinterpret_cast<const PaddlePredictor::Config *>(&optim_cfg),
input_slots_all);
}
} // namespace inference } // namespace inference
} // namespace paddle } // namespace paddle
...@@ -32,6 +32,17 @@ void SetInput(std::vector<std::vector<PaddleTensor>> *inputs) { ...@@ -32,6 +32,17 @@ void SetInput(std::vector<std::vector<PaddleTensor>> *inputs) {
SetFakeImageInput(inputs, FLAGS_infer_model); SetFakeImageInput(inputs, FLAGS_infer_model);
} }
void SetOptimConfig(AnalysisConfig *cfg) {
std::string optimModelPath =
FLAGS_infer_model.substr(0, FLAGS_infer_model.find_last_of("/")) +
"/saved_optim_model";
cfg->SetModel(optimModelPath + "/model", optimModelPath + "/params");
cfg->DisableGpu();
cfg->SwitchIrOptim();
cfg->SwitchSpecifyInputNames();
cfg->SetCpuMathLibraryNumThreads(FLAGS_paddle_num_threads);
}
// Easy for profiling independently. // Easy for profiling independently.
void profile(bool use_mkldnn = false) { void profile(bool use_mkldnn = false) {
AnalysisConfig cfg; AnalysisConfig cfg;
...@@ -87,13 +98,51 @@ TEST(Analyzer_resnet50, compare_mkldnn) { compare(true /* use_mkldnn */); } ...@@ -87,13 +98,51 @@ TEST(Analyzer_resnet50, compare_mkldnn) { compare(true /* use_mkldnn */); }
TEST(Analyzer_resnet50, compare_determine) { TEST(Analyzer_resnet50, compare_determine) {
AnalysisConfig cfg; AnalysisConfig cfg;
SetConfig(&cfg); SetConfig(&cfg);
std::vector<std::vector<PaddleTensor>> input_slots_all; std::vector<std::vector<PaddleTensor>> input_slots_all;
SetInput(&input_slots_all); SetInput(&input_slots_all);
CompareDeterministic(reinterpret_cast<const PaddlePredictor::Config *>(&cfg), CompareDeterministic(reinterpret_cast<const PaddlePredictor::Config *>(&cfg),
input_slots_all); input_slots_all);
} }
// Save optim model
TEST(Analyzer_resnet50, save_optim_model) {
AnalysisConfig cfg;
SetConfig(&cfg);
std::string optimModelPath =
FLAGS_infer_model.substr(0, FLAGS_infer_model.find_last_of("/")) +
"/saved_optim_model";
mkdir(optimModelPath.c_str(), 0777);
auto predictor = CreateTestPredictor(
reinterpret_cast<const PaddlePredictor::Config *>(&cfg),
FLAGS_use_analysis);
(static_cast<AnalysisPredictor *>(predictor.get()))
->SaveOptimModel(optimModelPath);
}
void CompareOptimAndOrig(const PaddlePredictor::Config *orig_config,
const PaddlePredictor::Config *optim_config,
const std::vector<std::vector<PaddleTensor>> &inputs) {
PrintConfig(orig_config, true);
PrintConfig(optim_config, true);
std::vector<std::vector<PaddleTensor>> orig_outputs, optim_outputs;
TestOneThreadPrediction(orig_config, inputs, &orig_outputs, false);
TestOneThreadPrediction(optim_config, inputs, &optim_outputs, false);
CompareResult(orig_outputs.back(), optim_outputs.back());
}
TEST(Analyzer_resnet50, compare_optim_orig) {
AnalysisConfig orig_cfg;
AnalysisConfig optim_cfg;
SetConfig(&orig_cfg);
SetOptimConfig(&optim_cfg);
std::vector<std::vector<PaddleTensor>> input_slots_all;
SetInput(&input_slots_all);
CompareOptimAndOrig(
reinterpret_cast<const PaddlePredictor::Config *>(&orig_cfg),
reinterpret_cast<const PaddlePredictor::Config *>(&optim_cfg),
input_slots_all);
}
} // namespace analysis } // namespace analysis
} // namespace inference } // namespace inference
} // namespace paddle } // namespace paddle
...@@ -29,8 +29,6 @@ pool3d ...@@ -29,8 +29,6 @@ pool3d
prelu prelu
quantize quantize
rank_loss rank_loss
reduce_all
reduce_any
reduce_max reduce_max
reduce_mean reduce_mean
reduce_min reduce_min
......
...@@ -36,14 +36,17 @@ class ConvShiftOp : public framework::OperatorWithKernel { ...@@ -36,14 +36,17 @@ class ConvShiftOp : public framework::OperatorWithKernel {
auto y_dims = ctx->GetInputDim("Y"); auto y_dims = ctx->GetInputDim("Y");
PADDLE_ENFORCE_EQ(x_dims.size(), 2, "Input(X)'s rank should be 2."); PADDLE_ENFORCE_EQ(x_dims.size(), 2, "Input(X)'s rank should be 2.");
PADDLE_ENFORCE_EQ(y_dims.size(), 2, "Input(Y)'s rank should be 2."); PADDLE_ENFORCE_EQ(y_dims.size(), 2, "Input(Y)'s rank should be 2.");
PADDLE_ENFORCE_EQ(x_dims[0], y_dims[0], if (ctx->IsRuntime() || (x_dims[0] > 0 && y_dims[0] > 0))
"The 1st dimension of Input(X) and Input(Y) should " PADDLE_ENFORCE_EQ(x_dims[0], y_dims[0],
"be equal."); "The 1st dimension of Input(X) and Input(Y) should "
PADDLE_ENFORCE_EQ(y_dims[1] % 2, 1, "be equal.");
"The 2nd dimension of Input(Y) should be odd."); if (ctx->IsRuntime() || y_dims[1] > 0)
PADDLE_ENFORCE_LE(y_dims[1], x_dims[1], PADDLE_ENFORCE_EQ(y_dims[1] % 2, 1,
"The 2nd dimension of Input(Y) should be less than or " "The 2nd dimension of Input(Y) should be odd.");
"equal to the 2nd dimension of Input(X)."); if (ctx->IsRuntime() || (x_dims[1] > 0 && y_dims[1] > 0))
PADDLE_ENFORCE_LE(y_dims[1], x_dims[1],
"The 2nd dimension of Input(Y) should be less than or "
"equal to the 2nd dimension of Input(X).");
ctx->ShareDim("X", /*->*/ "Out"); ctx->ShareDim("X", /*->*/ "Out");
ctx->ShareLoD("X", /*->*/ "Out"); ctx->ShareLoD("X", /*->*/ "Out");
} }
......
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/cvm_op.h"
#include <memory>
#include "paddle/fluid/operators/math/math_function.h"
namespace paddle {
namespace operators {
using Tensor = framework::Tensor;
class CVMOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext* ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) should be not null.");
PADDLE_ENFORCE(ctx->HasInput("CVM"), "Input(CVM) should be not null.");
PADDLE_ENFORCE(ctx->HasOutput("Y"), "Output(Y) should be not null.");
auto x_dims = ctx->GetInputDim("X");
auto cvm_dims = ctx->GetInputDim("CVM");
PADDLE_ENFORCE_EQ(x_dims.size(), 2UL, "Input(X)'s rank should be 2.");
PADDLE_ENFORCE_EQ(cvm_dims.size(), 2UL, "Input(CVM)'s rank should be 2.");
PADDLE_ENFORCE_EQ(cvm_dims[1], 2UL,
"The 2nd dimension of "
"Input(CVM) should be 2.");
if (ctx->Attrs().Get<bool>("use_cvm")) {
ctx->SetOutputDim("Y", {x_dims[0], x_dims[1]});
} else {
ctx->SetOutputDim("Y", {x_dims[0], x_dims[1] - 2});
}
ctx->ShareLoD("X", /*->*/ "Y");
}
protected:
// Explicitly set that the data type of computation kernel of
// cvm
// is determined by its input "X".
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(ctx.Input<Tensor>("X")->type(),
platform::CPUPlace());
}
};
class CVMGradientOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext* ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) should be not null.");
PADDLE_ENFORCE(ctx->HasInput("CVM"), "Input(CVM) should be not null.");
PADDLE_ENFORCE(ctx->HasInput(framework::GradVarName("Y")),
"Input(Y@GRAD) should be not null.");
PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("X")),
"Output(X@GRAD) should be not null.");
auto x_dims = ctx->GetInputDim("X");
auto cvm_dims = ctx->GetInputDim("CVM");
auto dy_dims = ctx->GetInputDim(framework::GradVarName("Y"));
PADDLE_ENFORCE_EQ(x_dims.size(), 2, "Input(X)'s rank should be 2.");
PADDLE_ENFORCE_EQ(dy_dims.size(), 2, "Input(Y@Grad)'s rank should be 2.");
PADDLE_ENFORCE_EQ(cvm_dims.size(), 2, "Input(CVM)'s rank should be 2.");
PADDLE_ENFORCE_EQ(x_dims[0], dy_dims[0],
"The 1st dimension of Input(X) and Input(Y@Grad) should "
"be equal.");
PADDLE_ENFORCE_EQ(cvm_dims[1], 2,
"When Attr(soft_label) == false, the 2nd dimension of "
"Input(CVM) should be 2.");
ctx->SetOutputDim(framework::GradVarName("X"), x_dims);
ctx->ShareLoD("X", framework::GradVarName("X"));
}
protected:
// Explicitly set that the data type of computation kernel of
// cvm
// is determined by its input "X".
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(ctx.Input<Tensor>("X")->type(),
platform::CPUPlace());
}
};
class CVMOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("X",
"(LodTensor, default LodTensor<float>), a 2-D tensor with shape "
"[N x D],"
" where N is the batch size and D is the emebdding dim. ");
AddInput("CVM",
"(Tensor), a 2-D Tensor with shape [N x 2], where N is the batch "
"size, 2 is show and click.");
AddOutput("Y",
"(LodTensor, default LodTensor<float>), a 2-D tensor with shape "
"[N x K].");
AddAttr<bool>("use_cvm", "bool, use cvm or not").SetDefault(true);
AddComment(R"DOC(
CVM Operator.
We assume that input X is a embedding vector with cvm_feature(show and click), which shape is [N * D] (D is 2(cvm_feature) + embedding dim, N is batch_size)
if use_cvm is True, we will log(cvm_feature), and output shape is [N * D].
if use_cvm is False, we will remove cvm_feature from input, and output shape is [N * (D - 2)].
)DOC");
}
};
class CVMGradOpDescMaker : public framework::SingleGradOpDescMaker {
public:
using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;
protected:
std::unique_ptr<framework::OpDesc> Apply() const override {
std::unique_ptr<framework::OpDesc> op(new framework::OpDesc());
op->SetType("cvm_grad");
op->SetInput("X", Input("X"));
op->SetInput("CVM", Input("CVM"));
op->SetInput(framework::GradVarName("Y"), OutputGrad("Y"));
op->SetOutput(framework::GradVarName("X"), InputGrad("X"));
op->SetAttrMap(Attrs());
return op;
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(cvm, ops::CVMOp, ops::CVMOpMaker, ops::CVMGradOpDescMaker);
REGISTER_OPERATOR(cvm_grad, ops::CVMGradientOp);
REGISTER_OP_CPU_KERNEL(cvm, ops::CVMOpKernel<float>, ops::CVMOpKernel<double>);
REGISTER_OP_CPU_KERNEL(cvm_grad, ops::CVMGradOpKernel<float>,
ops::CVMGradOpKernel<double>);
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
namespace paddle {
namespace operators {
using Tensor = framework::Tensor;
using LoDTensor = framework::LoDTensor;
template <typename T>
class CVMOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
const LoDTensor* x = context.Input<LoDTensor>("X");
const T* x_data = x->data<T>();
auto lod = x->lod()[0];
int64_t item_size = x->numel() / x->dims()[0];
int offset = 2;
if (!context.Attr<bool>("use_cvm")) {
item_size -= offset;
}
LoDTensor* y = context.Output<LoDTensor>("Y");
T* y_data = y->mutable_data<T>(context.GetPlace());
int seq_num = static_cast<int>(lod.size()) - 1;
for (int i = 0; i < seq_num; ++i) {
int64_t seq_len = static_cast<int64_t>(lod[i + 1] - lod[i]);
for (int j = 0; j < seq_len; ++j) {
if (context.Attr<bool>("use_cvm")) {
std::memcpy(y_data, x_data, item_size * sizeof(T));
y_data[0] = log(y_data[0] + 1);
y_data[1] = log(y_data[1] + 1) - y_data[0];
x_data += item_size;
y_data += item_size;
} else {
std::memcpy(y_data, x_data + offset, item_size * sizeof(T));
x_data += item_size + offset;
y_data += item_size;
}
}
}
}
};
template <typename T>
class CVMGradOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
LoDTensor* dx = context.Output<LoDTensor>(framework::GradVarName("X"));
T* dx_data = dx->mutable_data<T>(context.GetPlace());
const Tensor* cvm = context.Input<Tensor>("CVM");
const T* cvm_data = cvm->data<T>();
int offset = 2;
const framework::LoDTensor* dOut =
context.Input<framework::LoDTensor>(framework::GradVarName("Y"));
const T* dout_data = dOut->data<T>();
auto lod = dx->lod()[0];
int64_t item_size = dx->numel() / dx->dims()[0];
if (!context.Attr<bool>("use_cvm")) {
item_size -= offset;
}
int seq_num = static_cast<int>(lod.size()) - 1;
for (int i = 0; i < seq_num; ++i) {
int64_t seq_len = static_cast<int64_t>(lod[i + 1] - lod[i]);
for (int j = 0; j < seq_len; ++j) {
if (context.Attr<bool>("use_cvm")) {
std::memcpy(dx_data, dout_data, item_size * sizeof(T));
dx_data[0] = cvm_data[0];
dx_data[1] = cvm_data[1];
dx_data += item_size;
dout_data += item_size;
} else {
std::memcpy(dx_data + offset, dout_data, item_size * sizeof(T));
dx_data[0] = cvm_data[0];
dx_data[1] = cvm_data[1];
dx_data += item_size + offset;
dout_data += item_size;
}
}
cvm_data += offset;
}
}
};
} // namespace operators
} // namespace paddle
...@@ -9,6 +9,9 @@ else() ...@@ -9,6 +9,9 @@ else()
endif() endif()
configure_file(send_recv.proto.in ${CMAKE_CURRENT_SOURCE_DIR}/send_recv.proto @ONLY) configure_file(send_recv.proto.in ${CMAKE_CURRENT_SOURCE_DIR}/send_recv.proto @ONLY)
cc_library(async_sparse_param_update_recorder SRCS async_sparse_param_update_recorder.cc DEPS enforce simple_threadpool)
cc_test(async_sparse_param_update_recorder_test SRCS async_sparse_param_update_recorder_test.cc DEPS async_sparse_param_update_recorder)
# FIXME(typhoonzero): use add_subdirectory once we clean the dependency of these files # FIXME(typhoonzero): use add_subdirectory once we clean the dependency of these files
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
if(WITH_GRPC) if(WITH_GRPC)
...@@ -20,7 +23,7 @@ if(WITH_GRPC) ...@@ -20,7 +23,7 @@ if(WITH_GRPC)
collective_client.cc collective_server.cc collective_client.cc collective_server.cc
${GRPC_SRCS} ${GRPC_SRCS}
PROTO send_recv.proto PROTO send_recv.proto
DEPS lod_tensor selected_rows_functor memory scope ${GRPC_DEPS}) DEPS lod_tensor selected_rows_functor memory scope ${GRPC_DEPS} async_sparse_param_update_recorder)
set_source_files_properties(grpc_serde_test.cc rpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(grpc_serde_test.cc rpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set(RPC_DEPS sendrecvop_rpc ${GRPC_DEPS}) set(RPC_DEPS sendrecvop_rpc ${GRPC_DEPS})
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h"
namespace paddle {
namespace operators {
namespace distributed {
std::once_flag AsyncSparseParamUpdateRecorder::init_flag_;
std::unique_ptr<AsyncSparseParamUpdateRecorder>
AsyncSparseParamUpdateRecorder::recorder_(nullptr);
} // namespace distributed
} // namespace operators
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <functional>
#include <future> // NOLINT
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <ThreadPool.h>
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace operators {
namespace distributed {
class ConcurrentSet {
public:
ConcurrentSet() : pool_(new ::ThreadPool(1)) {}
~ConcurrentSet() {}
std::future<void> Update(const std::vector<int64_t>& rows) {
auto task = [this, rows] {
if (VLOG_IS_ON(3)) {
std::ostringstream sstream;
sstream << "[";
for (auto& id : rows) {
sstream << id << ", ";
}
sstream << "]";
VLOG(3) << "update ids -> " << sstream.str();
}
for (auto row : rows) {
set_.insert(row);
}
};
return pool_->enqueue(std::move(task));
}
std::future<void> GetAndClear(std::vector<int64_t>* result) {
auto task = [this, &result] {
result->clear();
for (auto& id : set_) {
result->push_back(id);
}
if (VLOG_IS_ON(3)) {
std::ostringstream sstream;
sstream << "[";
for (auto& id : *result) {
sstream << id << ", ";
}
sstream << "]";
VLOG(3) << "result ids size: " << result->size() << " "
<< sstream.str();
}
set_.clear();
};
return pool_->enqueue(std::move(task));
}
private:
std::unordered_set<int64_t> set_;
std::unique_ptr<::ThreadPool> pool_{nullptr};
};
class AsyncSparseParamUpdateRecorder {
using TrainerToRows = std::vector<std::unique_ptr<ConcurrentSet>>;
public:
AsyncSparseParamUpdateRecorder(
int trainer_num,
const std::unordered_map<std::string, std::string>& grad_to_param)
: trainer_num_(trainer_num), grad_to_param_(grad_to_param) {
if (VLOG_IS_ON(3)) {
std::ostringstream sstream;
sstream << "[";
for (auto& item : grad_to_param) {
sstream << item.first << ":" << item.second << ", ";
}
sstream << "]";
VLOG(3) << "trainer_num: " << trainer_num
<< " grad_to_param_: " << sstream.str();
}
for (auto& iter : grad_to_param) {
param_to_grad_[iter.second] = iter.first;
auto& param_name = iter.second;
param_to_updated_rows_[param_name] = TrainerToRows();
auto& trainer_to_rows = param_to_updated_rows_[param_name];
for (auto i = 0; i < trainer_num; ++i) {
trainer_to_rows.emplace_back(new ConcurrentSet());
}
}
}
~AsyncSparseParamUpdateRecorder() = default;
void Update(const std::string& grad_name,
const std::vector<int64_t>& update_rows) {
VLOG(3) << "update grad: " << grad_name
<< " row size: " << update_rows.size();
auto& param_name = grad_to_param_.at(grad_name);
auto& trainer_to_rows = param_to_updated_rows_.at(param_name);
std::vector<std::future<void>> fs;
for (auto& set : trainer_to_rows) {
fs.push_back(set->Update(update_rows));
}
for (auto& f : fs) {
f.wait();
}
}
void GetAndClear(const std::string& param_name, int trainer_id,
std::vector<int64_t>* result) {
VLOG(3) << "GetAndClear param: " << param_name
<< " for trainer: " << trainer_id;
PADDLE_ENFORCE_LT(trainer_id, trainer_num_);
param_to_updated_rows_.at(param_name)[trainer_id]
->GetAndClear(result)
.wait();
}
bool HasParam(const std::string& param_name) {
return param_to_grad_.find(param_name) != param_to_grad_.end();
}
bool HasGrad(const std::string& grad_name) {
return grad_to_param_.find(grad_name) != grad_to_param_.end();
}
private:
const int trainer_num_;
std::unordered_map<std::string, std::string> grad_to_param_;
std::unordered_map<std::string, std::string> param_to_grad_;
std::unordered_map<std::string, TrainerToRows> param_to_updated_rows_;
// init recorder
public:
static void Init(
int trainer_num,
const std::unordered_map<std::string, std::string>& grad_to_param) {
InitImpl(trainer_num, grad_to_param);
}
static AsyncSparseParamUpdateRecorder* GetInstance() {
return recorder_.get();
}
private:
// Init is called by GetInstance.
static void InitImpl(
int trainer_num,
const std::unordered_map<std::string, std::string>& grad_to_param) {
if (recorder_ == nullptr) {
recorder_.reset(
new AsyncSparseParamUpdateRecorder(trainer_num, grad_to_param));
}
}
static std::once_flag init_flag_;
static std::unique_ptr<AsyncSparseParamUpdateRecorder> recorder_;
};
} // namespace distributed
} // namespace operators
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h"
#include <algorithm>
#include "gtest/gtest.h"
namespace paddle {
namespace operators {
namespace distributed {
TEST(ConcurrentSet, All) {
ConcurrentSet concurrent_set;
std::vector<int64_t> in1 = {1, 2, 3, 4};
std::vector<int64_t> in2 = {2, 3, 5, 6};
std::vector<std::future<void>> futures;
futures.push_back(concurrent_set.Update(in1));
futures.push_back(concurrent_set.Update(in2));
for (auto &f : futures) {
f.wait();
}
std::unordered_set<int64_t> in;
std::copy(in1.begin(), in1.end(), std::inserter(in, in.begin()));
std::copy(in2.begin(), in2.end(), std::inserter(in, in.begin()));
std::vector<int64_t> ret;
concurrent_set.GetAndClear(&ret).wait();
std::unordered_set<int64_t> out;
std::copy(ret.begin(), ret.end(), std::inserter(out, out.begin()));
EXPECT_EQ(in, out);
concurrent_set.GetAndClear(&ret).wait();
EXPECT_EQ(ret.size(), 0);
}
TEST(AsyncSparseParamUpdateRecorder, All) {
std::unordered_map<std::string, std::string> grad_to_param;
grad_to_param["grad1"] = "param1";
grad_to_param["grad2"] = "param2";
int trainer_num = 10;
AsyncSparseParamUpdateRecorder recorder(trainer_num, grad_to_param);
std::vector<int64_t> in1 = {1, 2, 3, 4};
std::vector<int64_t> in2 = {2, 3, 5, 6};
std::unordered_set<int64_t> in;
std::copy(in1.begin(), in1.end(), std::inserter(in, in.begin()));
std::copy(in2.begin(), in2.end(), std::inserter(in, in.begin()));
recorder.Update("grad1", in1);
recorder.Update("grad1", in2);
EXPECT_TRUE(recorder.HasParam("param1"));
EXPECT_TRUE(recorder.HasParam("param2"));
EXPECT_FALSE(recorder.HasParam("param3"));
EXPECT_TRUE(recorder.HasGrad("grad1"));
EXPECT_TRUE(recorder.HasGrad("grad2"));
EXPECT_FALSE(recorder.HasGrad("grad3"));
std::vector<int64_t> ret;
EXPECT_ANY_THROW(recorder.GetAndClear("param1", trainer_num, &ret));
for (int i = 0; i < trainer_num; ++i) {
std::vector<int64_t> ret;
std::unordered_set<int64_t> out;
recorder.GetAndClear("param1", i, &ret);
std::copy(ret.begin(), ret.end(), std::inserter(out, out.begin()));
EXPECT_EQ(in, out);
recorder.GetAndClear("param1", i, &ret);
EXPECT_EQ(ret.size(), 0);
}
}
} // namespace distributed
} // namespace operators
} // namespace paddle
...@@ -234,6 +234,7 @@ VarHandlePtr BRPCClient::AsyncGetVar(const std::string& ep, ...@@ -234,6 +234,7 @@ VarHandlePtr BRPCClient::AsyncGetVar(const std::string& ep,
const framework::Scope& scope, const framework::Scope& scope,
const std::string& var_name, const std::string& var_name,
const std::string& out_var_name, const std::string& out_var_name,
const std::string& table_name,
int64_t time_out) { int64_t time_out) {
return _AsyncGetVar(ep, ctx, scope, var_name, out_var_name, kGetRPC, return _AsyncGetVar(ep, ctx, scope, var_name, out_var_name, kGetRPC,
time_out); time_out);
......
...@@ -21,8 +21,10 @@ limitations under the License. */ ...@@ -21,8 +21,10 @@ limitations under the License. */
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <memory>
#include <mutex> // NOLINT #include <mutex> // NOLINT
#include <string> #include <string>
#include <unordered_map>
#include <vector> #include <vector>
#include "brpc/channel.h" #include "brpc/channel.h"
...@@ -66,6 +68,7 @@ class BRPCClient : public RPCClient { ...@@ -66,6 +68,7 @@ class BRPCClient : public RPCClient {
const framework::Scope& scope, const framework::Scope& scope,
const std::string& var_name, const std::string& var_name,
const std::string& out_var_name, const std::string& out_var_name,
const std::string& table_name = "",
int64_t time_out = FLAGS_rpc_deadline) override; int64_t time_out = FLAGS_rpc_deadline) override;
VarHandlePtr AsyncGetMonomerBarrier( VarHandlePtr AsyncGetMonomerBarrier(
...@@ -107,13 +110,11 @@ class BRPCClient : public RPCClient { ...@@ -107,13 +110,11 @@ class BRPCClient : public RPCClient {
void SendComplete() override; void SendComplete() override;
private: private:
VarHandlePtr _AsyncGetVar(const std::string& ep, VarHandlePtr _AsyncGetVar(
const platform::DeviceContext& ctx, const std::string& ep, const platform::DeviceContext& ctx,
const framework::Scope& scope, const framework::Scope& scope, const std::string& var_name,
const std::string& var_name, const std::string& out_var_name, const std::string& method_name,
const std::string& out_var_name, const std::string& table_name, int64_t time_out = FLAGS_rpc_deadline);
const std::string& method_name,
int64_t time_out = FLAGS_rpc_deadline);
void Proceed(); void Proceed();
ChannelQueuePtr GetChannel(const std::string& ep); ChannelQueuePtr GetChannel(const std::string& ep);
......
...@@ -32,6 +32,9 @@ DEFINE_int32(communicator_send_queue_size, 20, ...@@ -32,6 +32,9 @@ DEFINE_int32(communicator_send_queue_size, 20,
DEFINE_int32(communicator_max_send_grad_num_before_recv, 20, DEFINE_int32(communicator_max_send_grad_num_before_recv, 20,
"max grad num to send before recv parameters"); "max grad num to send before recv parameters");
DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv"); DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv");
DEFINE_int32(communicator_send_wait_times, 5,
"times that send thread will wait if merge num does not reach "
"max_merge_var_num");
DEFINE_int32(communicator_max_merge_var_num, 20, DEFINE_int32(communicator_max_merge_var_num, 20,
"max var num to merge and send"); "max var num to merge and send");
DEFINE_bool(communicator_fake_rpc, false, DEFINE_bool(communicator_fake_rpc, false,
...@@ -65,6 +68,8 @@ Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx, ...@@ -65,6 +68,8 @@ Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx,
<< FLAGS_communicator_max_send_grad_num_before_recv; << FLAGS_communicator_max_send_grad_num_before_recv;
VLOG(0) << "communicator_thread_pool_size: " VLOG(0) << "communicator_thread_pool_size: "
<< FLAGS_communicator_thread_pool_size; << FLAGS_communicator_thread_pool_size;
VLOG(0) << "communicator_send_wait_times: "
<< FLAGS_communicator_send_wait_times;
VLOG(0) << "communicator_max_merge_var_num: " VLOG(0) << "communicator_max_merge_var_num: "
<< FLAGS_communicator_max_merge_var_num; << FLAGS_communicator_max_merge_var_num;
VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc; VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc;
...@@ -101,20 +106,32 @@ void Communicator::SendThread() { ...@@ -101,20 +106,32 @@ void Communicator::SendThread() {
VLOG(3) << var_name << " merge and send"; VLOG(3) << var_name << " merge and send";
std::vector<std::shared_ptr<Variable>> vars; std::vector<std::shared_ptr<Variable>> vars;
size_t merged_var_num = 0; size_t merged_var_num = 0;
while (var_queue->Size() > 0 && size_t wait_times = 0;
merged_var_num < FLAGS_communicator_max_merge_var_num) { while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
vars.push_back(var_queue->Pop()); if (var_queue->Size() == 0) {
// only count the send number of the first var VLOG(3) << "wait_times -> " << wait_times;
if (var_name == send_varname_to_queue_.begin()->first) { if (wait_times >= FLAGS_communicator_send_wait_times) {
grad_num_.fetch_add(1, std::memory_order_relaxed); break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
wait_times++;
continue;
} else {
wait_times = 0;
vars.push_back(var_queue->Pop());
// only count the send number of the first var
if (var_name == send_varname_to_queue_.begin()->first) {
grad_num_.fetch_add(1, std::memory_order_relaxed);
}
merged_var_num++;
} }
merged_var_num++;
} }
auto before_merge = GetCurrentUS(); auto before_merge = GetCurrentUS();
MergeVars(var_name, vars, send_scope_.get()); MergeVars(var_name, vars, send_scope_.get());
auto after_merge = GetCurrentUS(); auto after_merge = GetCurrentUS();
VLOG(3) << "merge " << var_name << " use time " VLOG(3) << "merge " << merged_var_num << " " << var_name
<< after_merge - before_merge; << " use time " << after_merge - before_merge;
auto send_functor = distributed::ParameterSend<float>(); auto send_functor = distributed::ParameterSend<float>();
auto &ctx = send_varname_to_ctx_.at(var_name); auto &ctx = send_varname_to_ctx_.at(var_name);
if (!FLAGS_communicator_fake_rpc) { if (!FLAGS_communicator_fake_rpc) {
......
...@@ -109,7 +109,7 @@ inline void MergeVars(const std::string& var_name, ...@@ -109,7 +109,7 @@ inline void MergeVars(const std::string& var_name,
auto* out_var = scope->Var(var_name); auto* out_var = scope->Var(var_name);
if (var0->IsType<framework::LoDTensor>()) { if (var0->IsType<framework::LoDTensor>()) {
auto dims = var0->Get<framework::LoDTensor>().dims(); auto dims = var0->Get<framework::LoDTensor>().dims();
VLOG(3) << "merge " << var_name << " LoDTensor " << dims; VLOG(3) << "merge " << var_name << " LoDTensor dims " << dims;
// init output tensor // init output tensor
auto* out_t = out_var->GetMutable<framework::LoDTensor>(); auto* out_t = out_var->GetMutable<framework::LoDTensor>();
......
...@@ -128,9 +128,11 @@ VarHandlePtr GRPCClient::AsyncGetVar(const std::string& ep, ...@@ -128,9 +128,11 @@ VarHandlePtr GRPCClient::AsyncGetVar(const std::string& ep,
const framework::Scope& scope, const framework::Scope& scope,
const std::string& var_name, const std::string& var_name,
const std::string& out_varname, const std::string& out_varname,
const std::string& table_name,
int64_t time_out) { int64_t time_out) {
return _AsyncGetVar(ep, ctx, scope, kGetRPC, var_name, out_varname, return _AsyncGetVar(ep, ctx, scope, kGetRPC, var_name, out_varname,
"/sendrecv.SendRecvService/GetVariable", time_out); "/sendrecv.SendRecvService/GetVariable", table_name,
time_out);
} }
VarHandlePtr GRPCClient::AsyncGetVarNoBarrier( VarHandlePtr GRPCClient::AsyncGetVarNoBarrier(
...@@ -142,7 +144,7 @@ VarHandlePtr GRPCClient::AsyncGetVarNoBarrier( ...@@ -142,7 +144,7 @@ VarHandlePtr GRPCClient::AsyncGetVarNoBarrier(
return _AsyncGetVar( return _AsyncGetVar(
ep, ctx, scope, kGetNoBarrierRPC, var_name_no_barrier, out_varname, ep, ctx, scope, kGetNoBarrierRPC, var_name_no_barrier, out_varname,
"/sendrecv.SendRecvService/GetVariableNoBarrier", time_out); "/sendrecv.SendRecvService/GetVariableNoBarrier", "", time_out);
} }
VarHandlePtr GRPCClient::AsyncGetMonomerVariable( VarHandlePtr GRPCClient::AsyncGetMonomerVariable(
...@@ -150,18 +152,21 @@ VarHandlePtr GRPCClient::AsyncGetMonomerVariable( ...@@ -150,18 +152,21 @@ VarHandlePtr GRPCClient::AsyncGetMonomerVariable(
const framework::Scope& scope, const std::string& var_name, const framework::Scope& scope, const std::string& var_name,
int64_t time_out) { int64_t time_out) {
return _AsyncGetVar(ep, ctx, scope, kGetMonomerRPC, var_name, var_name, return _AsyncGetVar(ep, ctx, scope, kGetMonomerRPC, var_name, var_name,
"/sendrecv.SendRecvService/GetMonomerVariable", time_out); "/sendrecv.SendRecvService/GetMonomerVariable", "",
time_out);
} }
VarHandlePtr GRPCClient::_AsyncGetVar( VarHandlePtr GRPCClient::_AsyncGetVar(
const std::string& ep, const platform::DeviceContext& ctx, const std::string& ep, const platform::DeviceContext& ctx,
const framework::Scope& scope, const std::string& method, const framework::Scope& scope, const std::string& method,
const std::string& var_name, const std::string& out_varname, const std::string& var_name, const std::string& out_varname,
const std::string& rpc_path, int64_t time_out) { const std::string& rpc_path, const std::string& table_name,
int64_t time_out) {
const platform::DeviceContext* p_ctx = &ctx; const platform::DeviceContext* p_ctx = &ctx;
const std::string ep_val = ep; const std::string ep_val = ep;
const std::string var_name_val = var_name; const std::string var_name_val = var_name;
const std::string out_varname_val = out_varname; const std::string out_varname_val = out_varname;
const std::string table_name_val = table_name;
const framework::Scope* p_scope = &scope; const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val); const auto ch = GetChannel(ep_val);
GetProcessor* s = new GetProcessor(ch); GetProcessor* s = new GetProcessor(ch);
...@@ -169,32 +174,33 @@ VarHandlePtr GRPCClient::_AsyncGetVar( ...@@ -169,32 +174,33 @@ VarHandlePtr GRPCClient::_AsyncGetVar(
VarHandlePtr h(new VarHandle(ep, method, out_varname_val, p_ctx, p_scope)); VarHandlePtr h(new VarHandle(ep, method, out_varname_val, p_ctx, p_scope));
s->Prepare(h, time_out); s->Prepare(h, time_out);
framework::AsyncIO( framework::AsyncIO([var_name_val, out_varname_val, table_name_val, s, method,
[var_name_val, out_varname_val, s, method, p_ctx, h, rpc_path, this] { p_ctx, h, rpc_path, this] {
// prepare input // prepare input
sendrecv::VariableMessage req; sendrecv::VariableMessage req;
req.set_varname(var_name_val); req.set_varname(var_name_val);
req.set_out_varname(out_varname_val); req.set_out_varname(out_varname_val);
req.set_trainer_id(trainer_id_); req.set_trainer_id(trainer_id_);
::grpc::ByteBuffer buf; req.set_table_name(table_name_val);
RequestToByteBuffer<sendrecv::VariableMessage>(req, &buf); ::grpc::ByteBuffer buf;
RequestToByteBuffer<sendrecv::VariableMessage>(req, &buf);
VLOG(3) << s->GetVarHandlePtr()->String() << " begin"; VLOG(3) << s->GetVarHandlePtr()->String() << " begin";
// stub context // stub context
s->response_call_back_ = ProcGetResponse; s->response_call_back_ = ProcGetResponse;
platform::RecordRPCEvent record_event(method); platform::RecordRPCEvent record_event(method);
auto call = auto call =
s->stub_g_.PrepareUnaryCall(s->context_.get(), rpc_path, buf, &cq_); s->stub_g_.PrepareUnaryCall(s->context_.get(), rpc_path, buf, &cq_);
call->StartCall(); call->StartCall();
call->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s)); call->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
if (UNLIKELY(platform::IsProfileEnabled())) { if (UNLIKELY(platform::IsProfileEnabled())) {
h->Wait(); h->Wait();
} }
}); });
req_count_++; req_count_++;
......
...@@ -23,9 +23,11 @@ limitations under the License. */ ...@@ -23,9 +23,11 @@ limitations under the License. */
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <memory>
#include <mutex> // NOLINT #include <mutex> // NOLINT
#include <string> #include <string>
#include <thread> // NOLINT #include <thread> // NOLINT
#include <unordered_map>
#include <vector> #include <vector>
#include "grpc++/channel.h" #include "grpc++/channel.h"
...@@ -187,6 +189,7 @@ class GRPCClient : public RPCClient { ...@@ -187,6 +189,7 @@ class GRPCClient : public RPCClient {
const framework::Scope& scope, const framework::Scope& scope,
const std::string& var_name, const std::string& var_name,
const std::string& out_varname, const std::string& out_varname,
const std::string& table_name = "",
int64_t time_out = FLAGS_rpc_deadline) override; int64_t time_out = FLAGS_rpc_deadline) override;
VarHandlePtr AsyncGetVarNoBarrier( VarHandlePtr AsyncGetVarNoBarrier(
...@@ -239,7 +242,8 @@ class GRPCClient : public RPCClient { ...@@ -239,7 +242,8 @@ class GRPCClient : public RPCClient {
const std::string& ep, const platform::DeviceContext& ctx, const std::string& ep, const platform::DeviceContext& ctx,
const framework::Scope& scope, const std::string& method, const framework::Scope& scope, const std::string& method,
const std::string& var_name, const std::string& out_varname, const std::string& var_name, const std::string& out_varname,
const std::string& rpc_path, int64_t time_out = FLAGS_rpc_deadline); const std::string& rpc_path, const std::string& table_name = "",
int64_t time_out = FLAGS_rpc_deadline);
private: private:
grpc::CompletionQueue cq_; grpc::CompletionQueue cq_;
......
...@@ -137,6 +137,7 @@ class RequestGet final : public RequestBase { ...@@ -137,6 +137,7 @@ class RequestGet final : public RequestBase {
// proc request. // proc request.
std::string varname = request_.varname(); std::string varname = request_.varname();
std::string out_varname = request_.out_varname(); std::string out_varname = request_.out_varname();
std::string table_name = request_.table_name();
int trainer_id = request_.trainer_id(); int trainer_id = request_.trainer_id();
VLOG(4) << "RequestGet " << out_varname << " from " << varname; VLOG(4) << "RequestGet " << out_varname << " from " << varname;
...@@ -145,19 +146,23 @@ class RequestGet final : public RequestBase { ...@@ -145,19 +146,23 @@ class RequestGet final : public RequestBase {
framework::Variable* invar = nullptr; framework::Variable* invar = nullptr;
framework::Variable* outvar = nullptr; framework::Variable* outvar = nullptr;
request_handler_->Handle(varname, scope, invar, &outvar, trainer_id, tmp_scope_ = std::move(scope->NewTmpScope());
out_varname); request_handler_->Handle(varname, tmp_scope_.get(), invar, &outvar,
trainer_id, out_varname, table_name);
VLOG(1) << "before SerializeToByteBuffer";
if (outvar) { if (outvar) {
SerializeToByteBuffer(out_varname, outvar, *request_handler_->dev_ctx(), SerializeToByteBuffer(out_varname, outvar, *request_handler_->dev_ctx(),
&reply_); &reply_);
} }
VLOG(1) << "after SerializeToByteBuffer";
Finish(reply_, &responder_); Finish(reply_, &responder_);
} }
protected: protected:
sendrecv::VariableMessage request_; sendrecv::VariableMessage request_;
::grpc::ByteBuffer reply_; ::grpc::ByteBuffer reply_;
std::unique_ptr<framework::Scope> tmp_scope_;
ServerAsyncResponseWriter<::grpc::ByteBuffer> responder_; ServerAsyncResponseWriter<::grpc::ByteBuffer> responder_;
}; };
......
...@@ -42,27 +42,23 @@ using DDim = framework::DDim; ...@@ -42,27 +42,23 @@ using DDim = framework::DDim;
template <typename T> template <typename T>
void ParameterRecv<T>::operator()(const RpcContext &rpc_ctx, void ParameterRecv<T>::operator()(const RpcContext &rpc_ctx,
const framework::Scope &scope) { const framework::Scope &scope) {
VLOG(3) << "ParameterRecv in"; VLOG(3) << "ParameterRecv in " << rpc_ctx.var_name;
std::unique_ptr<framework::Scope> local_scope = scope.NewTmpScope(); std::unique_ptr<framework::Scope> local_scope = scope.NewTmpScope();
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &cpu_ctx = *pool.Get(platform::CPUPlace()); auto &cpu_ctx = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client = distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(0); distributed::RPCClient::GetInstance<RPCCLIENT_T>(rpc_ctx.trainer_id);
auto *recv_var = scope.FindVar(rpc_ctx.var_name); auto *recv_var = scope.FindVar(rpc_ctx.var_name);
std::vector<framework::Tensor *> recved_tensors;
// recv all vars to local scope // recv all vars to local scope
if (recv_var->IsType<framework::LoDTensor>()) { if (recv_var->IsType<framework::LoDTensor>()) {
std::vector<distributed::VarHandlePtr> rets; std::vector<distributed::VarHandlePtr> rets;
for (size_t i = 0; i < rpc_ctx.splited_var_names.size(); i++) { for (size_t i = 0; i < rpc_ctx.splited_var_names.size(); i++) {
auto &recv_var_name = rpc_ctx.splited_var_names[i]; auto &recv_var_name = rpc_ctx.splited_var_names[i];
framework::Tensor *t = local_scope->Var(recv_var_name);
local_scope->Var(recv_var_name)->GetMutable<framework::LoDTensor>();
recved_tensors.push_back(t);
VLOG(3) << "recv " << recv_var_name << " from " << rpc_ctx.epmap[i]; VLOG(3) << "recv " << recv_var_name << " from " << rpc_ctx.epmap[i];
rets.push_back(rpc_client->AsyncGetVar(rpc_ctx.epmap[i], cpu_ctx, rets.push_back(rpc_client->AsyncGetVar(rpc_ctx.epmap[i], cpu_ctx,
*local_scope.get(), recv_var_name, *local_scope.get(), recv_var_name,
...@@ -78,23 +74,61 @@ void ParameterRecv<T>::operator()(const RpcContext &rpc_ctx, ...@@ -78,23 +74,61 @@ void ParameterRecv<T>::operator()(const RpcContext &rpc_ctx,
// concat recved tensor into one var // concat recved tensor into one var
{ {
size_t output_offset = 0; size_t output_offset = 0;
size_t row_offset = 0;
framework::Tensor *recv_tensor = framework::Tensor *recv_tensor =
recv_var->GetMutable<framework::LoDTensor>(); recv_var->GetMutable<framework::LoDTensor>();
auto dev_ctx = paddle::platform::CPUDeviceContext(); auto dev_ctx = paddle::platform::CPUDeviceContext();
int64_t recv_numel = 0; int64_t recv_numel = 0;
for (auto *in : recved_tensors) { for (auto &recv_var_name : rpc_ctx.splited_var_names) {
recv_numel += in->numel(); auto *recv_var = local_scope->FindVar(recv_var_name);
auto in_stride = framework::stride_numel(in->dims()); if (recv_var->IsType<framework::LoDTensor>()) {
auto out_stride = framework::stride_numel(recv_tensor->dims()); auto &in = recv_var->Get<framework::LoDTensor>();
StridedNumelCopyWithAxis<T>( recv_numel += in.numel();
dev_ctx, 0, recv_tensor->data<T>() + output_offset, out_stride, auto in_stride = framework::stride_numel(in.dims());
in->data<T>(), in_stride, in_stride[0]); auto out_stride = framework::stride_numel(recv_tensor->dims());
output_offset += in_stride[0]; StridedNumelCopyWithAxis<T>(
dev_ctx, 0, recv_tensor->data<T>() + output_offset, out_stride,
in.data<T>(), in_stride, in_stride[0]);
output_offset += in_stride[0];
} else if (recv_var->IsType<framework::SelectedRows>()) {
auto &recv_slr = recv_var->Get<framework::SelectedRows>();
auto &recv_dims = recv_tensor->dims();
int64_t width = recv_dims[1];
recv_numel += recv_slr.height() * width;
PADDLE_ENFORCE_EQ(recv_slr.value().dims()[1], width);
PADDLE_ENFORCE_EQ(recv_slr.value().dims()[0], recv_slr.rows().size());
VLOG(3) << "recv slr " << recv_var_name << " dims "
<< recv_slr.value().dims();
if (VLOG_IS_ON(3)) {
std::ostringstream sstream;
sstream << "[";
for (auto &row_id : recv_slr.rows()) {
sstream << row_id << ", ";
}
sstream << "]";
VLOG(3) << "recv_slr size: " << recv_slr.rows().size() << " "
<< sstream.str();
}
for (auto i = 0; i < recv_slr.rows().size(); ++i) {
auto row_id = recv_slr.rows()[i] + row_offset;
PADDLE_ENFORCE_LT(row_id, recv_dims[0]);
memcpy(recv_tensor->data<T>() + row_id * width,
recv_slr.value().data<T>() + i * width, sizeof(T) * width);
}
row_offset += recv_slr.height();
} else {
PADDLE_THROW("unsupported recieved var type");
}
}
auto numel = recv_tensor->numel();
if (recv_numel != numel) {
LOG(FATAL) << "recv_numel: " << recv_numel << " acture numel: " << numel;
} }
PADDLE_ENFORCE_EQ(recv_numel, recv_tensor->numel()); PADDLE_ENFORCE_EQ(recv_numel, numel);
} }
VLOG(3) << "ParameterRecv out"; VLOG(3) << "ParameterRecv out " << rpc_ctx.var_name;
} }
template struct ParameterRecv<float>; template struct ParameterRecv<float>;
......
...@@ -47,7 +47,7 @@ void ParameterSend<T>::operator()(const RpcContext &rpc_ctx, ...@@ -47,7 +47,7 @@ void ParameterSend<T>::operator()(const RpcContext &rpc_ctx,
auto &cpu_ctx = *pool.Get(platform::CPUPlace()); auto &cpu_ctx = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client = distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(0); distributed::RPCClient::GetInstance<RPCCLIENT_T>(rpc_ctx.trainer_id);
auto *send_var = scope.FindVar(rpc_ctx.var_name); auto *send_var = scope.FindVar(rpc_ctx.var_name);
size_t out_num = rpc_ctx.splited_var_names.size(); size_t out_num = rpc_ctx.splited_var_names.size();
......
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
#include <condition_variable> // NOLINT #include <condition_variable> // NOLINT
#include <functional> #include <functional>
#include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -180,6 +182,10 @@ class RequestHandler { ...@@ -180,6 +182,10 @@ class RequestHandler {
grad_to_prepared_ctx_ = g; grad_to_prepared_ctx_ = g;
} }
void SetSparseGradToParam(std::unordered_map<std::string, std::string>* g) {
sparse_grad_to_param_ = g;
}
void SetRPCServer(RPCServer* rpc_server) { rpc_server_ = rpc_server; } void SetRPCServer(RPCServer* rpc_server) { rpc_server_ = rpc_server; }
// Get attributes. // Get attributes.
...@@ -228,6 +234,7 @@ class RequestHandler { ...@@ -228,6 +234,7 @@ class RequestHandler {
std::unordered_map<std::string, std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>* std::shared_ptr<framework::ExecutorPrepareContext>>*
grad_to_prepared_ctx_; grad_to_prepared_ctx_;
std::unordered_map<std::string, std::string>* sparse_grad_to_param_;
RPCServer* rpc_server_; RPCServer* rpc_server_;
}; };
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h"
#include "paddle/fluid/operators/distributed/rpc_server.h" #include "paddle/fluid/operators/distributed/rpc_server.h"
#include "paddle/fluid/string/piece.h" #include "paddle/fluid/string/piece.h"
#include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/printf.h"
...@@ -59,6 +60,12 @@ bool RequestSendHandler::Handle(const std::string& varname, ...@@ -59,6 +60,12 @@ bool RequestSendHandler::Handle(const std::string& varname,
"async mode should not recv BATCH_BARRIER_MESSAGE or " "async mode should not recv BATCH_BARRIER_MESSAGE or "
"COMPLETE_MESSAGE"); "COMPLETE_MESSAGE");
} }
if (AsyncSparseParamUpdateRecorder::GetInstance()->HasGrad(varname)) {
auto& grad_slr =
scope->FindVar(varname)->Get<framework::SelectedRows>();
AsyncSparseParamUpdateRecorder::GetInstance()->Update(varname,
grad_slr.rows());
}
executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(),
scope); scope);
return true; return true;
...@@ -82,8 +89,9 @@ bool RequestGetHandler::Handle(const std::string& varname, ...@@ -82,8 +89,9 @@ bool RequestGetHandler::Handle(const std::string& varname,
const int trainer_id, const int trainer_id,
const std::string& out_var_name, const std::string& out_var_name,
const std::string& table_name) { const std::string& table_name) {
VLOG(4) << "RequestGetHandler:" << varname VLOG(3) << "RequestGetHandler:" << varname
<< " out_var_name: " << out_var_name; << " out_var_name: " << out_var_name << " trainer_id: " << trainer_id
<< " table_name: " << table_name;
if (sync_mode_) { if (sync_mode_) {
if (varname == FETCH_BARRIER_MESSAGE) { if (varname == FETCH_BARRIER_MESSAGE) {
...@@ -108,7 +116,42 @@ bool RequestGetHandler::Handle(const std::string& varname, ...@@ -108,7 +116,42 @@ bool RequestGetHandler::Handle(const std::string& varname,
VLOG(3) << "copying " << varname << " to " << param_bak_name; VLOG(3) << "copying " << varname << " to " << param_bak_name;
framework::TensorCopy(t_orig, dev_ctx_->GetPlace(), t); framework::TensorCopy(t_orig, dev_ctx_->GetPlace(), t);
} }
*outvar = scope_->FindVar(varname); if (AsyncSparseParamUpdateRecorder::GetInstance()->HasParam(varname) &&
!table_name.empty()) {
std::vector<int64_t> updated_rows;
AsyncSparseParamUpdateRecorder::GetInstance()->GetAndClear(
varname, trainer_id, &updated_rows);
if (VLOG_IS_ON(3)) {
std::ostringstream sstream;
sstream << "[";
for (auto& row_id : updated_rows) {
sstream << row_id << ", ";
}
sstream << "]";
VLOG(3) << "updated_rows size: " << updated_rows.size() << " "
<< sstream.str();
}
auto& origin_tensor =
scope_->FindVar(varname)->Get<framework::LoDTensor>();
auto* origin_tensor_data = origin_tensor.data<float>();
auto& dims = origin_tensor.dims();
*outvar = scope->Var();
auto* out_slr = (*outvar)->GetMutable<framework::SelectedRows>();
out_slr->set_rows(updated_rows);
out_slr->set_height(dims[0]);
auto out_dims = framework::make_ddim(
{static_cast<int64_t>(updated_rows.size()), dims[1]});
auto* data = out_slr->mutable_value()->mutable_data<float>(
out_dims, origin_tensor.place());
auto width = dims[1];
for (auto i = 0; i < updated_rows.size(); ++i) {
PADDLE_ENFORCE_LT(updated_rows[i], dims[0]);
memcpy(data + i * width, origin_tensor_data + updated_rows[i] * width,
sizeof(float) * width);
}
} else {
*outvar = scope_->FindVar(varname);
}
} }
} }
return true; return true;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <condition_variable> // NOLINT #include <condition_variable> // NOLINT
#include <memory>
#include <string> #include <string>
#include "gflags/gflags.h" #include "gflags/gflags.h"
...@@ -44,6 +45,7 @@ class RPCClient { ...@@ -44,6 +45,7 @@ class RPCClient {
const framework::Scope& scope, const framework::Scope& scope,
const std::string& var_name, const std::string& var_name,
const std::string& out_varname, const std::string& out_varname,
const std::string& table_name = "",
int64_t time_out = FLAGS_rpc_deadline) = 0; int64_t time_out = FLAGS_rpc_deadline) = 0;
virtual VarHandlePtr AsyncGetVarNoBarrier( virtual VarHandlePtr AsyncGetVarNoBarrier(
...@@ -96,6 +98,7 @@ class RPCClient { ...@@ -96,6 +98,7 @@ class RPCClient {
// Init is called by GetInstance. // Init is called by GetInstance.
template <typename T> template <typename T>
static void Init(int trainer_id) { static void Init(int trainer_id) {
VLOG(0) << "init rpc client with trainer_id " << trainer_id;
trainer_id_ = trainer_id; trainer_id_ = trainer_id;
if (rpc_client_.get() == nullptr) { if (rpc_client_.get() == nullptr) {
rpc_client_.reset(new T()); rpc_client_.reset(new T());
......
...@@ -27,23 +27,26 @@ struct RpcContext { ...@@ -27,23 +27,26 @@ struct RpcContext {
RpcContext(const std::string &name, const std::vector<std::string> &names, RpcContext(const std::string &name, const std::vector<std::string> &names,
const std::vector<std::string> &emap, const std::vector<std::string> &emap,
const std::vector<int64_t> &sections) const std::vector<int64_t> &sections, int id)
: var_name(name), : var_name(name),
splited_var_names(names), splited_var_names(names),
epmap(emap), epmap(emap),
height_sections(sections) {} height_sections(sections),
trainer_id(id) {}
RpcContext(const RpcContext &ctx) { RpcContext(const RpcContext &ctx) {
var_name = ctx.var_name; var_name = ctx.var_name;
splited_var_names = ctx.splited_var_names; splited_var_names = ctx.splited_var_names;
epmap = ctx.epmap; epmap = ctx.epmap;
height_sections = ctx.height_sections; height_sections = ctx.height_sections;
trainer_id = ctx.trainer_id;
} }
std::string var_name; std::string var_name;
std::vector<std::string> splited_var_names; std::vector<std::string> splited_var_names;
std::vector<std::string> epmap; std::vector<std::string> epmap;
std::vector<int64_t> height_sections; std::vector<int64_t> height_sections;
int trainer_id;
}; };
inline std::ostream &operator<<(std::ostream &os, const RpcContext &rpc_ctx) { inline std::ostream &operator<<(std::ostream &os, const RpcContext &rpc_ctx) {
......
...@@ -2,9 +2,9 @@ include(operators) ...@@ -2,9 +2,9 @@ include(operators)
set(DISTRIBUTE_DEPS "") set(DISTRIBUTE_DEPS "")
if(WITH_GRPC) if(WITH_GRPC)
set(DISTRIBUTE_DEPS sendrecvop_rpc parameter_send parameter_recv communicator grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node) set(DISTRIBUTE_DEPS sendrecvop_rpc parameter_send parameter_recv communicator async_sparse_param_update_recorder grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node)
else() else()
set(DISTRIBUTE_DEPS sendrecvop_rpc parameter_send parameter_recv communicator brpc leveldb snappystream snappy protobuf ssl crypto zlib node) set(DISTRIBUTE_DEPS sendrecvop_rpc parameter_send parameter_recv communicator async_sparse_param_update_recorder brpc leveldb snappystream snappy protobuf ssl crypto zlib node)
if(WITH_BRPC_RDMA) if(WITH_BRPC_RDMA)
find_library(IBVERBS_LIBRARY NAMES ibverbs) find_library(IBVERBS_LIBRARY NAMES ibverbs)
ADD_LIBRARY(ibverbs SHARED IMPORTED GLOBAL) ADD_LIBRARY(ibverbs SHARED IMPORTED GLOBAL)
......
...@@ -24,8 +24,10 @@ limitations under the License. */ ...@@ -24,8 +24,10 @@ limitations under the License. */
#include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h"
#include "paddle/fluid/operators/distributed/request_handler_impl.h" #include "paddle/fluid/operators/distributed/request_handler_impl.h"
#include "paddle/fluid/operators/distributed_ops/listen_and_serv_op.h" #include "paddle/fluid/operators/distributed_ops/listen_and_serv_op.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
DEFINE_int32(rpc_send_thread_num, 12, "number of threads for rpc send"); DEFINE_int32(rpc_send_thread_num, 12, "number of threads for rpc send");
...@@ -292,6 +294,8 @@ static void FillRequestCtx( ...@@ -292,6 +294,8 @@ static void FillRequestCtx(
std::unordered_map<std::string, std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>> std::shared_ptr<framework::ExecutorPrepareContext>>
*prefetch_ctx, *prefetch_ctx,
std::unordered_map<std::string, std::string>
*sparse_grad_name_to_param_name,
std::shared_ptr<framework::ExecutorPrepareContext> checkpoint_ctx, std::shared_ptr<framework::ExecutorPrepareContext> checkpoint_ctx,
distributed::RPCServer *rpc_server) { distributed::RPCServer *rpc_server) {
h->SetScope(scope); h->SetScope(scope);
...@@ -299,6 +303,7 @@ static void FillRequestCtx( ...@@ -299,6 +303,7 @@ static void FillRequestCtx(
h->SetExecutor(executor); h->SetExecutor(executor);
h->SetProgram(program); h->SetProgram(program);
h->SetPrefetchPreparedCtx(prefetch_ctx); h->SetPrefetchPreparedCtx(prefetch_ctx);
h->SetSparseGradToParam(sparse_grad_name_to_param_name);
h->SetRPCServer(rpc_server); h->SetRPCServer(rpc_server);
h->SetCheckpointNotifyPreparedCtx(checkpoint_ctx); h->SetCheckpointNotifyPreparedCtx(checkpoint_ctx);
} }
...@@ -414,10 +419,24 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -414,10 +419,24 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
prefetch_var_name_to_prepared_ctx[prefetch_var_name] = prefetch_prepared[i]; prefetch_var_name_to_prepared_ctx[prefetch_var_name] = prefetch_prepared[i];
} }
auto f = // parse attr of kSparseGradToParam sparse_grad_name -> param_name
std::bind(FillRequestCtx, std::placeholders::_1, &recv_scope, &dev_ctx, std::unordered_map<std::string, std::string> sparse_grad_name_to_param_name;
&executor, program, &prefetch_var_name_to_prepared_ctx, auto sparse_grad_name_to_param_name_str =
ckpt_pre_context, rpc_service_.get()); Attr<std::vector<std::string>>(kSparseGradToParam);
for (const auto &sparse_grad_name_and_param_name :
sparse_grad_name_to_param_name_str) {
std::vector<std::string> pieces;
split(sparse_grad_name_and_param_name, ':', &pieces);
PADDLE_ENFORCE_EQ(pieces.size(), 2);
VLOG(3) << "after split, sparse_grad_name = " << pieces[0]
<< ", param_name = " << pieces[1];
sparse_grad_name_to_param_name[pieces[0]] = pieces[1];
}
auto f = std::bind(
FillRequestCtx, std::placeholders::_1, &recv_scope, &dev_ctx, &executor,
program, &prefetch_var_name_to_prepared_ctx,
&sparse_grad_name_to_param_name, ckpt_pre_context, rpc_service_.get());
f(request_send_handler_.get()); f(request_send_handler_.get());
f(request_get_handler_.get()); f(request_get_handler_.get());
...@@ -445,6 +464,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -445,6 +464,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
RunSyncLoop(&executor, program, &recv_scope, &dev_ctx, RunSyncLoop(&executor, program, &recv_scope, &dev_ctx,
prefetch_block_id_list, checkpoint_block_id); prefetch_block_id_list, checkpoint_block_id);
} else { } else {
distributed::AsyncSparseParamUpdateRecorder::Init(
fan_in, sparse_grad_name_to_param_name);
RunAsyncLoop(&executor, program, &recv_scope); RunAsyncLoop(&executor, program, &recv_scope);
} }
} }
...@@ -475,6 +496,10 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -475,6 +496,10 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<std::vector<std::string>>(kPrefetchVarNameToBlockId, AddAttr<std::vector<std::string>>(kPrefetchVarNameToBlockId,
"prefetch blocks to run on server side.") "prefetch blocks to run on server side.")
.SetDefault({}); .SetDefault({});
AddAttr<std::vector<std::string>>(
kSparseGradToParam,
"sparse grad name to param name. like: 'emb@Grad:emb'")
.SetDefault({});
AddAttr<int>("Fanin", "How many clients send to this server.") AddAttr<int>("Fanin", "How many clients send to this server.")
.SetDefault(1); .SetDefault(1);
AddAttr<int>(kCheckpointBlockId, AddAttr<int>(kCheckpointBlockId,
......
...@@ -16,8 +16,10 @@ limitations under the License. */ ...@@ -16,8 +16,10 @@ limitations under the License. */
#include <stdint.h> #include <stdint.h>
#include <atomic> #include <atomic>
#include <memory>
#include <set> #include <set>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -35,6 +37,7 @@ namespace operators { ...@@ -35,6 +37,7 @@ namespace operators {
constexpr char kOptimizeBlocks[] = "optimize_blocks"; constexpr char kOptimizeBlocks[] = "optimize_blocks";
constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id"; constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id";
constexpr char kCheckpointBlockId[] = "checkpint_block_id"; constexpr char kCheckpointBlockId[] = "checkpint_block_id";
constexpr char kSparseGradToParam[] = "sparse_grad_to_param";
void RunServer(std::shared_ptr<distributed::RPCServer> service); void RunServer(std::shared_ptr<distributed::RPCServer> service);
......
...@@ -50,17 +50,18 @@ class RecvOp : public framework::OperatorBase { ...@@ -50,17 +50,18 @@ class RecvOp : public framework::OperatorBase {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &ctx = *pool.Get(place); auto &ctx = *pool.Get(place);
auto trainer_id = Attr<int>("trainer_id");
distributed::RPCClient *rpc_client = distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>( distributed::RPCClient::GetInstance<RPCCLIENT_T>(trainer_id);
Attr<int>("trainer_id"));
std::vector<std::string> recv_varnames = std::vector<std::string> recv_varnames =
Attr<std::vector<std::string>>("recv_varnames"); Attr<std::vector<std::string>>("recv_varnames");
if (recv_varnames.size() > 0) { if (recv_varnames.size() > 0) {
auto recv_functor = distributed::ParameterRecv<float>(); auto recv_functor = distributed::ParameterRecv<float>();
auto rpc_ctx = distributed::RpcContext(outs[0], recv_varnames, epmap, {}); auto rpc_ctx = distributed::RpcContext(outs[0], recv_varnames, epmap, {},
trainer_id);
recv_functor(rpc_ctx, scope); recv_functor(rpc_ctx, scope);
} else { } else {
if (with_barrier) { if (with_barrier) {
......
...@@ -42,6 +42,7 @@ class SendOp : public framework::OperatorBase { ...@@ -42,6 +42,7 @@ class SendOp : public framework::OperatorBase {
auto epmap = Attr<std::vector<std::string>>("epmap"); auto epmap = Attr<std::vector<std::string>>("epmap");
int sync_send = Attr<int>("sync_mode"); int sync_send = Attr<int>("sync_mode");
auto trainer_id = Attr<int>("trainer_id");
auto send_varnames = Attr<std::vector<std::string>>("send_varnames"); auto send_varnames = Attr<std::vector<std::string>>("send_varnames");
auto height_sections = Attr<std::vector<int64_t>>("sections"); auto height_sections = Attr<std::vector<int64_t>>("sections");
...@@ -51,7 +52,7 @@ class SendOp : public framework::OperatorBase { ...@@ -51,7 +52,7 @@ class SendOp : public framework::OperatorBase {
if (distributed::Communicator::GetInstance() == nullptr) { if (distributed::Communicator::GetInstance() == nullptr) {
auto send_functor = distributed::ParameterSend<float>(); auto send_functor = distributed::ParameterSend<float>();
auto rpc_ctx = distributed::RpcContext(ins[0], send_varnames, epmap, auto rpc_ctx = distributed::RpcContext(ins[0], send_varnames, epmap,
height_sections); height_sections, trainer_id);
send_functor(rpc_ctx, scope, true); send_functor(rpc_ctx, scope, true);
} else { } else {
distributed::Communicator::GetInstance()->Send(ins[0], scope); distributed::Communicator::GetInstance()->Send(ins[0], scope);
...@@ -62,8 +63,7 @@ class SendOp : public framework::OperatorBase { ...@@ -62,8 +63,7 @@ class SendOp : public framework::OperatorBase {
auto& ctx = *pool.Get(place); auto& ctx = *pool.Get(place);
distributed::RPCClient* rpc_client = distributed::RPCClient* rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>( distributed::RPCClient::GetInstance<RPCCLIENT_T>(trainer_id);
Attr<int>("trainer_id"));
std::vector<distributed::VarHandlePtr> rets; std::vector<distributed::VarHandlePtr> rets;
for (size_t i = 0; i < ins.size(); i++) { for (size_t i = 0; i < ins.size(); i++) {
......
...@@ -235,11 +235,13 @@ struct FindRangeAbsMaxFunctor<platform::CUDADeviceContext, T> { ...@@ -235,11 +235,13 @@ struct FindRangeAbsMaxFunctor<platform::CUDADeviceContext, T> {
int g_find_max; int g_find_max;
memory::Copy(platform::CPUPlace(), &g_find_max, gpu_place, find_max, memory::Copy(platform::CPUPlace(), &g_find_max, gpu_place, find_max,
sizeof(int), 0); sizeof(int), ctx.stream());
ctx.Wait();
if (g_find_max) { if (g_find_max) {
int len; int len;
memory::Copy(platform::CPUPlace(), &len, gpu_place, out_size_data, memory::Copy(platform::CPUPlace(), &len, gpu_place, out_size_data,
sizeof(int), 0); sizeof(int), ctx.stream());
ctx.Wait();
FindAbsMaxFunctor<platform::CUDADeviceContext, T>()(ctx, scale_arr, len, FindAbsMaxFunctor<platform::CUDADeviceContext, T>()(ctx, scale_arr, len,
out_scale_data); out_scale_data);
} }
...@@ -258,25 +260,26 @@ struct FindMovingAverageAbsMaxFunctor<platform::CUDADeviceContext, T> { ...@@ -258,25 +260,26 @@ struct FindMovingAverageAbsMaxFunctor<platform::CUDADeviceContext, T> {
const auto gpu_place = boost::get<platform::CUDAPlace>(ctx.GetPlace()); const auto gpu_place = boost::get<platform::CUDAPlace>(ctx.GetPlace());
T accum; T accum;
memory::Copy(platform::CPUPlace(), &accum, gpu_place, in_accum.data<T>(),
sizeof(T), 0);
T state; T state;
memory::Copy(platform::CPUPlace(), &state, gpu_place, in_state.data<T>(),
sizeof(T), 0);
T scale; T scale;
memory::Copy(platform::CPUPlace(), &accum, gpu_place, in_accum.data<T>(),
sizeof(T), ctx.stream());
memory::Copy(platform::CPUPlace(), &state, gpu_place, in_state.data<T>(),
sizeof(T), ctx.stream());
memory::Copy(platform::CPUPlace(), &scale, gpu_place, cur_scale, sizeof(T), memory::Copy(platform::CPUPlace(), &scale, gpu_place, cur_scale, sizeof(T),
0); ctx.stream());
ctx.Wait();
state = rate * state + 1; state = rate * state + 1;
accum = rate * accum + scale; accum = rate * accum + scale;
scale = accum / state; scale = accum / state;
memory::Copy(gpu_place, out_accum->mutable_data<T>(gpu_place), memory::Copy(gpu_place, out_accum->mutable_data<T>(gpu_place),
platform::CPUPlace(), &accum, sizeof(T), 0); platform::CPUPlace(), &accum, sizeof(T), ctx.stream());
memory::Copy(gpu_place, out_state->mutable_data<T>(gpu_place), memory::Copy(gpu_place, out_state->mutable_data<T>(gpu_place),
platform::CPUPlace(), &state, sizeof(T), 0); platform::CPUPlace(), &state, sizeof(T), ctx.stream());
memory::Copy(gpu_place, out_scale->mutable_data<T>(gpu_place), memory::Copy(gpu_place, out_scale->mutable_data<T>(gpu_place),
platform::CPUPlace(), &scale, sizeof(T), 0); platform::CPUPlace(), &scale, sizeof(T), ctx.stream());
ctx.Wait();
} }
}; };
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/operators/grid_sampler_op.h" #include "paddle/fluid/operators/grid_sampler_op.h"
#include <memory>
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cudnn_helper.h" #include "paddle/fluid/platform/cudnn_helper.h"
...@@ -40,10 +41,12 @@ class GridSampleOp : public framework::OperatorWithKernel { ...@@ -40,10 +41,12 @@ class GridSampleOp : public framework::OperatorWithKernel {
"Input(X) of GridSampleOp should be 4-D Tensor."); "Input(X) of GridSampleOp should be 4-D Tensor.");
PADDLE_ENFORCE(grid_dims.size() == 4, PADDLE_ENFORCE(grid_dims.size() == 4,
"Input(Grid) of GridSampleOp should be 4-D Tensor."); "Input(Grid) of GridSampleOp should be 4-D Tensor.");
PADDLE_ENFORCE(grid_dims[3] == 2, "Input(Grid) dims[3] should be 2."); if (ctx->IsRuntime() || grid_dims[3] > 0) {
PADDLE_ENFORCE_EQ(grid_dims[0], x_dims[0], PADDLE_ENFORCE(grid_dims[3] == 2, "Input(Grid) dims[3] should be 2.");
"Input(X) and Input(Grid) dims[0] should be equal."); }
if (ctx->IsRuntime()) { if (ctx->IsRuntime()) {
PADDLE_ENFORCE_EQ(grid_dims[0], x_dims[0],
"Input(X) and Input(Grid) dims[0] should be equal.");
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
grid_dims[1], x_dims[2], grid_dims[1], x_dims[2],
"Input(X) dims[2] and Input(Grid) dims[1] should be equal."); "Input(X) dims[2] and Input(Grid) dims[1] should be equal.");
......
...@@ -238,6 +238,8 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel<T> { ...@@ -238,6 +238,8 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel<T> {
zero(dev_ctx, w_grad, static_cast<T>(0.0)); zero(dev_ctx, w_grad, static_cast<T>(0.0));
bit_code->MulGradWeight(pre_out_grad, w_grad, in); bit_code->MulGradWeight(pre_out_grad, w_grad, in);
} else { } else {
PADDLE_ENFORCE(path != nullptr,
"Sparse mode should not be used without custom tree!");
framework::Vector<int64_t> real_rows = PathToRows(*path); framework::Vector<int64_t> real_rows = PathToRows(*path);
auto* w_grad = auto* w_grad =
ctx.Output<framework::SelectedRows>(framework::GradVarName("W")); ctx.Output<framework::SelectedRows>(framework::GradVarName("W"));
......
...@@ -45,9 +45,14 @@ class InterpolateOp : public framework::OperatorWithKernel { ...@@ -45,9 +45,14 @@ class InterpolateOp : public framework::OperatorWithKernel {
// round down // round down
out_h = static_cast<int>(dim_x[2] * scale); out_h = static_cast<int>(dim_x[2] * scale);
out_w = static_cast<int>(dim_x[3] * scale); out_w = static_cast<int>(dim_x[3] * scale);
// protect when input shape is -1
out_h = out_h > 0 ? out_h : -1;
out_w = out_w > 0 ? out_w : -1;
} else { } else {
out_h = ctx->Attrs().Get<int>("out_h"); out_h = ctx->Attrs().Get<int>("out_h");
out_w = ctx->Attrs().Get<int>("out_w"); out_w = ctx->Attrs().Get<int>("out_w");
PADDLE_ENFORCE_GT(out_h, 0, "out_h should be greater than 0.");
PADDLE_ENFORCE_GT(out_w, 0, "out_w should be greater than 0.");
} }
if (ctx->HasInput("OutSize") && ctx->IsRuntime()) { if (ctx->HasInput("OutSize") && ctx->IsRuntime()) {
...@@ -58,6 +63,7 @@ class InterpolateOp : public framework::OperatorWithKernel { ...@@ -58,6 +63,7 @@ class InterpolateOp : public framework::OperatorWithKernel {
ctx->ShareLoD("X", "Out"); ctx->ShareLoD("X", "Out");
return; return;
} }
std::vector<int64_t> dim_out({dim_x[0], dim_x[1], out_h, out_w}); std::vector<int64_t> dim_out({dim_x[0], dim_x[1], out_h, out_w});
ctx->SetOutputDim("Out", framework::make_ddim(dim_out)); ctx->SetOutputDim("Out", framework::make_ddim(dim_out));
} }
......
...@@ -35,8 +35,10 @@ class KLDivLossOp : public framework::OperatorWithKernel { ...@@ -35,8 +35,10 @@ class KLDivLossOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE_EQ(dim_x.size(), dim_target.size(), PADDLE_ENFORCE_EQ(dim_x.size(), dim_target.size(),
"Input(X) rank and Input(Target) rank should be same."); "Input(X) rank and Input(Target) rank should be same.");
for (int i = 0; i < dim_x.size(); i++) { for (int i = 0; i < dim_x.size(); i++) {
PADDLE_ENFORCE_EQ(dim_x[i], dim_target[i], if (ctx->IsRuntime() || (dim_x[i] > 0 && dim_target[i] > 0)) {
"Input(X) and Input(Target) should in same shape."); PADDLE_ENFORCE_EQ(dim_x[i], dim_target[i],
"Input(X) and Input(Target) should in same shape.");
}
} }
auto reduction = ctx->Attrs().Get<std::string>("reduction"); auto reduction = ctx->Attrs().Get<std::string>("reduction");
......
...@@ -296,6 +296,7 @@ struct MergeAdd<platform::CPUDeviceContext, T> { ...@@ -296,6 +296,7 @@ struct MergeAdd<platform::CPUDeviceContext, T> {
auto input_height = has_value_input->height(); auto input_height = has_value_input->height();
framework::SelectedRows& out = *output; framework::SelectedRows& out = *output;
std::set<int64_t> merged_row_set; std::set<int64_t> merged_row_set;
size_t row_num = 0;
for (auto* input : inputs) { for (auto* input : inputs) {
if (input->rows().size() == 0) { if (input->rows().size() == 0) {
continue; continue;
...@@ -305,42 +306,71 @@ struct MergeAdd<platform::CPUDeviceContext, T> { ...@@ -305,42 +306,71 @@ struct MergeAdd<platform::CPUDeviceContext, T> {
"dimension except for the first one"); "dimension except for the first one");
PADDLE_ENFORCE_EQ(input_height, input->height(), PADDLE_ENFORCE_EQ(input_height, input->height(),
"all input should have same height"); "all input should have same height");
row_num += input->rows().size();
merged_row_set.insert(input->rows().begin(), input->rows().end()); merged_row_set.insert(input->rows().begin(), input->rows().end());
} }
std::vector<int64_t> merge_rows(merged_row_set.begin(),
merged_row_set.end());
if (sorted_result) {
std::sort(merge_rows.begin(), merge_rows.end());
}
std::unordered_map<int64_t, size_t> rows_to_id;
for (size_t i = 0; i < merge_rows.size(); ++i) {
rows_to_id[merge_rows[i]] = i;
}
out.set_rows(merge_rows);
out.set_height(input_height); out.set_height(input_height);
out.mutable_value()->mutable_data<T>( out.mutable_value()->mutable_data<T>(
framework::make_ddim( framework::make_ddim(
{static_cast<int64_t>(merge_rows.size()), input_width}), {static_cast<int64_t>(merged_row_set.size()), input_width}),
context.GetPlace()); context.GetPlace());
auto* out_data = out.mutable_value()->data<T>();
math::SetConstant<platform::CPUDeviceContext, T> constant_functor; if (merged_row_set.size() == row_num && !sorted_result) {
constant_functor(context, out.mutable_value(), 0.0); // no duplicated ids, just concat the result together
std::vector<int64_t> merge_rows;
merge_rows.reserve(row_num);
// concat rows
for (auto* in : inputs) {
merge_rows.insert(merge_rows.end(), in->rows().begin(),
in->rows().end());
}
out.set_rows(merge_rows);
auto in_place = inputs[0]->place();
auto out_place = out.place();
int64_t copied_numel = 0;
for (auto* in : inputs) {
auto* in_data = in->value().data<T>();
auto in_numel = in->value().numel();
memory::Copy(boost::get<platform::CPUPlace>(out_place),
out_data + copied_numel,
boost::get<platform::CPUPlace>(in_place), in_data,
in_numel * sizeof(T));
copied_numel += in_numel;
}
} else {
std::vector<int64_t> merge_rows(merged_row_set.begin(),
merged_row_set.end());
auto* out_data = out.mutable_value()->data<T>(); if (sorted_result) {
std::sort(merge_rows.begin(), merge_rows.end());
}
auto blas = math::GetBlas<platform::CPUDeviceContext, T>(context); out.set_rows(merge_rows);
for (auto* input : inputs) {
if (input->rows().size() == 0) { math::SetConstant<platform::CPUDeviceContext, T> constant_functor;
continue; constant_functor(context, out.mutable_value(), 0.0);
std::unordered_map<int64_t, size_t> rows_to_id;
for (size_t i = 0; i < merge_rows.size(); ++i) {
rows_to_id[merge_rows[i]] = i;
} }
auto* input_data = input->value().data<T>();
auto& input_rows = input->rows(); auto blas = math::GetBlas<platform::CPUDeviceContext, T>(context);
for (auto* input : inputs) {
for (size_t i = 0; i < input_rows.size(); i++) { if (input->rows().size() == 0) {
size_t out_i = rows_to_id[input_rows[i]]; continue;
elementwise_add_to<platform::CPUDeviceContext, T>( }
context, &blas, static_cast<size_t>(input_width), auto* input_data = input->value().data<T>();
&input_data[i * input_width], &out_data[out_i * input_width]); auto& input_rows = input->rows();
for (size_t i = 0; i < input_rows.size(); i++) {
size_t out_i = rows_to_id[input_rows[i]];
elementwise_add_to<platform::CPUDeviceContext, T>(
context, &blas, static_cast<size_t>(input_width),
&input_data[i * input_width], &out_data[out_i * input_width]);
}
} }
} }
} }
......
...@@ -13,8 +13,11 @@ See the License for the specific language governing permissions and ...@@ -13,8 +13,11 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/operators/math/selected_rows_functor.h" #include "paddle/fluid/operators/math/selected_rows_functor.h"
#include <memory>
#include <vector> #include <vector>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
TEST(selected_rows_functor, cpu_add) { TEST(selected_rows_functor, cpu_add) {
...@@ -360,6 +363,69 @@ TEST(selected_rows_functor, cpu_merge_add_multi) { ...@@ -360,6 +363,69 @@ TEST(selected_rows_functor, cpu_merge_add_multi) {
} }
} }
TEST(selected_rows_functor, cpu_merge_add_multi_noduplicated) {
paddle::platform::CPUPlace cpu_place;
paddle::platform::CPUDeviceContext ctx(cpu_place);
paddle::operators::math::SetConstant<paddle::platform::CPUDeviceContext,
float>
set_const;
int64_t height = 10;
int64_t row_numel = 8;
std::vector<int64_t> rows1{1, 3, 5, 7, 9};
std::unique_ptr<paddle::framework::SelectedRows> selected_rows1{
new paddle::framework::SelectedRows(rows1, height)};
auto* in1_value = selected_rows1->mutable_value();
in1_value->mutable_data<float>(
paddle::framework::make_ddim(
{static_cast<int64_t>(rows1.size()), row_numel}),
cpu_place);
set_const(ctx, in1_value, 1.0);
std::vector<int64_t> rows2{0, 2, 4, 6, 8};
std::unique_ptr<paddle::framework::SelectedRows> selected_rows2{
new paddle::framework::SelectedRows(rows2, height)};
auto* in2_value = selected_rows2->mutable_value();
in2_value->mutable_data<float>(
paddle::framework::make_ddim(
{static_cast<int64_t>(rows2.size()), row_numel}),
cpu_place);
set_const(ctx, in2_value, 2.0);
std::unique_ptr<paddle::framework::SelectedRows> output{
new paddle::framework::SelectedRows()};
output->set_height(height);
paddle::operators::math::scatter::MergeAdd<paddle::platform::CPUDeviceContext,
float>
merge_add_functor;
std::vector<const paddle::framework::SelectedRows*> inputs;
inputs.push_back(selected_rows1.get());
inputs.push_back(selected_rows2.get());
merge_add_functor(ctx, inputs, output.get());
EXPECT_EQ(output->height(), height);
EXPECT_EQ(output->value().dims(),
paddle::framework::make_ddim({10, row_numel}));
std::vector<int64_t> ret_rows{1, 3, 5, 7, 9, 0, 2, 4, 6, 8};
EXPECT_EQ(output->rows(), ret_rows);
auto* out_data = output->value().data<float>();
for (size_t i = 0; i < ret_rows.size(); ++i) {
float data_value = 0;
if (i < 5) {
data_value = 1.0;
} else {
data_value = 2.0;
}
for (size_t j = 0; j < static_cast<size_t>(row_numel); ++j) {
EXPECT_EQ(out_data[i * row_numel + j], data_value);
}
}
}
TEST(selected_rows_functor, cpu_sum_to) { TEST(selected_rows_functor, cpu_sum_to) {
paddle::platform::CPUPlace cpu_place; paddle::platform::CPUPlace cpu_place;
paddle::platform::CPUDeviceContext ctx(cpu_place); paddle::platform::CPUDeviceContext ctx(cpu_place);
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#include "paddle/fluid/operators/reduce_ops/reduce_all_op.h" #include "paddle/fluid/operators/reduce_ops/reduce_all_op.h"
REGISTER_REDUCE_OP(reduce_all); REGISTER_REDUCE_OP_WITHOUT_GRAD(reduce_all);
REGISTER_OP_CPU_KERNEL(reduce_all, REGISTER_OP_CPU_KERNEL(reduce_all,
ops::ReduceKernel<paddle::platform::CPUDeviceContext, ops::ReduceKernel<paddle::platform::CPUDeviceContext,
bool, ops::AllFunctor>); bool, ops::AllFunctor>);
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#include "paddle/fluid/operators/reduce_ops/reduce_any_op.h" #include "paddle/fluid/operators/reduce_ops/reduce_any_op.h"
REGISTER_REDUCE_OP(reduce_any); REGISTER_REDUCE_OP_WITHOUT_GRAD(reduce_any);
REGISTER_OP_CPU_KERNEL(reduce_any, REGISTER_OP_CPU_KERNEL(reduce_any,
ops::ReduceKernel<paddle::platform::CPUDeviceContext, ops::ReduceKernel<paddle::platform::CPUDeviceContext,
bool, ops::AnyFunctor>); bool, ops::AnyFunctor>);
...@@ -270,3 +270,12 @@ namespace ops = paddle::operators; ...@@ -270,3 +270,12 @@ namespace ops = paddle::operators;
REGISTER_OPERATOR(op_name, ops::ReduceOp, __##op_name##Maker__, \ REGISTER_OPERATOR(op_name, ops::ReduceOp, __##op_name##Maker__, \
paddle::framework::DefaultGradOpDescMaker<true>); \ paddle::framework::DefaultGradOpDescMaker<true>); \
REGISTER_OPERATOR(op_name##_grad, ops::ReduceGradOp) REGISTER_OPERATOR(op_name##_grad, ops::ReduceGradOp)
#define REGISTER_REDUCE_OP_WITHOUT_GRAD(op_name) \
class __##op_name##Maker__ : public ops::ReduceOpMaker { \
protected: \
virtual std::string GetName() const { return #op_name; } \
virtual std::string GetOpType() const { return "Reduce " #op_name; } \
}; \
REGISTER_OPERATOR(op_name, ops::ReduceOp, __##op_name##Maker__, \
paddle::framework::EmptyGradOpMaker);
...@@ -34,15 +34,22 @@ class SigmoidCrossEntropyWithLogitsOp : public framework::OperatorWithKernel { ...@@ -34,15 +34,22 @@ class SigmoidCrossEntropyWithLogitsOp : public framework::OperatorWithKernel {
auto x_dims = ctx->GetInputDim("X"); auto x_dims = ctx->GetInputDim("X");
auto labels_dims = ctx->GetInputDim("Label"); auto labels_dims = ctx->GetInputDim("Label");
PADDLE_ENFORCE_EQ(x_dims.size(), 2, "Input(X)'s rank should be 2.");
PADDLE_ENFORCE_EQ(labels_dims.size(), 2, int rank = x_dims.size();
"Input(Label)'s rank should be 2."); PADDLE_ENFORCE_EQ(rank, labels_dims.size(),
PADDLE_ENFORCE_EQ(x_dims[0], labels_dims[0], "Input(X) and Input(Label) shall have the same rank.");
"The 1st dimension of Input(X) and Input(Label) should " bool check = true;
"be equal."); if ((!ctx->IsRuntime()) && (framework::product(x_dims) <= 0 ||
PADDLE_ENFORCE_EQ(x_dims[1], labels_dims[1], framework::product(labels_dims) <= 0)) {
"The 2nd dimension of Input(X) and Input(Label) should " check = false;
"be equal."); }
if (check) {
PADDLE_ENFORCE_EQ(framework::slice_ddim(x_dims, 0, rank),
framework::slice_ddim(labels_dims, 0, rank),
"Input(X) and Input(Label) shall have the same shape "
"except the last dimension.");
}
ctx->ShareDim("X", /*->*/ "Out"); ctx->ShareDim("X", /*->*/ "Out");
ctx->ShareLoD("X", /*->*/ "Out"); ctx->ShareLoD("X", /*->*/ "Out");
...@@ -65,23 +72,24 @@ class SigmoidCrossEntropyWithLogitsGradOp ...@@ -65,23 +72,24 @@ class SigmoidCrossEntropyWithLogitsGradOp
auto x_dims = ctx->GetInputDim("X"); auto x_dims = ctx->GetInputDim("X");
auto labels_dims = ctx->GetInputDim("Label"); auto labels_dims = ctx->GetInputDim("Label");
auto dout_dims = ctx->GetInputDim(framework::GradVarName("Out")); auto dout_dims = ctx->GetInputDim(framework::GradVarName("Out"));
PADDLE_ENFORCE_EQ(x_dims.size(), 2, "Input(X)'s rank should be 2.");
PADDLE_ENFORCE_EQ(labels_dims.size(), 2, int rank = x_dims.size();
"Input(Label)'s rank should be 2."); bool check = true;
PADDLE_ENFORCE_EQ(dout_dims.size(), 2, if ((!ctx->IsRuntime()) && (framework::product(x_dims) <= 0 ||
"Input(Out@Grad)'s rank should be 2."); framework::product(labels_dims) <= 0)) {
PADDLE_ENFORCE_EQ(x_dims[0], labels_dims[0], check = false;
"The 1st dimension of Input(X) and Input(Label) should " }
"be equal.");
PADDLE_ENFORCE_EQ(x_dims[1], labels_dims[1], if (check) {
"The 2nd dimension of Input(X) and Input(Label) should " PADDLE_ENFORCE_EQ(framework::slice_ddim(x_dims, 0, rank),
"be equal."); framework::slice_ddim(labels_dims, 0, rank),
PADDLE_ENFORCE_EQ(x_dims[0], dout_dims[0], "Input(X) and Input(Label) shall have the same shape.");
"The 1st dimension of Input(X) and Input(Out@Grad) "
"should be equal."); PADDLE_ENFORCE_EQ(
PADDLE_ENFORCE_EQ(x_dims[1], dout_dims[1], framework::slice_ddim(x_dims, 0, rank),
"The 2nd dimension of Input(X) and Input(Out@Grad) " framework::slice_ddim(dout_dims, 0, rank),
"should be equal."); "Input(X) and Input(Out@Grad) shall have the same shape.");
}
ctx->SetOutputDim(framework::GradVarName("X"), x_dims); ctx->SetOutputDim(framework::GradVarName("X"), x_dims);
} }
......
...@@ -56,13 +56,19 @@ class SpectralNormOp : public framework::OperatorWithKernel { ...@@ -56,13 +56,19 @@ class SpectralNormOp : public framework::OperatorWithKernel {
} }
auto dim_u = ctx->GetInputDim("U"); auto dim_u = ctx->GetInputDim("U");
auto dim_v = ctx->GetInputDim("V"); auto dim_v = ctx->GetInputDim("V");
PADDLE_ENFORCE_EQ(dim_u[0], h,
"Input(U) dims[0] should be equal to " if (ctx->IsRuntime() || (dim_u[0] > 0 && h > 0)) {
"Input(Weight) dims[Attr(dim)]"); PADDLE_ENFORCE_EQ(dim_u[0], h,
PADDLE_ENFORCE_EQ( "Input(U) dims[0] should be equal to "
dim_v[0], w, "Input(Weight) dims[Attr(dim)]");
"Input(V) dims[0] should be equal to " }
"the product of Input(Weight) dims except dims[Attr(dim)]");
if (ctx->IsRuntime() || (dim_v[0] > 0 && w > 0)) {
PADDLE_ENFORCE_EQ(
dim_v[0], w,
"Input(V) dims[0] should be equal to "
"the product of Input(Weight) dims except dims[Attr(dim)]");
}
ctx->SetOutputDim("Out", dim_weight); ctx->SetOutputDim("Out", dim_weight);
ctx->ShareLoD("Weight", /*->*/ "Out"); ctx->ShareLoD("Weight", /*->*/ "Out");
......
...@@ -39,14 +39,22 @@ class SplitOp : public framework::OperatorWithKernel { ...@@ -39,14 +39,22 @@ class SplitOp : public framework::OperatorWithKernel {
if (num > 0) { if (num > 0) {
int64_t in_axis_dim = in_dims[axis]; int64_t in_axis_dim = in_dims[axis];
PADDLE_ENFORCE_EQ(in_axis_dim % num, 0, if (ctx->IsRuntime() || in_axis_dim > 0) {
"tensor split does not result" PADDLE_ENFORCE_EQ(in_axis_dim % num, 0,
" in an equal division"); "tensor split does not result"
size_t out_axis_dim = in_axis_dim / num; " in an equal division");
for (size_t i = 0; i < outs_number; ++i) { size_t out_axis_dim = in_axis_dim / num;
auto dim = in_dims; for (size_t i = 0; i < outs_number; ++i) {
dim[axis] = out_axis_dim; auto dim = in_dims;
outs_dims.push_back(dim); dim[axis] = out_axis_dim;
outs_dims.push_back(dim);
}
} else {
for (size_t i = 0; i < outs_number; ++i) {
auto dim = in_dims;
dim[axis] = -1;
outs_dims.push_back(dim);
}
} }
} else if (sections.size() > 0) { } else if (sections.size() > 0) {
PADDLE_ENFORCE_EQ(sections.size(), outs_number, PADDLE_ENFORCE_EQ(sections.size(), outs_number,
......
...@@ -175,6 +175,7 @@ def __bootstrap__(): ...@@ -175,6 +175,7 @@ def __bootstrap__():
read_env_flags.append('communicator_thread_pool_size') read_env_flags.append('communicator_thread_pool_size')
read_env_flags.append('communicator_max_merge_var_num') read_env_flags.append('communicator_max_merge_var_num')
read_env_flags.append('communicator_fake_rpc') read_env_flags.append('communicator_fake_rpc')
read_env_flags.append('communicator_send_wait_times')
if core.is_compiled_with_brpc(): if core.is_compiled_with_brpc():
read_env_flags.append('max_body_size') read_env_flags.append('max_body_size')
#set brpc max body size #set brpc max body size
......
...@@ -147,10 +147,11 @@ class TestCalibrationForResnet50(unittest.TestCase): ...@@ -147,10 +147,11 @@ class TestCalibrationForResnet50(unittest.TestCase):
self.data_cache_folder) self.data_cache_folder)
os.system(cmd) os.system(cmd)
self.batch_size = 1 self.batch_size = 1 if os.environ.get('DATASET') == 'full' else 50
self.sample_iterations = 50 self.sample_iterations = 50 if os.environ.get(
'DATASET') == 'full' else 1
self.infer_iterations = 50000 if os.environ.get( self.infer_iterations = 50000 if os.environ.get(
'DATASET') == 'full' else 50 'DATASET') == 'full' else 1
def cache_unzipping(self, target_folder, zip_path): def cache_unzipping(self, target_folder, zip_path):
if not os.path.exists(target_folder): if not os.path.exists(target_folder):
...@@ -279,15 +280,15 @@ class TestCalibrationForResnet50(unittest.TestCase): ...@@ -279,15 +280,15 @@ class TestCalibrationForResnet50(unittest.TestCase):
def test_calibration(self): def test_calibration(self):
self.download_model() self.download_model()
print("Start FP32 inference for {0} on {1} images ...").format( print("Start FP32 inference for {0} on {1} images ...").format(
self.model, self.infer_iterations) self.model, self.infer_iterations * self.batch_size)
(fp32_throughput, fp32_latency, (fp32_throughput, fp32_latency,
fp32_acc1) = self.run_program(self.model_cache_folder + "/model") fp32_acc1) = self.run_program(self.model_cache_folder + "/model")
print("Start INT8 calibration for {0} on {1} images ...").format( print("Start INT8 calibration for {0} on {1} images ...").format(
self.model, self.sample_iterations) self.model, self.sample_iterations * self.batch_size)
self.run_program( self.run_program(
self.model_cache_folder + "/model", True, algo=self.algo) self.model_cache_folder + "/model", True, algo=self.algo)
print("Start INT8 inference for {0} on {1} images ...").format( print("Start INT8 inference for {0} on {1} images ...").format(
self.model, self.infer_iterations) self.model, self.infer_iterations * self.batch_size)
(int8_throughput, int8_latency, (int8_throughput, int8_latency,
int8_acc1) = self.run_program("calibration_out") int8_acc1) = self.run_program("calibration_out")
delta_value = fp32_acc1 - int8_acc1 delta_value = fp32_acc1 - int8_acc1
......
...@@ -196,6 +196,7 @@ __all__ = [ ...@@ -196,6 +196,7 @@ __all__ = [
'npair_loss', 'npair_loss',
'pixel_shuffle', 'pixel_shuffle',
'fsp_matrix', 'fsp_matrix',
'continuous_value_model',
] ]
kIgnoreIndex = -100 kIgnoreIndex = -100
...@@ -5720,12 +5721,21 @@ def hsigmoid(input, ...@@ -5720,12 +5721,21 @@ def hsigmoid(input,
raise ValueError( raise ValueError(
"num_classes must not be less than 2 with default tree") "num_classes must not be less than 2 with default tree")
if (not is_custom) and (is_sparse):
print("Sparse mode should not be used without custom tree")
is_sparse = False
if (not is_custom) and ((path_table is not None) or
(path_code is not None)):
raise ValueError(
"only num_classes should be passed without custom tree")
if (is_custom) and (path_code is None): if (is_custom) and (path_code is None):
raise ValueError("path_code should not be None with costum tree") raise ValueError("path_code should not be None with custom tree")
elif (is_custom) and (path_table is None): elif (is_custom) and (path_table is None):
raise ValueError("path_table should not be None with costum tree") raise ValueError("path_table should not be None with custom tree")
elif (is_custom) and (num_classes is None): elif (is_custom) and (num_classes is None):
raise ValueError("num_classes should not be None with costum tree") raise ValueError("num_classes should not be None with custom tree")
else: else:
pass pass
...@@ -11202,3 +11212,54 @@ def fsp_matrix(x, y): ...@@ -11202,3 +11212,54 @@ def fsp_matrix(x, y):
input_param_name='x')) input_param_name='x'))
helper.append_op(type='fsp', inputs={'X': x, 'Y': y}, outputs={'Out': out}) helper.append_op(type='fsp', inputs={'X': x, 'Y': y}, outputs={'Out': out})
return out return out
def continuous_value_model(input, cvm, use_cvm=True):
"""
**continuous_value_model layers**
continuous value model(cvm). Now, it only considers show and click value in CTR project.
We assume that input is an embedding vector with cvm_feature, whose shape is [N * D] (D is 2 + embedding dim).
If use_cvm is True, it will log(cvm_feature), and output shape is [N * D].
If use_cvm is False, it will remove cvm_feature from input, and output shape is [N * (D - 2)].
This layer accepts a tensor named input which is ID after embedded(lod level is 1), cvm is a show_click info.
Args:
input (Variable): a 2-D LodTensor with shape [N x D], where N is the batch size, D is 2 + the embedding dim. lod level = 1.
cvm (Variable): a 2-D Tensor with shape [N x 2], where N is the batch size, 2 is show and click.
use_cvm (bool): use cvm or not. if use cvm, the output dim is the same as input
if don't use cvm, the output dim is input dim - 2(remove show and click)
(cvm op is a customized op, which input is a sequence has embedd_with_cvm default, so we need an op named cvm to decided whever use it or not.)
Returns:
Variable: A 2-D LodTensor with shape [N x D], if use cvm, D is equal to input dim, if don't use cvm, D is equal to input dim - 2.
Examples:
.. code-block:: python
input = fluid.layers.data(name="input", shape=[-1, 1], lod_level=1, append_batch_size=False, dtype="int64")#, stop_gradient=False)
label = fluid.layers.data(name="label", shape=[-1, 1], append_batch_size=False, dtype="int64")
embed = fluid.layers.embedding(
input=input,
size=[100, 11],
dtype='float32')
ones = fluid.layers.fill_constant_batch_size_like(input=label, shape=[-1, 1], dtype="int64", value=1)
show_clk = fluid.layers.cast(fluid.layers.concat([ones, label], axis=1), dtype='float32')
show_clk.stop_gradient = True
input_with_cvm = fluid.layers.continuous_value_model(embed, show_clk, True)
"""
helper = LayerHelper('cvm', **locals())
out = helper.create_variable(dtype=input.dtype)
helper.append_op(
type='cvm',
inputs={'X': [input],
'CVM': [cvm]},
outputs={'Y': [out]},
attrs={"use_cvm": use_cvm})
return out
...@@ -275,15 +275,26 @@ class Optimizer(object): ...@@ -275,15 +275,26 @@ class Optimizer(object):
self._create_global_learning_rate() self._create_global_learning_rate()
optimize_ops = [] optimize_ops = []
for param_and_grad in parameters_and_grads: if framework.in_dygraph_mode():
if param_and_grad[1] is None: for param_and_grad in parameters_and_grads:
continue if param_and_grad[1] is None:
with param_and_grad[0].block.program._optimized_guard( continue
param_and_grad), name_scope("optimizer"): with param_and_grad[0].block.program._optimized_guard(
if param_and_grad[0].trainable is True: param_and_grad):
optimize_op = self._append_optimize_op(global_block, if param_and_grad[0].trainable is True:
param_and_grad) optimize_op = self._append_optimize_op(global_block,
optimize_ops.append(optimize_op) param_and_grad)
optimize_ops.append(optimize_op)
else:
for param_and_grad in parameters_and_grads:
if param_and_grad[1] is None:
continue
with param_and_grad[0].block.program._optimized_guard(
param_and_grad), name_scope("optimizer"):
if param_and_grad[0].trainable is True:
optimize_op = self._append_optimize_op(global_block,
param_and_grad)
optimize_ops.append(optimize_op)
# Get custom finish ops for subclasses # Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies # FIXME: Need to fix this once we figure out how to handle dependencies
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from math import log
from math import exp
from op_test import OpTest
import unittest
class TestCVMOp(OpTest):
"""
Test cvm op with discrete one-hot labels.
"""
def setUp(self):
self.op_type = "cvm"
batch_size = 4
dims = 11
lod = [[1]]
self.inputs = {
'X': (np.random.uniform(0, 1, [1, dims]).astype("float32"), lod),
'CVM': np.array([[0.6, 0.4]]).astype("float32"),
}
self.attrs = {'use_cvm': False}
out = []
for index, emb in enumerate(self.inputs["X"][0]):
out.append(emb[2:])
self.outputs = {'Y': (np.array(out), lod)}
def test_check_output(self):
self.check_output()
if __name__ == '__main__':
unittest.main()
...@@ -149,5 +149,98 @@ class TestSigmoidCrossEntropyWithNorm(OpTest): ...@@ -149,5 +149,98 @@ class TestSigmoidCrossEntropyWithNorm(OpTest):
self.check_grad(['X'], 'Out') self.check_grad(['X'], 'Out')
class TestSigmoidCrossEntropyWithLogitsOp5(OpTest):
"""Test sigmoid_cross_entropy_with_logit_op with probabalistic label
"""
def setUp(self):
self.op_type = "sigmoid_cross_entropy_with_logits"
batch_size = [10, 10]
num_classes = 20
self.inputs = {
'X': logit(
np.random.uniform(0, 1, tuple(batch_size + [num_classes]))
.astype("float32")),
'Label': np.random.uniform(0, 1, tuple(batch_size + [num_classes]))
.astype("float32")
}
# Fw Pass is implemented as elementwise sigmoid followed by
# elementwise logistic loss
# Label * -log(sigmoid(X)) + (1 - label) * -log(1 - sigmoid(X))
sigmoid_X = expit(self.inputs['X'])
term1 = self.inputs['Label'] * np.log(sigmoid_X)
term2 = (1 - self.inputs['Label']) * np.log(1 - sigmoid_X)
self.outputs = {'Out': -term1 - term2}
def test_check_output(self):
self.check_output()
def test_check_grad(self):
self.check_grad(['X'], 'Out')
class TestSigmoidCrossEntropyWithNorm2(OpTest):
def setUp(self):
self.op_type = "sigmoid_cross_entropy_with_logits"
batch_size = [10, 10]
num_classes = 20
ignore_index = -1
self.inputs = {
'X': logit(
np.random.uniform(0, 1, tuple(batch_size + [num_classes]))
.astype("float32")),
'Label': np.random.randint(-1, 2, tuple(batch_size + [num_classes]))
.astype("float32")
}
self.attrs = {'ignore_index': ignore_index, 'normalize': True}
sigmoid_X = expit(self.inputs['X'])
term1 = self.inputs['Label'] * np.log(sigmoid_X)
term2 = (1 - self.inputs['Label']) * np.log(1 - sigmoid_X)
out = -term1 - term2
out[np.where(self.inputs['Label'] == ignore_index)] = 0
if self.attrs['normalize']:
out = out / float(
np.where(self.inputs['Label'] != ignore_index)[0].size)
self.outputs = {'Out': out}
def test_check_output(self):
self.check_output()
def test_check_grad(self):
self.check_grad(['X'], 'Out')
class TestSigmoidCrossEntropyWithLogitsOp6(OpTest):
"""Test sigmoid_cross_entropy_with_logit_op with binary label
"""
def setUp(self):
self.op_type = "sigmoid_cross_entropy_with_logits"
batch_size = [10, 10]
num_classes = 20
self.inputs = {
'X': logit(
np.random.uniform(0, 1, tuple(batch_size + [num_classes]))
.astype("float32")),
'Label': np.random.randint(0, 2, tuple(batch_size + [num_classes]))
.astype("float32")
}
# Fw Pass is implemented as elementwise sigmoid followed by
# elementwise logistic loss
# Label * -log(sigmoid(X)) + (1 - label) * -log(1 - sigmoid(X))
sigmoid_X = expit(self.inputs['X'])
term1 = self.inputs['Label'] * np.log(sigmoid_X)
term2 = (1 - self.inputs['Label']) * np.log(1 - sigmoid_X)
self.outputs = {'Out': -term1 - term2}
def test_check_output(self):
self.check_output()
def test_check_grad(self):
self.check_grad(['X'], 'Out')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -658,6 +658,7 @@ class DistributeTranspiler(object): ...@@ -658,6 +658,7 @@ class DistributeTranspiler(object):
outputs={"Out": splited_var}, outputs={"Out": splited_var},
attrs={ attrs={
"epmap": eps, "epmap": eps,
"trainer_id": self.trainer_id,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
}) })
...@@ -669,6 +670,7 @@ class DistributeTranspiler(object): ...@@ -669,6 +670,7 @@ class DistributeTranspiler(object):
outputs={"Out": fetch_barrier_out}, outputs={"Out": fetch_barrier_out},
attrs={ attrs={
"endpoints": self.pserver_endpoints, "endpoints": self.pserver_endpoints,
"trainer_id": self.trainer_id,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
}) })
...@@ -791,11 +793,15 @@ class DistributeTranspiler(object): ...@@ -791,11 +793,15 @@ class DistributeTranspiler(object):
global_ops = [] global_ops = []
# sparse grad name to param name
sparse_grad_to_param = []
def __append_optimize_op__(op, block, grad_to_block_id, merged_var, def __append_optimize_op__(op, block, grad_to_block_id, merged_var,
lr_ops): lr_ops):
if self._is_optimizer_op(op): if self._is_optimizer_op(op):
self._append_pserver_ops(block, op, endpoint, grad_to_block_id, self._append_pserver_ops(block, op, endpoint, grad_to_block_id,
self.origin_program, merged_var) self.origin_program, merged_var,
sparse_grad_to_param)
elif op not in lr_ops: elif op not in lr_ops:
self._append_pserver_non_opt_ops(block, op) self._append_pserver_non_opt_ops(block, op)
...@@ -911,6 +917,7 @@ class DistributeTranspiler(object): ...@@ -911,6 +917,7 @@ class DistributeTranspiler(object):
"Fanin": self.trainer_num, "Fanin": self.trainer_num,
"sync_mode": self.sync_mode, "sync_mode": self.sync_mode,
"grad_to_block_id": grad_to_block_id, "grad_to_block_id": grad_to_block_id,
"sparse_grad_to_param": sparse_grad_to_param,
} }
if self.has_distributed_lookup_table: if self.has_distributed_lookup_table:
...@@ -1779,7 +1786,8 @@ class DistributeTranspiler(object): ...@@ -1779,7 +1786,8 @@ class DistributeTranspiler(object):
return o4 return o4
def _append_pserver_ops(self, optimize_block, opt_op, endpoint, def _append_pserver_ops(self, optimize_block, opt_op, endpoint,
grad_to_block_id, origin_program, merged_var): grad_to_block_id, origin_program, merged_var,
sparse_grad_to_param):
program = optimize_block.program program = optimize_block.program
pserver_block = program.global_block() pserver_block = program.global_block()
new_inputs = collections.OrderedDict() new_inputs = collections.OrderedDict()
...@@ -1863,6 +1871,12 @@ class DistributeTranspiler(object): ...@@ -1863,6 +1871,12 @@ class DistributeTranspiler(object):
outputs=outputs, outputs=outputs,
attrs=opt_op.all_attrs()) attrs=opt_op.all_attrs())
# record sparse grad to param name
if new_inputs["Grad"].type == core.VarDesc.VarType.SELECTED_ROWS:
sparse_grad_to_param.append(
str(new_inputs["Grad"].name) + ":" + str(new_inputs["Param"]
.name))
def _get_pserver_grad_param_var(self, var, var_dict): def _get_pserver_grad_param_var(self, var, var_dict):
""" """
Return pserver side grad/param variable, return None Return pserver side grad/param variable, return None
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册