diff --git a/BCLOUD b/BCLOUD index eeca617076765b745b689b7d5deb23bfc5c91878..879c2123c6f9ce542f87d50b8e4b6c505118b19f 100644 --- a/BCLOUD +++ b/BCLOUD @@ -11,7 +11,7 @@ INCPATHS('$OUT/../') INCPATHS('../../third-party') INCPATHS('../../third-party/eigen') INCPATHS('$OUT_ROOT/baidu/third-party/python/output/include/python2.7') -LDFLAGS('-lpthread -lcrypto -lrt -ldl -lssl -lz -lrt -lgomp') +LDFLAGS('-lpthread -lcrypto -lrt -ldl -lssl -lz -lgomp -fopenmp') CONFIGS('baidu/third-party/any@15595d8324be9e8a9a80d9ae442fdd12bd66df5d@git_branch') CONFIGS('baidu/third-party/boost@v1.41.0@git_branch') CONFIGS('baidu/third-party/c-ares@v1.13.0@git_branch') @@ -88,12 +88,11 @@ custom_trainer_src = GLOB('paddle/fluid/train/custom_trainer/feed/*/*.cc', Exclu CPPFLAGS_STR = '-DHPPL_STUB_FUNC -DLAPACK_FOUND -DPADDLE_DISABLE_PROFILER -DPADDLE_NO_PYTHON -DCUSTOM_TRAINER -DPADDLE_ON_INFERENCE -DPADDLE_USE_DSO -DPADDLE_USE_PTHREAD_BARRIER -DPADDLE_USE_PTHREAD_SPINLOCK -DPADDLE_VERSION=0.0.0 -DPADDLE_WITH_AVX -DPADDLE_WITH_MKLML -DPADDLE_WITH_XBYAK -DXBYAK64 -DXBYAK_NO_OP_NAMES -D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DPYBIND_AVX_MKLML' + r" -DPADDLE_REVISION=\"%s@%s@%s\"" % (REPO_URL(), REPO_BRANCH(), REPO_REVISION()) CFLAGS_STR = '-m64 -fPIC -fno-omit-frame-pointer -Werror -Wall -Wextra -Wnon-virtual-dtor -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-unused-function -Wno-error=literal-suffix -Wno-error=sign-compare -Wno-error=unused-local-typedefs -Wno-error=maybe-uninitialized -fopenmp -mavx -O3 -DNDEBUG ' - -CXXFLAGS_STR = '-std=c++11' + CFLAGS_STR +CXXFLAGS_STR = '-std=c++11 ' + CFLAGS_STR Application('feed_trainer', Sources('paddle/fluid/train/custom_trainer/feed/main.cc', custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so'])) #feed unit test UT_MAIN = UT_FILE('main.cc') -UTApplication('unit_test', Sources(UT_MAIN, UT_FILE('test_executor.cc'), UT_FILE('test_datareader.cc'), custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so'])) +UTApplication('unit_test', Sources(UT_MAIN, GLOB(UT_FILE('test_*.cc')), custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so'])) diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc index f07e8f667565d32d7c898ba99afc4b5496d7f4b0..cd5d524bd07358b663cd9ad5f4d9693c0816f3b5 100644 --- a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc +++ b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc @@ -3,6 +3,7 @@ #include #include +#include #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" @@ -32,9 +33,6 @@ public: VLOG(5) << "getline: " << str << " , pos: " << pos << ", len: " << len; data.id.assign(str, pos); data.data.assign(str + pos + 1, len - pos - 1); - if (!data.data.empty() && data.data.back() == '\n') { - data.data.pop_back(); - } return 0; } @@ -50,9 +48,6 @@ public: VLOG(5) << "getline: " << str << " , pos: " << pos; data.id.assign(str, pos); data.data.assign(str + pos + 1); - if (!data.data.empty() && data.data.back() == '\n') { - data.data.pop_back(); - } return 0; } @@ -85,9 +80,7 @@ public: return -1; } _done_file_name = config["done_file"].as(); - _buffer_size = config["buffer_size"].as(1024); _filename_prefix = config["filename_prefix"].as(""); - _buffer.reset(new char[_buffer_size]); if (config["file_system"] && config["file_system"]["class"]) { _file_system.reset( @@ -118,14 +111,11 @@ public: } virtual std::vector data_file_list(const std::string& data_dir) { - if (_filename_prefix.empty()) { - return _file_system->list(data_dir); - } std::vector data_files; for (auto& filepath : _file_system->list(data_dir)) { auto filename = _file_system->path_split(filepath).second; - if (filename.size() >= _filename_prefix.size() && - filename.substr(0, _filename_prefix.size()) == _filename_prefix) { + if (filename != _done_file_name && + string::begin_with(filename, _filename_prefix)) { data_files.push_back(std::move(filepath)); } } @@ -143,28 +133,42 @@ public: }; std::unique_ptr, decltype(deleter)> writer(new framework::ChannelWriter(data_channel.get()), deleter); DataItem data_item; - if (_buffer_size <= 0 || _buffer == nullptr) { - VLOG(2) << "no buffer"; - return -1; + + auto file_list = data_file_list(data_dir); + int file_list_size = file_list.size(); + + VLOG(5) << "omg max_threads: " << omp_get_max_threads(); + #pragma omp parallel for + for (int i = 0; i < file_list_size; ++i) { + VLOG(5) << "omg num_threads: " << omp_get_num_threads() << ", start read: " << i << std::endl; } - for (const auto& filepath : data_file_list(data_dir)) { - if (_file_system->path_split(filepath).second == _done_file_name) { - continue; - } + for (int i = 0; i < file_list_size; ++i) { + //VLOG(5) << "omg num_threads: " << omp_get_num_threads() << ", start read: " << i; + const auto& filepath = file_list[i]; { std::shared_ptr fin = _file_system->open_read(filepath, _pipeline_cmd); if (fin == nullptr) { VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd; return -1; } - while (fgets(_buffer.get(), _buffer_size, fin.get())) { - if (_buffer[0] == '\n') { + char *buffer = nullptr; + size_t buffer_size = 0; + ssize_t line_len = 0; + while ((line_len = getline(&buffer, &buffer_size, fin.get())) != -1) { + if (line_len > 0 && buffer[line_len - 1] == '\n') { + buffer[--line_len] = '\0'; + } + if (line_len <= 0) { continue; } - if (_parser->parse(_buffer.get(), data_item) != 0) { - return -1; + if (_parser->parse(buffer, line_len, data_item) == 0) { + (*writer) << std::move(data_item); } - (*writer) << std::move(data_item); + } + if (buffer != nullptr) { + free(buffer); + buffer = nullptr; + buffer_size = 0; } if (ferror(fin.get()) != 0) { VLOG(2) << "fail to read file: " << filepath; @@ -190,9 +194,7 @@ public: } private: - std::string _done_file_name; // without data_dir - int _buffer_size = 0; - std::unique_ptr _buffer; + std::string _done_file_name; // without data_dirq std::string _filename_prefix; std::unique_ptr _file_system; }; diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/main.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/main.cc index 7ff531d8442508b2daf7b8e56b631e0829a821fb..acbcae55a2155faff57b1e44bbc9fe8ec8f0b3dd 100644 --- a/paddle/fluid/train/custom_trainer/feed/unit_test/main.cc +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/main.cc @@ -8,5 +8,6 @@ int32_t main(int32_t argc, char** argv) { ::google::InitGoogleLogging(argv[0]); ::testing::InitGoogleTest(&argc, argv); ::google::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); } diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc index 4393eb76a7e1e6462aa7fc131633f2852e004e26..5fed50cb61001c2ff2cb4b1ed16e6897aed897f8 100644 --- a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include #include #include +#include #include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" #include "paddle/fluid/framework/tensor_util.h" @@ -60,17 +61,21 @@ public: } virtual void SetUp() { + thread_num = omp_get_max_threads(); + omp_set_num_threads(1); fs.reset(CREATE_CLASS(FileSystem, "LocalFileSystem")); context_ptr.reset(new TrainerContext()); } virtual void TearDown() { + omp_set_num_threads(thread_num); fs = nullptr; context_ptr = nullptr; } std::shared_ptr context_ptr; std::unique_ptr fs; + int thread_num = 1; }; TEST_F(DataReaderTest, LineDataParser) { @@ -110,11 +115,11 @@ TEST_F(DataReaderTest, LineDataReader) { "parser:\n" " class: LineDataParser\n" "pipeline_cmd: cat\n" - "done_file: done_file\n" - "buffer_size: 128"); + "done_file: done_file\n"); ASSERT_EQ(0, data_reader->initialize(config, context_ptr)); auto data_file_list = data_reader->data_file_list(test_data_dir); ASSERT_EQ(2, data_file_list.size()); + std::sort(data_file_list.begin(), data_file_list.end()); ASSERT_EQ(string::format_string("%s/%s", test_data_dir, "a.txt"), data_file_list[0]); ASSERT_EQ(string::format_string("%s/%s", test_data_dir, "b.txt"), data_file_list[1]); diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc new file mode 100644 index 0000000000000000000000000000000000000000..526d0322e65f7f7bdbe08d909039ad29aeaa7cdc --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc @@ -0,0 +1,211 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include +#include +#include + +#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" +#include "paddle/fluid/framework/tensor_util.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" +#include "paddle/fluid/train/custom_trainer/feed/io/shell.h" +#include "paddle/fluid/string/string_helper.h" +#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +namespace { +const char test_data_dir[] = "test_data"; +} + +class DataReaderOmpTest : public testing::Test { +public: + static void SetUpTestCase() { + std::unique_ptr fs(CREATE_CLASS(FileSystem, "LocalFileSystem")); + fs->mkdir(test_data_dir); + shell_set_verbose(true); + std_items.clear(); + sorted_std_items.clear(); + for (char c = 'a'; c <= 'z'; ++c) { + DataItem item; + item.id = c; + item.data = std::to_string(c - 'a'); + std::ofstream fout(fs->path_join(test_data_dir, string::format_string("%c.txt", c))); + fout << item.id << " " << item.data << std::endl; + fout.close(); + sorted_std_items.push_back(std::move(item)); + } + for (const auto& filename: fs->list(test_data_dir)) { + std::ifstream fin(filename); + DataItem item; + fin >> item.id >> item.data; + fin.close(); + std_items.push_back(std::move(item)); + } + } + + static void TearDownTestCase() { + std::unique_ptr fs(CREATE_CLASS(FileSystem, "LocalFileSystem")); + fs->remove(test_data_dir); + } + + virtual void SetUp() { + thread_num = omp_get_max_threads(); + omp_set_num_threads(1); + fs.reset(CREATE_CLASS(FileSystem, "LocalFileSystem")); + context_ptr.reset(new TrainerContext()); + } + + virtual void TearDown() { + omp_set_num_threads(thread_num); + fs = nullptr; + context_ptr = nullptr; + } + + static bool is_same(const std::vector& a, const std::vector& b) { + int a_size = a.size(); + if (a_size != b.size()) { + return false; + } + for (int i = 0; i < a_size; ++i) { + if (a[i].id != b[i].id || a[i].data != b[i].data) { + return false; + } + } + return true; + } + + static bool is_same_with_std_items(const std::vector& items) { + return is_same(items, std_items); + } + + static bool is_same_with_sorted_std_items(const std::vector& items) { + return is_same(items, sorted_std_items); + } + + static std::vector std_items; + static std::vector sorted_std_items; + std::shared_ptr context_ptr; + std::unique_ptr fs; + int thread_num = 1; +}; + +std::vector DataReaderOmpTest::std_items; +std::vector DataReaderOmpTest::sorted_std_items; + +TEST_F(DataReaderOmpTest, LineDataReaderSingleThread) { + std::unique_ptr data_reader(CREATE_CLASS(DataReader, "LineDataReader")); + ASSERT_NE(nullptr, data_reader); + + auto config = YAML::Load( + "parser:\n" + " class: LineDataParser\n" + "pipeline_cmd: cat\n" + "done_file: done_file\n"); + ASSERT_EQ(0, data_reader->initialize(config, context_ptr)); + auto data_file_list = data_reader->data_file_list(test_data_dir); + + const int std_items_size = std_items.size(); + ASSERT_EQ(std_items_size, data_file_list.size()); + + for (int i = 0; i < std_items_size; ++i) { + ASSERT_EQ(string::format_string("%s/%s.txt", test_data_dir, std_items[i].id.c_str()), data_file_list[i]); + } + + constexpr int n_run = 10; + int same_count = 0; + for (int i = 0; i < n_run; ++i) { + auto channel = framework::MakeChannel(128); + ASSERT_NE(nullptr, channel); + ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel)); + + std::vector items; + channel->ReadAll(items); + + if (is_same_with_std_items(items)) { + ++same_count; + } + } + + // n_run 次都相同 + ASSERT_EQ(n_run, same_count); +} + +TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) { + std::unique_ptr data_reader(CREATE_CLASS(DataReader, "LineDataReader")); + ASSERT_NE(nullptr, data_reader); + + auto config = YAML::Load( + "parser:\n" + " class: LineDataParser\n" + "pipeline_cmd: cat\n" + "done_file: done_file\n"); + ASSERT_EQ(0, data_reader->initialize(config, context_ptr)); + auto data_file_list = data_reader->data_file_list(test_data_dir); + + const int std_items_size = std_items.size(); + ASSERT_EQ(std_items_size, data_file_list.size()); + + for (int i = 0; i < std_items_size; ++i) { + ASSERT_EQ(string::format_string("%s/%s.txt", test_data_dir, std_items[i].id.c_str()), data_file_list[i]); + } + + ASSERT_FALSE(data_reader->is_data_ready(test_data_dir)); + std::ofstream fout(fs->path_join(test_data_dir, "done_file")); + fout << "done"; + fout.close(); + ASSERT_TRUE(data_reader->is_data_ready(test_data_dir)); + + constexpr int n_run = 10; + int same_count = 0; + int sort_same_count = 0; + for (int i = 0; i < n_run; ++i) { + auto channel = framework::MakeChannel(128); + ASSERT_NE(nullptr, channel); + + omp_set_num_threads(4); + + ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel)); + + std::vector items; + channel->ReadAll(items); + + if (is_same_with_std_items(items)) { + ++same_count; + } + + std::sort(items.begin(), items.end(), [] (const DataItem& a, const DataItem& b) { + return a.id < b.id; + }); + + if (is_same_with_sorted_std_items(items)) { + ++sort_same_count; + } + + } + // n_run次有不同的(证明是多线程) + // ASSERT_GT(n_run, same_count); + + // 但排序后都是相同的 + ASSERT_EQ(n_run, sort_same_count); +} + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle