create_double_buffer_reader_op.cc 5.7 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fengjiayi 已提交
15
#include <thread>
16
#include "paddle/fluid/framework/channel.h"
F
fengjiayi 已提交
17 18 19 20 21 22
#include "paddle/fluid/operators/reader/reader_op_registry.h"

namespace paddle {
namespace operators {
namespace reader {

23
static constexpr size_t kDoubleBufferSize = 2;
F
fengjiayi 已提交
24 25 26

class DoubleBufferReader : public framework::DecoratedReader {
 public:
Y
Yu Yang 已提交
27 28 29 30 31 32 33
  struct Item {
    Item() : ctx_(nullptr) {}

    std::vector<framework::LoDTensor> payloads_;
    platform::DeviceContext* ctx_;
  };

Y
Yu Yang 已提交
34 35 36
  explicit DoubleBufferReader(
      ReaderBase* reader, platform::Place target_place = platform::CPUPlace())
      : DecoratedReader(reader), place_(target_place) {
Y
Yu Yang 已提交
37 38 39 40 41 42 43 44 45
    for (size_t i = 0; i < kDoubleBufferSize; ++i) {
      if (platform::is_gpu_place(place_)) {
#ifdef PADDLE_WITH_CUDA
        ctxs_.emplace_back(new platform::CUDADeviceContext(
            boost::get<platform::CUDAPlace>(place_)));
#endif
      }
    }

Y
Yu Yang 已提交
46 47 48 49
    start_thread();
  }

  void start_thread() {
Y
Yu Yang 已提交
50
    buffer_ = framework::MakeChannel<Item>(kDoubleBufferSize);
Y
Yu Yang 已提交
51
    std::thread prefetch([this] { PrefetchThreadFunc(); });
F
fengjiayi 已提交
52 53 54 55
    prefetch.detach();
  }

  void ReadNext(std::vector<framework::LoDTensor>* out) override;
56 57 58
  void ReInit() override;

  ~DoubleBufferReader() { buffer_->Close(); }
F
fengjiayi 已提交
59

Y
Yu Yang 已提交
60 61
  bool HasNext() const override;

F
fengjiayi 已提交
62 63 64
 private:
  void PrefetchThreadFunc();

Y
Yu Yang 已提交
65
  framework::Channel<Item>* buffer_;
Y
Yu Yang 已提交
66
  platform::Place place_;
Y
Yu Yang 已提交
67 68
  std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
  mutable Item local_buffer_;
F
fengjiayi 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81
};

class CreateDoubleBufferReaderOp : public framework::OperatorBase {
 public:
  using framework::OperatorBase::OperatorBase;

 private:
  void RunImpl(const framework::Scope& scope,
               const platform::Place& dev_place) const override {
    const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
                                        ->Get<framework::ReaderHolder>();
    auto* out = scope.FindVar(Output("Out"))
                    ->template GetMutable<framework::ReaderHolder>();
Y
Yu Yang 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95

    auto place_str = Attr<std::string>("place");
    platform::Place place;
    if (place_str == "CPU") {
      place = platform::CPUPlace();
    } else {
      std::istringstream sin(place_str);
      sin.seekg(std::string("CUDA:").size(), std::ios::beg);
      size_t num;
      sin >> num;
      place = platform::CUDAPlace(static_cast<int>(num));
    }

    out->Reset(new DoubleBufferReader(underlying_reader.Get(), place));
F
fengjiayi 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109
  }
};

class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
 public:
  CreateDoubleBufferReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker)
      : DecoratedReaderMakerBase(op_proto, op_checker) {
    AddComment(R"DOC(
      CreateDoubleBufferReader Operator

      A double buffer reader takes another reader as its 'underlying reader'.
      It launches another thread to execute the 'underlying reader' asynchronously, 
      which prevents reading process from blocking subsequent training.
    )DOC");
Y
Yu Yang 已提交
110 111 112 113 114 115 116 117 118
    std::unordered_set<std::string> enum_range;
    constexpr size_t kMaxCUDADevs = 128;
    for (size_t i = 0; i < kMaxCUDADevs; ++i) {
      enum_range.insert(string::Sprintf("CUDA:%d", i));
    }
    enum_range.insert("CPU");
    AddAttr<std::string>("place", "The double buffer place, default is CPU")
        .SetDefault("CPU")
        .InEnum({enum_range});
F
fengjiayi 已提交
119 120 121 122
  }
};

void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
Y
Yu Yang 已提交
123 124 125 126 127 128 129 130
  if (local_buffer_.payloads_.empty()) {
    buffer_->Receive(&local_buffer_);
  }

  *out = local_buffer_.payloads_;
  local_buffer_.payloads_.clear();
  if (local_buffer_.ctx_) {
    local_buffer_.ctx_->Wait();
Y
Yu Yang 已提交
131
  }
F
fengjiayi 已提交
132 133
}

134 135 136
void DoubleBufferReader::ReInit() {
  reader_->ReInit();
  buffer_->Close();
Y
Yu Yang 已提交
137
  start_thread();
F
fengjiayi 已提交
138 139 140
}

void DoubleBufferReader::PrefetchThreadFunc() {
141
  VLOG(5) << "A new prefetch thread starts.";
Y
Yu Yang 已提交
142
  size_t gpu_ctx_offset = 0;
Y
Yu Yang 已提交
143
  while (reader_->HasNext()) {
Y
Yu Yang 已提交
144 145
    Item batch;
    reader_->ReadNext(&batch.payloads_);
Y
Yu Yang 已提交
146 147
    if (platform::is_gpu_place(place_)) {
      std::vector<framework::LoDTensor> gpu_batch;
Y
Yu Yang 已提交
148 149 150 151 152 153 154
      auto& gpu_ctx = this->ctxs_[gpu_ctx_offset++];
      gpu_ctx_offset %= this->ctxs_.size();
      gpu_batch.resize(batch.payloads_.size());
      for (size_t i = 0; i < batch.payloads_.size(); ++i) {
        framework::TensorCopy(batch.payloads_[i], place_, *gpu_ctx,
                              &gpu_batch[i]);
        gpu_batch[i].set_lod(batch.payloads_[i].lod());
Y
Yu Yang 已提交
155
      }
Y
Yu Yang 已提交
156 157
      batch.ctx_ = gpu_ctx.get();
      std::swap(gpu_batch, batch.payloads_);
F
fengjiayi 已提交
158
    }
Y
Yu Yang 已提交
159

160 161 162 163
    if (!buffer_->Send(&batch)) {
      VLOG(5) << "WARNING: The double buffer channel has been closed. The "
                 "prefetch thread terminates.";
      break;
F
fengjiayi 已提交
164 165
    }
  }
Y
Yu Yang 已提交
166
  buffer_->Close();
F
fengjiayi 已提交
167 168
}

Y
Yu Yang 已提交
169
bool DoubleBufferReader::HasNext() const {
Y
Yu Yang 已提交
170
  if (local_buffer_.payloads_.empty()) {
Y
Yu Yang 已提交
171 172 173 174 175 176
    bool ok = buffer_->Receive(&local_buffer_);
    return ok;
  } else {
    return true;
  }
}
Y
Yu Yang 已提交
177

F
fengjiayi 已提交
178 179 180 181 182 183 184 185
}  // namespace reader
}  // namespace operators
}  // namespace paddle

namespace ops = paddle::operators::reader;
REGISTER_DECORATED_READER_OPERATOR(create_double_buffer_reader,
                                   ops::CreateDoubleBufferReaderOp,
                                   ops::CreateDoubleBufferReaderOpMaker);