diff --git a/paddle/fluid/framework/details/dgc_const_values.h b/paddle/fluid/framework/details/dgc_const_values.h index fbe50dc91160e1d7d5175daa150ec9c45aa60a6f..3063177fb082e7a3b013b8a9f3b19f91ca1f4cdf 100644 --- a/paddle/fluid/framework/details/dgc_const_values.h +++ b/paddle/fluid/framework/details/dgc_const_values.h @@ -22,10 +22,10 @@ namespace details { constexpr char g_dgc_counter_name[] = "__g_dgc_counter__"; constexpr char g_dgc_rampup_begin_step[] = "__g_rampup_begin_step__"; -constexpr char g_dgc_u[] = "__dgc_u__"; -constexpr char g_dgc_v[] = "__dgc_v__"; +constexpr char g_dgc_nranks[] = "__g_nranks__"; constexpr char g_dgc_k[] = "__dgc_k__"; constexpr char g_dgc_encoded[] = "__dgc_encoded__"; +constexpr char g_dgc_gather[] = "__dgc_gather__"; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc index 282f0fc053eb74aa242a7eb74b19f996916dc167..a530c652fe84d23262b911686200725aeaf671e5 100644 --- a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc @@ -38,30 +38,23 @@ SparseAllReduceOpHandle::SparseAllReduceOpHandle( is_encoded_(is_encoded), nranks_(nranks) { // TODO(gongwb) :polish them! - if (is_encoded) { - VLOG(1) << "Use dgc allreduce mode"; - } -} + PADDLE_ENFORCE_EQ(is_encoded, true); + VLOG(1) << "Use dgc allreduce mode" + << ", nranks:" << nranks_; -void SparseAllReduceOpHandle::WaitInputVarGenerated() { -#ifdef PADDLE_WITH_CUDA - for (auto &p : dev_ctxes_) { - if (platform::is_gpu_place(p.first)) { - int dev_id = boost::get(p.first).device; - auto *compute_dev_ctx = - platform::DeviceContextPool::Instance().GetByPlace( - platform::CUDAPlace(dev_id)); - auto *dev_ctx = static_cast(p.second); - if (compute_dev_ctx->stream() != dev_ctx->stream()) { - auto &event = events_.at(dev_id); - PADDLE_ENFORCE_CUDA_SUCCESS( - cudaEventRecord(event, compute_dev_ctx->stream())); - PADDLE_ENFORCE_CUDA_SUCCESS( - cudaStreamWaitEvent(dev_ctx->stream(), event, 0)); - } + PADDLE_ENFORCE_GT(local_scopes_.size(), 0); + auto nranks_name = g_dgc_nranks; + for (size_t i = 0; i < local_scopes_.size(); ++i) { + auto *local_scope = local_scopes_[i]; + auto nranks_var = local_scope->FindVar(nranks_name); + if (nranks_var == nullptr) { + PADDLE_THROW("not find nranks_var:%s", nranks_name); } + + float *dgc_nranks = nranks_var->GetMutable()->data(); + *dgc_nranks = nranks; + VLOG(10) << "dgc_nranks=" << *dgc_nranks; } -#endif } void SparseAllReduceOpHandle::RunImplEncoded() { @@ -77,18 +70,27 @@ void SparseAllReduceOpHandle::RunImplEncoded() { "The NoDummyInputSize and NoDummyOutputSize should be equal."); std::vector ins; + std::vector gathers; std::vector outs; int k = -1; for (size_t i = 0; i < local_scopes_.size(); ++i) { auto *local_scope = local_exec_scopes_[i]; auto original_name = paddle::framework::GradOriginalVarName(in_var_handles[i]->name()); + auto encode_var_name = original_name + g_dgc_encoded; auto *in_var = local_scope->FindVar(encode_var_name); PADDLE_ENFORCE_NOT_NULL(in_var, "%s should not be null", encode_var_name); auto &in = in_var->Get(); ins.emplace_back(&in); + auto gather_var_name = original_name + g_dgc_gather; + auto *gather_var = local_scope->FindVar(gather_var_name); + PADDLE_ENFORCE_NOT_NULL(gather_var, "%s should not be null", + gather_var_name); + auto *gather = gather_var->GetMutable(); + gathers.emplace_back(gather); + auto *out = local_scope->FindVar(out_var_handles[i]->name()) ->GetMutable(); outs.emplace_back(out); @@ -135,9 +137,7 @@ void SparseAllReduceOpHandle::RunImplEncoded() { // dgc use ncclAllGather to get all the encoded data // so the buffer need nranks. int buf_size = nranks_ * encode_size; - auto tmp_ious_data = memory::Alloc(place, buf_size); - void *gather_buff = reinterpret_cast(tmp_ious_data->ptr()); - allocations.emplace_back(std::move(tmp_ious_data)); + void *gather_buff = gathers[i]->data(); VLOG(10) << "in_numel:" << in_numel << ", out_numel:" << out_numel << ", nranks:" << nranks_ << ", gather_buf size:" << buf_size diff --git a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h index 87136ea0d5a5bc827a46f615e4900ea77b72f869..b24b457d21d8b7a17d8e2ae615791090f3859a97 100644 --- a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h +++ b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h @@ -36,8 +36,6 @@ class SparseAllReduceOpHandle : public AllReduceOpHandle { bool is_encoded = false, int nranks = -1); std::string Name() const override; - void WaitInputVarGenerated() override; - protected: void RunImpl() override; int GetKValue(const std::string &grad_name); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc index 73d7bf6dba0f01cc53ed0e9010c2da88afd6e384..3f7ceb5795876029531aa73d3aa0b9154a2e6408 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc @@ -105,6 +105,12 @@ class FuseAllReduceOpPass : public ir::Pass { auto *all_reduce_op_handle = dynamic_cast( &node->Wrapper()); if (all_reduce_op_handle) { +#if defined(PADDLE_WITH_DGC) + PADDLE_ENFORCE_NE( + all_reduce_op_handle->Name(), "sparse_all_reduce", + "DGC doesn't support fuse for now, if you want to use DGC " + "you need set strategy.fuse_all_reduce_ops = False."); +#endif auto inputs = details::DynamicCast( all_reduce_op_handle->Inputs()); PADDLE_ENFORCE_EQ(inputs.size(), num_place); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc index 491bbc17ff514494f8037e7e45b12eb7f9e8f7d9..750fc12acc7344e5cefc99b211a9f6a9496d0f76 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc @@ -1011,10 +1011,10 @@ int DistSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, #if defined(PADDLE_WITH_DGC) bool AllReduceSSAGraphBuilder::IsEncoded(const std::string &p_name) const { - auto u_name = p_name + details::g_dgc_u; - auto it = all_vars_.find(u_name); + auto k_name = p_name + details::g_dgc_k; + auto it = all_vars_.find(k_name); if (it == all_vars_.end()) { - VLOG(10) << "can't find u_name, so it's not encoded:" << u_name; + VLOG(10) << "can't find k_name, so it's not encoded:" << k_name; return false; } diff --git a/paddle/fluid/operators/dgc_op.cc b/paddle/fluid/operators/dgc_op.cc index ccdeea2d0a96342a57ca56ae2b686f81b32fd866..5b45c42f837382353b7252f696bd882916cdd3fa 100644 --- a/paddle/fluid/operators/dgc_op.cc +++ b/paddle/fluid/operators/dgc_op.cc @@ -31,6 +31,8 @@ class DGCOp : public framework::OperatorWithKernel { "Input(Grad) of DGCop should not be null."); PADDLE_ENFORCE(ctx->HasInput("current_step"), "Input(current_step) of DGCop should not be null."); + PADDLE_ENFORCE_EQ(ctx->HasInput("nranks"), true, + "Input(nranks) of DGCop should not be null."); PADDLE_ENFORCE(ctx->HasOutput("U_out"), "Output(U_out) of DGCop should not be null."); @@ -40,14 +42,15 @@ class DGCOp : public framework::OperatorWithKernel { "Output(k) of DGCop should not be null."); PADDLE_ENFORCE(ctx->HasOutput("EncodeGrad"), "Output(EncodeGrad) of DGCop should not be null."); + PADDLE_ENFORCE_EQ(ctx->HasOutput("GatherBuff"), true, + "Output(EncodeGrad) of DGCop should not be null."); } protected: framework::OpKernelType GetKernelTypeForVar( const std::string& var_name, const framework::Tensor& tensor, const framework::OpKernelType& expected_kernel_type) const override { - if (var_name == "current_step" || var_name == "rampup_step" || - var_name == "k") { + if (var_name == "current_step" || var_name == "k" || var_name == "nranks") { VLOG(10) << "var_name:" << var_name << " need not to transform"; return expected_kernel_type; } @@ -60,26 +63,18 @@ class DGCOp : public framework::OperatorWithKernel { class DGCOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { - AddInput("U", "(Tensor) Middle tensor of DGC"); - AddInput("V", "(Tensor) Middle tensor of DGC"); + AddInput("U", "(Tensor) U velocity tensor of DGC"); + AddInput("V", "(Tensor) V velocity tensor of DGC"); AddInput("Grad", "(Tensor) Input gradient"); AddInput("current_step", "(Tensor) Current step."); - - AddOutput("U_out", - "(Tensor) " - "Output encoded gradient"); - AddOutput("V_out", - "(Tensor) " - "Output encoded gradient"); - AddOutput("EncodeGrad", - "(Tensor) " - "Output encoded gradient"); - AddOutput("Grad_out", - "(Tensor) " - "Output grad gradient"); - AddOutput("k", - "(Tensor) " - "Output top-k value"); + AddInput("nranks", "(Tensor) nranks."); + + AddOutput("U_out", "(Tensor) Output U velocity of DGC"); + AddOutput("V_out", "(Tensor) Output V velocity of DGC"); + AddOutput("EncodeGrad", "(Tensor) Output encoded gradient"); + AddOutput("Grad_out", "(Tensor) Output grad gradient"); + AddOutput("k", "(Tensor) Output top-k value"); + AddOutput("GatherBuff", "(Tensor) Gather buffer"); AddAttr("m", "(float, 0.9) " diff --git a/paddle/fluid/operators/dgc_op.h b/paddle/fluid/operators/dgc_op.h index 1285daae094ab28cd4ec059094d4baf603870d7d..32dffe65161f19f2afbc8a92e9834ca6dd34e2b0 100644 --- a/paddle/fluid/operators/dgc_op.h +++ b/paddle/fluid/operators/dgc_op.h @@ -50,6 +50,11 @@ class DGCOpKernel : public framework::OpKernel { auto rampup_begin_step = ctx.Attr("rampup_begin_step"); auto rampup_step = ctx.Attr("rampup_step"); + // nranks + auto nranks_tensor = ctx.Input("nranks"); + const int nranks = static_cast(*nranks_tensor->data()); + PADDLE_ENFORCE_GT(nranks, 1, "DGC is not useful when num_trainers <= 1"); + // current step auto current_step_tensor = ctx.Input("current_step"); const float* current_step = current_step_tensor->data(); @@ -72,7 +77,7 @@ class DGCOpKernel : public framework::OpKernel { << ", rampup_begin_step:" << rampup_begin_step << ", rampup_step:" << rampup_step << ", current_step:" << *current_step << ", ratio:" << ratio - << ", k:" << k; + << ", k:" << k << ", nranks:" << nranks; auto k_out = ctx.Output("k"); T* k_out_data = k_out->data(); @@ -81,6 +86,7 @@ class DGCOpKernel : public framework::OpKernel { auto u_out = ctx.Output("U_out"); auto v_out = ctx.Output("V_out"); auto encode_grad_out = ctx.Output("EncodeGrad"); + auto gather_buff = ctx.Output("GatherBuff"); // FIXME(gongwb): use cublas. auto u_out_e = framework::EigenVector::Flatten(*u_out); @@ -88,6 +94,13 @@ class DGCOpKernel : public framework::OpKernel { auto g_e = framework::EigenVector::Flatten(*g); auto& dev_ctx = ctx.template device_context(); auto& eigen_ctx = *dev_ctx.eigen_device(); + + if (static_cast(*current_step) == + static_cast(rampup_begin_step)) { + // calc local momentum from global momentum + u_out_e.device(eigen_ctx) = (1.0 / nranks) * u_e; + } + if (use_nesterov) { // u = m * (u + g) u_out_e.device(eigen_ctx) = m * (u_e + g_e); @@ -111,6 +124,8 @@ class DGCOpKernel : public framework::OpKernel { T* u_out_data = u_out->mutable_data(ctx.GetPlace()); T* encode_grad_out_data = encode_grad_out->mutable_data( framework::DDim{2 * k}, ctx.GetPlace()); + gather_buff->mutable_data(framework::DDim{2 * k * nranks}, + ctx.GetPlace()); int buf_size = paddle::communication::dgc::get_buffer_size(k); auto tmp_ious_data = memory::Alloc(dev_ctx, buf_size); diff --git a/paddle/fluid/pybind/const_value.cc b/paddle/fluid/pybind/const_value.cc index 71eeaf3b53acf98c9f5e43f9acd6d67d42086005..5a889900ffbced143a9251df48bb6f229e2e7928 100644 --- a/paddle/fluid/pybind/const_value.cc +++ b/paddle/fluid/pybind/const_value.cc @@ -59,14 +59,14 @@ void BindConstValue(pybind11::module* m) { framework::OpProtoAndCheckerMaker::OpCreationCallstackAttrName); #if defined(PADDLE_WITH_DGC) auto dgc = m->def_submodule("dgc"); - dgc.def("kDGCUName", [] { return framework::details::g_dgc_u; }); - dgc.def("kDGCVName", [] { return framework::details::g_dgc_v; }); dgc.def("kDGCKName", [] { return framework::details::g_dgc_k; }); dgc.def("kDGCEncodedName", [] { return framework::details::g_dgc_encoded; }); + dgc.def("kDGCGatherName", [] { return framework::details::g_dgc_gather; }); dgc.def("kDGCCounterName", [] { return framework::details::g_dgc_counter_name; }); dgc.def("kDGCRampUpBeginStepName", [] { return framework::details::g_dgc_rampup_begin_step; }); + dgc.def("kDGCNRanksName", [] { return framework::details::g_dgc_nranks; }); #endif } diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 5006e6acdc94d595433f777cbf81db60073ac89c..aa1b713534aab75238cc789771f551708027cea0 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -867,7 +867,7 @@ class MomentumOptimizer(Optimizer): return momentum_op -class DGCMomentumOptimizer(MomentumOptimizer): +class DGCMomentumOptimizer(Optimizer): """ DGC (Deep Gradient Compression) Momentum Optimizer. Original paper is https://arxiv.org/abs/1712.01887 @@ -923,6 +923,8 @@ class DGCMomentumOptimizer(MomentumOptimizer): sparsity=[0.999, 0.999]) """ + _u_velocity_acc_str = "_dgc_u_" + _v_velocity_acc_str = "_dgc_v_" def __init__(self, learning_rate, @@ -935,17 +937,25 @@ class DGCMomentumOptimizer(MomentumOptimizer): num_trainers=None, regularization=None, name=None): - self._sparsity = sparsity - self._rampup_step = rampup_step - self._rampup_step_var = None + assert learning_rate is not None + assert momentum is not None + super(DGCMomentumOptimizer, self).__init__( + learning_rate=learning_rate, + regularization=regularization, + name=name) + self.type = "dgc_momentum" + self._momentum = momentum + self._use_nesterov = bool(use_nesterov) self._rampup_begin_step = rampup_begin_step - self._rampup_begin_step_var = None + self._rampup_step = rampup_step + self._sparsity = sparsity + self._rampup_begin_step_var = None self._global_step_var = None + self._local_grad_clip_norm = None self._clip_norm = None - if local_grad_clip_norm is not None: assert isinstance(num_trainers, int) assert isinstance(local_grad_clip_norm, float) @@ -956,9 +966,6 @@ class DGCMomentumOptimizer(MomentumOptimizer): self._clip_norm = local_grad_clip_norm / (num_trainers * num_trainers) - super(DGCMomentumOptimizer, self).__init__( - learning_rate, momentum, use_nesterov, regularization, name) - def _is_use_dgc(self, param_var, grad_var): var_numel = abs(reduce(lambda x, y: x * y, param_var.shape)) if var_numel < 16384 or \ @@ -970,34 +977,36 @@ class DGCMomentumOptimizer(MomentumOptimizer): def _append_optimize_op(self, block, param_and_grad): assert isinstance(block, framework.Block) + velocity_acc = self._get_accumulator(self._u_velocity_acc_str, + param_and_grad[0]) + assert velocity_acc is not None + + inputs = { + "Param": param_and_grad[0], + "Grad": param_and_grad[1], + "Velocity": velocity_acc, + "LearningRate": self._create_param_lr(param_and_grad), + } + outputs = { + "ParamOut": param_and_grad[0], + "VelocityOut": velocity_acc, + } + attrs = {"mu": self._momentum, "use_nesterov": self._use_nesterov} if not self._is_use_dgc(param_and_grad[0], param_and_grad[1]): - return super(DGCMomentumOptimizer, self)._append_optimize_op( - block, param_and_grad) + type = "momentum" + else: + type = "dgc_momentum" + inputs.update({"current_step": self._global_step_var}) + attrs.update({"rampup_begin_step": float(self._rampup_begin_step)}) - velocity_acc = self._get_accumulator(self._velocity_acc_str, - param_and_grad[0]) # create the dgc momentum optimize op dgc_momentum_op = block.append_op( - type="dgc_momentum", - inputs={ - "Param": param_and_grad[0], - "Grad": param_and_grad[1], - "Velocity": velocity_acc, - "LearningRate": self._create_param_lr(param_and_grad), - "current_step": self._global_step_var, - }, - outputs={ - "ParamOut": param_and_grad[0], - "VelocityOut": velocity_acc - }, - attrs={ - "mu": self._momentum, - "use_nesterov": self._use_nesterov, - "rampup_begin_step": float(self._rampup_begin_step) - }, + type=type, + inputs=inputs, + outputs=outputs, + attrs=attrs, stop_gradient=True) - return dgc_momentum_op def _add_auto_increment_var(self, counter_name, begin, step=1): @@ -1019,8 +1028,20 @@ class DGCMomentumOptimizer(MomentumOptimizer): return counter + def _add_nranks_var(self, name, value=-1): + helper = LayerHelper('global_step_counter') + counter, is_new_var = helper.create_or_get_global_variable( + name=name, dtype='float32', shape=[1], persistable=True) + if is_new_var: + helper.set_variable_initializer( + counter, + initializer=Constant( + value=float(value), force_cpu=True)) + counter.stop_gradient = True + + return counter + def _append_dgc_ops(self, param_and_grads): - start_program = default_startup_program() main_program = default_main_program() main_program._enable_dgc = True @@ -1028,6 +1049,9 @@ class DGCMomentumOptimizer(MomentumOptimizer): self._global_step_var = self._add_auto_increment_var( counter_name=core.dgc.kDGCCounterName(), begin=0) + self._nranks_var = self._add_nranks_var( + name=core.dgc.kDGCNRanksName(), value=-1) + # rampup begin step var for all_reduce_op_handle self._rampup_begin_step_var = tensor.create_global_var( shape=[1], @@ -1037,22 +1061,16 @@ class DGCMomentumOptimizer(MomentumOptimizer): value=self._rampup_begin_step * 1.0, force_cpu=True) + self.helper = LayerHelper(self.__class__.__name__) + for param_var, grad_var in param_and_grads: + # reuse velocity in dgc_op and dgc_momentum_op + u_var = self._add_accumulator(self._u_velocity_acc_str, param_var) + if not self._is_use_dgc(param_var, grad_var): continue - u_var = tensor.create_global_var( - shape=param_var.shape, - dtype=param_var.dtype, - persistable=True, - name=param_var.name + core.dgc.kDGCUName(), - value=0.0) - v_var = tensor.create_global_var( - shape=param_var.shape, - dtype=param_var.dtype, - persistable=True, - name=param_var.name + core.dgc.kDGCVName(), - value=0.0) + v_var = self._add_accumulator(self._v_velocity_acc_str, param_var) k_var = tensor.create_global_var( shape=[1], @@ -1070,6 +1088,14 @@ class DGCMomentumOptimizer(MomentumOptimizer): value=0.0, force_cpu=False) + gather_var = tensor.create_global_var( + shape=[1], + dtype=param_var.dtype, + persistable=True, + name=param_var.name + core.dgc.kDGCGatherName(), + value=0.0, + force_cpu=False) + # del back oprolevarname op_maker = core.op_proto_and_checker_maker backward = core.op_proto_and_checker_maker.OpRole.Backward @@ -1092,7 +1118,7 @@ class DGCMomentumOptimizer(MomentumOptimizer): if self._local_grad_clip_norm is not None: clip_var = self._append_clip_norm(grad_var, self._clip_norm) self._dgc_op(param_var, clip_var, grad_var, u_var, v_var, k_var, - encoded_var) + encoded_var, gather_var) def _is_the_backward_op(self, op): op_maker = core.op_proto_and_checker_maker @@ -1131,7 +1157,7 @@ class DGCMomentumOptimizer(MomentumOptimizer): x=grad_var, max_norm=clip_norm, name=grad_var.name) def _dgc_op(self, param_var, clip_var, grad_var, u_var, v_var, k_var, - encoded_var): + encoded_var, gather_var): block = framework.default_main_program().global_block() op_maker = core.op_proto_and_checker_maker dgc_op = block.append_op( @@ -1140,21 +1166,23 @@ class DGCMomentumOptimizer(MomentumOptimizer): "U": u_var, "V": v_var, "Grad": clip_var, - "current_step": self._global_step_var + "current_step": self._global_step_var, + "nranks": self._nranks_var, }, outputs={ "U_out": u_var, "V_out": v_var, "EncodeGrad": encoded_var, "k": k_var, - "Grad_out": grad_var + "Grad_out": grad_var, + "GatherBuff": gather_var, }, attrs={ "m": self._momentum, "sparsity": self._sparsity, "use_nesterov": self._use_nesterov, "rampup_begin_step": float(self._rampup_begin_step), - "rampup_step": float(self._rampup_step) + "rampup_step": float(self._rampup_step), }, stop_gradient=True) diff --git a/python/paddle/fluid/tests/unittests/test_dgc_op.py b/python/paddle/fluid/tests/unittests/test_dgc_op.py index 04766dd858496e18642d6532e49bd810ef34cac0..31898b03424f7692a11d9273f8f0730199c94082 100644 --- a/python/paddle/fluid/tests/unittests/test_dgc_op.py +++ b/python/paddle/fluid/tests/unittests/test_dgc_op.py @@ -34,7 +34,7 @@ class TestDGCOp(unittest.TestCase): print("place:", place) # numpy data - # inputs: U, V, Grad, current_step + # inputs: U, V, Grad, current_step, nranks self.u_name = "U" self.u = np.random.random(size).astype("float32") @@ -47,10 +47,14 @@ class TestDGCOp(unittest.TestCase): self.current_step_name = "current_step" self.current_step = np.full((1), 0.0).astype("float32") - # output: U_out, V_out, EncodeGrad, GradLocal_out + self.nranks_name = "nranks" + self.nranks = np.full((1), 2.0).astype("float32") + + # output: U_out, V_out, EncodeGrad, GradLocal_out, k, GatherBuff self.encode_grad_name = "EncodeGrad" self.k_name = "k" self.k = np.full((1), 0.0).astype("float32") + self.gather_buff_name = "GatherBuff" # scope data self.u_tensor = self.scope.var(self.u_name).get_tensor() @@ -62,16 +66,22 @@ class TestDGCOp(unittest.TestCase): self.grad_tensor = self.scope.var(self.grad_name).get_tensor() self.grad_tensor.set(self.grad, place) - self.encode_grad_tensor = self.scope.var( - self.encode_grad_name).get_tensor() - self.current_step_tensor = self.scope.var( self.current_step_name).get_tensor() self.current_step_tensor.set(self.current_step, core.CPUPlace()) + self.nranks_tensor = self.scope.var(self.nranks_name).get_tensor() + self.nranks_tensor.set(self.nranks, core.CPUPlace()) + + self.encode_grad_tensor = self.scope.var( + self.encode_grad_name).get_tensor() + self.k_tensor = self.scope.var(self.k_name).get_tensor() self.k_tensor.set(self.k, core.CPUPlace()) + self.gather_buff_tensor = self.scope.var( + self.gather_buff_name).get_tensor() + def check(self, actual_t, expect_t, place, out_name, atol=1e-5): self.assertTrue( np.allclose( @@ -87,6 +97,7 @@ class TestDGCOp(unittest.TestCase): 'V': self.v_name, 'Grad': self.grad_name, 'current_step': self.current_step_name, + 'nranks': self.nranks_name, # outputs 'U_out': self.u_name, @@ -94,6 +105,7 @@ class TestDGCOp(unittest.TestCase): 'EncodeGrad': self.encode_grad_name, 'Grad_out': self.grad_name, 'k': self.k_name, + 'GatherBuff': self.gather_buff_name, # attrs 'm': 0.9, diff --git a/python/paddle/fluid/tests/unittests/test_dgc_optimizer.py b/python/paddle/fluid/tests/unittests/test_dgc_optimizer.py index a148a1a78862c9972df5dccafef90497449cb00e..3ddd6fdb6e9136714617f58851d430432cd3a623 100644 --- a/python/paddle/fluid/tests/unittests/test_dgc_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_dgc_optimizer.py @@ -29,7 +29,7 @@ class TestDGCMomentumOptimizer(unittest.TestCase): return self._accumulators def get_velocity_str(self): - return self._velocity_acc_str + return self._u_velocity_acc_str def check_dgc_momentum_optimizer(self, dims=[5, 10, 8], name="momentum"): init_program = framework.Program() @@ -66,8 +66,10 @@ class TestDGCMomentumOptimizer(unittest.TestCase): type="mean", inputs={"X": mul_out}, outputs={"Out": mean_out}) # params_grads = append_backward(mean_out) params_grads = dgc_momentum_optimizer.backward(mean_out) + accumulator_count = 1 if name == "momentum" else 2 self.assertEqual(len(params_grads), 1) - self.assertEqual(len(dgc_momentum_optimizer.get_accumulators()), 0) + self.assertEqual( + len(dgc_momentum_optimizer.get_accumulators()), accumulator_count) with framework.program_guard(program, init_program): opts = dgc_momentum_optimizer.apply_gradients(params_grads) self.assertEqual(len(opts), 2) @@ -77,7 +79,7 @@ class TestDGCMomentumOptimizer(unittest.TestCase): # Check accumulators accumulators = dgc_momentum_optimizer.get_accumulators() - self.assertEqual(len(accumulators), 1) + self.assertEqual(len(accumulators), accumulator_count) self.assertTrue( dgc_momentum_optimizer.get_velocity_str() in accumulators) velocity_acc = accumulators[dgc_momentum_optimizer.get_velocity_str()] @@ -86,11 +88,9 @@ class TestDGCMomentumOptimizer(unittest.TestCase): # Check init_program init_ops = init_program.global_block().ops - self.assertEqual(len(init_ops), 2) + self.assertEqual(len(init_ops), 1) self.assertEqual(init_ops[0].type, "fill_constant") self.assertAlmostEqual(init_ops[0].attr('value'), learning_rate) - self.assertEqual(init_ops[1].type, "fill_constant") - self.assertAlmostEqual(init_ops[1].attr('value'), 0.0) with open("test_dgc_optimizer_" + name + ".log", "w") as f: program_to_code(program, fout=f)