diff --git a/cmake/external/gzstream.cmake b/cmake/external/gzstream.cmake index f0e3dd8c6aa115be7b43d8d80ebdc2084cbcc23f..e8a7de27f1ab1b5b3724a77a72fa327f82a8485f 100644 --- a/cmake/external/gzstream.cmake +++ b/cmake/external/gzstream.cmake @@ -44,4 +44,4 @@ SET_PROPERTY(TARGET gzstream PROPERTY IMPORTED_LOCATION "${GZSTREAM_INSTALL_DIR}/lib/libgzstream.a") include_directories(${GZSTREAM_INCLUDE_DIR}) -ADD_DEPENDENCIES(gzstream extern_gzstream) +ADD_DEPENDENCIES(gzstream extern_gzstream zlib) diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 2e019f3c1d8f0f207a6d045cf89250545585310f..1514f6566a8cd6fda64c48754c58169409e263fb 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -16,7 +16,7 @@ function(reader_library TARGET_NAME) endfunction() cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool) -cc_library(ctr_reader SRCS ctr_reader.cc DEPS reader simple_threadpool boost gzstream) +cc_library(ctr_reader SRCS ctr_reader.cc DEPS gzstream reader zlib) cc_test(ctr_reader_test SRCS ctr_reader_test.cc DEPS ctr_reader) reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader) reader_library(create_ctr_reader_op SRCS create_ctr_reader_op.cc DEPS ctr_reader) diff --git a/paddle/fluid/operators/reader/ctr_reader.cc b/paddle/fluid/operators/reader/ctr_reader.cc index ca2f567e3715ae934f1f7dcbe80ef6177cb95216..26092c17e472463d2f468fc7c3cd8202bf93fbac 100644 --- a/paddle/fluid/operators/reader/ctr_reader.cc +++ b/paddle/fluid/operators/reader/ctr_reader.cc @@ -58,10 +58,8 @@ static inline void parse_line( const std::string& item = ret[i]; std::vector feasign_and_slot; string_split(item, ':', &feasign_and_slot); - auto& slot = feasign_and_slot[1]; if (feasign_and_slot.size() == 2 && - slot_to_index.find(slot) != slot_to_index.end()) { - const std::string& slot = feasign_and_slot[1]; + slot_to_index.find(feasign_and_slot[1]) != slot_to_index.end()) { int64_t feasign = std::strtoll(feasign_and_slot[0].c_str(), NULL, 10); (*slot_to_data)[feasign_and_slot[1]].push_back(feasign); } @@ -164,7 +162,7 @@ void ReadThread(const std::vector& file_list, VLOG(3) << "reader inited"; - clock_t t0 = clock(); + uint64_t t0 = GetTimeInSec(); int i = 0; @@ -219,13 +217,12 @@ void ReadThread(const std::vector& file_list, memcpy(label_tensor_data, batch_label.data(), batch_label.size()); lod_datas.push_back(label_tensor); - // queue->Push(lod_datas); + queue->Push(lod_datas); VLOG(4) << "push one data, queue_size=" << queue->Size(); if (i != 0 && i % 100 == 0) { - clock_t t1 = clock(); - float line_per_s = 100 * batch_size * static_cast(CLOCKS_PER_SEC) / - static_cast(t1 - t0); + uint64_t t1 = GetTimeInSec(); + float line_per_s = 100 * batch_size / static_cast(t1 - t0); VLOG(3) << "[" << thread_id << "]" << " line_per_second = " << line_per_s; t0 = t1; diff --git a/paddle/fluid/operators/reader/ctr_reader.h b/paddle/fluid/operators/reader/ctr_reader.h index 9469d86c6ab11ffc129198d7a2b820b003ee647a..32dfed8264861c15fdadc492b944bdd9b833177f 100644 --- a/paddle/fluid/operators/reader/ctr_reader.h +++ b/paddle/fluid/operators/reader/ctr_reader.h @@ -14,6 +14,8 @@ #pragma once +#include + #include #include #include @@ -37,6 +39,15 @@ void ReadThread(const std::vector& file_list, int thread_id, std::vector* thread_status, std::shared_ptr queue); +inline uint64_t GetTimeInSec() { + using clock = std::conditional::type; + return std::chrono::duration_cast( + clock::now().time_since_epoch()) + .count(); +} + class CTRReader : public framework::FileReader { public: explicit CTRReader(const std::shared_ptr& queue, @@ -88,7 +99,7 @@ class CTRReader : public framework::FileReader { private: void SplitFiles() { file_groups_.resize(thread_num_); - for (int i = 0; i < file_list_.size(); ++i) { + for (size_t i = 0; i < file_list_.size(); ++i) { auto& file_name = file_list_[i]; std::ifstream f(file_name.c_str()); PADDLE_ENFORCE(f.good(), "file %s not exist!", file_name); diff --git a/paddle/fluid/operators/reader/ctr_reader_test.cc b/paddle/fluid/operators/reader/ctr_reader_test.cc index 142d04e315748c34814e3cdfb2a6c843079e5496..6ca0b26a0d71a9d32a9ced03cb6434321b4f695e 100644 --- a/paddle/fluid/operators/reader/ctr_reader_test.cc +++ b/paddle/fluid/operators/reader/ctr_reader_test.cc @@ -25,16 +25,17 @@ using paddle::operators::reader::LoDTensorBlockingQueue; using paddle::operators::reader::LoDTensorBlockingQueueHolder; using paddle::operators::reader::CTRReader; using paddle::framework::LoDTensor; +using paddle::operators::reader::GetTimeInSec; TEST(CTR_READER, read_data) { LoDTensorBlockingQueueHolder queue_holder; int capacity = 64; - queue_holder.InitOnce(capacity, {}, true); + queue_holder.InitOnce(capacity, {}, false); std::shared_ptr queue = queue_holder.GetQueue(); int batch_size = 10; - int thread_num = 2; + int thread_num = 4; std::vector slots = { "6002", "6003", "6004", "6005", "6006", "6007", "6008", "6009", "6010", "6011", "6012", "6013", "6014", "6015", "6016", "6017", "6018", "6019", @@ -109,7 +110,8 @@ TEST(CTR_READER, read_data) { std::vector file_list = { "/Users/qiaolongfei/project/gzip_test/part-00000-A.gz", "/Users/qiaolongfei/project/gzip_test/part-00001-A.gz", - "/Users/qiaolongfei/project/gzip_test/part-00002-A.gz"}; + "/Users/qiaolongfei/project/gzip_test/part-00002-A.gz", + "/Users/qiaolongfei/project/gzip_test/part-00003-A.gz"}; CTRReader reader(queue, batch_size, thread_num, slots, file_list); @@ -118,13 +120,11 @@ TEST(CTR_READER, read_data) { std::cout << "start to reader data" << std::endl; std::vector out; int read_batch = 1000; - clock_t t0 = clock(); + uint64_t t0 = GetTimeInSec(); for (int i = 0; i < read_batch; ++i) { reader.ReadNext(&out); } - clock_t t1 = clock(); - float line_per_s = read_batch * batch_size * - static_cast(CLOCKS_PER_SEC) / - static_cast(t1 - t0); + uint64_t t1 = GetTimeInSec(); + float line_per_s = read_batch * batch_size / static_cast(t1 - t0); VLOG(3) << "line_per_second = " << line_per_s; }