diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index daeacdb8b4d540e94d1c03bd33b0bbdb024f958d..0519f5856bbc49babba32a7ec938aff28fef50e6 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -18,6 +18,7 @@ #include "ThreadPool.h" #include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/operators/reader/blocking_queue.h" +#include "paddle/fluid/operators/reader/buffered_reader.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { @@ -232,12 +233,17 @@ class OpenFilesOp : public framework::OperatorBase { container.reset(new OrderedReaderContainer()); } else { container.reset(new PreemptiveReaderContainer( - std::min(file_names.size(), - static_cast(std::thread::hardware_concurrency())))); + static_cast(Attr("thread_num")))); } - out->Reset( - std::make_shared(file_names, std::move(container))); + std::shared_ptr reader( + new MultiFileReader(file_names, std::move(container))); + auto buffer_size = Attr("buffer_size"); + if (buffer_size > 1) { + reader = framework::MakeDecoratedReader( + reader, platform::CPUPlace(), buffer_size); + } + out->Reset(reader); } }; @@ -253,6 +259,8 @@ class OpenFilesOpMaker : public FileReaderMakerBase { An OpenFilesOp creates a MultiFileReader, which is able to read data multi-threaded from multiple files. )DOC"); + AddAttr("thread_num", "Number of thread to read files."); + AddAttr("buffer_size", "The reading buffer of these files."); } };