From 6c1bc1c6a90ecfc1183715551d37ceb5f02d10ab Mon Sep 17 00:00:00 2001 From: yujianfeng Date: Mon, 15 Jun 2020 11:05:13 +0800 Subject: [PATCH] Add multiple process for computation of sparse optimizers --- mindspore/ccsrc/kernel/common_utils.cc | 17 +++ mindspore/ccsrc/kernel/common_utils.h | 23 ++++ .../cpu/sparse_apply_adam_cpu_kernel.cc | 109 ++++++++++++------ .../kernel/cpu/sparse_apply_adam_cpu_kernel.h | 3 - .../cpu/sparse_apply_ftrl_cpu_kernel.cc | 81 ++++++++----- .../cpu/sparse_apply_lazy_adam_cpu_kernel.cc | 65 ++++++++--- ...parse_apply_proximal_adagrad_cpu_kernel.cc | 65 +++++++---- 7 files changed, 260 insertions(+), 103 deletions(-) diff --git a/mindspore/ccsrc/kernel/common_utils.cc b/mindspore/ccsrc/kernel/common_utils.cc index 3de03069e..9b578eb10 100644 --- a/mindspore/ccsrc/kernel/common_utils.cc +++ b/mindspore/ccsrc/kernel/common_utils.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include "nlohmann/json.hpp" #include "session/anf_runtime_algorithm.h" #include "common/utils.h" @@ -876,5 +877,21 @@ bool IsWeightBoundary(const AnfNodePtr &node) { } return false; } + +void MultiThreadCompute(const MultiThreadComputeFunc &func, MultiThreadComputeParams *params, size_t thread_num, + size_t total_compute_size) { + std::vector threads; + threads.reserve(thread_num); + size_t start = 0; + size_t once_compute_size = (total_compute_size + thread_num - 1) / thread_num; + while (start < total_compute_size) { + size_t end = (start + once_compute_size) > total_compute_size ? total_compute_size : (start + once_compute_size); + threads.emplace_back(std::thread(func, params, start, end)); + start += once_compute_size; + } + for (size_t i = 0; i < threads.size(); ++i) { + threads[i].join(); + } +} } // namespace kernel } // namespace mindspore diff --git a/mindspore/ccsrc/kernel/common_utils.h b/mindspore/ccsrc/kernel/common_utils.h index 244d8e4e9..e25421c57 100644 --- a/mindspore/ccsrc/kernel/common_utils.h +++ b/mindspore/ccsrc/kernel/common_utils.h @@ -78,6 +78,27 @@ struct SparseGradient { size_t indices_size_; }; +struct MultiThreadComputeParams { + float *var_; + float *accum_; + float *linear_; + float *m_; + float *m_t_; + float *v_; + float lr_; + float l1_; + float l2_; + float lr_power_; + float beta1_; + float beta2_; + float epsilon_; + SparseGradient sparse_grad_; + size_t var_first_dim_size_; + size_t var_outer_dim_size_; + bool use_nesterov_; +}; +using MultiThreadComputeFunc = std::function; + bool CheckCache(const std::string &kernel_name); KernelPackPtr SearchCache(const std::string &kernel_name, const std::string &processor); KernelPackPtr InsertCache(const std::string &kernel_name, const std::string &processor); @@ -107,6 +128,8 @@ void GetValidKernelNodes(const FuncGraphPtr &func_graph, std::vector bool GetInputTensorValue(const AnfNodePtr &anf_node, size_t input_idx, nlohmann::json *const node_json); void GetGraphRealOutput(const FuncGraphPtr &func_graph, std::vector> *node_list); bool IsWeightBoundary(const AnfNodePtr &node); +void MultiThreadCompute(const MultiThreadComputeFunc &func, MultiThreadComputeParams *params, size_t thread_num, + size_t total_compute_size); } // namespace kernel } // namespace mindspore diff --git a/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.cc b/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.cc index 13450d048..5e2fc7957 100644 --- a/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.cc +++ b/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.cc @@ -14,12 +14,66 @@ * limitations under the License. */ #include "kernel/cpu/sparse_apply_adam_cpu_kernel.h" +#include "kernel/common_utils.h" #include "device/cpu/cpu_device_address.h" namespace mindspore { namespace kernel { namespace { constexpr size_t kSparseApplyAdamInputSize = 11; + +void ComputeAdam(MultiThreadComputeParams *input_params, size_t start, size_t end) { + MS_EXCEPTION_IF_NULL(input_params); + auto m = input_params->m_; + auto m_t = input_params->m_t_; + auto v = input_params->v_; + auto beta1 = input_params->beta1_; + auto beta2 = input_params->beta2_; + auto use_nesterov = input_params->use_nesterov_; + auto unique_sparse_grad = input_params->sparse_grad_; + auto var_first_dim_size = input_params->var_first_dim_size_; + auto var_outer_dim_size = input_params->var_outer_dim_size_; + for (size_t i = start; i < end; ++i) { + int index = unique_sparse_grad.indices_[i]; + if (index < 0 || IntToSize(index) >= var_first_dim_size) { + MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range after unique process"; + } + size_t start_index = var_outer_dim_size * index; + size_t end_index = start_index + var_outer_dim_size; + for (size_t j = start_index, k = var_outer_dim_size * i; j < end_index; ++j, ++k) { + auto summed_grad = unique_sparse_grad.value_[k]; + m[j] += (1 - beta1) * summed_grad; + v[j] += (1 - beta2) * summed_grad * summed_grad; + if (use_nesterov) { + m_t[j] = m[j] * beta1 + (1 - beta1) * summed_grad; + } + } + } +} + +void ComputeMomentum(MultiThreadComputeParams *input_params, size_t start, size_t end) { + MS_EXCEPTION_IF_NULL(input_params); + auto m = input_params->m_; + auto v = input_params->v_; + auto beta1 = input_params->beta1_; + auto beta2 = input_params->beta2_; + for (size_t i = start; i < end; ++i) { + m[i] *= beta1; + v[i] *= beta2; + } +} + +void ComputeWeight(MultiThreadComputeParams *input_params, size_t start, size_t end) { + MS_EXCEPTION_IF_NULL(input_params); + auto var = input_params->var_; + auto m = input_params->m_; + auto v = input_params->v_; + auto lr = input_params->lr_; + auto epsilon = input_params->epsilon_; + for (size_t i = start; i < end; ++i) { + var[i] -= lr * m[i] / (std::sqrt(v[i]) + epsilon); + } +} } // namespace void SparseApplyAdamCPUKernel::InitInputOutputSize(const CNodePtr &kernel_node) { @@ -64,29 +118,6 @@ void SparseApplyAdamCPUKernel::InitKernel(const CNodePtr &kernel_node) { } } -void SparseApplyAdamCPUKernel::UpdateSparseMomentum(const SparseGradient &unique_sparse_grad, float *m, float *m_t, - float *v, float beta1, float beta2) const { - MS_EXCEPTION_IF_NULL(m); - MS_EXCEPTION_IF_NULL(m_t); - MS_EXCEPTION_IF_NULL(v); - for (size_t i = 0; i < unique_sparse_grad.indices_size_; ++i) { - int index = unique_sparse_grad.indices_[i]; - if (index < 0 || IntToSize(index) >= var_first_dim_size_) { - MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range after unique process"; - } - size_t start_index = var_outer_dim_size_ * index; - size_t end_index = start_index + var_outer_dim_size_; - for (size_t j = start_index, k = var_outer_dim_size_ * i; j < end_index; ++j, ++k) { - auto summed_grad = unique_sparse_grad.value_[k]; - m[j] += (1 - beta1) * summed_grad; - v[j] += (1 - beta2) * summed_grad * summed_grad; - if (use_nesterov_) { - m_t[j] = m[j] * beta1 + (1 - beta1) * summed_grad; - } - } - } -} - bool SparseApplyAdamCPUKernel::Launch(const std::vector &inputs, const std::vector &workspace, const std::vector & /*outputs*/) { @@ -115,21 +146,31 @@ bool SparseApplyAdamCPUKernel::Launch(const std::vector &inp ReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &unique_sparse_grad, var_first_dim_size_, var_outer_dim_size_); size_t total_dim_size = var_first_dim_size_ * var_outer_dim_size_; - // Update momentum lr = lr * std::sqrt(1 - beta2_power) / (1 - beta1_power); - for (size_t i = 0; i < total_dim_size; ++i) { - m[i] *= beta1; - v[i] *= beta2; - } + + MultiThreadComputeParams input_params; + input_params.m_ = m; + input_params.v_ = v; + input_params.beta1_ = beta1; + input_params.beta2_ = beta2; + const size_t kThreadNum = 16; + MultiThreadCompute(ComputeMomentum, &input_params, kThreadNum, total_dim_size); + std::vector m_t(m, m + total_dim_size); - UpdateSparseMomentum(unique_sparse_grad, m, m_t.data(), v, beta1, beta2); - // Update weight + input_params.m_t_ = m_t.data(); + input_params.use_nesterov_ = use_nesterov_; + input_params.sparse_grad_ = unique_sparse_grad; + input_params.var_first_dim_size_ = var_first_dim_size_; + input_params.var_outer_dim_size_ = var_outer_dim_size_; + MultiThreadCompute(ComputeAdam, &input_params, kThreadNum, unique_sparse_grad.indices_size_); + if (use_nesterov_) { - m = m_t.data(); - } - for (size_t i = 0; i < total_dim_size; ++i) { - var[i] -= lr * m[i] / (std::sqrt(v[i]) + epsilon); + input_params.m_ = input_params.m_t_; } + input_params.var_ = var; + input_params.lr_ = lr; + input_params.epsilon_ = epsilon; + MultiThreadCompute(ComputeWeight, &input_params, kThreadNum, total_dim_size); return true; } } // namespace kernel diff --git a/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.h b/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.h index 71be65eca..c2770d0eb 100644 --- a/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.h +++ b/mindspore/ccsrc/kernel/cpu/sparse_apply_adam_cpu_kernel.h @@ -20,7 +20,6 @@ #include #include "kernel/cpu/cpu_kernel.h" #include "kernel/cpu/cpu_kernel_factory.h" -#include "kernel/common_utils.h" namespace mindspore { namespace kernel { @@ -35,8 +34,6 @@ class SparseApplyAdamCPUKernel : public CPUKernel { const std::vector &outputs) override; private: - void UpdateSparseMomentum(const SparseGradient &unique_sparse_grad, float *m, float *m_t, float *v, float beta1, - float beta2) const; size_t indices_size_{0}; size_t var_first_dim_size_{0}; size_t var_outer_dim_size_{1}; diff --git a/mindspore/ccsrc/kernel/cpu/sparse_apply_ftrl_cpu_kernel.cc b/mindspore/ccsrc/kernel/cpu/sparse_apply_ftrl_cpu_kernel.cc index 49ef0813f..005195ea3 100644 --- a/mindspore/ccsrc/kernel/cpu/sparse_apply_ftrl_cpu_kernel.cc +++ b/mindspore/ccsrc/kernel/cpu/sparse_apply_ftrl_cpu_kernel.cc @@ -21,6 +21,47 @@ namespace mindspore { namespace kernel { namespace { constexpr size_t kSparseApplyFtrlInputSize = 5; + +void ComputeFtrl(MultiThreadComputeParams *input_params, size_t start, size_t end) { + MS_EXCEPTION_IF_NULL(input_params); + auto var = input_params->var_; + auto accum = input_params->accum_; + auto linear = input_params->linear_; + auto lr = input_params->lr_; + auto l1 = input_params->l1_; + auto l2 = input_params->l2_; + auto lr_power = input_params->lr_power_; + auto unique_sparse_grad = input_params->sparse_grad_; + auto var_first_dim_size = input_params->var_first_dim_size_; + auto var_outer_dim_size = input_params->var_outer_dim_size_; + for (size_t i = start; i < end; ++i) { + int index = unique_sparse_grad.indices_[i]; + if (index < 0 || IntToSize(index) >= var_first_dim_size) { + MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range after unique process"; + } + size_t start_index = var_outer_dim_size * index; + size_t end_index = start_index + var_outer_dim_size; + for (size_t j = start_index, k = var_outer_dim_size * i; j < end_index; ++j, ++k) { + auto summed_grad = unique_sparse_grad.value_[k]; + auto accum_new = accum[j] + summed_grad * summed_grad; + if (lr_power == -0.5) { + linear[j] += summed_grad - (std::sqrt(accum_new) - std::sqrt(accum[j])) / lr * var[j]; + } else { + linear[j] += summed_grad - (std::pow(accum_new, -lr_power) - std::pow(accum[j], -lr_power)) / lr * var[j]; + } + auto x = Sign(linear[j]) * l1 - linear[j]; + float y; + if (lr_power == -0.5) { + y = std::sqrt(accum_new) / lr + 2 * l2; + } else { + y = std::pow(accum_new, -lr_power) / lr + 2 * l2; + } + auto pre_shrink = x / y; + var[j] = std::fabs(linear[j]) > l1 ? pre_shrink : 0; + accum[j] = accum_new; + } + } +} } // namespace void SparseApplyFtrlCPUKernel::InitInputOutputSize(const CNodePtr &kernel_node) { @@ -96,33 +137,19 @@ bool SparseApplyFtrlCPUKernel::Launch(const std::vector &inp ReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &unique_sparse_grad, var_first_dim_size_, var_outer_dim_size_); - for (size_t i = 0; i < unique_sparse_grad.indices_size_; ++i) { - int index = unique_sparse_grad.indices_[i]; - if (index < 0 || IntToSize(index) >= var_first_dim_size_) { - MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range after unique process"; - } - size_t start_index = var_outer_dim_size_ * index; - size_t end_index = start_index + var_outer_dim_size_; - for (size_t j = start_index, k = var_outer_dim_size_ * i; j < end_index; ++j, ++k) { - auto summed_grad = unique_sparse_grad.value_[k]; - auto accum_new = accum[j] + summed_grad * summed_grad; - if (lr_power_ == -0.5) { - linear[j] += summed_grad - (std::sqrt(accum_new) - std::sqrt(accum[j])) / lr_ * var[j]; - } else { - linear[j] += summed_grad - (std::pow(accum_new, -lr_power_) - std::pow(accum[j], -lr_power_)) / lr_ * var[j]; - } - auto x = Sign(linear[j]) * l1_ - linear[j]; - float y; - if (lr_power_ == -0.5) { - y = std::sqrt(accum_new) / lr_ + 2 * l2_; - } else { - y = std::pow(accum_new, -lr_power_) / lr_ + 2 * l2_; - } - auto pre_shrink = x / y; - var[j] = std::fabs(linear[j]) > l1_ ? pre_shrink : 0; - accum[j] = accum_new; - } - } + MultiThreadComputeParams input_params; + input_params.var_ = var; + input_params.accum_ = accum; + input_params.linear_ = linear; + input_params.lr_ = lr_; + input_params.l1_ = l1_; + input_params.l2_ = l2_; + input_params.lr_power_ = lr_power_; + input_params.sparse_grad_ = unique_sparse_grad; + input_params.var_first_dim_size_ = var_first_dim_size_; + input_params.var_outer_dim_size_ = var_outer_dim_size_; + const size_t kThreadNum = 16; + MultiThreadCompute(ComputeFtrl, &input_params, kThreadNum, unique_sparse_grad.indices_size_); return true; } } // namespace kernel diff --git a/mindspore/ccsrc/kernel/cpu/sparse_apply_lazy_adam_cpu_kernel.cc b/mindspore/ccsrc/kernel/cpu/sparse_apply_lazy_adam_cpu_kernel.cc index 0d6e0405d..2460dc0f2 100644 --- a/mindspore/ccsrc/kernel/cpu/sparse_apply_lazy_adam_cpu_kernel.cc +++ b/mindspore/ccsrc/kernel/cpu/sparse_apply_lazy_adam_cpu_kernel.cc @@ -21,6 +21,39 @@ namespace mindspore { namespace kernel { namespace { constexpr size_t kSparseApplyLazyAdamInputSize = 11; + +void ComputeLazyAdam(MultiThreadComputeParams *input_params, size_t start, size_t end) { + MS_EXCEPTION_IF_NULL(input_params); + auto var = input_params->var_; + auto m = input_params->m_; + auto v = input_params->v_; + auto lr = input_params->lr_; + auto beta1 = input_params->beta1_; + auto beta2 = input_params->beta2_; + auto epsilon = input_params->epsilon_; + auto use_nesterov = input_params->use_nesterov_; + auto unique_sparse_grad = input_params->sparse_grad_; + auto var_first_dim_size = input_params->var_first_dim_size_; + auto var_outer_dim_size = input_params->var_outer_dim_size_; + for (size_t i = start; i < end; ++i) { + int index = unique_sparse_grad.indices_[i]; + if (index < 0 || IntToSize(index) >= var_first_dim_size) { + MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range"; + } + size_t start_index = var_outer_dim_size * index; + size_t end_index = start_index + var_outer_dim_size; + for (size_t j = start_index, k = var_outer_dim_size * i; j < end_index; ++j, ++k) { + auto summed_grad = unique_sparse_grad.value_[k]; + m[j] = beta1 * m[j] + (1 - beta1) * summed_grad; + v[j] = beta2 * v[j] + (1 - beta2) * summed_grad * summed_grad; + if (use_nesterov) { + var[j] -= lr * (m[j] * beta1 + (1 - beta1) * summed_grad) / (std::sqrt(v[j]) + epsilon); + } else { + var[j] -= lr * m[j] / (std::sqrt(v[j]) + epsilon); + } + } + } +} } // namespace void SparseApplyLazyAdamCPUKernel::InitInputOutputSize(const CNodePtr &kernel_node) { @@ -94,24 +127,20 @@ bool SparseApplyLazyAdamCPUKernel::Launch(const std::vector var_outer_dim_size_); lr = lr * std::sqrt(1 - beta2_power) / (1 - beta1_power); - for (size_t i = 0; i < unique_sparse_grad.indices_size_; ++i) { - int index = unique_sparse_grad.indices_[i]; - if (index < 0 || IntToSize(index) >= var_first_dim_size_) { - MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range"; - } - size_t start_index = var_outer_dim_size_ * index; - size_t end_index = start_index + var_outer_dim_size_; - for (size_t j = start_index, k = var_outer_dim_size_ * i; j < end_index; ++j, ++k) { - auto summed_grad = unique_sparse_grad.value_[k]; - m[j] = beta1 * m[j] + (1 - beta1) * summed_grad; - v[j] = beta2 * v[j] + (1 - beta2) * summed_grad * summed_grad; - if (use_nesterov_) { - var[j] -= lr * (m[j] * beta1 + (1 - beta1) * summed_grad) / (std::sqrt(v[j]) + epsilon); - } else { - var[j] -= lr * m[j] / (std::sqrt(v[j]) + epsilon); - } - } - } + MultiThreadComputeParams input_params; + input_params.var_ = var; + input_params.m_ = m; + input_params.v_ = v; + input_params.lr_ = lr; + input_params.beta1_ = beta1; + input_params.beta2_ = beta2; + input_params.epsilon_ = epsilon; + input_params.use_nesterov_ = use_nesterov_; + input_params.sparse_grad_ = unique_sparse_grad; + input_params.var_first_dim_size_ = var_first_dim_size_; + input_params.var_outer_dim_size_ = var_outer_dim_size_; + const size_t kThreadNum = 16; + MultiThreadCompute(ComputeLazyAdam, &input_params, kThreadNum, unique_sparse_grad.indices_size_); return true; } } // namespace kernel diff --git a/mindspore/ccsrc/kernel/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc b/mindspore/ccsrc/kernel/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc index d99c29459..64cb65764 100644 --- a/mindspore/ccsrc/kernel/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc +++ b/mindspore/ccsrc/kernel/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc @@ -21,6 +21,39 @@ namespace mindspore { namespace kernel { namespace { constexpr size_t kSparseApplyProximalAdagradInputSize = 7; + +void ComputeProximalAdagrad(MultiThreadComputeParams *input_params, size_t start, size_t end) { + MS_EXCEPTION_IF_NULL(input_params); + auto var = input_params->var_; + auto accum = input_params->accum_; + auto lr = input_params->lr_; + auto l1 = input_params->l1_; + auto l2 = input_params->l2_; + auto unique_sparse_grad = input_params->sparse_grad_; + auto var_first_dim_size = input_params->var_first_dim_size_; + auto var_outer_dim_size = input_params->var_outer_dim_size_; + for (size_t i = start; i < end; ++i) { + int index = unique_sparse_grad.indices_[i]; + if (index < 0 || IntToSize(index) >= var_first_dim_size) { + MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range after unique process"; + } + size_t start_index = var_outer_dim_size * index; + size_t end_index = start_index + var_outer_dim_size; + for (size_t j = start_index, k = var_outer_dim_size * i; j < end_index; ++j, ++k) { + auto summed_grad = unique_sparse_grad.value_[k]; + accum[j] += summed_grad * summed_grad; + auto learning_rate = lr * (1 / std::sqrt(accum[j])); + auto prox_v = var[j]; + prox_v -= summed_grad * learning_rate; + if (l1 > 0) { + var[j] = Sign(prox_v) * std::fmax(std::fabs(prox_v) - learning_rate * l1, static_cast(0.0)) / + (1 + l2 * learning_rate); + } else { + var[j] = prox_v / (1 + l2 * learning_rate); + } + } + } +} } // namespace void SparseApplyProximalAdagradCPUKernel::InitInputOutputSize(const CNodePtr &kernel_node) { @@ -90,27 +123,17 @@ bool SparseApplyProximalAdagradCPUKernel::Launch(const std::vector= var_first_dim_size_) { - MS_LOG(EXCEPTION) << "Index " << index << " in indices is out of range after unique process"; - } - size_t start_index = var_outer_dim_size_ * index; - size_t end_index = start_index + var_outer_dim_size_; - for (size_t j = start_index, k = var_outer_dim_size_ * i; j < end_index; ++j, ++k) { - auto summed_grad = unique_sparse_grad.value_[k]; - accum[j] += summed_grad * summed_grad; - auto learning_rate = lr * (1 / std::sqrt(accum[j])); - auto prox_v = var[j]; - prox_v -= summed_grad * learning_rate; - if (l1 > 0) { - var[j] = Sign(prox_v) * std::fmax(std::fabs(prox_v) - learning_rate * l1, static_cast(0.0)) / - (1 + l2 * learning_rate); - } else { - var[j] = prox_v / (1 + l2 * learning_rate); - } - } - } + MultiThreadComputeParams input_params; + input_params.var_ = var; + input_params.accum_ = accum; + input_params.lr_ = lr; + input_params.l1_ = l1; + input_params.l2_ = l2; + input_params.sparse_grad_ = unique_sparse_grad; + input_params.var_first_dim_size_ = var_first_dim_size_; + input_params.var_outer_dim_size_ = var_outer_dim_size_; + const size_t kThreadNum = 16; + MultiThreadCompute(ComputeProximalAdagrad, &input_params, kThreadNum, unique_sparse_grad.indices_size_); return true; } } // namespace kernel -- GitLab