diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index b5f7e6c22405d6928f0e423458d6cd720f2d09a8..365c80da34eb287f50d2f0dcbf3844001ab43ec8 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -72,7 +72,6 @@ bool DataFeed::PickOneFile(std::string* filename) { } VLOG(3) << "file_idx_=" << *file_idx_; *filename = filelist_[(*file_idx_)++]; - // LOG(ERROR) << "pick file:" << *filename; return true; } @@ -466,6 +465,17 @@ void MultiSlotDataFeed::Init( if (slot.is_used()) { use_slots_.push_back(all_slots_[i]); use_slots_is_dense_.push_back(slot.is_dense()); + std::vector local_shape; + if (slot.is_dense()) { + // for batch size holder if is_dense + if (slot.shape(0) > 0) { + local_shape.push_back(0); + } + } + for (size_t i = 0; i < slot.shape_size(); ++i) { + local_shape.push_back(slot.shape(i)); + } + use_slots_shape_.push_back(local_shape); } } feed_vec_.resize(use_slots_.size()); @@ -752,8 +762,8 @@ void MultiSlotDataFeed::PutToFeedVec( LoD data_lod{offset}; feed_vec_[i]->set_lod(data_lod); if (use_slots_is_dense_[i]) { - int dim = total_instance / batch_size_; - feed_vec_[i]->Resize({batch_size_, dim}); + use_slots_shape_[i][0] = batch_size_; + feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i])); } } #endif @@ -785,6 +795,16 @@ void MultiSlotInMemoryDataFeed::Init( if (slot.is_used()) { use_slots_.push_back(all_slots_[i]); use_slots_is_dense_.push_back(slot.is_dense()); + std::vector local_shape; + if (slot.is_dense()) { + if (slot.shape(0) > 0) { + local_shape.push_back(0); + } + } + for (size_t i = 0; i < slot.shape_size(); ++i) { + local_shape.push_back(slot.shape(i)); + } + use_slots_shape_.push_back(local_shape); } } feed_vec_.resize(use_slots_.size()); @@ -940,8 +960,8 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( LoD data_lod{offset}; feed_vec_[i]->set_lod(data_lod); if (use_slots_is_dense_[i]) { - int dim = total_instance / batch_size_; - feed_vec_[i]->Resize({batch_size_, dim}); + use_slots_shape_[i][0] = batch_size_; + feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i])); } } #endif diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 648c874a0b8763b18118e18adf3b3e93acfd104b..d098c7858a98c644bd3cad78d3cf1e3b35ca026b 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -142,6 +142,7 @@ class DataFeed { // object) std::vector all_slots_; std::vector all_slots_type_; + std::vector> use_slots_shape_; std::vector use_slots_index_; // -1: not used; >=0: the index of use_slots_ diff --git a/paddle/fluid/framework/data_feed.proto b/paddle/fluid/framework/data_feed.proto index 77911306299b77748a2ad9437d49680748885003..03996e0e20a1729ee300a5ad37abc325876930b7 100644 --- a/paddle/fluid/framework/data_feed.proto +++ b/paddle/fluid/framework/data_feed.proto @@ -19,6 +19,7 @@ message Slot { required string type = 2; optional bool is_dense = 3 [ default = false ]; optional bool is_used = 4 [ default = false ]; + repeated int32 shape = 5; // we can define N-D Tensor } message MultiSlotDesc { repeated Slot slots = 1; } diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 4ca7842fa261a1b8178438d35ca5d626146663d4..386ffd84c57063e950cd8b0d57304c66190be4c4 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -21,40 +21,40 @@ namespace framework { void DownpourWorker::Initialize(const TrainerDesc& desc) { param_ = desc.downpour_param(); - for (size_t i = 0; i < param_.sparse_table_size(); ++i) { + for (int i = 0; i < param_.sparse_table_size(); ++i) { uint64_t table_id = static_cast(param_.sparse_table(i).table_id()); TableParameter table = param_.sparse_table(i); sparse_key_names_[table_id].resize(table.sparse_key_name_size()); - for (size_t j = 0; j < table.sparse_key_name_size(); ++j) { + for (int j = 0; j < table.sparse_key_name_size(); ++j) { sparse_key_names_[table_id][j] = table.sparse_key_name(j); } sparse_value_names_[table_id].resize(table.sparse_value_name_size()); - for (size_t j = 0; j < table.sparse_value_name_size(); ++j) { + for (int j = 0; j < table.sparse_value_name_size(); ++j) { sparse_value_names_[table_id][j] = table.sparse_value_name(j); } sparse_grad_names_[table_id].resize(table.sparse_grad_name_size()); - for (size_t j = 0; j < table.sparse_grad_name_size(); ++j) { + for (int j = 0; j < table.sparse_grad_name_size(); ++j) { sparse_grad_names_[table_id][j] = table.sparse_grad_name(j); } label_var_name_[table_id] = table.label_var_name(); } - for (size_t i = 0; i < param_.dense_table_size(); ++i) { + for (int i = 0; i < param_.dense_table_size(); ++i) { uint64_t table_id = static_cast(param_.dense_table(i).table_id()); auto table = param_.dense_table(i); dense_value_names_[table_id].resize(table.dense_value_name_size()); - for (size_t j = 0; j < table.dense_value_name_size(); ++j) { + for (int j = 0; j < table.dense_value_name_size(); ++j) { dense_value_names_[table_id][j] = table.dense_value_name(j); } dense_grad_names_[table_id].resize(table.dense_grad_name_size()); - for (size_t j = 0; j < table.dense_grad_name_size(); ++j) { + for (int j = 0; j < table.dense_grad_name_size(); ++j) { dense_grad_names_[table_id][j] = table.dense_grad_name(j); } } skip_ops_.resize(param_.skip_ops_size()); - for (size_t i = 0; i < param_.skip_ops_size(); ++i) { + for (int i = 0; i < param_.skip_ops_size(); ++i) { skip_ops_[i] = param_.skip_ops(i); } @@ -83,14 +83,14 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { LoDTensor* tensor = var->GetMutable(); int64_t* label_ptr = tensor->data(); - int global_index = 0; + size_t global_index = 0; for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) { VLOG(3) << "sparse_key_names_[" << i << "]: " << sparse_key_names_[table_id][i]; Variable* fea_var = thread_scope_->FindVar(sparse_key_names_[table_id][i]); LoDTensor* tensor = fea_var->GetMutable(); int64_t* ids = tensor->data(); - int fea_idx = 0; + size_t fea_idx = 0; // tensor->lod()[0].size() == batch_size + 1 for (auto lod_idx = 1u; lod_idx < tensor->lod()[0].size(); ++lod_idx) { for (; fea_idx < tensor->lod()[0][lod_idx]; ++fea_idx) { @@ -138,7 +138,7 @@ void DownpourWorker::FillSparseValue(size_t table_idx) { auto& tensor_lod = tensor->lod()[0]; LoD data_lod{tensor_lod}; tensor_emb->set_lod(data_lod); - for (auto index = 0u; index < len; ++index) { + for (int index = 0; index < len; ++index) { if (ids[index] == 0u) { memcpy(ptr + table.emb_dim() * index, init_value.data() + 2, sizeof(float) * table.emb_dim()); @@ -192,7 +192,7 @@ void DownpourWorker::TrainFilesWithProfiler() { read_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec(); VLOG(3) << "program config size: " << param_.program_config_size(); - for (size_t i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); + for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); ++i) { uint64_t tid = static_cast( param_.program_config(0).pull_sparse_table_id(i)); @@ -244,8 +244,8 @@ void DownpourWorker::TrainFilesWithProfiler() { } if (need_to_push_sparse_) { - for (size_t i = 0; - i < param_.program_config(0).push_sparse_table_id_size(); ++i) { + for (int i = 0; i < param_.program_config(0).push_sparse_table_id_size(); + ++i) { uint64_t tid = static_cast( param_.program_config(0).push_sparse_table_id(i)); TableParameter table; @@ -268,8 +268,8 @@ void DownpourWorker::TrainFilesWithProfiler() { if (need_to_push_dense_) { timeline.Start(); - for (size_t i = 0; - i < param_.program_config(0).push_dense_table_id_size(); ++i) { + for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { uint64_t tid = static_cast( param_.program_config(0).push_dense_table_id(i)); fleet_ptr_->PushDenseVarsAsync( @@ -315,8 +315,8 @@ void DownpourWorker::TrainFilesWithProfiler() { } if (need_to_push_dense_) { - for (size_t i = 0; - i < param_.program_config(0).push_dense_table_id_size(); ++i) { + for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { uint64_t tid = static_cast( param_.program_config(0).push_dense_table_id(i)); pull_dense_worker_->IncreaseThreadVersion(thread_id_, tid); @@ -362,7 +362,7 @@ void DownpourWorker::TrainFiles() { int cur_batch; while ((cur_batch = device_reader_->Next()) > 0) { // pull sparse here - for (size_t i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); + for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); ++i) { uint64_t tid = static_cast( param_.program_config(0).pull_sparse_table_id(i)); @@ -397,8 +397,8 @@ void DownpourWorker::TrainFiles() { if (need_to_push_sparse_) { // push gradients here - for (size_t i = 0; - i < param_.program_config(0).push_sparse_table_id_size(); ++i) { + for (int i = 0; i < param_.program_config(0).push_sparse_table_id_size(); + ++i) { uint64_t tid = static_cast( param_.program_config(0).push_sparse_table_id(i)); TableParameter table; @@ -416,8 +416,8 @@ void DownpourWorker::TrainFiles() { } if (need_to_push_dense_) { - for (size_t i = 0; - i < param_.program_config(0).push_dense_table_id_size(); ++i) { + for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { uint64_t tid = static_cast( param_.program_config(0).push_dense_table_id(i)); fleet_ptr_->PushDenseVarsAsync( @@ -461,8 +461,8 @@ void DownpourWorker::TrainFiles() { } if (need_to_push_dense_) { - for (size_t i = 0; - i < param_.program_config(0).push_dense_table_id_size(); ++i) { + for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { uint64_t tid = static_cast( param_.program_config(0).push_dense_table_id(i)); pull_dense_worker_->IncreaseThreadVersion(thread_id_, tid); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 4245caf1689c76d72b410c742488c55562c8b998..c4bf2b7e8c017b22f917c9f9bd40e75b8cde08b2 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -221,7 +221,7 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, PADDLE_ENFORCE(!member_->use_cuda_, "gpu mode does not support async_mode_ now!"); graphs.push_back(graph); - for (int i = 1; i < places.size(); ++i) { + for (size_t i = 1; i < places.size(); ++i) { auto *tmp_graph = new ir::Graph(graph->OriginProgram()); async_graphs_.emplace_back(tmp_graph); graphs.push_back(tmp_graph); @@ -315,7 +315,7 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name, {member_->local_scopes_[0]}, 1, member_->use_cuda_, member_->nccl_ctxs_.get()); - for (int i = 1; i < member_->places_.size(); ++i) { + for (size_t i = 1; i < member_->places_.size(); ++i) { graphs[i] = build_strategy.Apply(graphs[i], {member_->places_[i]}, loss_var_name, {member_->local_scopes_[i]}, 1, diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 389c1a870fb54ad28806ad49632323b1c93676f4..4fc05ccf5c9be37e80b4ae7263166ad76eb6d6a7 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -76,7 +76,7 @@ message PullDenseWorkerParameter { message TableParameter { // dense table only - optional int64 table_id = 1; + optional uint64 table_id = 1; repeated string dense_value_name = 2; repeated string dense_grad_name = 3; repeated int32 push_dense_wait_times = 5; diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index e655fd4a976a8a6fa2811ddc43de3d1f231229d5..1a023f61675ed62c141bb6e71fabbdf0086b0c64 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -136,6 +136,7 @@ class DatasetBase(object): slot_var.name = var.name if var.lod_level == 0: slot_var.is_dense = True + slot_var.shape.extend(var.shape) if var.dtype == core.VarDesc.VarType.FP32: slot_var.type = "float" elif var.dtype == core.VarDesc.VarType.INT64: diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index e15197037e1d901855883919b02a1574b7bc9a29..fa8b49a021294e8555e979459615b1956d9b2b55 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -712,10 +712,6 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is needed and should be initialized") - if not isinstance(self.place, core.CPUPlace): - raise RuntimeError("infer_from_dataset is verified on CPUPlace" - "We will open CUDAPlace in the future") - scope, trainer = self._prepare_trainer( program=program, dataset=dataset, @@ -796,10 +792,6 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is need and should be initialized") - if not isinstance(self.place, core.CPUPlace): - raise RuntimeError("train_from_dataset is verified on CPUPlace" - "We will open CUDAPlace in the future") - scope, trainer = self._prepare_trainer( program=program, dataset=dataset,