提交 93c7f058 编写于 作者: W WangXi 提交者: gongweibao

[Cherry-pick 1.6] Fix dgc buffer illegal & reuse velocity & fix fuse (#21281)

上级 3423f0b6
...@@ -22,10 +22,10 @@ namespace details { ...@@ -22,10 +22,10 @@ namespace details {
constexpr char g_dgc_counter_name[] = "__g_dgc_counter__"; constexpr char g_dgc_counter_name[] = "__g_dgc_counter__";
constexpr char g_dgc_rampup_begin_step[] = "__g_rampup_begin_step__"; constexpr char g_dgc_rampup_begin_step[] = "__g_rampup_begin_step__";
constexpr char g_dgc_u[] = "__dgc_u__"; constexpr char g_dgc_nranks[] = "__g_nranks__";
constexpr char g_dgc_v[] = "__dgc_v__";
constexpr char g_dgc_k[] = "__dgc_k__"; constexpr char g_dgc_k[] = "__dgc_k__";
constexpr char g_dgc_encoded[] = "__dgc_encoded__"; constexpr char g_dgc_encoded[] = "__dgc_encoded__";
constexpr char g_dgc_gather[] = "__dgc_gather__";
} // namespace details } // namespace details
} // namespace framework } // namespace framework
......
...@@ -38,30 +38,23 @@ SparseAllReduceOpHandle::SparseAllReduceOpHandle( ...@@ -38,30 +38,23 @@ SparseAllReduceOpHandle::SparseAllReduceOpHandle(
is_encoded_(is_encoded), is_encoded_(is_encoded),
nranks_(nranks) { nranks_(nranks) {
// TODO(gongwb) :polish them! // TODO(gongwb) :polish them!
if (is_encoded) { PADDLE_ENFORCE_EQ(is_encoded, true);
VLOG(1) << "Use dgc allreduce mode"; VLOG(1) << "Use dgc allreduce mode"
} << ", nranks:" << nranks_;
}
void SparseAllReduceOpHandle::WaitInputVarGenerated() { PADDLE_ENFORCE_GT(local_scopes_.size(), 0);
#ifdef PADDLE_WITH_CUDA auto nranks_name = g_dgc_nranks;
for (auto &p : dev_ctxes_) { for (size_t i = 0; i < local_scopes_.size(); ++i) {
if (platform::is_gpu_place(p.first)) { auto *local_scope = local_scopes_[i];
int dev_id = boost::get<platform::CUDAPlace>(p.first).device; auto nranks_var = local_scope->FindVar(nranks_name);
auto *compute_dev_ctx = if (nranks_var == nullptr) {
platform::DeviceContextPool::Instance().GetByPlace( PADDLE_THROW("not find nranks_var:%s", nranks_name);
platform::CUDAPlace(dev_id));
auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(p.second);
if (compute_dev_ctx->stream() != dev_ctx->stream()) {
auto &event = events_.at(dev_id);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(event, compute_dev_ctx->stream()));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamWaitEvent(dev_ctx->stream(), event, 0));
}
} }
float *dgc_nranks = nranks_var->GetMutable<LoDTensor>()->data<float>();
*dgc_nranks = nranks;
VLOG(10) << "dgc_nranks=" << *dgc_nranks;
} }
#endif
} }
void SparseAllReduceOpHandle::RunImplEncoded() { void SparseAllReduceOpHandle::RunImplEncoded() {
...@@ -77,18 +70,27 @@ void SparseAllReduceOpHandle::RunImplEncoded() { ...@@ -77,18 +70,27 @@ void SparseAllReduceOpHandle::RunImplEncoded() {
"The NoDummyInputSize and NoDummyOutputSize should be equal."); "The NoDummyInputSize and NoDummyOutputSize should be equal.");
std::vector<const LoDTensor *> ins; std::vector<const LoDTensor *> ins;
std::vector<LoDTensor *> gathers;
std::vector<LoDTensor *> outs; std::vector<LoDTensor *> outs;
int k = -1; int k = -1;
for (size_t i = 0; i < local_scopes_.size(); ++i) { for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto *local_scope = local_exec_scopes_[i]; auto *local_scope = local_exec_scopes_[i];
auto original_name = auto original_name =
paddle::framework::GradOriginalVarName(in_var_handles[i]->name()); paddle::framework::GradOriginalVarName(in_var_handles[i]->name());
auto encode_var_name = original_name + g_dgc_encoded; auto encode_var_name = original_name + g_dgc_encoded;
auto *in_var = local_scope->FindVar(encode_var_name); auto *in_var = local_scope->FindVar(encode_var_name);
PADDLE_ENFORCE_NOT_NULL(in_var, "%s should not be null", encode_var_name); PADDLE_ENFORCE_NOT_NULL(in_var, "%s should not be null", encode_var_name);
auto &in = in_var->Get<LoDTensor>(); auto &in = in_var->Get<LoDTensor>();
ins.emplace_back(&in); ins.emplace_back(&in);
auto gather_var_name = original_name + g_dgc_gather;
auto *gather_var = local_scope->FindVar(gather_var_name);
PADDLE_ENFORCE_NOT_NULL(gather_var, "%s should not be null",
gather_var_name);
auto *gather = gather_var->GetMutable<LoDTensor>();
gathers.emplace_back(gather);
auto *out = local_scope->FindVar(out_var_handles[i]->name()) auto *out = local_scope->FindVar(out_var_handles[i]->name())
->GetMutable<LoDTensor>(); ->GetMutable<LoDTensor>();
outs.emplace_back(out); outs.emplace_back(out);
...@@ -135,9 +137,7 @@ void SparseAllReduceOpHandle::RunImplEncoded() { ...@@ -135,9 +137,7 @@ void SparseAllReduceOpHandle::RunImplEncoded() {
// dgc use ncclAllGather to get all the encoded data // dgc use ncclAllGather to get all the encoded data
// so the buffer need nranks. // so the buffer need nranks.
int buf_size = nranks_ * encode_size; int buf_size = nranks_ * encode_size;
auto tmp_ious_data = memory::Alloc(place, buf_size); void *gather_buff = gathers[i]->data<void>();
void *gather_buff = reinterpret_cast<void *>(tmp_ious_data->ptr());
allocations.emplace_back(std::move(tmp_ious_data));
VLOG(10) << "in_numel:" << in_numel << ", out_numel:" << out_numel VLOG(10) << "in_numel:" << in_numel << ", out_numel:" << out_numel
<< ", nranks:" << nranks_ << ", gather_buf size:" << buf_size << ", nranks:" << nranks_ << ", gather_buf size:" << buf_size
......
...@@ -36,8 +36,6 @@ class SparseAllReduceOpHandle : public AllReduceOpHandle { ...@@ -36,8 +36,6 @@ class SparseAllReduceOpHandle : public AllReduceOpHandle {
bool is_encoded = false, int nranks = -1); bool is_encoded = false, int nranks = -1);
std::string Name() const override; std::string Name() const override;
void WaitInputVarGenerated() override;
protected: protected:
void RunImpl() override; void RunImpl() override;
int GetKValue(const std::string &grad_name); int GetKValue(const std::string &grad_name);
......
...@@ -105,6 +105,12 @@ class FuseAllReduceOpPass : public ir::Pass { ...@@ -105,6 +105,12 @@ class FuseAllReduceOpPass : public ir::Pass {
auto *all_reduce_op_handle = dynamic_cast<details::AllReduceOpHandle *>( auto *all_reduce_op_handle = dynamic_cast<details::AllReduceOpHandle *>(
&node->Wrapper<details::OpHandleBase>()); &node->Wrapper<details::OpHandleBase>());
if (all_reduce_op_handle) { if (all_reduce_op_handle) {
#if defined(PADDLE_WITH_DGC)
PADDLE_ENFORCE_NE(
all_reduce_op_handle->Name(), "sparse_all_reduce",
"DGC doesn't support fuse for now, if you want to use DGC "
"you need set strategy.fuse_all_reduce_ops = False.");
#endif
auto inputs = details::DynamicCast<details::VarHandle>( auto inputs = details::DynamicCast<details::VarHandle>(
all_reduce_op_handle->Inputs()); all_reduce_op_handle->Inputs());
PADDLE_ENFORCE_EQ(inputs.size(), num_place); PADDLE_ENFORCE_EQ(inputs.size(), num_place);
......
...@@ -1011,10 +1011,10 @@ int DistSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, ...@@ -1011,10 +1011,10 @@ int DistSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result,
#if defined(PADDLE_WITH_DGC) #if defined(PADDLE_WITH_DGC)
bool AllReduceSSAGraphBuilder::IsEncoded(const std::string &p_name) const { bool AllReduceSSAGraphBuilder::IsEncoded(const std::string &p_name) const {
auto u_name = p_name + details::g_dgc_u; auto k_name = p_name + details::g_dgc_k;
auto it = all_vars_.find(u_name); auto it = all_vars_.find(k_name);
if (it == all_vars_.end()) { if (it == all_vars_.end()) {
VLOG(10) << "can't find u_name, so it's not encoded:" << u_name; VLOG(10) << "can't find k_name, so it's not encoded:" << k_name;
return false; return false;
} }
......
...@@ -31,6 +31,8 @@ class DGCOp : public framework::OperatorWithKernel { ...@@ -31,6 +31,8 @@ class DGCOp : public framework::OperatorWithKernel {
"Input(Grad) of DGCop should not be null."); "Input(Grad) of DGCop should not be null.");
PADDLE_ENFORCE(ctx->HasInput("current_step"), PADDLE_ENFORCE(ctx->HasInput("current_step"),
"Input(current_step) of DGCop should not be null."); "Input(current_step) of DGCop should not be null.");
PADDLE_ENFORCE_EQ(ctx->HasInput("nranks"), true,
"Input(nranks) of DGCop should not be null.");
PADDLE_ENFORCE(ctx->HasOutput("U_out"), PADDLE_ENFORCE(ctx->HasOutput("U_out"),
"Output(U_out) of DGCop should not be null."); "Output(U_out) of DGCop should not be null.");
...@@ -40,14 +42,15 @@ class DGCOp : public framework::OperatorWithKernel { ...@@ -40,14 +42,15 @@ class DGCOp : public framework::OperatorWithKernel {
"Output(k) of DGCop should not be null."); "Output(k) of DGCop should not be null.");
PADDLE_ENFORCE(ctx->HasOutput("EncodeGrad"), PADDLE_ENFORCE(ctx->HasOutput("EncodeGrad"),
"Output(EncodeGrad) of DGCop should not be null."); "Output(EncodeGrad) of DGCop should not be null.");
PADDLE_ENFORCE_EQ(ctx->HasOutput("GatherBuff"), true,
"Output(EncodeGrad) of DGCop should not be null.");
} }
protected: protected:
framework::OpKernelType GetKernelTypeForVar( framework::OpKernelType GetKernelTypeForVar(
const std::string& var_name, const framework::Tensor& tensor, const std::string& var_name, const framework::Tensor& tensor,
const framework::OpKernelType& expected_kernel_type) const override { const framework::OpKernelType& expected_kernel_type) const override {
if (var_name == "current_step" || var_name == "rampup_step" || if (var_name == "current_step" || var_name == "k" || var_name == "nranks") {
var_name == "k") {
VLOG(10) << "var_name:" << var_name << " need not to transform"; VLOG(10) << "var_name:" << var_name << " need not to transform";
return expected_kernel_type; return expected_kernel_type;
} }
...@@ -60,26 +63,18 @@ class DGCOp : public framework::OperatorWithKernel { ...@@ -60,26 +63,18 @@ class DGCOp : public framework::OperatorWithKernel {
class DGCOpMaker : public framework::OpProtoAndCheckerMaker { class DGCOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
void Make() override { void Make() override {
AddInput("U", "(Tensor) Middle tensor of DGC"); AddInput("U", "(Tensor) U velocity tensor of DGC");
AddInput("V", "(Tensor) Middle tensor of DGC"); AddInput("V", "(Tensor) V velocity tensor of DGC");
AddInput("Grad", "(Tensor) Input gradient"); AddInput("Grad", "(Tensor) Input gradient");
AddInput("current_step", "(Tensor) Current step."); AddInput("current_step", "(Tensor) Current step.");
AddInput("nranks", "(Tensor) nranks.");
AddOutput("U_out",
"(Tensor) " AddOutput("U_out", "(Tensor) Output U velocity of DGC");
"Output encoded gradient"); AddOutput("V_out", "(Tensor) Output V velocity of DGC");
AddOutput("V_out", AddOutput("EncodeGrad", "(Tensor) Output encoded gradient");
"(Tensor) " AddOutput("Grad_out", "(Tensor) Output grad gradient");
"Output encoded gradient"); AddOutput("k", "(Tensor) Output top-k value");
AddOutput("EncodeGrad", AddOutput("GatherBuff", "(Tensor) Gather buffer");
"(Tensor) "
"Output encoded gradient");
AddOutput("Grad_out",
"(Tensor) "
"Output grad gradient");
AddOutput("k",
"(Tensor) "
"Output top-k value");
AddAttr<float>("m", AddAttr<float>("m",
"(float, 0.9) " "(float, 0.9) "
......
...@@ -50,6 +50,11 @@ class DGCOpKernel : public framework::OpKernel<T> { ...@@ -50,6 +50,11 @@ class DGCOpKernel : public framework::OpKernel<T> {
auto rampup_begin_step = ctx.Attr<float>("rampup_begin_step"); auto rampup_begin_step = ctx.Attr<float>("rampup_begin_step");
auto rampup_step = ctx.Attr<float>("rampup_step"); auto rampup_step = ctx.Attr<float>("rampup_step");
// nranks
auto nranks_tensor = ctx.Input<framework::Tensor>("nranks");
const int nranks = static_cast<const int>(*nranks_tensor->data<float>());
PADDLE_ENFORCE_GT(nranks, 1, "DGC is not useful when num_trainers <= 1");
// current step // current step
auto current_step_tensor = ctx.Input<framework::Tensor>("current_step"); auto current_step_tensor = ctx.Input<framework::Tensor>("current_step");
const float* current_step = current_step_tensor->data<float>(); const float* current_step = current_step_tensor->data<float>();
...@@ -72,7 +77,7 @@ class DGCOpKernel : public framework::OpKernel<T> { ...@@ -72,7 +77,7 @@ class DGCOpKernel : public framework::OpKernel<T> {
<< ", rampup_begin_step:" << rampup_begin_step << ", rampup_begin_step:" << rampup_begin_step
<< ", rampup_step:" << rampup_step << ", rampup_step:" << rampup_step
<< ", current_step:" << *current_step << ", ratio:" << ratio << ", current_step:" << *current_step << ", ratio:" << ratio
<< ", k:" << k; << ", k:" << k << ", nranks:" << nranks;
auto k_out = ctx.Output<framework::Tensor>("k"); auto k_out = ctx.Output<framework::Tensor>("k");
T* k_out_data = k_out->data<T>(); T* k_out_data = k_out->data<T>();
...@@ -81,6 +86,7 @@ class DGCOpKernel : public framework::OpKernel<T> { ...@@ -81,6 +86,7 @@ class DGCOpKernel : public framework::OpKernel<T> {
auto u_out = ctx.Output<framework::Tensor>("U_out"); auto u_out = ctx.Output<framework::Tensor>("U_out");
auto v_out = ctx.Output<framework::Tensor>("V_out"); auto v_out = ctx.Output<framework::Tensor>("V_out");
auto encode_grad_out = ctx.Output<framework::Tensor>("EncodeGrad"); auto encode_grad_out = ctx.Output<framework::Tensor>("EncodeGrad");
auto gather_buff = ctx.Output<framework::Tensor>("GatherBuff");
// FIXME(gongwb): use cublas. // FIXME(gongwb): use cublas.
auto u_out_e = framework::EigenVector<T>::Flatten(*u_out); auto u_out_e = framework::EigenVector<T>::Flatten(*u_out);
...@@ -88,6 +94,13 @@ class DGCOpKernel : public framework::OpKernel<T> { ...@@ -88,6 +94,13 @@ class DGCOpKernel : public framework::OpKernel<T> {
auto g_e = framework::EigenVector<T>::Flatten(*g); auto g_e = framework::EigenVector<T>::Flatten(*g);
auto& dev_ctx = ctx.template device_context<DeviceContext>(); auto& dev_ctx = ctx.template device_context<DeviceContext>();
auto& eigen_ctx = *dev_ctx.eigen_device(); auto& eigen_ctx = *dev_ctx.eigen_device();
if (static_cast<int>(*current_step) ==
static_cast<int>(rampup_begin_step)) {
// calc local momentum from global momentum
u_out_e.device(eigen_ctx) = (1.0 / nranks) * u_e;
}
if (use_nesterov) { if (use_nesterov) {
// u = m * (u + g) // u = m * (u + g)
u_out_e.device(eigen_ctx) = m * (u_e + g_e); u_out_e.device(eigen_ctx) = m * (u_e + g_e);
...@@ -111,6 +124,8 @@ class DGCOpKernel : public framework::OpKernel<T> { ...@@ -111,6 +124,8 @@ class DGCOpKernel : public framework::OpKernel<T> {
T* u_out_data = u_out->mutable_data<T>(ctx.GetPlace()); T* u_out_data = u_out->mutable_data<T>(ctx.GetPlace());
T* encode_grad_out_data = encode_grad_out->mutable_data<T>( T* encode_grad_out_data = encode_grad_out->mutable_data<T>(
framework::DDim{2 * k}, ctx.GetPlace()); framework::DDim{2 * k}, ctx.GetPlace());
gather_buff->mutable_data<T>(framework::DDim{2 * k * nranks},
ctx.GetPlace());
int buf_size = paddle::communication::dgc::get_buffer_size(k); int buf_size = paddle::communication::dgc::get_buffer_size(k);
auto tmp_ious_data = memory::Alloc(dev_ctx, buf_size); auto tmp_ious_data = memory::Alloc(dev_ctx, buf_size);
......
...@@ -59,14 +59,14 @@ void BindConstValue(pybind11::module* m) { ...@@ -59,14 +59,14 @@ void BindConstValue(pybind11::module* m) {
framework::OpProtoAndCheckerMaker::OpCreationCallstackAttrName); framework::OpProtoAndCheckerMaker::OpCreationCallstackAttrName);
#if defined(PADDLE_WITH_DGC) #if defined(PADDLE_WITH_DGC)
auto dgc = m->def_submodule("dgc"); auto dgc = m->def_submodule("dgc");
dgc.def("kDGCUName", [] { return framework::details::g_dgc_u; });
dgc.def("kDGCVName", [] { return framework::details::g_dgc_v; });
dgc.def("kDGCKName", [] { return framework::details::g_dgc_k; }); dgc.def("kDGCKName", [] { return framework::details::g_dgc_k; });
dgc.def("kDGCEncodedName", [] { return framework::details::g_dgc_encoded; }); dgc.def("kDGCEncodedName", [] { return framework::details::g_dgc_encoded; });
dgc.def("kDGCGatherName", [] { return framework::details::g_dgc_gather; });
dgc.def("kDGCCounterName", dgc.def("kDGCCounterName",
[] { return framework::details::g_dgc_counter_name; }); [] { return framework::details::g_dgc_counter_name; });
dgc.def("kDGCRampUpBeginStepName", dgc.def("kDGCRampUpBeginStepName",
[] { return framework::details::g_dgc_rampup_begin_step; }); [] { return framework::details::g_dgc_rampup_begin_step; });
dgc.def("kDGCNRanksName", [] { return framework::details::g_dgc_nranks; });
#endif #endif
} }
......
...@@ -867,7 +867,7 @@ class MomentumOptimizer(Optimizer): ...@@ -867,7 +867,7 @@ class MomentumOptimizer(Optimizer):
return momentum_op return momentum_op
class DGCMomentumOptimizer(MomentumOptimizer): class DGCMomentumOptimizer(Optimizer):
""" """
DGC (Deep Gradient Compression) Momentum Optimizer. Original paper is https://arxiv.org/abs/1712.01887 DGC (Deep Gradient Compression) Momentum Optimizer. Original paper is https://arxiv.org/abs/1712.01887
...@@ -923,6 +923,8 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -923,6 +923,8 @@ class DGCMomentumOptimizer(MomentumOptimizer):
sparsity=[0.999, 0.999]) sparsity=[0.999, 0.999])
""" """
_u_velocity_acc_str = "_dgc_u_"
_v_velocity_acc_str = "_dgc_v_"
def __init__(self, def __init__(self,
learning_rate, learning_rate,
...@@ -935,17 +937,25 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -935,17 +937,25 @@ class DGCMomentumOptimizer(MomentumOptimizer):
num_trainers=None, num_trainers=None,
regularization=None, regularization=None,
name=None): name=None):
self._sparsity = sparsity assert learning_rate is not None
self._rampup_step = rampup_step assert momentum is not None
self._rampup_step_var = None super(DGCMomentumOptimizer, self).__init__(
learning_rate=learning_rate,
regularization=regularization,
name=name)
self.type = "dgc_momentum"
self._momentum = momentum
self._use_nesterov = bool(use_nesterov)
self._rampup_begin_step = rampup_begin_step self._rampup_begin_step = rampup_begin_step
self._rampup_begin_step_var = None self._rampup_step = rampup_step
self._sparsity = sparsity
self._rampup_begin_step_var = None
self._global_step_var = None self._global_step_var = None
self._local_grad_clip_norm = None self._local_grad_clip_norm = None
self._clip_norm = None self._clip_norm = None
if local_grad_clip_norm is not None: if local_grad_clip_norm is not None:
assert isinstance(num_trainers, int) assert isinstance(num_trainers, int)
assert isinstance(local_grad_clip_norm, float) assert isinstance(local_grad_clip_norm, float)
...@@ -956,9 +966,6 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -956,9 +966,6 @@ class DGCMomentumOptimizer(MomentumOptimizer):
self._clip_norm = local_grad_clip_norm / (num_trainers * self._clip_norm = local_grad_clip_norm / (num_trainers *
num_trainers) num_trainers)
super(DGCMomentumOptimizer, self).__init__(
learning_rate, momentum, use_nesterov, regularization, name)
def _is_use_dgc(self, param_var, grad_var): def _is_use_dgc(self, param_var, grad_var):
var_numel = abs(reduce(lambda x, y: x * y, param_var.shape)) var_numel = abs(reduce(lambda x, y: x * y, param_var.shape))
if var_numel < 16384 or \ if var_numel < 16384 or \
...@@ -970,34 +977,36 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -970,34 +977,36 @@ class DGCMomentumOptimizer(MomentumOptimizer):
def _append_optimize_op(self, block, param_and_grad): def _append_optimize_op(self, block, param_and_grad):
assert isinstance(block, framework.Block) assert isinstance(block, framework.Block)
velocity_acc = self._get_accumulator(self._u_velocity_acc_str,
param_and_grad[0])
assert velocity_acc is not None
inputs = {
"Param": param_and_grad[0],
"Grad": param_and_grad[1],
"Velocity": velocity_acc,
"LearningRate": self._create_param_lr(param_and_grad),
}
outputs = {
"ParamOut": param_and_grad[0],
"VelocityOut": velocity_acc,
}
attrs = {"mu": self._momentum, "use_nesterov": self._use_nesterov}
if not self._is_use_dgc(param_and_grad[0], param_and_grad[1]): if not self._is_use_dgc(param_and_grad[0], param_and_grad[1]):
return super(DGCMomentumOptimizer, self)._append_optimize_op( type = "momentum"
block, param_and_grad) else:
type = "dgc_momentum"
inputs.update({"current_step": self._global_step_var})
attrs.update({"rampup_begin_step": float(self._rampup_begin_step)})
velocity_acc = self._get_accumulator(self._velocity_acc_str,
param_and_grad[0])
# create the dgc momentum optimize op # create the dgc momentum optimize op
dgc_momentum_op = block.append_op( dgc_momentum_op = block.append_op(
type="dgc_momentum", type=type,
inputs={ inputs=inputs,
"Param": param_and_grad[0], outputs=outputs,
"Grad": param_and_grad[1], attrs=attrs,
"Velocity": velocity_acc,
"LearningRate": self._create_param_lr(param_and_grad),
"current_step": self._global_step_var,
},
outputs={
"ParamOut": param_and_grad[0],
"VelocityOut": velocity_acc
},
attrs={
"mu": self._momentum,
"use_nesterov": self._use_nesterov,
"rampup_begin_step": float(self._rampup_begin_step)
},
stop_gradient=True) stop_gradient=True)
return dgc_momentum_op return dgc_momentum_op
def _add_auto_increment_var(self, counter_name, begin, step=1): def _add_auto_increment_var(self, counter_name, begin, step=1):
...@@ -1019,8 +1028,20 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -1019,8 +1028,20 @@ class DGCMomentumOptimizer(MomentumOptimizer):
return counter return counter
def _add_nranks_var(self, name, value=-1):
helper = LayerHelper('global_step_counter')
counter, is_new_var = helper.create_or_get_global_variable(
name=name, dtype='float32', shape=[1], persistable=True)
if is_new_var:
helper.set_variable_initializer(
counter,
initializer=Constant(
value=float(value), force_cpu=True))
counter.stop_gradient = True
return counter
def _append_dgc_ops(self, param_and_grads): def _append_dgc_ops(self, param_and_grads):
start_program = default_startup_program()
main_program = default_main_program() main_program = default_main_program()
main_program._enable_dgc = True main_program._enable_dgc = True
...@@ -1028,6 +1049,9 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -1028,6 +1049,9 @@ class DGCMomentumOptimizer(MomentumOptimizer):
self._global_step_var = self._add_auto_increment_var( self._global_step_var = self._add_auto_increment_var(
counter_name=core.dgc.kDGCCounterName(), begin=0) counter_name=core.dgc.kDGCCounterName(), begin=0)
self._nranks_var = self._add_nranks_var(
name=core.dgc.kDGCNRanksName(), value=-1)
# rampup begin step var for all_reduce_op_handle # rampup begin step var for all_reduce_op_handle
self._rampup_begin_step_var = tensor.create_global_var( self._rampup_begin_step_var = tensor.create_global_var(
shape=[1], shape=[1],
...@@ -1037,22 +1061,16 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -1037,22 +1061,16 @@ class DGCMomentumOptimizer(MomentumOptimizer):
value=self._rampup_begin_step * 1.0, value=self._rampup_begin_step * 1.0,
force_cpu=True) force_cpu=True)
self.helper = LayerHelper(self.__class__.__name__)
for param_var, grad_var in param_and_grads: for param_var, grad_var in param_and_grads:
# reuse velocity in dgc_op and dgc_momentum_op
u_var = self._add_accumulator(self._u_velocity_acc_str, param_var)
if not self._is_use_dgc(param_var, grad_var): if not self._is_use_dgc(param_var, grad_var):
continue continue
u_var = tensor.create_global_var( v_var = self._add_accumulator(self._v_velocity_acc_str, param_var)
shape=param_var.shape,
dtype=param_var.dtype,
persistable=True,
name=param_var.name + core.dgc.kDGCUName(),
value=0.0)
v_var = tensor.create_global_var(
shape=param_var.shape,
dtype=param_var.dtype,
persistable=True,
name=param_var.name + core.dgc.kDGCVName(),
value=0.0)
k_var = tensor.create_global_var( k_var = tensor.create_global_var(
shape=[1], shape=[1],
...@@ -1070,6 +1088,14 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -1070,6 +1088,14 @@ class DGCMomentumOptimizer(MomentumOptimizer):
value=0.0, value=0.0,
force_cpu=False) force_cpu=False)
gather_var = tensor.create_global_var(
shape=[1],
dtype=param_var.dtype,
persistable=True,
name=param_var.name + core.dgc.kDGCGatherName(),
value=0.0,
force_cpu=False)
# del back oprolevarname # del back oprolevarname
op_maker = core.op_proto_and_checker_maker op_maker = core.op_proto_and_checker_maker
backward = core.op_proto_and_checker_maker.OpRole.Backward backward = core.op_proto_and_checker_maker.OpRole.Backward
...@@ -1092,7 +1118,7 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -1092,7 +1118,7 @@ class DGCMomentumOptimizer(MomentumOptimizer):
if self._local_grad_clip_norm is not None: if self._local_grad_clip_norm is not None:
clip_var = self._append_clip_norm(grad_var, self._clip_norm) clip_var = self._append_clip_norm(grad_var, self._clip_norm)
self._dgc_op(param_var, clip_var, grad_var, u_var, v_var, k_var, self._dgc_op(param_var, clip_var, grad_var, u_var, v_var, k_var,
encoded_var) encoded_var, gather_var)
def _is_the_backward_op(self, op): def _is_the_backward_op(self, op):
op_maker = core.op_proto_and_checker_maker op_maker = core.op_proto_and_checker_maker
...@@ -1131,7 +1157,7 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -1131,7 +1157,7 @@ class DGCMomentumOptimizer(MomentumOptimizer):
x=grad_var, max_norm=clip_norm, name=grad_var.name) x=grad_var, max_norm=clip_norm, name=grad_var.name)
def _dgc_op(self, param_var, clip_var, grad_var, u_var, v_var, k_var, def _dgc_op(self, param_var, clip_var, grad_var, u_var, v_var, k_var,
encoded_var): encoded_var, gather_var):
block = framework.default_main_program().global_block() block = framework.default_main_program().global_block()
op_maker = core.op_proto_and_checker_maker op_maker = core.op_proto_and_checker_maker
dgc_op = block.append_op( dgc_op = block.append_op(
...@@ -1140,21 +1166,23 @@ class DGCMomentumOptimizer(MomentumOptimizer): ...@@ -1140,21 +1166,23 @@ class DGCMomentumOptimizer(MomentumOptimizer):
"U": u_var, "U": u_var,
"V": v_var, "V": v_var,
"Grad": clip_var, "Grad": clip_var,
"current_step": self._global_step_var "current_step": self._global_step_var,
"nranks": self._nranks_var,
}, },
outputs={ outputs={
"U_out": u_var, "U_out": u_var,
"V_out": v_var, "V_out": v_var,
"EncodeGrad": encoded_var, "EncodeGrad": encoded_var,
"k": k_var, "k": k_var,
"Grad_out": grad_var "Grad_out": grad_var,
"GatherBuff": gather_var,
}, },
attrs={ attrs={
"m": self._momentum, "m": self._momentum,
"sparsity": self._sparsity, "sparsity": self._sparsity,
"use_nesterov": self._use_nesterov, "use_nesterov": self._use_nesterov,
"rampup_begin_step": float(self._rampup_begin_step), "rampup_begin_step": float(self._rampup_begin_step),
"rampup_step": float(self._rampup_step) "rampup_step": float(self._rampup_step),
}, },
stop_gradient=True) stop_gradient=True)
......
...@@ -34,7 +34,7 @@ class TestDGCOp(unittest.TestCase): ...@@ -34,7 +34,7 @@ class TestDGCOp(unittest.TestCase):
print("place:", place) print("place:", place)
# numpy data # numpy data
# inputs: U, V, Grad, current_step # inputs: U, V, Grad, current_step, nranks
self.u_name = "U" self.u_name = "U"
self.u = np.random.random(size).astype("float32") self.u = np.random.random(size).astype("float32")
...@@ -47,10 +47,14 @@ class TestDGCOp(unittest.TestCase): ...@@ -47,10 +47,14 @@ class TestDGCOp(unittest.TestCase):
self.current_step_name = "current_step" self.current_step_name = "current_step"
self.current_step = np.full((1), 0.0).astype("float32") self.current_step = np.full((1), 0.0).astype("float32")
# output: U_out, V_out, EncodeGrad, GradLocal_out self.nranks_name = "nranks"
self.nranks = np.full((1), 2.0).astype("float32")
# output: U_out, V_out, EncodeGrad, GradLocal_out, k, GatherBuff
self.encode_grad_name = "EncodeGrad" self.encode_grad_name = "EncodeGrad"
self.k_name = "k" self.k_name = "k"
self.k = np.full((1), 0.0).astype("float32") self.k = np.full((1), 0.0).astype("float32")
self.gather_buff_name = "GatherBuff"
# scope data # scope data
self.u_tensor = self.scope.var(self.u_name).get_tensor() self.u_tensor = self.scope.var(self.u_name).get_tensor()
...@@ -62,16 +66,22 @@ class TestDGCOp(unittest.TestCase): ...@@ -62,16 +66,22 @@ class TestDGCOp(unittest.TestCase):
self.grad_tensor = self.scope.var(self.grad_name).get_tensor() self.grad_tensor = self.scope.var(self.grad_name).get_tensor()
self.grad_tensor.set(self.grad, place) self.grad_tensor.set(self.grad, place)
self.encode_grad_tensor = self.scope.var(
self.encode_grad_name).get_tensor()
self.current_step_tensor = self.scope.var( self.current_step_tensor = self.scope.var(
self.current_step_name).get_tensor() self.current_step_name).get_tensor()
self.current_step_tensor.set(self.current_step, core.CPUPlace()) self.current_step_tensor.set(self.current_step, core.CPUPlace())
self.nranks_tensor = self.scope.var(self.nranks_name).get_tensor()
self.nranks_tensor.set(self.nranks, core.CPUPlace())
self.encode_grad_tensor = self.scope.var(
self.encode_grad_name).get_tensor()
self.k_tensor = self.scope.var(self.k_name).get_tensor() self.k_tensor = self.scope.var(self.k_name).get_tensor()
self.k_tensor.set(self.k, core.CPUPlace()) self.k_tensor.set(self.k, core.CPUPlace())
self.gather_buff_tensor = self.scope.var(
self.gather_buff_name).get_tensor()
def check(self, actual_t, expect_t, place, out_name, atol=1e-5): def check(self, actual_t, expect_t, place, out_name, atol=1e-5):
self.assertTrue( self.assertTrue(
np.allclose( np.allclose(
...@@ -87,6 +97,7 @@ class TestDGCOp(unittest.TestCase): ...@@ -87,6 +97,7 @@ class TestDGCOp(unittest.TestCase):
'V': self.v_name, 'V': self.v_name,
'Grad': self.grad_name, 'Grad': self.grad_name,
'current_step': self.current_step_name, 'current_step': self.current_step_name,
'nranks': self.nranks_name,
# outputs # outputs
'U_out': self.u_name, 'U_out': self.u_name,
...@@ -94,6 +105,7 @@ class TestDGCOp(unittest.TestCase): ...@@ -94,6 +105,7 @@ class TestDGCOp(unittest.TestCase):
'EncodeGrad': self.encode_grad_name, 'EncodeGrad': self.encode_grad_name,
'Grad_out': self.grad_name, 'Grad_out': self.grad_name,
'k': self.k_name, 'k': self.k_name,
'GatherBuff': self.gather_buff_name,
# attrs # attrs
'm': 0.9, 'm': 0.9,
......
...@@ -29,7 +29,7 @@ class TestDGCMomentumOptimizer(unittest.TestCase): ...@@ -29,7 +29,7 @@ class TestDGCMomentumOptimizer(unittest.TestCase):
return self._accumulators return self._accumulators
def get_velocity_str(self): def get_velocity_str(self):
return self._velocity_acc_str return self._u_velocity_acc_str
def check_dgc_momentum_optimizer(self, dims=[5, 10, 8], name="momentum"): def check_dgc_momentum_optimizer(self, dims=[5, 10, 8], name="momentum"):
init_program = framework.Program() init_program = framework.Program()
...@@ -66,8 +66,10 @@ class TestDGCMomentumOptimizer(unittest.TestCase): ...@@ -66,8 +66,10 @@ class TestDGCMomentumOptimizer(unittest.TestCase):
type="mean", inputs={"X": mul_out}, outputs={"Out": mean_out}) type="mean", inputs={"X": mul_out}, outputs={"Out": mean_out})
# params_grads = append_backward(mean_out) # params_grads = append_backward(mean_out)
params_grads = dgc_momentum_optimizer.backward(mean_out) params_grads = dgc_momentum_optimizer.backward(mean_out)
accumulator_count = 1 if name == "momentum" else 2
self.assertEqual(len(params_grads), 1) self.assertEqual(len(params_grads), 1)
self.assertEqual(len(dgc_momentum_optimizer.get_accumulators()), 0) self.assertEqual(
len(dgc_momentum_optimizer.get_accumulators()), accumulator_count)
with framework.program_guard(program, init_program): with framework.program_guard(program, init_program):
opts = dgc_momentum_optimizer.apply_gradients(params_grads) opts = dgc_momentum_optimizer.apply_gradients(params_grads)
self.assertEqual(len(opts), 2) self.assertEqual(len(opts), 2)
...@@ -77,7 +79,7 @@ class TestDGCMomentumOptimizer(unittest.TestCase): ...@@ -77,7 +79,7 @@ class TestDGCMomentumOptimizer(unittest.TestCase):
# Check accumulators # Check accumulators
accumulators = dgc_momentum_optimizer.get_accumulators() accumulators = dgc_momentum_optimizer.get_accumulators()
self.assertEqual(len(accumulators), 1) self.assertEqual(len(accumulators), accumulator_count)
self.assertTrue( self.assertTrue(
dgc_momentum_optimizer.get_velocity_str() in accumulators) dgc_momentum_optimizer.get_velocity_str() in accumulators)
velocity_acc = accumulators[dgc_momentum_optimizer.get_velocity_str()] velocity_acc = accumulators[dgc_momentum_optimizer.get_velocity_str()]
...@@ -86,11 +88,9 @@ class TestDGCMomentumOptimizer(unittest.TestCase): ...@@ -86,11 +88,9 @@ class TestDGCMomentumOptimizer(unittest.TestCase):
# Check init_program # Check init_program
init_ops = init_program.global_block().ops init_ops = init_program.global_block().ops
self.assertEqual(len(init_ops), 2) self.assertEqual(len(init_ops), 1)
self.assertEqual(init_ops[0].type, "fill_constant") self.assertEqual(init_ops[0].type, "fill_constant")
self.assertAlmostEqual(init_ops[0].attr('value'), learning_rate) self.assertAlmostEqual(init_ops[0].attr('value'), learning_rate)
self.assertEqual(init_ops[1].type, "fill_constant")
self.assertAlmostEqual(init_ops[1].attr('value'), 0.0)
with open("test_dgc_optimizer_" + name + ".log", "w") as f: with open("test_dgc_optimizer_" + name + ".log", "w") as f:
program_to_code(program, fout=f) program_to_code(program, fout=f)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册