未验证 提交 5a80cc84 编写于 作者: H hutuxian 提交者: GitHub

Datafeed support reading to cuda place directly. (#19071)

* add a place field in DataFeed to denote which place it will feed data to.
* abstract the copy process in CopyToFeedTensor function
* add UT for float32 type and for CUDAPlace
上级 3f4c088a
...@@ -101,6 +101,18 @@ void DataFeed::AssignFeedVar(const Scope& scope) { ...@@ -101,6 +101,18 @@ void DataFeed::AssignFeedVar(const Scope& scope) {
} }
} }
void DataFeed::CopyToFeedTensor(void* dst, const void* src, size_t size) {
if (platform::is_cpu_place(this->place_)) {
memcpy(dst, src, size);
} else {
#ifdef PADDLE_WITH_CUDA
cudaMemcpy(dst, src, size, cudaMemcpyHostToDevice);
#else
PADDLE_THROW("Not supported GPU, Please compile WITH_GPU option");
#endif
}
}
template <typename T> template <typename T>
void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) { void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) {
PADDLE_ENFORCE(queue_size > 0, "Illegal queue size: %d.", queue_size); PADDLE_ENFORCE(queue_size > 0, "Illegal queue size: %d.", queue_size);
...@@ -612,15 +624,16 @@ void MultiSlotDataFeed::PutToFeedVec( ...@@ -612,15 +624,16 @@ void MultiSlotDataFeed::PutToFeedVec(
if (type[0] == 'f') { // float if (type[0] == 'f') { // float
const auto& feasign = ins_vec[i].GetFloatData(); const auto& feasign = ins_vec[i].GetFloatData();
float* tensor_ptr = feed_vec_[i]->mutable_data<float>( float* tensor_ptr =
{total_instance, 1}, platform::CPUPlace()); feed_vec_[i]->mutable_data<float>({total_instance, 1}, this->place_);
memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float)); CopyToFeedTensor(tensor_ptr, &feasign[0], total_instance * sizeof(float));
} else if (type[0] == 'u') { // uint64 } else if (type[0] == 'u') { // uint64
// no uint64_t type in paddlepaddle // no uint64_t type in paddlepaddle
const auto& feasign = ins_vec[i].GetUint64Data(); const auto& feasign = ins_vec[i].GetUint64Data();
int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>( int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>(
{total_instance, 1}, platform::CPUPlace()); {total_instance, 1}, this->place_);
memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t)); CopyToFeedTensor(tensor_ptr, &feasign[0],
total_instance * sizeof(int64_t));
} }
LoD data_lod{offset}; LoD data_lod{offset};
...@@ -874,15 +887,15 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( ...@@ -874,15 +887,15 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec(
const auto& type = all_slots_type_[i]; const auto& type = all_slots_type_[i];
if (type[0] == 'f') { // float if (type[0] == 'f') { // float
float* feasign = batch_float_feasigns[i].data(); float* feasign = batch_float_feasigns[i].data();
float* tensor_ptr = feed_vec_[i]->mutable_data<float>( float* tensor_ptr =
{total_instance, 1}, platform::CPUPlace()); feed_vec_[i]->mutable_data<float>({total_instance, 1}, this->place_);
memcpy(tensor_ptr, feasign, total_instance * sizeof(float)); CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(float));
} else if (type[0] == 'u') { // uint64 } else if (type[0] == 'u') { // uint64
// no uint64_t type in paddlepaddle // no uint64_t type in paddlepaddle
uint64_t* feasign = batch_uint64_feasigns[i].data(); uint64_t* feasign = batch_uint64_feasigns[i].data();
int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>( int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>(
{total_instance, 1}, platform::CPUPlace()); {total_instance, 1}, this->place_);
memcpy(tensor_ptr, feasign, total_instance * sizeof(int64_t)); CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(int64_t));
} }
auto& slot_offset = offset[i]; auto& slot_offset = offset[i];
LoD data_lod{slot_offset}; LoD data_lod{slot_offset};
...@@ -908,15 +921,16 @@ void PrivateInstantDataFeed<T>::PutToFeedVec() { ...@@ -908,15 +921,16 @@ void PrivateInstantDataFeed<T>::PutToFeedVec() {
if (type[0] == 'f') { // float if (type[0] == 'f') { // float
const auto& feasign = ins_vec_[i].GetFloatData(); const auto& feasign = ins_vec_[i].GetFloatData();
float* tensor_ptr = feed_vec_[i]->mutable_data<float>( float* tensor_ptr =
{total_instance, 1}, platform::CPUPlace()); feed_vec_[i]->mutable_data<float>({total_instance, 1}, this->place_);
memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float)); CopyToFeedTensor(tensor_ptr, &feasign[0], total_instance * sizeof(float));
} else if (type[0] == 'u') { // uint64 } else if (type[0] == 'u') { // uint64
// no uint64_t type in paddlepaddle // no uint64_t type in paddlepaddle
const auto& feasign = ins_vec_[i].GetUint64Data(); const auto& feasign = ins_vec_[i].GetUint64Data();
int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>( int64_t* tensor_ptr = feed_vec_[i]->mutable_data<int64_t>(
{total_instance, 1}, platform::CPUPlace()); {total_instance, 1}, this->place_);
memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t)); CopyToFeedTensor(tensor_ptr, &feasign[0],
total_instance * sizeof(int64_t));
} }
LoD data_lod{offset}; LoD data_lod{offset};
......
...@@ -111,6 +111,10 @@ class DataFeed { ...@@ -111,6 +111,10 @@ class DataFeed {
virtual void LoadIntoMemory() { virtual void LoadIntoMemory() {
PADDLE_THROW("This function(LoadIntoMemory) is not implemented."); PADDLE_THROW("This function(LoadIntoMemory) is not implemented.");
} }
virtual void SetPlace(const paddle::platform::Place& place) {
place_ = place;
}
virtual const paddle::platform::Place& GetPlace() const { return place_; }
protected: protected:
// The following three functions are used to check if it is executed in this // The following three functions are used to check if it is executed in this
...@@ -124,6 +128,7 @@ class DataFeed { ...@@ -124,6 +128,7 @@ class DataFeed {
// This function is used to pick one file from the global filelist(thread // This function is used to pick one file from the global filelist(thread
// safe). // safe).
virtual bool PickOneFile(std::string* filename); virtual bool PickOneFile(std::string* filename);
virtual void CopyToFeedTensor(void* dst, const void* src, size_t size);
std::vector<std::string> filelist_; std::vector<std::string> filelist_;
size_t* file_idx_; size_t* file_idx_;
...@@ -158,6 +163,7 @@ class DataFeed { ...@@ -158,6 +163,7 @@ class DataFeed {
bool finish_set_filelist_; bool finish_set_filelist_;
bool finish_start_; bool finish_start_;
std::string pipe_command_; std::string pipe_command_;
platform::Place place_;
}; };
// PrivateQueueDataFeed is the base virtual class for ohther DataFeeds. // PrivateQueueDataFeed is the base virtual class for ohther DataFeeds.
......
...@@ -117,6 +117,9 @@ class DeviceWorker { ...@@ -117,6 +117,9 @@ class DeviceWorker {
virtual void SetPlace(const paddle::platform::Place& place) { virtual void SetPlace(const paddle::platform::Place& place) {
place_ = place; place_ = place;
} }
virtual void SetReaderPlace(const paddle::platform::Place& place) {
device_reader_->SetPlace(place);
}
protected: protected:
Scope* root_scope_ = nullptr; Scope* root_scope_ = nullptr;
......
...@@ -50,6 +50,7 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program, ...@@ -50,6 +50,7 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) { const platform::Place& place) {
for (int i = 0; i < thread_num_; ++i) { for (int i = 0; i < thread_num_; ++i) {
workers_[i]->SetPlace(place); workers_[i]->SetPlace(place);
workers_[i]->SetReaderPlace(place);
workers_[i]->SetRootScope(root_scope_); workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory(); workers_[i]->BindingDataFeedMemory();
......
...@@ -101,6 +101,7 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, ...@@ -101,6 +101,7 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc,
this_worker->SetPipelineNum(pipeline_num_); this_worker->SetPipelineNum(pipeline_num_);
if (i == 0) { if (i == 0) {
this_worker->SetDataFeed(readers[reader_index++]); this_worker->SetDataFeed(readers[reader_index++]);
this_worker->SetReaderPlace(place);
} }
this_worker->SetPlace(place); this_worker->SetPlace(place);
this_worker->Initialize(trainer_desc); this_worker->Initialize(trainer_desc);
......
...@@ -18,6 +18,7 @@ including create, config, run, etc. ...@@ -18,6 +18,7 @@ including create, config, run, etc.
from __future__ import print_function from __future__ import print_function
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core
import numpy as np import numpy as np
import os import os
import shutil import shutil
...@@ -123,6 +124,57 @@ class TestDataset(unittest.TestCase): ...@@ -123,6 +124,57 @@ class TestDataset(unittest.TestCase):
os.remove("./test_in_memory_dataset_run_a.txt") os.remove("./test_in_memory_dataset_run_a.txt")
os.remove("./test_in_memory_dataset_run_b.txt") os.remove("./test_in_memory_dataset_run_b.txt")
def test_in_memory_dataset_run_2(self):
"""
Testcase for InMemoryDataset from create to run.
Use CUDAPlace
Use float type id
"""
with open("test_in_memory_dataset_run_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data)
with open("test_in_memory_dataset_run_b.txt", "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
f.write(data)
slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
slots_vars = []
for slot in slots:
var = fluid.layers.data(
name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var)
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_batch_size(32)
dataset.set_thread(3)
dataset.set_filelist([
"test_in_memory_dataset_run_a.txt",
"test_in_memory_dataset_run_b.txt"
])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory()
dataset.local_shuffle()
exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0))
exe.run(fluid.default_startup_program())
for i in range(2):
try:
exe.train_from_dataset(fluid.default_main_program(), dataset)
except ImportError as e:
pass
except Exception as e:
self.assertTrue(False)
os.remove("./test_in_memory_dataset_run_a.txt")
os.remove("./test_in_memory_dataset_run_b.txt")
def test_queue_dataset_run(self): def test_queue_dataset_run(self):
""" """
Testcase for QueueDataset from create to run. Testcase for QueueDataset from create to run.
...@@ -167,6 +219,53 @@ class TestDataset(unittest.TestCase): ...@@ -167,6 +219,53 @@ class TestDataset(unittest.TestCase):
os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt") os.remove("./test_queue_dataset_run_b.txt")
def test_queue_dataset_run_2(self):
"""
Testcase for QueueDataset from create to run.
Use CUDAPlace
Use float type id
"""
with open("test_queue_dataset_run_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data)
with open("test_queue_dataset_run_b.txt", "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
f.write(data)
slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
slots_vars = []
for slot in slots:
var = fluid.layers.data(
name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var)
dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
dataset.set_batch_size(32)
dataset.set_thread(3)
dataset.set_filelist(
["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0))
exe.run(fluid.default_startup_program())
for i in range(2):
try:
exe.train_from_dataset(fluid.default_main_program(), dataset)
except ImportError as e:
pass
except Exception as e:
self.assertTrue(False)
os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册