open_files_op.cc 6.9 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fengjiayi 已提交
15 16
#include <thread>  // NOLINT

17
#include "paddle/fluid/operators/reader/blocking_queue.h"
F
fengjiayi 已提交
18 19 20 21 22 23
#include "paddle/fluid/operators/reader/reader_op_registry.h"

namespace paddle {
namespace operators {
namespace reader {

F
fengjiayi 已提交
24
class MultiFileReader : public framework::ReaderBase {
F
fengjiayi 已提交
25
 public:
26
  MultiFileReader(const std::vector<std::string>& file_names, size_t thread_num,
F
fengjiayi 已提交
27
                  size_t buffer_size)
28
      : buffer_size_(buffer_size) {
F
fengjiayi 已提交
29
    readers_.reserve(file_names.size());
30
    for (const std::string& f_name : file_names) {
31
      readers_.emplace_back(CreateReaderByFileName(f_name));
32
    }
F
fengjiayi 已提交
33
    prefetchers_.resize(thread_num);
F
fengjiayi 已提交
34
    StartNewScheduler();
F
fengjiayi 已提交
35 36
  }

37
  void ReadNextImpl(std::vector<framework::LoDTensor>* out) override;
F
fengjiayi 已提交
38 39

 private:
F
fengjiayi 已提交
40 41 42 43 44 45
  void ShutdownImpl() override { EndScheduler(); }

  void StartImpl() override { StartNewScheduler(); }

  void StartNewScheduler();
  void EndScheduler();
F
fengjiayi 已提交
46
  void ScheduleThreadFunc();
47
  void PrefetchThreadFunc(size_t reader_idx, size_t thread_idx);
F
fengjiayi 已提交
48

49
  std::vector<std::unique_ptr<framework::ReaderBase>> readers_;
F
fengjiayi 已提交
50 51
  std::thread scheduler_;
  std::vector<std::thread> prefetchers_;
52
  size_t buffer_size_;
53
  reader::BlockingQueue<size_t>* waiting_reader_idx_;
54 55
  reader::BlockingQueue<size_t>* available_thread_idx_;
  reader::BlockingQueue<std::vector<framework::LoDTensor>>* buffer_;
F
fengjiayi 已提交
56 57
};

58
void MultiFileReader::ReadNextImpl(std::vector<framework::LoDTensor>* out) {
F
fengjiayi 已提交
59 60
  if (!buffer_->Receive(out)) {
    out->clear();
F
fengjiayi 已提交
61 62 63
  }
}

F
fengjiayi 已提交
64
void MultiFileReader::StartNewScheduler() {
F
fengjiayi 已提交
65
  size_t thread_num = prefetchers_.size();
66
  waiting_reader_idx_ = new reader::BlockingQueue<size_t>(readers_.size());
67 68 69
  available_thread_idx_ = new reader::BlockingQueue<size_t>(thread_num);
  buffer_ = new reader::BlockingQueue<std::vector<framework::LoDTensor>>(
      buffer_size_);
F
fengjiayi 已提交
70

71 72
  for (size_t i = 0; i < readers_.size(); ++i) {
    waiting_reader_idx_->Send(i);
F
fengjiayi 已提交
73
  }
74
  waiting_reader_idx_->Close();
F
fengjiayi 已提交
75
  for (size_t i = 0; i < thread_num; ++i) {
76
    available_thread_idx_->Send(i);
F
fengjiayi 已提交
77 78
  }

F
fengjiayi 已提交
79 80 81
  scheduler_ = std::thread([this] { ScheduleThreadFunc(); });
}

F
fengjiayi 已提交
82
void MultiFileReader::EndScheduler() {
F
fengjiayi 已提交
83 84
  available_thread_idx_->Close();
  buffer_->Close();
85
  waiting_reader_idx_->Close();
F
fengjiayi 已提交
86 87 88
  if (scheduler_.joinable()) {
    scheduler_.join();
  }
F
fengjiayi 已提交
89 90
  delete buffer_;
  delete available_thread_idx_;
91
  delete waiting_reader_idx_;
F
fengjiayi 已提交
92 93
}

F
fengjiayi 已提交
94 95
void MultiFileReader::ScheduleThreadFunc() {
  VLOG(5) << "MultiFileReader schedule thread starts.";
F
fengjiayi 已提交
96
  size_t completed_thread_num = 0;
F
fengjiayi 已提交
97 98 99 100 101 102
  size_t thread_idx;
  while (available_thread_idx_->Receive(&thread_idx)) {
    std::thread& prefetcher = prefetchers_[thread_idx];
    if (prefetcher.joinable()) {
      prefetcher.join();
    }
103 104
    size_t reader_idx;
    if (waiting_reader_idx_->Receive(&reader_idx)) {
F
fengjiayi 已提交
105
      // Still have files to read. Start a new prefetch thread.
106 107
      prefetcher = std::thread([this, reader_idx, thread_idx] {
        PrefetchThreadFunc(reader_idx, thread_idx);
F
fengjiayi 已提交
108
      });
F
fengjiayi 已提交
109 110 111
    } else {
      // No more file to read.
      ++completed_thread_num;
F
fengjiayi 已提交
112
      if (completed_thread_num == prefetchers_.size()) {
F
fengjiayi 已提交
113
        buffer_->Close();
F
fengjiayi 已提交
114 115 116 117
        break;
      }
    }
  }
118
  // If users invoke Shutdown() when scheduler is running, it will close the
F
fengjiayi 已提交
119 120 121 122 123 124 125
  // 'avaiable_thread_idx_' and prefecther threads have no way to tell scheduler
  // to release their resource. So a check is needed before scheduler ends.
  for (auto& p : prefetchers_) {
    if (p.joinable()) {
      p.join();
    }
  }
F
fengjiayi 已提交
126
  VLOG(5) << "MultiFileReader schedule thread terminates.";
F
fengjiayi 已提交
127 128
}

129 130 131
void MultiFileReader::PrefetchThreadFunc(size_t reader_idx, size_t thread_idx) {
  VLOG(5) << "The prefetch thread of file idx '" << reader_idx << "' starts.";
  std::unique_ptr<framework::ReaderBase>& reader = readers_[reader_idx];
F
fengjiayi 已提交
132
  while (true) {
F
fengjiayi 已提交
133 134
    std::vector<framework::LoDTensor> ins;
    reader->ReadNext(&ins);
F
fengjiayi 已提交
135
    if (ins.empty()) {
136 137
      reader->Shutdown();
      reader->Start();
F
fengjiayi 已提交
138 139
      break;
    }
140
    try {
141
      buffer_->Send(std::move(ins));
142
    } catch (paddle::platform::EnforceNotMet e) {
F
fengjiayi 已提交
143
      VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch "
144 145
                 "thread of file idx '"
              << reader_idx << "' will terminate.";
F
fengjiayi 已提交
146 147 148
      break;
    }
  }
149

150
  if (!available_thread_idx_->Send(thread_idx)) {
F
fengjiayi 已提交
151 152 153
    VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. "
               "Fail to send thread_idx.";
  }
154 155
  VLOG(5) << "The prefetch thread of file idx '" << reader_idx
          << "' terminates.";
F
fengjiayi 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168
}

class OpenFilesOp : public framework::OperatorBase {
 public:
  using framework::OperatorBase::OperatorBase;

 private:
  void RunImpl(const framework::Scope& scope,
               const platform::Place& dev_place) const override {
    const auto& shape_concat = Attr<std::vector<int>>("shape_concat");
    const auto& ranks = Attr<std::vector<int>>("ranks");
    PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty());
    PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0),
F
fengjiayi 已提交
169
                      static_cast<int>(shape_concat.size()),
F
fengjiayi 已提交
170 171 172 173 174
                      "The accumulate of all ranks should be equal to the "
                      "shape concat's length.");
    const auto& file_names = Attr<std::vector<std::string>>("file_names");
    PADDLE_ENFORCE(!file_names.empty(), "No file to be read!");
    const size_t thread_num = Attr<int>("thread_num");
175
    const size_t buffer_size = Attr<int>("buffer_size");
F
fengjiayi 已提交
176 177 178

    auto* out = scope.FindVar(Output("Out"))
                    ->template GetMutable<framework::ReaderHolder>();
179 180
    out->Reset(
        std::make_shared<MultiFileReader>(file_names, thread_num, buffer_size));
F
fengjiayi 已提交
181 182 183
  }
};

184
class OpenFilesOpMaker : public FileReaderMakerBase {
Y
Yu Yang 已提交
185 186
 protected:
  void Apply() override {
187 188 189
    AddAttr<std::vector<std::string>>("file_names", "Files to be read.");
    AddAttr<int>("thread_num", "The maximal concurrent prefetch thread number.")
        .GreaterThan(0);
190
    AddAttr<int>("buffer_size", "The size of prefetch buffer.").GreaterThan(0);
191

F
fengjiayi 已提交
192 193 194
    AddComment(R"DOC(
      OpenFiles Operator

Y
Yu Yang 已提交
195
      An OpenFilesOp creates a MultiFileReader, which is able to
F
fengjiayi 已提交
196 197 198 199 200 201 202 203 204 205 206 207
      read data multi-threaded from multiple files.
    )DOC");
  }
};

}  // namespace reader
}  // namespace operators
}  // namespace paddle

namespace reader = paddle::operators::reader;

REGISTER_FILE_READER_OPERATOR(open_files, reader::OpenFilesOp,
208
                              reader::OpenFilesOpMaker);