From e106901ecab756876a066e0af1304e0b82cab0c3 Mon Sep 17 00:00:00 2001 From: helen88 Date: Mon, 24 Jan 2022 10:24:16 +0800 Subject: [PATCH] support sparse of adam, *test=kunlun (#38483) * support sparse of adam, *test=kunlun * add pre-commit-config.yaml * support sparse of adam in KL2,*test=kunlun * support sparse of adam in KL2, *test=kunlun * modify xpu.cmake, *test=kunlun * support sparse of adam, rm some wait, *test=kunlun * support sparse of adam, rm some wait, *test=kunlun * support sparse of adam, *test=kunlun * support sparse of adam, *test=kunlun * support sparse of adam, *test=kunlun * support sparse of adam, *test=kunlun * support sparse of adam, *test=kunlun --- cmake/external/xpu.cmake | 2 +- .../operators/math/selected_rows_functor.cc | 153 ++++++++++++++++++ .../fluid/operators/optimizers/adam_op_xpu.cc | 127 ++++++++++++++- .../tests/unittests/xpu/test_adam_op_xpu.py | 138 ++++++++++++++++ 4 files changed, 411 insertions(+), 9 deletions(-) diff --git a/cmake/external/xpu.cmake b/cmake/external/xpu.cmake index c7a6f04b5f..578fb16216 100644 --- a/cmake/external/xpu.cmake +++ b/cmake/external/xpu.cmake @@ -36,7 +36,7 @@ ENDIF() if(NOT DEFINED XPU_BASE_URL) SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev") - SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220104") + SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220116") else() SET(XPU_BASE_URL "${XPU_BASE_URL}") endif() diff --git a/paddle/fluid/operators/math/selected_rows_functor.cc b/paddle/fluid/operators/math/selected_rows_functor.cc index 67176f26b0..f6178eb0a1 100644 --- a/paddle/fluid/operators/math/selected_rows_functor.cc +++ b/paddle/fluid/operators/math/selected_rows_functor.cc @@ -477,6 +477,155 @@ struct MergeAdd { } }; +#ifdef PADDLE_WITH_XPU +template +struct MergeAdd { + framework::SelectedRows operator()(const platform::XPUDeviceContext& context, + const framework::SelectedRows& input, + const bool sorted_result = false) { + framework::SelectedRows out; + (*this)(context, input, &out, sorted_result); + return out; + } + + void operator()(const platform::XPUDeviceContext& context, + const framework::SelectedRows& input, + framework::SelectedRows* output, + const bool sorted_result = false) { + framework::Vector input_rows(input.rows()); + if (input_rows.size() == 0) { + return; + } + + framework::SelectedRows& out = *output; + std::set row_set(input_rows.begin(), input_rows.end()); + std::vector merge_rows(row_set.begin(), row_set.end()); + auto input_width = input.value().dims()[1]; + + out.set_rows(merge_rows); + out.set_height(input.height()); + out.mutable_value()->mutable_data( + framework::make_ddim( + {static_cast(merge_rows.size()), input_width}), + context.GetPlace()); + int r = + xpu::constant(context.x_context(), out.mutable_value()->data(), + merge_rows.size() * input_width, static_cast(0.f)); + PADDLE_ENFORCE_EQ(r, xpu::Error_t::SUCCESS, + platform::errors::External("XPU constant op return" + " wrong value[%d %s].", + r, XPUAPIErrorMsg[r])); + + std::unordered_map rows_to_id; + for (size_t i = 0; i < merge_rows.size(); ++i) { + rows_to_id[merge_rows[i]] = i; + } + + auto* out_data = out.mutable_value()->data(); + auto* input_data = input.value().data(); + int n = input_width; + for (size_t i = 0; i < input_rows.size(); i++) { + size_t out_i = rows_to_id[input_rows[i]]; + auto r = xpu::add(context.x_context(), &input_data[i * input_width], + &out_data[out_i * input_width], + &out_data[out_i * input_width], n); + PADDLE_ENFORCE_EQ( + r, XPU_SUCCESS, + platform::errors::External("XPU API return wrong value[%d %s], ", r, + XPUAPIErrorMsg[r])); + } + } + + void operator()(const platform::XPUDeviceContext& context, + const std::vector& inputs, + framework::SelectedRows* output, + const bool sorted_result = false) { + if (inputs.size() == 0) { + VLOG(3) << "no input! return"; + return; + } + const framework::SelectedRows* has_value_input = nullptr; + for (auto* in : inputs) { + if (in->rows().size() > 0) { + has_value_input = in; + break; + } + } + if (has_value_input == nullptr) { + VLOG(3) << "no input has value! just return" << std::endl; + return; + } + auto input_width = has_value_input->value().dims()[1]; + auto input_height = has_value_input->height(); + framework::SelectedRows& out = *output; + std::set merged_row_set; + size_t row_num = 0; + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + PADDLE_ENFORCE_EQ(input_width, input->value().dims()[1], + platform::errors::InvalidArgument( + "All inputs should have same " + "dimension except for the first one.")); + PADDLE_ENFORCE_EQ(input_height, input->height(), + platform::errors::InvalidArgument( + "All inputs should have same height.")); + row_num += input->rows().size(); + merged_row_set.insert(input->rows().begin(), input->rows().end()); + } + + std::vector merge_rows(merged_row_set.begin(), + merged_row_set.end()); + + if (sorted_result) { + std::sort(merge_rows.begin(), merge_rows.end()); + } + + out.set_rows(merge_rows); + out.set_height(input_height); + out.mutable_value()->mutable_data( + framework::make_ddim( + {static_cast(merged_row_set.size()), input_width}), + context.GetPlace()); + + int r = + xpu::constant(context.x_context(), out.mutable_value()->data(), + merge_rows.size() * input_width, static_cast(0.f)); + PADDLE_ENFORCE_EQ(r, xpu::Error_t::SUCCESS, + platform::errors::External("XPU constant op return" + " wrong value[%d %s].", + r, XPUAPIErrorMsg[r])); + + float* out_data = reinterpret_cast(out.mutable_value()->data()); + + std::unordered_map rows_to_id; + for (size_t i = 0; i < merge_rows.size(); ++i) { + rows_to_id[merge_rows[i]] = i; + } + + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + auto& input_rows = input->rows(); + + int n = input_width; + for (size_t i = 0; i < input_rows.size(); i++) { + size_t out_i = rows_to_id[input_rows[i]]; + auto r = xpu::add( + context.x_context(), input->value().data() + i * input_width, + &out_data[out_i * input_width], &out_data[out_i * input_width], n); + PADDLE_ENFORCE_EQ( + r, XPU_SUCCESS, + platform::errors::External("XPU API return wrong value[%d %s], ", r, + XPUAPIErrorMsg[r])); + } + } + } +}; + +#endif template struct MergeAverage { framework::SelectedRows operator()(const platform::CPUDeviceContext& context, @@ -589,6 +738,10 @@ template struct MergeAdd; +#ifdef PADDLE_WITH_XPU +template struct MergeAdd; +#endif + template struct MergeAverage; template struct MergeAverage; template struct MergeAverage; diff --git a/paddle/fluid/operators/optimizers/adam_op_xpu.cc b/paddle/fluid/operators/optimizers/adam_op_xpu.cc index 0a653c4011..e462c20c7f 100644 --- a/paddle/fluid/operators/optimizers/adam_op_xpu.cc +++ b/paddle/fluid/operators/optimizers/adam_op_xpu.cc @@ -14,6 +14,7 @@ limitations under the License. */ #include "paddle/fluid/operators/optimizers/adam_op.h" #include "gflags/gflags.h" +#include "paddle/fluid/operators/math/selected_rows_functor.h" namespace paddle { namespace operators { @@ -155,6 +156,11 @@ class AdamOpXPUKernel : public framework::OpKernel { mom2_out.template mutable_data(ctx.GetPlace()), param_out.template mutable_data(ctx.GetPlace()), beta1, beta2, epsilon, param.numel()); + + xpu_wait(dev_ctx.x_context()->xpu_stream); + PADDLE_ENFORCE_EQ( + r == xpu::Error_t::SUCCESS, true, + platform::errors::External("XPU API return wrong value[%d],", r)); if (!use_global_beta_pow) { // update in cpu and then copy to xpu if (beta1_pow.place() == platform::CPUPlace() && @@ -165,7 +171,6 @@ class AdamOpXPUKernel : public framework::OpKernel { const float* beta2_pow_p = beta2_pow.template data(); beta2_pow_out->mutable_data(platform::CPUPlace())[0] = beta2 * beta2_pow_p[0]; - xpu_wait(dev_ctx.x_context()->xpu_stream); } else { float* beta1_pow_out_p = beta1_pow_out->mutable_data(ctx.GetPlace()); @@ -177,23 +182,129 @@ class AdamOpXPUKernel : public framework::OpKernel { PADDLE_ENFORCE_EQ( r, xpu::SUCCESS, platform::errors::External( - "XPU kernel scale occur error in adamw error code ", r, + "XPU kernel scale occur error in adam error code ", r, XPUAPIErrorMsg[r])); r = xpu::scale(dev_ctx.x_context(), beta2_pow_ptr, beta2_pow_out_p, beta2_pow.numel(), false, beta2, 0.0f); PADDLE_ENFORCE_EQ( r, xpu::SUCCESS, platform::errors::External( - "XPU kernel scale occur error in adamw error code ", r, + "XPU kernel scale occur error in adam error code ", r, XPUAPIErrorMsg[r])); + + xpu_wait(dev_ctx.x_context()->xpu_stream); + } + } + } else if (grad_var->IsType()) { + auto* grad = ctx.Input("Grad"); + auto& dev_ctx = ctx.template device_context(); + + if (grad->rows().size() == 0) { + VLOG(3) << "grad row size is 0!!"; + return; + } + + std::vector cpu_rows(grad->rows().begin(), grad->rows().end()); + bool is_strict_sorted = true; + for (size_t i = 1; i < cpu_rows.size(); ++i) { + if (cpu_rows[i - 1] >= cpu_rows[i]) { + is_strict_sorted = false; + break; } + } + + framework::SelectedRows tmp_grad_merge; + const framework::SelectedRows* grad_merge_ptr; + if (is_strict_sorted) { + grad_merge_ptr = grad; + } else { + scatter::MergeAdd merge_func; + merge_func(ctx.template device_context(), + *grad, &tmp_grad_merge, true); + + xpu_wait(dev_ctx.x_context()->xpu_stream); + grad_merge_ptr = &tmp_grad_merge; + } + const T* beta1_pow_ptr = beta1_pow.template data(); + const T* beta2_pow_ptr = beta2_pow.template data(); + Tensor xpu_beta1_pow; + Tensor xpu_beta2_pow; + if (beta1_pow.place() == platform::CPUPlace() && + beta2_pow.place() == platform::CPUPlace()) { + paddle::framework::TensorCopy(beta1_pow, ctx.GetPlace(), dev_ctx, + &xpu_beta1_pow); + paddle::framework::TensorCopy(beta2_pow, ctx.GetPlace(), dev_ctx, + &xpu_beta2_pow); + dev_ctx.Wait(); + beta1_pow_ptr = xpu_beta1_pow.template data(); + beta2_pow_ptr = xpu_beta2_pow.template data(); + } + auto& grad_merge = *grad_merge_ptr; + auto& grad_tensor = grad_merge.value(); + const T* grad_data = grad_tensor.template data(); + int row_count = grad_merge.rows().size(); + std::vector rows(row_count); + xpu::ctx_guard RAII_GUARD(dev_ctx.x_context()); + int* xpu_rows = RAII_GUARD.alloc_l3_or_gm(row_count); + std::vector merge_rows(grad_merge.rows().begin(), + grad_merge.rows().end()); + for (size_t i = 0; i < grad_merge.rows().size(); ++i) { + rows[i] = static_cast(merge_rows[i]); + } + xpu_wait(dev_ctx.x_context()->xpu_stream); + memory::Copy(ctx.GetPlace(), xpu_rows, platform::CPUPlace(), rows.data(), + row_count * sizeof(int)); + auto row_numel = grad_tensor.numel() / grad_merge.rows().size(); + auto ori_rows = param.numel() / row_numel; - PADDLE_ENFORCE_EQ(r == xpu::Error_t::SUCCESS, true, - platform::errors::External( - "XPU API return wrong value[%d], please check " - "where Baidu Kunlun Card is properly installed.", - r)); + int lazy_mode = static_cast(ctx.Attr("lazy_mode")); + int r = xpu::sparse_adam( + dev_ctx.x_context(), grad_data, mom1.template data(), + mom2.template data(), param.template data(), beta1_pow_ptr, + beta2_pow_ptr, lr.template data(), + mom1_out.template mutable_data(ctx.GetPlace()), + mom2_out.template mutable_data(ctx.GetPlace()), + param_out.template mutable_data(ctx.GetPlace()), beta1, beta2, + epsilon, ori_rows, xpu_rows, row_numel, grad_merge.rows().size(), + lazy_mode); + + PADDLE_ENFORCE_EQ( + r == xpu::Error_t::SUCCESS, true, + platform::errors::External("XPU API return wrong value[%d],", r)); + + if (!use_global_beta_pow) { + // update in cpu and then copy to xpu + if (beta1_pow.place() == platform::CPUPlace() && + beta2_pow.place() == platform::CPUPlace()) { + const float* beta1_pow_p = beta1_pow.template data(); + beta1_pow_out->mutable_data(platform::CPUPlace())[0] = + beta1 * beta1_pow_p[0]; + const float* beta2_pow_p = beta2_pow.template data(); + beta2_pow_out->mutable_data(platform::CPUPlace())[0] = + beta2 * beta2_pow_p[0]; + } else { + float* beta1_pow_out_p = + beta1_pow_out->mutable_data(ctx.GetPlace()); + float* beta2_pow_out_p = + beta2_pow_out->mutable_data(ctx.GetPlace()); + int r = + xpu::scale(dev_ctx.x_context(), beta1_pow_ptr, beta1_pow_out_p, + beta1_pow.numel(), false, beta1, 0.0f); + PADDLE_ENFORCE_EQ( + r, xpu::SUCCESS, + platform::errors::External( + "XPU kernel scale occur error in adam error code ", r, + XPUAPIErrorMsg[r])); + r = xpu::scale(dev_ctx.x_context(), beta2_pow_ptr, beta2_pow_out_p, + beta2_pow.numel(), false, beta2, 0.0f); + PADDLE_ENFORCE_EQ( + r, xpu::SUCCESS, + platform::errors::External( + "XPU kernel scale occur error in adam error code ", r, + XPUAPIErrorMsg[r])); + } } + xpu_wait(dev_ctx.x_context()->xpu_stream); } else { PADDLE_ENFORCE_EQ(1, 2, platform::errors::InvalidArgument( "Variable type not supported by adam_op")); diff --git a/python/paddle/fluid/tests/unittests/xpu/test_adam_op_xpu.py b/python/paddle/fluid/tests/unittests/xpu/test_adam_op_xpu.py index 147824f341..a36c0bf071 100644 --- a/python/paddle/fluid/tests/unittests/xpu/test_adam_op_xpu.py +++ b/python/paddle/fluid/tests/unittests/xpu/test_adam_op_xpu.py @@ -216,6 +216,144 @@ def adam_step(inputs, attributes): return param_out, moment1_out, moment2_out +def adam_step_sparse(inputs, attributes, height, rows, row_numel, np_grad, + lazy_mode): + ''' + Simulate one step of the adam optimizer + :param inputs: dict of inputs + :param attributes: dict of attributes + :return tuple: tuple of output param, moment1, moment2, + beta1 power accumulator and beta2 power accumulator + ''' + param = inputs['Param'] + # grad = inputs['Grad'] + moment1 = inputs['Moment1'] + moment2 = inputs['Moment2'] + lr = inputs['LearningRate'] + beta1_pow = inputs['Beta1Pow'] + beta2_pow = inputs['Beta2Pow'] + + beta1 = attributes['beta1'] + beta2 = attributes['beta2'] + epsilon = attributes['epsilon'] + + moment1_out = np.zeros(shape=[height, row_numel]) + moment2_out = np.zeros(shape=[height, row_numel]) + param_out = np.zeros(shape=[height, row_numel]) + + def update_row(row_id, update_value): + moment1_out[row_id] = beta1 * moment1[row_id] + (1 - beta1 + ) * update_value + moment2_out[row_id] = beta2 * moment2[row_id] + ( + 1 - beta2) * np.square(update_value) + lr_t = lr * np.sqrt(1 - beta2_pow) / (1 - beta1_pow) + param_out[row_id] = param[row_id] - lr_t * (moment1_out[row_id] / ( + np.sqrt(moment2_out[row_id]) + epsilon)) + + if lazy_mode: + for idx, row_id in enumerate(rows): + update_row(row_id, np_grad[idx]) + else: + for row_id in range(param_out.shape[0]): + update_value = np.zeros(np_grad[0].shape).astype("float32") + if row_id in rows: + update_value = np_grad[rows.index(row_id)] + update_row(row_id, update_value) + + return param_out, moment1_out, moment2_out + + +class TestSparseAdamOp(unittest.TestCase): + def setup(self, scope, place, lazy_mode): + beta1 = 0.78 + beta2 = 0.836 + epsilon = 1e-4 + beta1_pow = np.array([beta1**10]).astype("float32") + beta2_pow = np.array([beta2**10]).astype("float32") + + height = 10 + rows = [0, 4, 7] + self.rows = rows + row_numel = 12 + self.row_numel = row_numel + self.dense_inputs = { + "Param": np.full((height, row_numel), 5.0).astype("float32"), + "Moment1": np.full((height, row_numel), 5.0).astype("float32"), + "Moment2": np.full((height, row_numel), 5.0).astype("float32"), + 'Beta1Pow': beta1_pow, + 'Beta2Pow': beta2_pow, + "LearningRate": np.full((1), 2.0).astype("float32") + } + self.init_output = np.full((height, row_numel), 0.0).astype("float32") + self.attrs = { + 'epsilon': epsilon, + 'beta1': beta1, + 'beta2': beta2, + 'min_row_size_to_use_multithread': 2 + } + + grad_selected_rows = scope.var('Grad').get_selected_rows() + grad_selected_rows.set_height(height) + grad_selected_rows.set_rows(rows) + np_array = np.ones((len(rows), row_numel)).astype("float32") + np_array[0, 0] = 2.0 + np_array[2, 8] = 4.0 + + grad_tensor = grad_selected_rows.get_tensor() + grad_tensor.set(np_array, place) + + self.sparse_inputs = ["Grad"] + + param_out, mom1, mom2 = adam_step_sparse(self.dense_inputs, self.attrs, + height, rows, row_numel, + np_array, lazy_mode) + self.outputs = { + "ParamOut": param_out, + "Moment1Out": mom1, + "Moment2Out": mom2, + 'Beta1PowOut': beta1_pow * beta1, + 'Beta2PowOut': beta2_pow * beta2 + } + + def check_with_place(self, place, lazy_mode): + scope = core.Scope() + self.setup(scope, place, lazy_mode) + + op_args = dict() + op_args['lazy_mode'] = lazy_mode + for key, np_array in self.dense_inputs.items(): + var = scope.var(key).get_tensor() + var.set(np_array, place) + op_args[key] = key + for s in self.sparse_inputs: + op_args[s] = s + for s in self.outputs: + var = scope.var(s).get_tensor() + var.set(self.init_output, place) + op_args[s] = s + for k in self.attrs: + op_args[k] = self.attrs[k] + + # create and run adam operator + adam_op = Operator("adam", **op_args) + adam_op.run(scope, place) + + for key, np_array in self.outputs.items(): + out_var = scope.var(key).get_tensor() + actual = np.array(out_var) + actual = actual.reshape([actual.size]) + np_array = np_array.reshape([np_array.size]) + + for i in range(np_array.size): + self.assertLess((actual[i] - np_array[i]), 0.00001) + + def test_sparse_adam(self): + xpu_version = core.get_xpu_device_version(0) + version_str = "xpu2" if xpu_version == core.XPUVersion.XPU2 else "xpu1" + if "xpu2" == version_str: + self.check_with_place(paddle.XPUPlace(0), False) + + class TestAdamOpBetaVariable(OpTest): def setUp(self): '''Test Adam Op with beta as Variable -- GitLab