未验证 提交 5e74c4e8 编写于 作者: 乔龙飞 Qiao Longfei 提交者: GitHub

Merge pull request #15100 from jacquesqiao/fix-dist-sparse-decay

fix dist sparse l2 decay
...@@ -195,6 +195,10 @@ struct SelectedRowsAddToTensor<platform::CPUDeviceContext, T> { ...@@ -195,6 +195,10 @@ struct SelectedRowsAddToTensor<platform::CPUDeviceContext, T> {
void operator()(const platform::CPUDeviceContext& context, void operator()(const platform::CPUDeviceContext& context,
const framework::SelectedRows& input1, const framework::SelectedRows& input1,
framework::Tensor* input2) { framework::Tensor* input2) {
if (UNLIKELY(input1.rows().size() == 0)) {
LOG(WARNING) << "input selected rows is empty!";
return;
}
auto in1_height = input1.height(); auto in1_height = input1.height();
auto in2_dims = input2->dims(); auto in2_dims = input2->dims();
PADDLE_ENFORCE_EQ(in1_height, in2_dims[0]); PADDLE_ENFORCE_EQ(in1_height, in2_dims[0]);
......
...@@ -41,7 +41,9 @@ class SumOp : public framework::OperatorWithKernel { ...@@ -41,7 +41,9 @@ class SumOp : public framework::OperatorWithKernel {
return; // skip runtime infershape when is tensor array; return; // skip runtime infershape when is tensor array;
} }
auto x_var_types = ctx->GetInputsVarType("X");
auto x_dims = ctx->GetInputsDim("X"); auto x_dims = ctx->GetInputsDim("X");
size_t N = x_dims.size(); size_t N = x_dims.size();
PADDLE_ENFORCE_GT(N, 0, "Input tensors count should > 0."); PADDLE_ENFORCE_GT(N, 0, "Input tensors count should > 0.");
if (N == 1) { if (N == 1) {
...@@ -49,7 +51,13 @@ class SumOp : public framework::OperatorWithKernel { ...@@ -49,7 +51,13 @@ class SumOp : public framework::OperatorWithKernel {
} }
framework::DDim in_dim({0}); framework::DDim in_dim({0});
for (auto& x_dim : x_dims) { for (size_t i = 0; i < x_dims.size(); ++i) {
auto& x_dim = x_dims[i];
// x_dim.size() == 1 means the real dim of selected rows is [0]
if (x_var_types[i] == framework::proto::VarType::SELECTED_ROWS &&
x_dim.size() == 1) {
continue;
}
if (framework::product(x_dim) == 0) { if (framework::product(x_dim) == 0) {
continue; continue;
} }
......
...@@ -31,6 +31,7 @@ fluid.default_main_program().random_seed = 1 ...@@ -31,6 +31,7 @@ fluid.default_main_program().random_seed = 1
class TestDistCTR2x2(TestDistRunnerBase): class TestDistCTR2x2(TestDistRunnerBase):
def get_model(self, batch_size=2): def get_model(self, batch_size=2):
dnn_input_dim, lr_input_dim = dist_ctr_reader.load_data_meta() dnn_input_dim, lr_input_dim = dist_ctr_reader.load_data_meta()
""" network definition """ """ network definition """
dnn_data = fluid.layers.data( dnn_data = fluid.layers.data(
...@@ -97,7 +98,14 @@ class TestDistCTR2x2(TestDistRunnerBase): ...@@ -97,7 +98,14 @@ class TestDistCTR2x2(TestDistRunnerBase):
inference_program = paddle.fluid.default_main_program().clone() inference_program = paddle.fluid.default_main_program().clone()
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.0001) regularization = None
use_l2_decay = bool(os.getenv('USE_L2_DECAY', 0))
if use_l2_decay:
regularization = fluid.regularizer.L2DecayRegularizer(
regularization_coeff=1e-1)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.0001,
regularization=regularization)
sgd_optimizer.minimize(avg_cost) sgd_optimizer.minimize(avg_cost)
dataset = dist_ctr_reader.Dataset() dataset = dist_ctr_reader.Dataset()
......
...@@ -235,7 +235,6 @@ class DistSeResneXt2x2(TestDistRunnerBase): ...@@ -235,7 +235,6 @@ class DistSeResneXt2x2(TestDistRunnerBase):
bd = [step * e for e in epochs] bd = [step * e for e in epochs]
base_lr = 0.1 base_lr = 0.1
lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum( optimizer = fluid.optimizer.Momentum(
......
...@@ -18,7 +18,6 @@ import unittest ...@@ -18,7 +18,6 @@ import unittest
from test_dist_base import TestDistBase from test_dist_base import TestDistBase
# FIXME(tangwei): sum op can not handle when inputs is empty.
class TestDistCTR2x2(TestDistBase): class TestDistCTR2x2(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
...@@ -28,5 +27,19 @@ class TestDistCTR2x2(TestDistBase): ...@@ -28,5 +27,19 @@ class TestDistCTR2x2(TestDistBase):
self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False) self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False)
class TestDistCTRWithL2Decay2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._enforce_place = "CPU"
def test_dist_ctr(self):
need_envs = {"USE_L2_DECAY": "1"}
self.check_with_place(
"dist_ctr.py",
delta=1e-7,
check_error_log=False,
need_envs=need_envs)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -752,12 +752,6 @@ class DistributeTranspiler(object): ...@@ -752,12 +752,6 @@ class DistributeTranspiler(object):
elif op not in lr_ops: elif op not in lr_ops:
self._append_pserver_non_opt_ops(block, op) self._append_pserver_non_opt_ops(block, op)
def __op_have_grad_input__(op):
for varname in op.input_arg_names:
if varname.find("@GRAD") >= 0:
return varname
return ""
def __clone_lr_op_sub_block__(op, program, lr_block): def __clone_lr_op_sub_block__(op, program, lr_block):
if not op.has_attr('sub_block'): if not op.has_attr('sub_block'):
return return
...@@ -808,7 +802,7 @@ class DistributeTranspiler(object): ...@@ -808,7 +802,7 @@ class DistributeTranspiler(object):
merged_var = None merged_var = None
for _, op in enumerate(self.optimize_ops): for _, op in enumerate(self.optimize_ops):
# find the origin grad var before clipping/L2Decay, # find the origin grad var before clipping/L2Decay,
# merged_var should be the input var name of L2Decaybuil # merged_var should be the input var name of L2Decay
grad_varname_for_block = op.attr(OP_ROLE_VAR_ATTR_NAME)[1] grad_varname_for_block = op.attr(OP_ROLE_VAR_ATTR_NAME)[1]
if op.attr(OP_ROLE_VAR_ATTR_NAME)[ if op.attr(OP_ROLE_VAR_ATTR_NAME)[
0] == optimize_target_param_name: 0] == optimize_target_param_name:
...@@ -1684,7 +1678,16 @@ class DistributeTranspiler(object): ...@@ -1684,7 +1678,16 @@ class DistributeTranspiler(object):
if self.config.enable_dc_asgd: if self.config.enable_dc_asgd:
new_inputs[key] = dc new_inputs[key] = dc
else: else:
new_inputs[key] = merged_var # Note!! This is for l2decay on sparse gradient, because it will create a new tensor for
# decayed gradient but not inplace modify the origin one
origin_grad_name = opt_op.input(key)[0]
if core.kNewGradSuffix(
) in origin_grad_name and pserver_block.has_var(
origin_grad_name):
new_grad = pserver_block.var(origin_grad_name)
new_inputs[key] = new_grad
else:
new_inputs[key] = merged_var
elif key == "Param": elif key == "Param":
param_block = _get_param_block(opt_op) param_block = _get_param_block(opt_op)
if not param_block: if not param_block:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册