From 5a80cc8431431fdda035acd3508f6edead64daf4 Mon Sep 17 00:00:00 2001 From: hutuxian Date: Sat, 10 Aug 2019 10:20:44 +0800 Subject: [PATCH] Datafeed support reading to cuda place directly. (#19071) * add a place field in DataFeed to denote which place it will feed data to. * abstract the copy process in CopyToFeedTensor function * add UT for float32 type and for CUDAPlace --- paddle/fluid/framework/data_feed.cc | 44 ++++++--- paddle/fluid/framework/data_feed.h | 6 ++ paddle/fluid/framework/device_worker.h | 3 + paddle/fluid/framework/multi_trainer.cc | 1 + paddle/fluid/framework/pipeline_trainer.cc | 1 + .../fluid/tests/unittests/test_dataset.py | 99 +++++++++++++++++++ 6 files changed, 139 insertions(+), 15 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 1ed472de6c..627b7790e8 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -101,6 +101,18 @@ void DataFeed::AssignFeedVar(const Scope& scope) { } } +void DataFeed::CopyToFeedTensor(void* dst, const void* src, size_t size) { + if (platform::is_cpu_place(this->place_)) { + memcpy(dst, src, size); + } else { +#ifdef PADDLE_WITH_CUDA + cudaMemcpy(dst, src, size, cudaMemcpyHostToDevice); +#else + PADDLE_THROW("Not supported GPU, Please compile WITH_GPU option"); +#endif + } +} + template void PrivateQueueDataFeed::SetQueueSize(int queue_size) { PADDLE_ENFORCE(queue_size > 0, "Illegal queue size: %d.", queue_size); @@ -612,15 +624,16 @@ void MultiSlotDataFeed::PutToFeedVec( if (type[0] == 'f') { // float const auto& feasign = ins_vec[i].GetFloatData(); - float* tensor_ptr = feed_vec_[i]->mutable_data( - {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float)); + float* tensor_ptr = + feed_vec_[i]->mutable_data({total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, &feasign[0], total_instance * sizeof(float)); } else if (type[0] == 'u') { // uint64 // no uint64_t type in paddlepaddle const auto& feasign = ins_vec[i].GetUint64Data(); int64_t* tensor_ptr = feed_vec_[i]->mutable_data( - {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t)); + {total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, &feasign[0], + total_instance * sizeof(int64_t)); } LoD data_lod{offset}; @@ -874,15 +887,15 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( const auto& type = all_slots_type_[i]; if (type[0] == 'f') { // float float* feasign = batch_float_feasigns[i].data(); - float* tensor_ptr = feed_vec_[i]->mutable_data( - {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, feasign, total_instance * sizeof(float)); + float* tensor_ptr = + feed_vec_[i]->mutable_data({total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(float)); } else if (type[0] == 'u') { // uint64 // no uint64_t type in paddlepaddle uint64_t* feasign = batch_uint64_feasigns[i].data(); int64_t* tensor_ptr = feed_vec_[i]->mutable_data( - {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, feasign, total_instance * sizeof(int64_t)); + {total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(int64_t)); } auto& slot_offset = offset[i]; LoD data_lod{slot_offset}; @@ -908,15 +921,16 @@ void PrivateInstantDataFeed::PutToFeedVec() { if (type[0] == 'f') { // float const auto& feasign = ins_vec_[i].GetFloatData(); - float* tensor_ptr = feed_vec_[i]->mutable_data( - {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float)); + float* tensor_ptr = + feed_vec_[i]->mutable_data({total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, &feasign[0], total_instance * sizeof(float)); } else if (type[0] == 'u') { // uint64 // no uint64_t type in paddlepaddle const auto& feasign = ins_vec_[i].GetUint64Data(); int64_t* tensor_ptr = feed_vec_[i]->mutable_data( - {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t)); + {total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, &feasign[0], + total_instance * sizeof(int64_t)); } LoD data_lod{offset}; diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 7164834cf8..400212f1d2 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -111,6 +111,10 @@ class DataFeed { virtual void LoadIntoMemory() { PADDLE_THROW("This function(LoadIntoMemory) is not implemented."); } + virtual void SetPlace(const paddle::platform::Place& place) { + place_ = place; + } + virtual const paddle::platform::Place& GetPlace() const { return place_; } protected: // The following three functions are used to check if it is executed in this @@ -124,6 +128,7 @@ class DataFeed { // This function is used to pick one file from the global filelist(thread // safe). virtual bool PickOneFile(std::string* filename); + virtual void CopyToFeedTensor(void* dst, const void* src, size_t size); std::vector filelist_; size_t* file_idx_; @@ -158,6 +163,7 @@ class DataFeed { bool finish_set_filelist_; bool finish_start_; std::string pipe_command_; + platform::Place place_; }; // PrivateQueueDataFeed is the base virtual class for ohther DataFeeds. diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 5aa6793422..d0c40210a5 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -117,6 +117,9 @@ class DeviceWorker { virtual void SetPlace(const paddle::platform::Place& place) { place_ = place; } + virtual void SetReaderPlace(const paddle::platform::Place& place) { + device_reader_->SetPlace(place); + } protected: Scope* root_scope_ = nullptr; diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index 8cbf2efa81..f81948c4da 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -50,6 +50,7 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program, const platform::Place& place) { for (int i = 0; i < thread_num_; ++i) { workers_[i]->SetPlace(place); + workers_[i]->SetReaderPlace(place); workers_[i]->SetRootScope(root_scope_); workers_[i]->CreateDeviceResource(main_program); // Program workers_[i]->BindingDataFeedMemory(); diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 916359ab6b..3617a8f188 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -101,6 +101,7 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, this_worker->SetPipelineNum(pipeline_num_); if (i == 0) { this_worker->SetDataFeed(readers[reader_index++]); + this_worker->SetReaderPlace(place); } this_worker->SetPlace(place); this_worker->Initialize(trainer_desc); diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index bce3c24dc8..cd12cd2307 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -18,6 +18,7 @@ including create, config, run, etc. from __future__ import print_function import paddle.fluid as fluid +import paddle.fluid.core as core import numpy as np import os import shutil @@ -123,6 +124,57 @@ class TestDataset(unittest.TestCase): os.remove("./test_in_memory_dataset_run_a.txt") os.remove("./test_in_memory_dataset_run_b.txt") + def test_in_memory_dataset_run_2(self): + """ + Testcase for InMemoryDataset from create to run. + Use CUDAPlace + Use float type id + """ + with open("test_in_memory_dataset_run_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_in_memory_dataset_run_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data( + name=slot, shape=[1], dtype="float32", lod_level=1) + slots_vars.append(var) + + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist([ + "test_in_memory_dataset_run_a.txt", + "test_in_memory_dataset_run_b.txt" + ]) + dataset.set_pipe_command("cat") + dataset.set_use_var(slots_vars) + dataset.load_into_memory() + dataset.local_shuffle() + + exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( + ) else fluid.CUDAPlace(0)) + exe.run(fluid.default_startup_program()) + for i in range(2): + try: + exe.train_from_dataset(fluid.default_main_program(), dataset) + except ImportError as e: + pass + except Exception as e: + self.assertTrue(False) + + os.remove("./test_in_memory_dataset_run_a.txt") + os.remove("./test_in_memory_dataset_run_b.txt") + def test_queue_dataset_run(self): """ Testcase for QueueDataset from create to run. @@ -167,6 +219,53 @@ class TestDataset(unittest.TestCase): os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_b.txt") + def test_queue_dataset_run_2(self): + """ + Testcase for QueueDataset from create to run. + Use CUDAPlace + Use float type id + """ + with open("test_queue_dataset_run_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_queue_dataset_run_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data( + name=slot, shape=[1], dtype="float32", lod_level=1) + slots_vars.append(var) + + dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist( + ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) + dataset.set_pipe_command("cat") + dataset.set_use_var(slots_vars) + + exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( + ) else fluid.CUDAPlace(0)) + exe.run(fluid.default_startup_program()) + for i in range(2): + try: + exe.train_from_dataset(fluid.default_main_program(), dataset) + except ImportError as e: + pass + except Exception as e: + self.assertTrue(False) + + os.remove("./test_queue_dataset_run_a.txt") + os.remove("./test_queue_dataset_run_b.txt") + if __name__ == '__main__': unittest.main() -- GitLab