create_double_buffer_reader_op.cc 6.1 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fengjiayi 已提交
15
#include <thread>
16
#include "paddle/fluid/framework/channel.h"
F
fengjiayi 已提交
17 18 19 20 21 22
#include "paddle/fluid/operators/reader/reader_op_registry.h"

namespace paddle {
namespace operators {
namespace reader {

J
JiayiFeng 已提交
23 24
static constexpr size_t kCacheSize = 2;
static constexpr size_t kChannelSize = 0;  // kCacheSize - 2
F
fengjiayi 已提交
25 26 27

class DoubleBufferReader : public framework::DecoratedReader {
 public:
Y
Yu Yang 已提交
28 29 30 31 32 33 34
  struct Item {
    Item() : ctx_(nullptr) {}

    std::vector<framework::LoDTensor> payloads_;
    platform::DeviceContext* ctx_;
  };

Y
Yu Yang 已提交
35 36 37
  explicit DoubleBufferReader(
      ReaderBase* reader, platform::Place target_place = platform::CPUPlace())
      : DecoratedReader(reader), place_(target_place) {
Y
Yu Yang 已提交
38
#ifdef PADDLE_WITH_CUDA
J
JiayiFeng 已提交
39
    for (size_t i = 0; i < kCacheSize; ++i) {
F
fengjiayi 已提交
40
      if (platform::is_gpu_place(place_)) {
Y
Yu Yang 已提交
41 42 43 44
        ctxs_.emplace_back(new platform::CUDADeviceContext(
            boost::get<platform::CUDAPlace>(place_)));
      }
    }
F
fengjiayi 已提交
45 46
#endif
    StartPrefetcher();
F
fengjiayi 已提交
47 48
  }

F
fengjiayi 已提交
49
  bool HasNext() const override;
F
fengjiayi 已提交
50
  void ReadNext(std::vector<framework::LoDTensor>* out) override;
51 52
  void ReInit() override;

F
fengjiayi 已提交
53
  void StartPrefetcher() {
J
JiayiFeng 已提交
54
    channel_ = framework::MakeChannel<Item>(kChannelSize);
F
fengjiayi 已提交
55 56 57 58
    prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
  }

  void EndPrefetcher() {
J
JiayiFeng 已提交
59 60
    channel_->Close();
    if (prefetcher_.joinable()) {
F
fengjiayi 已提交
61 62
      prefetcher_.join();
    }
J
JiayiFeng 已提交
63 64
    delete channel_;
    channel_ = nullptr;
F
fengjiayi 已提交
65
  }
F
fengjiayi 已提交
66

F
fengjiayi 已提交
67
  ~DoubleBufferReader() { EndPrefetcher(); }
Y
Yu Yang 已提交
68

F
fengjiayi 已提交
69 70 71
 private:
  void PrefetchThreadFunc();

F
fengjiayi 已提交
72
  std::thread prefetcher_;
J
JiayiFeng 已提交
73
  framework::Channel<Item>* channel_;
Y
Yu Yang 已提交
74
  platform::Place place_;
Y
Yu Yang 已提交
75
  std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
F
fengjiayi 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88
};

class CreateDoubleBufferReaderOp : public framework::OperatorBase {
 public:
  using framework::OperatorBase::OperatorBase;

 private:
  void RunImpl(const framework::Scope& scope,
               const platform::Place& dev_place) const override {
    const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
                                        ->Get<framework::ReaderHolder>();
    auto* out = scope.FindVar(Output("Out"))
                    ->template GetMutable<framework::ReaderHolder>();
Y
Yu Yang 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102

    auto place_str = Attr<std::string>("place");
    platform::Place place;
    if (place_str == "CPU") {
      place = platform::CPUPlace();
    } else {
      std::istringstream sin(place_str);
      sin.seekg(std::string("CUDA:").size(), std::ios::beg);
      size_t num;
      sin >> num;
      place = platform::CUDAPlace(static_cast<int>(num));
    }

    out->Reset(new DoubleBufferReader(underlying_reader.Get(), place));
F
fengjiayi 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116
  }
};

class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
 public:
  CreateDoubleBufferReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker)
      : DecoratedReaderMakerBase(op_proto, op_checker) {
    AddComment(R"DOC(
      CreateDoubleBufferReader Operator

      A double buffer reader takes another reader as its 'underlying reader'.
      It launches another thread to execute the 'underlying reader' asynchronously, 
      which prevents reading process from blocking subsequent training.
    )DOC");
Y
Yu Yang 已提交
117 118 119 120 121 122 123 124 125
    std::unordered_set<std::string> enum_range;
    constexpr size_t kMaxCUDADevs = 128;
    for (size_t i = 0; i < kMaxCUDADevs; ++i) {
      enum_range.insert(string::Sprintf("CUDA:%d", i));
    }
    enum_range.insert("CPU");
    AddAttr<std::string>("place", "The double buffer place, default is CPU")
        .SetDefault("CPU")
        .InEnum({enum_range});
F
fengjiayi 已提交
126 127 128
  }
};

F
fengjiayi 已提交
129
bool DoubleBufferReader::HasNext() const {
J
JiayiFeng 已提交
130
  while (!channel_->IsClosed() && !channel_->CanReceive()) {
F
fengjiayi 已提交
131
  }
J
JiayiFeng 已提交
132
  return channel_->CanReceive();
F
fengjiayi 已提交
133 134
}

F
fengjiayi 已提交
135
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
F
fengjiayi 已提交
136 137 138 139
  if (!HasNext()) {
    PADDLE_THROW("There is no next data!");
  }

F
fengjiayi 已提交
140
  Item batch;
J
JiayiFeng 已提交
141 142
  channel_->Receive(&batch);
  *out = batch.payloads_;
F
fengjiayi 已提交
143 144
  if (batch.ctx_) {
    batch.ctx_->Wait();
Y
Yu Yang 已提交
145
  }
F
fengjiayi 已提交
146 147
}

148 149
void DoubleBufferReader::ReInit() {
  reader_->ReInit();
F
fengjiayi 已提交
150 151
  EndPrefetcher();
  StartPrefetcher();
F
fengjiayi 已提交
152 153 154
}

void DoubleBufferReader::PrefetchThreadFunc() {
155
  VLOG(5) << "A new prefetch thread starts.";
F
fengjiayi 已提交
156 157 158
  std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache(kCacheSize);
  std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache(kCacheSize);
  size_t cached_tensor_id = 0;
159

Y
Yu Yang 已提交
160
  while (reader_->HasNext()) {
Y
Yu Yang 已提交
161
    Item batch;
F
fengjiayi 已提交
162 163
    auto& cpu_batch = cpu_tensor_cache[cached_tensor_id];
    reader_->ReadNext(&cpu_batch);
Y
Yu Yang 已提交
164
    if (platform::is_gpu_place(place_)) {
F
fengjiayi 已提交
165 166 167
      auto& gpu_batch = gpu_tensor_cache[cached_tensor_id];
      auto* gpu_ctx = ctxs_[cached_tensor_id].get();
      gpu_batch.resize(cpu_batch.size());
168 169
      for (size_t i = 0; i < cpu_batch.size(); ++i) {
        framework::TensorCopy(cpu_batch[i], place_, *gpu_ctx, &gpu_batch[i]);
J
JiayiFeng 已提交
170
        gpu_batch[i].set_lod(cpu_batch[i].lod());
Y
Yu Yang 已提交
171
      }
J
JiayiFeng 已提交
172
      batch.payloads_ = gpu_batch;
F
fengjiayi 已提交
173 174 175
      batch.ctx_ = gpu_ctx;
    } else {
      // CPUPlace
J
JiayiFeng 已提交
176
      batch.payloads_ = cpu_batch;
F
fengjiayi 已提交
177
    }
F
fengjiayi 已提交
178 179
    ++cached_tensor_id;
    cached_tensor_id %= kCacheSize;
Y
Yu Yang 已提交
180

181
    try {
J
JiayiFeng 已提交
182
      channel_->Send(&batch);
183
    } catch (paddle::platform::EnforceNotMet e) {
184
      VLOG(5) << "WARNING: The double buffer channel has been closed. The "
F
fengjiayi 已提交
185
                 "prefetch thread will terminate.";
186
      break;
F
fengjiayi 已提交
187 188
    }
  }
J
JiayiFeng 已提交
189
  channel_->Close();
F
fengjiayi 已提交
190
  VLOG(5) << "Prefetch thread terminates.";
F
fengjiayi 已提交
191 192 193 194 195 196 197 198 199 200
}

}  // namespace reader
}  // namespace operators
}  // namespace paddle

namespace ops = paddle::operators::reader;
REGISTER_DECORATED_READER_OPERATOR(create_double_buffer_reader,
                                   ops::CreateDoubleBufferReaderOp,
                                   ops::CreateDoubleBufferReaderOpMaker);