create_double_buffer_reader_op.cc 3.4 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Y
yuyang18 已提交
15
#include "paddle/fluid/operators/reader/buffered_reader.h"
F
fengjiayi 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29
#include "paddle/fluid/operators/reader/reader_op_registry.h"

namespace paddle {
namespace operators {
namespace reader {
class CreateDoubleBufferReaderOp : public framework::OperatorBase {
 public:
  using framework::OperatorBase::OperatorBase;

 private:
  void RunImpl(const framework::Scope& scope,
               const platform::Place& dev_place) const override {
    auto* out = scope.FindVar(Output("Out"))
                    ->template GetMutable<framework::ReaderHolder>();
F
fengjiayi 已提交
30 31
    const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
                                        ->Get<framework::ReaderHolder>();
Y
Yu Yang 已提交
32

33 34 35 36 37 38 39 40 41 42 43
    if (out->Get() != nullptr) {
      auto* decorated_reader =
          dynamic_cast<framework::DecoratedReader*>(out->Get().get());
      PADDLE_ENFORCE_NOT_NULL(
          decorated_reader,
          platform::errors::NotFound("Not inited with DecoratedReader"));
      if (decorated_reader->UnderlyingReader() == underlying_reader.Get()) {
        return;
      }
    }

Y
Yu Yang 已提交
44 45
    auto place_str = Attr<std::string>("place");
    platform::Place place;
46 47 48
    if (place_str == "AUTO") {
      place = dev_place;
    } else if (place_str == "CPU") {
Y
Yu Yang 已提交
49 50 51 52 53 54 55 56 57
      place = platform::CPUPlace();
    } else {
      std::istringstream sin(place_str);
      sin.seekg(std::string("CUDA:").size(), std::ios::beg);
      size_t num;
      sin >> num;
      place = platform::CUDAPlace(static_cast<int>(num));
    }

58 59
    VLOG(10) << "Create new double buffer reader on " << place;

60
    out->Clear();
Y
yuyang18 已提交
61 62
    out->Reset(framework::MakeDecoratedReader<BufferedReader>(underlying_reader,
                                                              place, 2));
F
fengjiayi 已提交
63 64 65 66
  }
};

class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
Y
Yu Yang 已提交
67 68
 protected:
  void Apply() override {
F
fengjiayi 已提交
69 70 71 72
    AddComment(R"DOC(
      CreateDoubleBufferReader Operator

      A double buffer reader takes another reader as its 'underlying reader'.
Y
Yu Yang 已提交
73
      It launches another thread to execute the 'underlying reader' asynchronously,
F
fengjiayi 已提交
74 75
      which prevents reading process from blocking subsequent training.
    )DOC");
Y
Yu Yang 已提交
76 77 78 79 80 81
    std::unordered_set<std::string> enum_range;
    constexpr size_t kMaxCUDADevs = 128;
    for (size_t i = 0; i < kMaxCUDADevs; ++i) {
      enum_range.insert(string::Sprintf("CUDA:%d", i));
    }
    enum_range.insert("CPU");
82 83 84
    enum_range.insert("AUTO");
    AddAttr<std::string>("place", "The double buffer place")
        .SetDefault("AUTO")
Y
Yu Yang 已提交
85
        .InEnum({enum_range});
F
fengjiayi 已提交
86 87 88 89 90 91 92 93 94 95 96
  }
};

}  // namespace reader
}  // namespace operators
}  // namespace paddle

namespace ops = paddle::operators::reader;
REGISTER_DECORATED_READER_OPERATOR(create_double_buffer_reader,
                                   ops::CreateDoubleBufferReaderOp,
                                   ops::CreateDoubleBufferReaderOpMaker);