提交 3515e5fc 编写于 作者: R rensilin

datareader_openmp

Change-Id: I0ece0047ec23c22df6ee480969bad2fbbf2ef075
上级 23757ef5
...@@ -11,7 +11,7 @@ INCPATHS('$OUT/../') ...@@ -11,7 +11,7 @@ INCPATHS('$OUT/../')
INCPATHS('../../third-party') INCPATHS('../../third-party')
INCPATHS('../../third-party/eigen') INCPATHS('../../third-party/eigen')
INCPATHS('$OUT_ROOT/baidu/third-party/python/output/include/python2.7') INCPATHS('$OUT_ROOT/baidu/third-party/python/output/include/python2.7')
LDFLAGS('-lpthread -lcrypto -lrt -ldl -lssl -lz -lrt -lgomp') LDFLAGS('-lpthread -lcrypto -lrt -ldl -lssl -lz -lgomp -fopenmp')
CONFIGS('baidu/third-party/any@15595d8324be9e8a9a80d9ae442fdd12bd66df5d@git_branch') CONFIGS('baidu/third-party/any@15595d8324be9e8a9a80d9ae442fdd12bd66df5d@git_branch')
CONFIGS('baidu/third-party/boost@v1.41.0@git_branch') CONFIGS('baidu/third-party/boost@v1.41.0@git_branch')
CONFIGS('baidu/third-party/c-ares@v1.13.0@git_branch') CONFIGS('baidu/third-party/c-ares@v1.13.0@git_branch')
...@@ -88,12 +88,11 @@ custom_trainer_src = GLOB('paddle/fluid/train/custom_trainer/feed/*/*.cc', Exclu ...@@ -88,12 +88,11 @@ custom_trainer_src = GLOB('paddle/fluid/train/custom_trainer/feed/*/*.cc', Exclu
CPPFLAGS_STR = '-DHPPL_STUB_FUNC -DLAPACK_FOUND -DPADDLE_DISABLE_PROFILER -DPADDLE_NO_PYTHON -DCUSTOM_TRAINER -DPADDLE_ON_INFERENCE -DPADDLE_USE_DSO -DPADDLE_USE_PTHREAD_BARRIER -DPADDLE_USE_PTHREAD_SPINLOCK -DPADDLE_VERSION=0.0.0 -DPADDLE_WITH_AVX -DPADDLE_WITH_MKLML -DPADDLE_WITH_XBYAK -DXBYAK64 -DXBYAK_NO_OP_NAMES -D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DPYBIND_AVX_MKLML' + r" -DPADDLE_REVISION=\"%s@%s@%s\"" % (REPO_URL(), REPO_BRANCH(), REPO_REVISION()) CPPFLAGS_STR = '-DHPPL_STUB_FUNC -DLAPACK_FOUND -DPADDLE_DISABLE_PROFILER -DPADDLE_NO_PYTHON -DCUSTOM_TRAINER -DPADDLE_ON_INFERENCE -DPADDLE_USE_DSO -DPADDLE_USE_PTHREAD_BARRIER -DPADDLE_USE_PTHREAD_SPINLOCK -DPADDLE_VERSION=0.0.0 -DPADDLE_WITH_AVX -DPADDLE_WITH_MKLML -DPADDLE_WITH_XBYAK -DXBYAK64 -DXBYAK_NO_OP_NAMES -D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DPYBIND_AVX_MKLML' + r" -DPADDLE_REVISION=\"%s@%s@%s\"" % (REPO_URL(), REPO_BRANCH(), REPO_REVISION())
CFLAGS_STR = '-m64 -fPIC -fno-omit-frame-pointer -Werror -Wall -Wextra -Wnon-virtual-dtor -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-unused-function -Wno-error=literal-suffix -Wno-error=sign-compare -Wno-error=unused-local-typedefs -Wno-error=maybe-uninitialized -fopenmp -mavx -O3 -DNDEBUG ' CFLAGS_STR = '-m64 -fPIC -fno-omit-frame-pointer -Werror -Wall -Wextra -Wnon-virtual-dtor -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-unused-function -Wno-error=literal-suffix -Wno-error=sign-compare -Wno-error=unused-local-typedefs -Wno-error=maybe-uninitialized -fopenmp -mavx -O3 -DNDEBUG '
CXXFLAGS_STR = '-std=c++11 ' + CFLAGS_STR
CXXFLAGS_STR = '-std=c++11' + CFLAGS_STR
Application('feed_trainer', Sources('paddle/fluid/train/custom_trainer/feed/main.cc', custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so'])) Application('feed_trainer', Sources('paddle/fluid/train/custom_trainer/feed/main.cc', custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so']))
#feed unit test #feed unit test
UT_MAIN = UT_FILE('main.cc') UT_MAIN = UT_FILE('main.cc')
UTApplication('unit_test', Sources(UT_MAIN, UT_FILE('test_executor.cc'), UT_FILE('test_datareader.cc'), custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so'])) UTApplication('unit_test', Sources(UT_MAIN, GLOB(UT_FILE('test_*.cc')), custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so']))
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <cstdio> #include <cstdio>
#include <glog/logging.h> #include <glog/logging.h>
#include <omp.h>
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
...@@ -32,9 +33,6 @@ public: ...@@ -32,9 +33,6 @@ public:
VLOG(5) << "getline: " << str << " , pos: " << pos << ", len: " << len; VLOG(5) << "getline: " << str << " , pos: " << pos << ", len: " << len;
data.id.assign(str, pos); data.id.assign(str, pos);
data.data.assign(str + pos + 1, len - pos - 1); data.data.assign(str + pos + 1, len - pos - 1);
if (!data.data.empty() && data.data.back() == '\n') {
data.data.pop_back();
}
return 0; return 0;
} }
...@@ -50,9 +48,6 @@ public: ...@@ -50,9 +48,6 @@ public:
VLOG(5) << "getline: " << str << " , pos: " << pos; VLOG(5) << "getline: " << str << " , pos: " << pos;
data.id.assign(str, pos); data.id.assign(str, pos);
data.data.assign(str + pos + 1); data.data.assign(str + pos + 1);
if (!data.data.empty() && data.data.back() == '\n') {
data.data.pop_back();
}
return 0; return 0;
} }
...@@ -85,9 +80,7 @@ public: ...@@ -85,9 +80,7 @@ public:
return -1; return -1;
} }
_done_file_name = config["done_file"].as<std::string>(); _done_file_name = config["done_file"].as<std::string>();
_buffer_size = config["buffer_size"].as<int>(1024);
_filename_prefix = config["filename_prefix"].as<std::string>(""); _filename_prefix = config["filename_prefix"].as<std::string>("");
_buffer.reset(new char[_buffer_size]);
if (config["file_system"] && config["file_system"]["class"]) { if (config["file_system"] && config["file_system"]["class"]) {
_file_system.reset( _file_system.reset(
...@@ -118,14 +111,11 @@ public: ...@@ -118,14 +111,11 @@ public:
} }
virtual std::vector<std::string> data_file_list(const std::string& data_dir) { virtual std::vector<std::string> data_file_list(const std::string& data_dir) {
if (_filename_prefix.empty()) {
return _file_system->list(data_dir);
}
std::vector<std::string> data_files; std::vector<std::string> data_files;
for (auto& filepath : _file_system->list(data_dir)) { for (auto& filepath : _file_system->list(data_dir)) {
auto filename = _file_system->path_split(filepath).second; auto filename = _file_system->path_split(filepath).second;
if (filename.size() >= _filename_prefix.size() && if (filename != _done_file_name &&
filename.substr(0, _filename_prefix.size()) == _filename_prefix) { string::begin_with(filename, _filename_prefix)) {
data_files.push_back(std::move(filepath)); data_files.push_back(std::move(filepath));
} }
} }
...@@ -143,28 +133,42 @@ public: ...@@ -143,28 +133,42 @@ public:
}; };
std::unique_ptr<framework::ChannelWriter<DataItem>, decltype(deleter)> writer(new framework::ChannelWriter<DataItem>(data_channel.get()), deleter); std::unique_ptr<framework::ChannelWriter<DataItem>, decltype(deleter)> writer(new framework::ChannelWriter<DataItem>(data_channel.get()), deleter);
DataItem data_item; DataItem data_item;
if (_buffer_size <= 0 || _buffer == nullptr) {
VLOG(2) << "no buffer"; auto file_list = data_file_list(data_dir);
return -1; int file_list_size = file_list.size();
VLOG(5) << "omg max_threads: " << omp_get_max_threads();
#pragma omp parallel for
for (int i = 0; i < file_list_size; ++i) {
VLOG(5) << "omg num_threads: " << omp_get_num_threads() << ", start read: " << i << std::endl;
} }
for (const auto& filepath : data_file_list(data_dir)) { for (int i = 0; i < file_list_size; ++i) {
if (_file_system->path_split(filepath).second == _done_file_name) { //VLOG(5) << "omg num_threads: " << omp_get_num_threads() << ", start read: " << i;
continue; const auto& filepath = file_list[i];
}
{ {
std::shared_ptr<FILE> fin = _file_system->open_read(filepath, _pipeline_cmd); std::shared_ptr<FILE> fin = _file_system->open_read(filepath, _pipeline_cmd);
if (fin == nullptr) { if (fin == nullptr) {
VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd; VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd;
return -1; return -1;
} }
while (fgets(_buffer.get(), _buffer_size, fin.get())) { char *buffer = nullptr;
if (_buffer[0] == '\n') { size_t buffer_size = 0;
ssize_t line_len = 0;
while ((line_len = getline(&buffer, &buffer_size, fin.get())) != -1) {
if (line_len > 0 && buffer[line_len - 1] == '\n') {
buffer[--line_len] = '\0';
}
if (line_len <= 0) {
continue; continue;
} }
if (_parser->parse(_buffer.get(), data_item) != 0) { if (_parser->parse(buffer, line_len, data_item) == 0) {
return -1; (*writer) << std::move(data_item);
} }
(*writer) << std::move(data_item); }
if (buffer != nullptr) {
free(buffer);
buffer = nullptr;
buffer_size = 0;
} }
if (ferror(fin.get()) != 0) { if (ferror(fin.get()) != 0) {
VLOG(2) << "fail to read file: " << filepath; VLOG(2) << "fail to read file: " << filepath;
...@@ -190,9 +194,7 @@ public: ...@@ -190,9 +194,7 @@ public:
} }
private: private:
std::string _done_file_name; // without data_dir std::string _done_file_name; // without data_dirq
int _buffer_size = 0;
std::unique_ptr<char[]> _buffer;
std::string _filename_prefix; std::string _filename_prefix;
std::unique_ptr<FileSystem> _file_system; std::unique_ptr<FileSystem> _file_system;
}; };
......
...@@ -8,5 +8,6 @@ int32_t main(int32_t argc, char** argv) { ...@@ -8,5 +8,6 @@ int32_t main(int32_t argc, char** argv) {
::google::InitGoogleLogging(argv[0]); ::google::InitGoogleLogging(argv[0]);
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
::google::ParseCommandLineFlags(&argc, &argv, true); ::google::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
...@@ -15,6 +15,7 @@ limitations under the License. */ ...@@ -15,6 +15,7 @@ limitations under the License. */
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <omp.h>
#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" #include "paddle/fluid/train/custom_trainer/feed/executor/executor.h"
#include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/tensor_util.h"
...@@ -60,17 +61,21 @@ public: ...@@ -60,17 +61,21 @@ public:
} }
virtual void SetUp() { virtual void SetUp() {
thread_num = omp_get_max_threads();
omp_set_num_threads(1);
fs.reset(CREATE_CLASS(FileSystem, "LocalFileSystem")); fs.reset(CREATE_CLASS(FileSystem, "LocalFileSystem"));
context_ptr.reset(new TrainerContext()); context_ptr.reset(new TrainerContext());
} }
virtual void TearDown() { virtual void TearDown() {
omp_set_num_threads(thread_num);
fs = nullptr; fs = nullptr;
context_ptr = nullptr; context_ptr = nullptr;
} }
std::shared_ptr<TrainerContext> context_ptr; std::shared_ptr<TrainerContext> context_ptr;
std::unique_ptr<FileSystem> fs; std::unique_ptr<FileSystem> fs;
int thread_num = 1;
}; };
TEST_F(DataReaderTest, LineDataParser) { TEST_F(DataReaderTest, LineDataParser) {
...@@ -110,11 +115,11 @@ TEST_F(DataReaderTest, LineDataReader) { ...@@ -110,11 +115,11 @@ TEST_F(DataReaderTest, LineDataReader) {
"parser:\n" "parser:\n"
" class: LineDataParser\n" " class: LineDataParser\n"
"pipeline_cmd: cat\n" "pipeline_cmd: cat\n"
"done_file: done_file\n" "done_file: done_file\n");
"buffer_size: 128");
ASSERT_EQ(0, data_reader->initialize(config, context_ptr)); ASSERT_EQ(0, data_reader->initialize(config, context_ptr));
auto data_file_list = data_reader->data_file_list(test_data_dir); auto data_file_list = data_reader->data_file_list(test_data_dir);
ASSERT_EQ(2, data_file_list.size()); ASSERT_EQ(2, data_file_list.size());
std::sort(data_file_list.begin(), data_file_list.end());
ASSERT_EQ(string::format_string("%s/%s", test_data_dir, "a.txt"), data_file_list[0]); ASSERT_EQ(string::format_string("%s/%s", test_data_dir, "a.txt"), data_file_list[0]);
ASSERT_EQ(string::format_string("%s/%s", test_data_dir, "b.txt"), data_file_list[1]); ASSERT_EQ(string::format_string("%s/%s", test_data_dir, "b.txt"), data_file_list[1]);
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <iostream>
#include <fstream>
#include <algorithm>
#include <gtest/gtest.h>
#include <omp.h>
#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include "paddle/fluid/train/custom_trainer/feed/io/shell.h"
#include "paddle/fluid/string/string_helper.h"
#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
namespace {
const char test_data_dir[] = "test_data";
}
class DataReaderOmpTest : public testing::Test {
public:
static void SetUpTestCase() {
std::unique_ptr<FileSystem> fs(CREATE_CLASS(FileSystem, "LocalFileSystem"));
fs->mkdir(test_data_dir);
shell_set_verbose(true);
std_items.clear();
sorted_std_items.clear();
for (char c = 'a'; c <= 'z'; ++c) {
DataItem item;
item.id = c;
item.data = std::to_string(c - 'a');
std::ofstream fout(fs->path_join(test_data_dir, string::format_string("%c.txt", c)));
fout << item.id << " " << item.data << std::endl;
fout.close();
sorted_std_items.push_back(std::move(item));
}
for (const auto& filename: fs->list(test_data_dir)) {
std::ifstream fin(filename);
DataItem item;
fin >> item.id >> item.data;
fin.close();
std_items.push_back(std::move(item));
}
}
static void TearDownTestCase() {
std::unique_ptr<FileSystem> fs(CREATE_CLASS(FileSystem, "LocalFileSystem"));
fs->remove(test_data_dir);
}
virtual void SetUp() {
thread_num = omp_get_max_threads();
omp_set_num_threads(1);
fs.reset(CREATE_CLASS(FileSystem, "LocalFileSystem"));
context_ptr.reset(new TrainerContext());
}
virtual void TearDown() {
omp_set_num_threads(thread_num);
fs = nullptr;
context_ptr = nullptr;
}
static bool is_same(const std::vector<DataItem>& a, const std::vector<DataItem>& b) {
int a_size = a.size();
if (a_size != b.size()) {
return false;
}
for (int i = 0; i < a_size; ++i) {
if (a[i].id != b[i].id || a[i].data != b[i].data) {
return false;
}
}
return true;
}
static bool is_same_with_std_items(const std::vector<DataItem>& items) {
return is_same(items, std_items);
}
static bool is_same_with_sorted_std_items(const std::vector<DataItem>& items) {
return is_same(items, sorted_std_items);
}
static std::vector<DataItem> std_items;
static std::vector<DataItem> sorted_std_items;
std::shared_ptr<TrainerContext> context_ptr;
std::unique_ptr<FileSystem> fs;
int thread_num = 1;
};
std::vector<DataItem> DataReaderOmpTest::std_items;
std::vector<DataItem> DataReaderOmpTest::sorted_std_items;
TEST_F(DataReaderOmpTest, LineDataReaderSingleThread) {
std::unique_ptr<DataReader> data_reader(CREATE_CLASS(DataReader, "LineDataReader"));
ASSERT_NE(nullptr, data_reader);
auto config = YAML::Load(
"parser:\n"
" class: LineDataParser\n"
"pipeline_cmd: cat\n"
"done_file: done_file\n");
ASSERT_EQ(0, data_reader->initialize(config, context_ptr));
auto data_file_list = data_reader->data_file_list(test_data_dir);
const int std_items_size = std_items.size();
ASSERT_EQ(std_items_size, data_file_list.size());
for (int i = 0; i < std_items_size; ++i) {
ASSERT_EQ(string::format_string("%s/%s.txt", test_data_dir, std_items[i].id.c_str()), data_file_list[i]);
}
constexpr int n_run = 10;
int same_count = 0;
for (int i = 0; i < n_run; ++i) {
auto channel = framework::MakeChannel<DataItem>(128);
ASSERT_NE(nullptr, channel);
ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel));
std::vector<DataItem> items;
channel->ReadAll(items);
if (is_same_with_std_items(items)) {
++same_count;
}
}
// n_run 次都相同
ASSERT_EQ(n_run, same_count);
}
TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) {
std::unique_ptr<DataReader> data_reader(CREATE_CLASS(DataReader, "LineDataReader"));
ASSERT_NE(nullptr, data_reader);
auto config = YAML::Load(
"parser:\n"
" class: LineDataParser\n"
"pipeline_cmd: cat\n"
"done_file: done_file\n");
ASSERT_EQ(0, data_reader->initialize(config, context_ptr));
auto data_file_list = data_reader->data_file_list(test_data_dir);
const int std_items_size = std_items.size();
ASSERT_EQ(std_items_size, data_file_list.size());
for (int i = 0; i < std_items_size; ++i) {
ASSERT_EQ(string::format_string("%s/%s.txt", test_data_dir, std_items[i].id.c_str()), data_file_list[i]);
}
ASSERT_FALSE(data_reader->is_data_ready(test_data_dir));
std::ofstream fout(fs->path_join(test_data_dir, "done_file"));
fout << "done";
fout.close();
ASSERT_TRUE(data_reader->is_data_ready(test_data_dir));
constexpr int n_run = 10;
int same_count = 0;
int sort_same_count = 0;
for (int i = 0; i < n_run; ++i) {
auto channel = framework::MakeChannel<DataItem>(128);
ASSERT_NE(nullptr, channel);
omp_set_num_threads(4);
ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel));
std::vector<DataItem> items;
channel->ReadAll(items);
if (is_same_with_std_items(items)) {
++same_count;
}
std::sort(items.begin(), items.end(), [] (const DataItem& a, const DataItem& b) {
return a.id < b.id;
});
if (is_same_with_sorted_std_items(items)) {
++sort_same_count;
}
}
// n_run次有不同的(证明是多线程)
// ASSERT_GT(n_run, same_count);
// 但排序后都是相同的
ASSERT_EQ(n_run, sort_same_count);
}
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册