diff --git a/paddle/fluid/operators/send_op.cc b/paddle/fluid/operators/send_op.cc index 178976f96fdbd08cead7b7c518ea1fbaaa2a5db8..8fdd08eae6b22cd57506d6e75182c1a7e2022562 100644 --- a/paddle/fluid/operators/send_op.cc +++ b/paddle/fluid/operators/send_op.cc @@ -24,15 +24,15 @@ limitations under the License. */ namespace paddle { namespace operators { -static bool IsVariableInitialized(const framework::Scope& scope, - const std::string& varname) { +static bool NeedSend(const framework::Scope& scope, + const std::string& varname) { auto* var = scope.FindVar(varname); PADDLE_ENFORCE_NOT_NULL(var, "Can not find variable '%s' in the send side.", varname); if (var->IsType()) { return var->Get().IsInitialized(); } else if (var->IsType()) { - return var->Get().value().IsInitialized(); + return var->Get().rows().size() > 0UL; } else { PADDLE_THROW( "Variable type in send side should be in " @@ -67,7 +67,7 @@ class SendOp : public framework::OperatorBase { detail::RPCClient* rpc_client = client_var->GetMutable(); for (size_t i = 0; i < ins.size(); i++) { - if (IsVariableInitialized(scope, ins[i])) { + if (NeedSend(scope, ins[i])) { VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); } else { diff --git a/paddle/fluid/operators/sgd_op.cc b/paddle/fluid/operators/sgd_op.cc index 7cc73de8788e3dceb763b6f5a1519459d0fb05dd..d0aa2f9cbadaadf4e7e625628d9db5677d50d277 100644 --- a/paddle/fluid/operators/sgd_op.cc +++ b/paddle/fluid/operators/sgd_op.cc @@ -39,6 +39,14 @@ class SGDOp : public framework::OperatorWithKernel { // and run time. ctx->SetOutputDim("ParamOut", param_dim); } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType( + framework::ToDataType(ctx.Input("Param")->type()), + ctx.GetPlace()); + } }; class SGDOpMaker : public framework::OpProtoAndCheckerMaker { diff --git a/paddle/fluid/operators/sgd_op.h b/paddle/fluid/operators/sgd_op.h index 2fec84815a9ecc63675de88816b23cfaa75aca65..0ad801079400f1830d85a945e57a434a86adeb00 100644 --- a/paddle/fluid/operators/sgd_op.h +++ b/paddle/fluid/operators/sgd_op.h @@ -47,6 +47,12 @@ class SGDOpKernel : public framework::OpKernel { PADDLE_ENFORCE_EQ(param, param_out); auto* grad = ctx.Input("Grad"); + // for distributed training, a sparse var may be empty, + // just skip updating. + if (grad->rows().size() == 0) { + return; + } + auto in_height = grad->height(); auto out_dims = param_out->dims(); PADDLE_ENFORCE_EQ(in_height, out_dims[0]); @@ -60,13 +66,15 @@ class SGDOpKernel : public framework::OpKernel { auto* in_data = in_value.data(); auto* out_data = param_out->data(); auto* lr = learning_rate->data(); - for (size_t i = 0; i < in_rows.size(); i++) { + PADDLE_ENFORCE(in_rows[i] < in_height, + "Input rows index should less than height"); for (int64_t j = 0; j < in_row_numel; j++) { out_data[in_rows[i] * in_row_numel + j] -= lr[0] * in_data[i * in_row_numel + j]; } } + } else { PADDLE_THROW("Unsupported Variable Type of Grad"); } diff --git a/paddle/fluid/operators/split_selected_rows_op.h b/paddle/fluid/operators/split_selected_rows_op.h index 23baf8e72eca87f3865fc2b63ce2de96f799dce3..0e9ce165b98845f4745ee70b028513ea31cc6657 100644 --- a/paddle/fluid/operators/split_selected_rows_op.h +++ b/paddle/fluid/operators/split_selected_rows_op.h @@ -21,15 +21,24 @@ limitations under the License. */ namespace paddle { namespace operators { -static int FindOutIdx(int row, const std::vector& height_sections) { - int offset = 0; - for (size_t i = 0; i < height_sections.size(); ++i) { - if (row >= offset && row < (offset + height_sections[i])) { - return i; +static int FindOutIdx(int row, const std::vector& abs_sections) { + for (size_t i = 1; i < abs_sections.size(); ++i) { + if (row < abs_sections[i]) { + return i - 1; } - offset += height_sections[i]; } - return -1; + return abs_sections.size() - 1; +} + +static std::vector ToAbsoluteSection( + const std::vector& height_sections) { + std::vector abs_sections; + abs_sections.resize(height_sections.size()); + abs_sections[0] = 0; + for (size_t i = 1; i < height_sections.size(); ++i) { + abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1]; + } + return abs_sections; } template @@ -40,16 +49,23 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { auto outs = ctx.MultiOutput("Out"); auto height_sections = ctx.Attr>("height_sections"); + auto abs_sections = ToAbsoluteSection(height_sections); + auto x_rows = x->rows(); std::vector> outs_rows_idx; + std::vector> outs_dense_idx; + outs_rows_idx.resize(outs.size()); + outs_dense_idx.resize(outs.size()); auto row_numel = x->value().numel() / x->value().dims()[0]; auto src = x->value().data(); + // split rows index into output sparse vars for (size_t i = 0; i < x_rows.size(); ++i) { - int out_idx = FindOutIdx(x_rows[i], height_sections); - outs_rows_idx[out_idx].push_back(i); + int out_idx = FindOutIdx(x_rows[i], abs_sections); + outs_rows_idx[out_idx].push_back(x_rows[i]); + outs_dense_idx[out_idx].push_back(i); } auto place = ctx.GetPlace(); @@ -61,19 +77,20 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { dims[0] = rows_idx.size(); outs[i]->mutable_value()->mutable_data(dims, x->place()); for (auto idx : rows_idx) { - outs[i]->mutable_rows()->push_back(x_rows[idx]); + outs[i]->mutable_rows()->push_back(idx - abs_sections[i]); } auto dst = outs[i]->mutable_value()->mutable_data(ctx.GetPlace()); for (size_t j = 0; j < rows_idx.size(); j++) { if (platform::is_cpu_place(place)) { - memory::Copy(platform::CPUPlace(), dst + j * row_numel, - platform::CPUPlace(), src + rows_idx[j] * row_numel, - sizeof(T) * row_numel); + memory::Copy( + platform::CPUPlace(), dst + j * row_numel, platform::CPUPlace(), + src + outs_dense_idx[i][j] * row_numel, sizeof(T) * row_numel); } else { #ifdef PADDLE_WITH_CUDA auto stream = ctx.cuda_device_context().stream(); memory::Copy(platform::CUDAPlace(), dst + j * row_numel, - platform::CUDAPlace(), src + rows_idx[j] * row_numel, + platform::CUDAPlace(), + src + outs_dense_idx[i][j] * row_numel, sizeof(T) * row_numel, stream); #else PADDLE_THROW("Paddle is not compiled with GPU"); diff --git a/paddle/fluid/operators/sum_op.cc b/paddle/fluid/operators/sum_op.cc index c3abb3ea4a53126c22c817069e8ad955b202f09d..d3d5c8a3429e2070c5472355b4440401eaa699cb 100644 --- a/paddle/fluid/operators/sum_op.cc +++ b/paddle/fluid/operators/sum_op.cc @@ -76,10 +76,16 @@ class SumOp : public framework::OperatorWithKernel { static_cast(dtype), ctx.device_context()); } else if (x_vars[0]->IsType()) { - return framework::OpKernelType( - framework::ToDataType( - x_vars[0]->Get().value().type()), - ctx.device_context()); + for (auto& var : x_vars) { + auto& value = var->Get().value(); + if (value.IsInitialized()) { + return framework::OpKernelType(framework::ToDataType(value.type()), + ctx.device_context()); + } + } + // if input sparse vars are not initialized, use an default kernel type. + return framework::OpKernelType(framework::proto::VarType::FP32, + ctx.device_context()); } else if (x_vars[0]->IsType()) { for (auto& x_var : x_vars) { auto& array = x_var->Get(); diff --git a/paddle/fluid/operators/sum_op.h b/paddle/fluid/operators/sum_op.h index 48b2d2779aeeb168cf87e61557e01d5cbde476b3..e7e5346cdca5efaf81c2b0fddedde7406e3b874d 100644 --- a/paddle/fluid/operators/sum_op.h +++ b/paddle/fluid/operators/sum_op.h @@ -109,6 +109,12 @@ class SumKernel : public framework::OpKernel { in_dim[0] = static_cast(first_dim); out_value->Resize(framework::make_ddim(in_dim)); + + // if all the input sparse vars are empty, no need to + // merge these vars. + if (first_dim == 0UL) { + return; + } out_value->mutable_data(context.GetPlace()); math::SelectedRowsAddTo functor; @@ -116,7 +122,7 @@ class SumKernel : public framework::OpKernel { int64_t offset = 0; for (int i = 0; i < N; i++) { auto &sel_row = get_selected_row(i); - if (!sel_row.value().IsInitialized() || sel_row.rows().size() == 0) { + if (sel_row.rows().size() == 0) { continue; } PADDLE_ENFORCE_EQ(out->height(), sel_row.height()); diff --git a/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py b/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py index 286d305a777a4683d42a4d3d2d5d5f0c5b6ac12a..61040a39ced6dc57d05a10bf0605c80011db45c3 100644 --- a/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py +++ b/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py @@ -60,8 +60,8 @@ class TestSpliteSelectedRows(unittest.TestCase): # expected output selected rows expected_out0_rows = [0, 4] - expected_out1_rows = [5, 7] - expected_out4_rows = [20] + expected_out1_rows = [0, 2] + expected_out4_rows = [0] op = Operator( "split_selected_rows", @@ -101,7 +101,7 @@ class TestSpliteSelectedRows(unittest.TestCase): out0_grad_tensor.set(np_array, place) out1_grad = scope.var("out1@GRAD").get_selected_rows() - rows1 = [7, 5] + rows1 = [2, 0] out1_grad.set_rows(rows1) out1_grad.set_height(height) out1_grad_tensor = out1_grad.get_tensor()