未验证 提交 aa46caf3 编写于 作者: G guru4elephant 提交者: GitHub

Merge pull request #16765 from guru4elephant/gpu_dataset_train

add gpu training for Executor.train_from_dataset
......@@ -72,7 +72,6 @@ bool DataFeed::PickOneFile(std::string* filename) {
}
VLOG(3) << "file_idx_=" << *file_idx_;
*filename = filelist_[(*file_idx_)++];
// LOG(ERROR) << "pick file:" << *filename;
return true;
}
......@@ -466,6 +465,17 @@ void MultiSlotDataFeed::Init(
if (slot.is_used()) {
use_slots_.push_back(all_slots_[i]);
use_slots_is_dense_.push_back(slot.is_dense());
std::vector<int> local_shape;
if (slot.is_dense()) {
// for batch size holder if is_dense
if (slot.shape(0) > 0) {
local_shape.push_back(0);
}
}
for (size_t i = 0; i < slot.shape_size(); ++i) {
local_shape.push_back(slot.shape(i));
}
use_slots_shape_.push_back(local_shape);
}
}
feed_vec_.resize(use_slots_.size());
......@@ -752,8 +762,8 @@ void MultiSlotDataFeed::PutToFeedVec(
LoD data_lod{offset};
feed_vec_[i]->set_lod(data_lod);
if (use_slots_is_dense_[i]) {
int dim = total_instance / batch_size_;
feed_vec_[i]->Resize({batch_size_, dim});
use_slots_shape_[i][0] = batch_size_;
feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i]));
}
}
#endif
......@@ -785,6 +795,16 @@ void MultiSlotInMemoryDataFeed::Init(
if (slot.is_used()) {
use_slots_.push_back(all_slots_[i]);
use_slots_is_dense_.push_back(slot.is_dense());
std::vector<int> local_shape;
if (slot.is_dense()) {
if (slot.shape(0) > 0) {
local_shape.push_back(0);
}
}
for (size_t i = 0; i < slot.shape_size(); ++i) {
local_shape.push_back(slot.shape(i));
}
use_slots_shape_.push_back(local_shape);
}
}
feed_vec_.resize(use_slots_.size());
......@@ -940,8 +960,8 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec(
LoD data_lod{offset};
feed_vec_[i]->set_lod(data_lod);
if (use_slots_is_dense_[i]) {
int dim = total_instance / batch_size_;
feed_vec_[i]->Resize({batch_size_, dim});
use_slots_shape_[i][0] = batch_size_;
feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i]));
}
}
#endif
......
......@@ -142,6 +142,7 @@ class DataFeed {
// object)
std::vector<std::string> all_slots_;
std::vector<std::string> all_slots_type_;
std::vector<std::vector<int>> use_slots_shape_;
std::vector<int>
use_slots_index_; // -1: not used; >=0: the index of use_slots_
......
......@@ -19,6 +19,7 @@ message Slot {
required string type = 2;
optional bool is_dense = 3 [ default = false ];
optional bool is_used = 4 [ default = false ];
repeated int32 shape = 5; // we can define N-D Tensor
}
message MultiSlotDesc { repeated Slot slots = 1; }
......
......@@ -21,40 +21,40 @@ namespace framework {
void DownpourWorker::Initialize(const TrainerDesc& desc) {
param_ = desc.downpour_param();
for (size_t i = 0; i < param_.sparse_table_size(); ++i) {
for (int i = 0; i < param_.sparse_table_size(); ++i) {
uint64_t table_id =
static_cast<uint64_t>(param_.sparse_table(i).table_id());
TableParameter table = param_.sparse_table(i);
sparse_key_names_[table_id].resize(table.sparse_key_name_size());
for (size_t j = 0; j < table.sparse_key_name_size(); ++j) {
for (int j = 0; j < table.sparse_key_name_size(); ++j) {
sparse_key_names_[table_id][j] = table.sparse_key_name(j);
}
sparse_value_names_[table_id].resize(table.sparse_value_name_size());
for (size_t j = 0; j < table.sparse_value_name_size(); ++j) {
for (int j = 0; j < table.sparse_value_name_size(); ++j) {
sparse_value_names_[table_id][j] = table.sparse_value_name(j);
}
sparse_grad_names_[table_id].resize(table.sparse_grad_name_size());
for (size_t j = 0; j < table.sparse_grad_name_size(); ++j) {
for (int j = 0; j < table.sparse_grad_name_size(); ++j) {
sparse_grad_names_[table_id][j] = table.sparse_grad_name(j);
}
label_var_name_[table_id] = table.label_var_name();
}
for (size_t i = 0; i < param_.dense_table_size(); ++i) {
for (int i = 0; i < param_.dense_table_size(); ++i) {
uint64_t table_id = static_cast<uint64_t>(param_.dense_table(i).table_id());
auto table = param_.dense_table(i);
dense_value_names_[table_id].resize(table.dense_value_name_size());
for (size_t j = 0; j < table.dense_value_name_size(); ++j) {
for (int j = 0; j < table.dense_value_name_size(); ++j) {
dense_value_names_[table_id][j] = table.dense_value_name(j);
}
dense_grad_names_[table_id].resize(table.dense_grad_name_size());
for (size_t j = 0; j < table.dense_grad_name_size(); ++j) {
for (int j = 0; j < table.dense_grad_name_size(); ++j) {
dense_grad_names_[table_id][j] = table.dense_grad_name(j);
}
}
skip_ops_.resize(param_.skip_ops_size());
for (size_t i = 0; i < param_.skip_ops_size(); ++i) {
for (int i = 0; i < param_.skip_ops_size(); ++i) {
skip_ops_[i] = param_.skip_ops(i);
}
......@@ -83,14 +83,14 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) {
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int64_t* label_ptr = tensor->data<int64_t>();
int global_index = 0;
size_t global_index = 0;
for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) {
VLOG(3) << "sparse_key_names_[" << i
<< "]: " << sparse_key_names_[table_id][i];
Variable* fea_var = thread_scope_->FindVar(sparse_key_names_[table_id][i]);
LoDTensor* tensor = fea_var->GetMutable<LoDTensor>();
int64_t* ids = tensor->data<int64_t>();
int fea_idx = 0;
size_t fea_idx = 0;
// tensor->lod()[0].size() == batch_size + 1
for (auto lod_idx = 1u; lod_idx < tensor->lod()[0].size(); ++lod_idx) {
for (; fea_idx < tensor->lod()[0][lod_idx]; ++fea_idx) {
......@@ -138,7 +138,7 @@ void DownpourWorker::FillSparseValue(size_t table_idx) {
auto& tensor_lod = tensor->lod()[0];
LoD data_lod{tensor_lod};
tensor_emb->set_lod(data_lod);
for (auto index = 0u; index < len; ++index) {
for (int index = 0; index < len; ++index) {
if (ids[index] == 0u) {
memcpy(ptr + table.emb_dim() * index, init_value.data() + 2,
sizeof(float) * table.emb_dim());
......@@ -192,7 +192,7 @@ void DownpourWorker::TrainFilesWithProfiler() {
read_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
VLOG(3) << "program config size: " << param_.program_config_size();
for (size_t i = 0; i < param_.program_config(0).pull_sparse_table_id_size();
for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).pull_sparse_table_id(i));
......@@ -244,8 +244,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
}
if (need_to_push_sparse_) {
for (size_t i = 0;
i < param_.program_config(0).push_sparse_table_id_size(); ++i) {
for (int i = 0; i < param_.program_config(0).push_sparse_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_sparse_table_id(i));
TableParameter table;
......@@ -268,8 +268,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
if (need_to_push_dense_) {
timeline.Start();
for (size_t i = 0;
i < param_.program_config(0).push_dense_table_id_size(); ++i) {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync(
......@@ -315,8 +315,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
}
if (need_to_push_dense_) {
for (size_t i = 0;
i < param_.program_config(0).push_dense_table_id_size(); ++i) {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
pull_dense_worker_->IncreaseThreadVersion(thread_id_, tid);
......@@ -362,7 +362,7 @@ void DownpourWorker::TrainFiles() {
int cur_batch;
while ((cur_batch = device_reader_->Next()) > 0) {
// pull sparse here
for (size_t i = 0; i < param_.program_config(0).pull_sparse_table_id_size();
for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).pull_sparse_table_id(i));
......@@ -397,8 +397,8 @@ void DownpourWorker::TrainFiles() {
if (need_to_push_sparse_) {
// push gradients here
for (size_t i = 0;
i < param_.program_config(0).push_sparse_table_id_size(); ++i) {
for (int i = 0; i < param_.program_config(0).push_sparse_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_sparse_table_id(i));
TableParameter table;
......@@ -416,8 +416,8 @@ void DownpourWorker::TrainFiles() {
}
if (need_to_push_dense_) {
for (size_t i = 0;
i < param_.program_config(0).push_dense_table_id_size(); ++i) {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync(
......@@ -461,8 +461,8 @@ void DownpourWorker::TrainFiles() {
}
if (need_to_push_dense_) {
for (size_t i = 0;
i < param_.program_config(0).push_dense_table_id_size(); ++i) {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
pull_dense_worker_->IncreaseThreadVersion(thread_id_, tid);
......
......@@ -221,7 +221,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
PADDLE_ENFORCE(!member_->use_cuda_,
"gpu mode does not support async_mode_ now!");
graphs.push_back(graph);
for (int i = 1; i < places.size(); ++i) {
for (size_t i = 1; i < places.size(); ++i) {
auto *tmp_graph = new ir::Graph(graph->OriginProgram());
async_graphs_.emplace_back(tmp_graph);
graphs.push_back(tmp_graph);
......@@ -315,7 +315,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1,
member_->use_cuda_, member_->nccl_ctxs_.get());
for (int i = 1; i < member_->places_.size(); ++i) {
for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] =
build_strategy.Apply(graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1,
......
......@@ -76,7 +76,7 @@ message PullDenseWorkerParameter {
message TableParameter {
// dense table only
optional int64 table_id = 1;
optional uint64 table_id = 1;
repeated string dense_value_name = 2;
repeated string dense_grad_name = 3;
repeated int32 push_dense_wait_times = 5;
......
......@@ -136,6 +136,7 @@ class DatasetBase(object):
slot_var.name = var.name
if var.lod_level == 0:
slot_var.is_dense = True
slot_var.shape.extend(var.shape)
if var.dtype == core.VarDesc.VarType.FP32:
slot_var.type = "float"
elif var.dtype == core.VarDesc.VarType.INT64:
......
......@@ -712,10 +712,6 @@ class Executor(object):
if dataset == None:
raise RuntimeError("dataset is needed and should be initialized")
if not isinstance(self.place, core.CPUPlace):
raise RuntimeError("infer_from_dataset is verified on CPUPlace"
"We will open CUDAPlace in the future")
scope, trainer = self._prepare_trainer(
program=program,
dataset=dataset,
......@@ -796,10 +792,6 @@ class Executor(object):
if dataset == None:
raise RuntimeError("dataset is need and should be initialized")
if not isinstance(self.place, core.CPUPlace):
raise RuntimeError("train_from_dataset is verified on CPUPlace"
"We will open CUDAPlace in the future")
scope, trainer = self._prepare_trainer(
program=program,
dataset=dataset,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册