diff --git a/BCLOUD b/BCLOUD index 879c2123c6f9ce542f87d50b8e4b6c505118b19f..e226fda0303dd277e5bfbc6d6b19b673c5007197 100644 --- a/BCLOUD +++ b/BCLOUD @@ -1,7 +1,7 @@ WORKROOT('../../../') COMPILER('gcc482') CPPFLAGS('-D_GNU_SOURCE -DNDEBUG') -GLOBAL_CFLAGS_STR = '-g -O3 -pipe ' +GLOBAL_CFLAGS_STR = '-g -O3 -pipe -fopenmp ' CFLAGS(GLOBAL_CFLAGS_STR) GLOBAL_CXXFLAGS_STR = GLOBAL_CFLAGS_STR + ' -std=c++11 ' CXXFLAGS(GLOBAL_CXXFLAGS_STR) diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc index cd5d524bd07358b663cd9ad5f4d9693c0816f3b5..3ba2ff1a9bc4333fb91a6b0131838754acd0b879 100644 --- a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc +++ b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc @@ -1,6 +1,7 @@ #include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h" #include +#include #include #include @@ -136,20 +137,17 @@ public: auto file_list = data_file_list(data_dir); int file_list_size = file_list.size(); + std::atomic is_failed(false); - VLOG(5) << "omg max_threads: " << omp_get_max_threads(); #pragma omp parallel for for (int i = 0; i < file_list_size; ++i) { - VLOG(5) << "omg num_threads: " << omp_get_num_threads() << ", start read: " << i << std::endl; - } - for (int i = 0; i < file_list_size; ++i) { - //VLOG(5) << "omg num_threads: " << omp_get_num_threads() << ", start read: " << i; const auto& filepath = file_list[i]; - { + if (!is_failed) { std::shared_ptr fin = _file_system->open_read(filepath, _pipeline_cmd); if (fin == nullptr) { VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd; - return -1; + is_failed = true; + continue; } char *buffer = nullptr; size_t buffer_size = 0; @@ -172,21 +170,23 @@ public: } if (ferror(fin.get()) != 0) { VLOG(2) << "fail to read file: " << filepath; - return -1; + is_failed = true; + continue; } } if (_file_system->err_no() != 0) { _file_system->reset_err_no(); - return -1; + is_failed = true; + continue; } } writer->Flush(); if (!(*writer)) { VLOG(2) << "fail when write to channel"; - return -1; + is_failed = true; } data_channel->Close(); - return 0; + return is_failed ? -1 : 0; } virtual const DataParser* get_parser() { @@ -194,7 +194,7 @@ public: } private: - std::string _done_file_name; // without data_dirq + std::string _done_file_name; // without data_dir std::string _filename_prefix; std::unique_ptr _file_system; }; diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc index 526d0322e65f7f7bdbe08d909039ad29aeaa7cdc..59e6e2cfa21b2d8cc1f8d009703e2e7a1bb111b3 100644 --- a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc @@ -104,6 +104,7 @@ public: std::shared_ptr context_ptr; std::unique_ptr fs; int thread_num = 1; + const int n_run = 5; }; std::vector DataReaderOmpTest::std_items; @@ -128,7 +129,6 @@ TEST_F(DataReaderOmpTest, LineDataReaderSingleThread) { ASSERT_EQ(string::format_string("%s/%s.txt", test_data_dir, std_items[i].id.c_str()), data_file_list[i]); } - constexpr int n_run = 10; int same_count = 0; for (int i = 0; i < n_run; ++i) { auto channel = framework::MakeChannel(128); @@ -172,7 +172,6 @@ TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) { fout.close(); ASSERT_TRUE(data_reader->is_data_ready(test_data_dir)); - constexpr int n_run = 10; int same_count = 0; int sort_same_count = 0; for (int i = 0; i < n_run; ++i) { @@ -200,7 +199,8 @@ TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) { } // n_run次有不同的(证明是多线程) - // ASSERT_GT(n_run, same_count); + ASSERT_EQ(4, omp_get_max_threads()); + ASSERT_GT(n_run, same_count); // 但排序后都是相同的 ASSERT_EQ(n_run, sort_same_count);