create_custom_reader_op.cc 7.2 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15
#include "paddle/fluid/framework/executor.h"
F
fengjiayi 已提交
16 17 18 19 20 21 22 23
#include "paddle/fluid/operators/reader/reader_op_registry.h"

namespace paddle {
namespace operators {
namespace reader {

class CustomReader : public framework::DecoratedReader {
 public:
F
fengjiayi 已提交
24
  CustomReader(ReaderBase* reader, const framework::BlockDesc* sub_block,
F
fengjiayi 已提交
25
               const platform::Place& dev_place,
F
fengjiayi 已提交
26 27 28 29
               const std::vector<std::string>& source_var_names,
               const std::vector<std::string>& sink_var_names)
      : DecoratedReader(reader),
        sub_block_(sub_block),
F
fengjiayi 已提交
30
        exe_(framework::Executor(dev_place)),
F
fengjiayi 已提交
31 32 33 34 35
        source_var_names_(source_var_names),
        sink_var_names_(sink_var_names) {}

  void ReadNext(std::vector<framework::LoDTensor>* out) override;

F
fengjiayi 已提交
36 37 38 39 40
  void UpdateBlockAndScope(const framework::BlockDesc* sub_block,
                           const framework::Scope* scope) {
    sub_block_ = sub_block;
  }

F
fengjiayi 已提交
41
 private:
F
fengjiayi 已提交
42
  const framework::BlockDesc* sub_block_;
F
fengjiayi 已提交
43
  framework::Executor exe_;
F
fengjiayi 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57

  std::vector<std::string> source_var_names_;
  std::vector<std::string> sink_var_names_;
};

class CreateCustomReaderOp : public framework::OperatorBase {
 public:
  using framework::OperatorBase::OperatorBase;

 private:
  void RunImpl(const framework::Scope& scope,
               const platform::Place& dev_place) const override {
    auto* out = scope.FindVar(Output("Out"))
                    ->template GetMutable<framework::ReaderHolder>();
F
fengjiayi 已提交
58
    auto* sub_block = Attr<framework::BlockDesc*>("sub_block");
F
fengjiayi 已提交
59
    if (out->Get() != nullptr) {
F
fengjiayi 已提交
60 61
      auto* custom_reader = reinterpret_cast<CustomReader*>(out->Get());
      custom_reader->UpdateBlockAndScope(sub_block, &scope);
F
fengjiayi 已提交
62 63 64 65
      return;
    }
    const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
                                        ->Get<framework::ReaderHolder>();
F
fengjiayi 已提交
66
    out->Reset(
F
fengjiayi 已提交
67
        new CustomReader(underlying_reader.Get(), sub_block, dev_place,
F
fengjiayi 已提交
68 69
                         Attr<std::vector<std::string>>("source_var_names"),
                         Attr<std::vector<std::string>>("sink_var_names")));
F
fengjiayi 已提交
70 71 72 73
  }
};

class CreateCustomReaderOpMaker : public DecoratedReaderMakerBase {
F
fengjiayi 已提交
74 75
 protected:
  void Apply() override {
F
fengjiayi 已提交
76 77 78 79 80 81 82 83 84 85
    AddAttr<framework::BlockDesc*>("sub_block", "");
    AddAttr<std::vector<std::string>>("source_var_names", "");
    AddAttr<std::vector<std::string>>("sink_var_names", "");
    AddComment(R"DOC(
      CreateCustomReader Operator

    )DOC");
  }
};

86 87 88 89 90 91 92 93
class CustomReaderInferShape : public framework::InferShapeBase {
 public:
  void operator()(framework::InferShapeContext* ctx) const override {
    PADDLE_ENFORCE(!ctx->IsRuntime(),
                   "'CustomReaderInferShape' should only be invoked during "
                   "compile time.");
    PADDLE_ENFORCE(ctx->HasOutput("Out"),
                   "The output decorated reader should not be null.");
F
fengjiayi 已提交
94 95
    const auto* sub_block =
        ctx->Attrs().Get<framework::BlockDesc*>("sub_block");
96 97 98 99 100
    const auto sink_var_names =
        ctx->Attrs().Get<std::vector<std::string>>("sink_var_names");
    std::vector<std::vector<int64_t>> res_dims;
    std::vector<int32_t> res_lod_levels;
    for (const std::string& var_name : sink_var_names) {
F
fengjiayi 已提交
101
      auto* sink_var = sub_block->FindVar(var_name);
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
      PADDLE_ENFORCE_NOT_NULL(sink_var);
      res_dims.emplace_back(sink_var->GetShape());
      res_lod_levels.push_back(sink_var->GetLoDLevel());
    }
    auto* out_reader =
        boost::get<framework::VarDesc*>(ctx->GetOutputVarPtrs("Out")[0]);
    out_reader->SetShapes(res_dims);
    out_reader->SetLoDLevels(res_lod_levels);
  }
};

class CustomReaderInferVarType : public framework::VarTypeInference {
 public:
  void operator()(const framework::OpDesc& op_desc,
                  framework::BlockDesc* block) const override {
    framework::VarDesc* out_reader = block->FindVar(op_desc.Output("Out")[0]);
    PADDLE_ENFORCE_NOT_NULL(out_reader);
    out_reader->SetType(framework::proto::VarType::READER);
F
fengjiayi 已提交
120

121 122
    auto sink_var_names =
        boost::get<std::vector<std::string>>(op_desc.GetAttr("sink_var_names"));
F
fengjiayi 已提交
123 124
    const auto* sub_block =
        boost::get<framework::BlockDesc*>(op_desc.GetAttr("sub_block"));
125 126
    std::vector<framework::proto::VarType::Type> res_data_types;
    for (const std::string& var_name : sink_var_names) {
F
fengjiayi 已提交
127
      framework::VarDesc* var = sub_block->FindVar(var_name);
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
      PADDLE_ENFORCE_NOT_NULL(var);
      res_data_types.emplace_back(var->GetDataType());
    }
    out_reader->SetDataTypes(res_data_types);
  }
};

void CustomReader::ReadNext(std::vector<framework::LoDTensor>* out) {
  out->clear();
  std::vector<framework::LoDTensor> underlying_outs;
  reader_->ReadNext(&underlying_outs);
  if (underlying_outs.empty()) {
    // There is not next data.
    return;
  }
  PADDLE_ENFORCE(
      source_var_names_.size() == underlying_outs.size() &&
          sink_var_names_.size() == underlying_outs.size(),
      "The size of source_var_names(%d), the size of sink_var_names(%d) and "
      "the size of underlying_outs(%d) are not consistent. Each feeding "
      "element must have its own source and sink variable.",
      source_var_names_.size(), sink_var_names_.size(), underlying_outs.size());
F
fengjiayi 已提交
150 151 152 153
  // The scope for CustomReader's sub-block should be independent and shouldn't
  // be any other computation scope's child. Otherwise, data preprocessing and
  // compution cannot be concurrent.
  auto* scope = new framework::Scope();
154
  // 1. Copy LoDTensors from underlying reader's output to source variables.
F
fengjiayi 已提交
155
  for (size_t i = 0; i < source_var_names_.size(); ++i) {
F
fengjiayi 已提交
156
    framework::Variable* var = scope->Var(source_var_names_[i]);
157 158 159
    framework::LoDTensor* tensor = var->GetMutable<framework::LoDTensor>();
    tensor->ShareDataWith(underlying_outs[i]);
    tensor->set_lod(underlying_outs[i].lod());
F
fengjiayi 已提交
160
  }
161
  // 2. Run the sub-block.
F
fengjiayi 已提交
162
  framework::ProgramDesc* program = sub_block_->Program();
F
fengjiayi 已提交
163
  exe_.Run(*program, scope, sub_block_->ID(), false, true);
164 165 166
  // 3. Copy LoDTensors from sink variables to out.
  out->resize(sink_var_names_.size());
  for (size_t i = 0; i < sink_var_names_.size(); ++i) {
F
fengjiayi 已提交
167
    framework::Variable* var = scope->FindVar(sink_var_names_[i]);
F
fengjiayi 已提交
168
    PADDLE_ENFORCE_NOT_NULL(var);
169
    const framework::LoDTensor& tensor = var->Get<framework::LoDTensor>();
F
fengjiayi 已提交
170
    framework::TensorCopySync(tensor, platform::CPUPlace(), &(*out)[i]);
171
  }
F
fengjiayi 已提交
172
  delete scope;
F
fengjiayi 已提交
173 174 175 176 177
}

}  // namespace reader
}  // namespace operators
}  // namespace paddle
178 179 180 181 182 183

namespace ops = paddle::operators::reader;
REGISTER_OPERATOR(create_custom_reader, ops::CreateCustomReaderOp,
                  ops::CreateCustomReaderOpMaker, ops::CustomReaderInferShape,
                  ops::CustomReaderInferVarType,
                  paddle::framework::EmptyGradOpMaker)