提交 1b52753f 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!2462 optimize cpu reduce gradient

Merge pull request !2462 from kisnwang/optimize-reduce-sparse-gradient
......@@ -327,19 +327,16 @@ bool AscendKernelRuntime::GenTask(const session::KernelGraph *graph) {
vector<std::shared_ptr<TaskInfo>> task_info_list;
auto anf_node_list = graph->execution_order();
TaskGenerator::GenTasks(anf_node_list, &task_info_list, graph->graph_id());
// Store the task_info_list
auto insert_ret = task_map_.insert(std::make_pair(graph->graph_id(), task_info_list));
if (!insert_ret.second) {
MS_LOG(EXCEPTION) << "Duplicate GraphId! Please check in ascend_session.";
}
// Graph may have no compute node, such TensorAddGrad.
if (task_info_list.empty()) {
MS_LOG(WARNING) << "graph " << graph->graph_id() << " have no compute node";
return true;
}
AscendStreamAssign &assign_instance = AscendStreamAssign::GetInstance();
AscendStreamMng &stream_manager = AscendStreamMng::GetInstance();
AscendLabelAssign &label_assign_instance = AscendLabelAssign::GetInstance();
......@@ -348,19 +345,16 @@ bool AscendKernelRuntime::GenTask(const session::KernelGraph *graph) {
assign_instance.GetWaitStreams(&wait_active_stream_list);
std::vector<uint32_t> force_copy_stream_list;
assign_instance.GetHcomStreams(&force_copy_stream_list);
MS_LOG(INFO) << "call DavinciModel total stream num:" << stream_manager.GetCurAllocStreamNum()
<< ", total event num:" << assign_instance.total_event_num()
<< ", total label num:" << label_assign_instance.GetLabelNum(NOT_NULL(graph))
<< ", wait_active_stream_list size:" << wait_active_stream_list.size()
<< ", force_copy_stream_list size:" << force_copy_stream_list.size();
std::vector<std::shared_ptr<ge::model_runner::OpInfo>> empty_list;
std::shared_ptr<ge::model_runner::DavinciModel> model = std::make_shared<ge::model_runner::DavinciModel>(
task_info_list, empty_list, empty_list, empty_list, empty_list, wait_active_stream_list, force_copy_stream_list, 0,
0, 0, 0, 0, 0, stream_manager.GetCurAllocStreamNum(), label_assign_instance.GetLabelNum(NOT_NULL(graph)),
assign_instance.total_event_num(), 0);
auto ret = graph_model_map_.insert(std::make_pair(graph->graph_id(), model));
if (!ret.second) {
MS_LOG(EXCEPTION) << "Duplicate GraphId! Please check in ascend_session.";
......
......@@ -147,20 +147,18 @@ BaseRef CPUKernelRuntime::CreatTensorForOutput(const session::KernelWithIndex &k
auto &input_node = kernel_with_index.first;
auto index = kernel_with_index.second;
MS_EXCEPTION_IF_NULL(input_node);
if (input_node->isa<CNode>() && AnfAlgo::GetCNodeName(input_node) == prim::kPrimMakeTuple->name()) {
auto cnode = input_node->cast<CNodePtr>();
MS_EXCEPTION_IF_NULL(cnode);
VectorRef ret;
for (size_t i = 1; i < cnode->inputs().size(); i++) {
auto item_with_index = AnfAlgo::VisitKernelWithReturnType(cnode->input(i), 0);
auto out = CreatTensorForOutput(item_with_index, input_map, bound_addresses, need_sync_outputs);
ret.push_back(out);
}
return ret;
}
if (input_node->isa<CNode>()) {
auto node = input_node->cast<CNodePtr>();
MS_EXCEPTION_IF_NULL(node);
if (AnfAlgo::GetCNodeName(input_node) == prim::kPrimMakeTuple->name()) {
VectorRef ret;
for (size_t i = 1; i < node->inputs().size(); i++) {
auto item_with_index = AnfAlgo::VisitKernelWithReturnType(node->input(i), 0);
auto out = CreatTensorForOutput(item_with_index, input_map, bound_addresses, need_sync_outputs);
ret.push_back(out);
}
return ret;
}
size_t output_size = AnfAlgo::GetOutputTensorNum(node);
if (index >= output_size) {
MS_LOG(EXCEPTION) << "Invalid input index " << index;
......
......@@ -576,6 +576,52 @@ void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGr
unique_grad->indices_size_ = unique_indices_size;
}
struct WorkerParamsForReduceSparseGradient {
size_t slice_start_{0};
size_t slice_end_{0};
size_t max_length_{0};
size_t outer_dim_{0};
std::vector<std::pair<int, size_t>> *sorted_indices_{nullptr};
std::vector<size_t> *slice_positions_{nullptr};
float *src_value_{nullptr};
SparseGradient *unique_grad_{nullptr};
};
void WorkerForReduceSparseGradient(WorkerParamsForReduceSparseGradient param) {
MS_EXCEPTION_IF_NULL(param.sorted_indices_);
MS_EXCEPTION_IF_NULL(param.slice_positions_);
MS_EXCEPTION_IF_NULL(param.src_value_);
MS_EXCEPTION_IF_NULL(param.unique_grad_);
auto outer_dim = param.outer_dim_;
auto &sorted_indices = *(param.sorted_indices_);
auto &slice_positions = *(param.slice_positions_);
auto unique_grad = param.unique_grad_;
for (size_t slice_id = param.slice_start_; slice_id < param.slice_end_; ++slice_id) {
size_t cur_pos = slice_positions[slice_id];
int index = sorted_indices[cur_pos].first;
unique_grad->indices_[slice_id] = index;
size_t start_index = slice_id * outer_dim;
auto ret_code = memcpy_s(unique_grad->value_ + start_index, (param.max_length_ - start_index) * sizeof(float),
param.src_value_ + sorted_indices[cur_pos].second, outer_dim * sizeof(float));
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
cur_pos++;
size_t end_pos;
if (slice_id + 1 < slice_positions.size()) {
end_pos = slice_positions[slice_id + 1];
} else {
end_pos = sorted_indices.size();
}
while (cur_pos < end_pos) {
for (size_t i = 0; i < outer_dim; ++i) {
unique_grad->value_[start_index + i] += param.src_value_[sorted_indices[cur_pos].second + i];
}
cur_pos++;
}
}
}
void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim,
size_t outer_dim) {
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_);
......@@ -583,47 +629,50 @@ void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradie
MS_EXCEPTION_IF_NULL(unique_grad);
MS_EXCEPTION_IF_NULL(unique_grad->value_);
MS_EXCEPTION_IF_NULL(unique_grad->indices_);
size_t unique_indices_size = 0;
std::vector<std::pair<int, size_t>> sorted_indices;
sorted_indices.reserve(origin_sparse_grad.indices_size_);
for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) {
int index = origin_sparse_grad.indices_[i];
if (index < 0 || IntToSize(index) >= first_dim) {
continue;
if (index >= 0 && IntToSize(index) < first_dim) {
sorted_indices.emplace_back(std::pair<int, size_t>(index, i * outer_dim));
}
sorted_indices.emplace_back(std::pair<int, size_t>(index, i * outer_dim));
}
std::sort(
sorted_indices.begin(), sorted_indices.end(),
[](const std::pair<int, size_t> &left, const std::pair<int, size_t> &right) { return left.first < right.first; });
int last_index = 0;
size_t indices_size = sorted_indices.size();
size_t start_index = 0;
size_t end_index = outer_dim;
size_t dst_len = indices_size * outer_dim;
for (size_t i = 0; i < indices_size; ++i) {
int index = sorted_indices[i].first;
if (i == 0 || last_index != index) {
if (i > 0 && last_index != index) {
unique_indices_size++;
start_index += outer_dim;
end_index += outer_dim;
}
unique_grad->indices_[unique_indices_size] = index;
auto ret_code = memcpy_s(unique_grad->value_ + start_index, dst_len - start_index,
origin_sparse_grad.value_ + sorted_indices[i].second, outer_dim);
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
std::vector<size_t> slice_positions;
for (size_t i = 0; i < sorted_indices.size(); ++i) {
if (i == 0 || last_index != sorted_indices[i].first) {
slice_positions.emplace_back(i);
}
last_index = sorted_indices[i].first;
}
size_t thread_num = 8;
if (slice_positions.size() < thread_num) {
thread_num = slice_positions.size();
}
size_t stride = (slice_positions.size() + thread_num - 1) / thread_num;
thread_num = (slice_positions.size() + stride - 1) / stride;
std::vector<std::thread> threads;
size_t max_length = sorted_indices.size() * outer_dim;
for (size_t i = 0; i < thread_num; ++i) {
size_t slice_start = i * stride;
size_t slice_end = 0;
if (i == thread_num - 1) {
slice_end = slice_positions.size();
} else {
for (size_t j = start_index, k = sorted_indices[i].second; j < end_index; ++j, ++k) {
unique_grad->value_[j] += origin_sparse_grad.value_[k];
}
slice_end = slice_start + stride;
}
last_index = index;
WorkerParamsForReduceSparseGradient params{
slice_start, slice_end, max_length, outer_dim, &sorted_indices, &slice_positions, origin_sparse_grad.value_,
unique_grad};
threads.emplace_back(std::thread(WorkerForReduceSparseGradient, params));
}
for (size_t i = 0; i < thread_num; ++i) {
threads[i].join();
}
unique_grad->indices_size_ = unique_indices_size + 1;
unique_grad->indices_size_ = slice_positions.size();
}
std::pair<AnfNodePtr, size_t> GetKernelInput(const AnfNodePtr &anf_node, size_t index) {
......
......@@ -25,6 +25,7 @@
#include <string>
#include <vector>
#include <utility>
#include <thread>
#include <nlohmann/json.hpp>
#include "kernel/kernel.h"
#include "kernel/oplib/opinfo.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册