create_custom_reader_op.cc 8.4 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15
#include "paddle/fluid/framework/executor.h"
F
fengjiayi 已提交
16 17 18 19 20 21 22 23
#include "paddle/fluid/operators/reader/reader_op_registry.h"

namespace paddle {
namespace operators {
namespace reader {

class CustomReader : public framework::DecoratedReader {
 public:
F
fengjiayi 已提交
24 25
  CustomReader(const std::shared_ptr<ReaderBase>& reader,
               const framework::BlockDesc& sub_block,
F
fengjiayi 已提交
26 27 28
               const std::vector<std::string>& source_var_names,
               const std::vector<std::string>& sink_var_names)
      : DecoratedReader(reader),
F
fengjiayi 已提交
29 30
        program_(*sub_block.Program()),
        sub_block_id_(sub_block.ID()),
F
fengjiayi 已提交
31
        exe_(framework::Executor(platform::CPUPlace())),
F
fengjiayi 已提交
32
        source_var_names_(source_var_names),
F
fengjiayi 已提交
33
        sink_var_names_(sink_var_names) {}
F
fengjiayi 已提交
34

35
  void ReadNextImpl(std::vector<framework::LoDTensor>* out) override;
F
fengjiayi 已提交
36 37

 private:
F
fengjiayi 已提交
38 39
  const framework::ProgramDesc program_;
  int sub_block_id_;
F
fengjiayi 已提交
40
  framework::Executor exe_;
F
fengjiayi 已提交
41
  framework::Scope scope_;
F
fengjiayi 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55

  std::vector<std::string> source_var_names_;
  std::vector<std::string> sink_var_names_;
};

class CreateCustomReaderOp : public framework::OperatorBase {
 public:
  using framework::OperatorBase::OperatorBase;

 private:
  void RunImpl(const framework::Scope& scope,
               const platform::Place& dev_place) const override {
    auto* out = scope.FindVar(Output("Out"))
                    ->template GetMutable<framework::ReaderHolder>();
F
fengjiayi 已提交
56
    auto* sub_block = Attr<framework::BlockDesc*>("sub_block");
F
fengjiayi 已提交
57 58 59 60 61
    if (out->Get() != nullptr) {
      return;
    }
    const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
                                        ->Get<framework::ReaderHolder>();
62
    out->Reset(framework::MakeDecoratedReader<CustomReader>(
63 64
        underlying_reader,
        *sub_block,
65 66
        Attr<std::vector<std::string>>("source_var_names"),
        Attr<std::vector<std::string>>("sink_var_names")));
F
fengjiayi 已提交
67 68 69 70
  }
};

class CreateCustomReaderOpMaker : public DecoratedReaderMakerBase {
F
fengjiayi 已提交
71 72
 protected:
  void Apply() override {
F
fengjiayi 已提交
73 74 75 76 77 78 79 80 81 82 83 84
    AddAttr<framework::BlockDesc*>(
        "sub_block", "The block to hold all preprocessing operators.");
    AddAttr<std::vector<std::string>>(
        "source_var_names",
        "Source variables are starting points of data preprocessing. They hold "
        "preprocessing's input tensors. Each source variable corresponds to "
        "one of underlying reader's output datas.");
    AddAttr<std::vector<std::string>>(
        "sink_var_names",
        "Sink variables are ending points of data preprocessing. They hold "
        "preprocessing's output tensors. Each sink variable corresponds to "
        "one of custom reader's output datas.");
F
fengjiayi 已提交
85 86 87
    AddComment(R"DOC(
      CreateCustomReader Operator

M
minqiyang 已提交
88 89 90 91
      A custom reader can be used for input data preprocessing.
      A custom reader holds its own sub-block, which will be executed in CPU
      in its 'ReadNext()' function. Users can configurate their own
      preprocessing pipelines by inserting operators into custom reader's
F
fengjiayi 已提交
92
      sub-block.
F
fengjiayi 已提交
93 94 95 96
    )DOC");
  }
};

97 98 99
class CustomReaderInferShape : public framework::InferShapeBase {
 public:
  void operator()(framework::InferShapeContext* ctx) const override {
100
    PADDLE_ENFORCE_NE(
101 102
        ctx->IsRuntime(),
        true,
103 104 105
        platform::errors::PreconditionNotMet(
            "'CustomReaderInferShape' should only be invoked during "
            "compile time."));
106 107
    PADDLE_ENFORCE_EQ(ctx->HasOutput("Out"),
                      true,
108 109
                      platform::errors::NotFound(
                          "The output decorated reader should not be null."));
F
fengjiayi 已提交
110 111
    const auto* sub_block =
        ctx->Attrs().Get<framework::BlockDesc*>("sub_block");
112 113 114 115 116
    const auto sink_var_names =
        ctx->Attrs().Get<std::vector<std::string>>("sink_var_names");
    std::vector<std::vector<int64_t>> res_dims;
    std::vector<int32_t> res_lod_levels;
    for (const std::string& var_name : sink_var_names) {
F
fengjiayi 已提交
117
      auto* sink_var = sub_block->FindVar(var_name);
118
      PADDLE_ENFORCE_NOT_NULL(
119 120 121
          sink_var,
          platform::errors::NotFound(
              "The sink variable is not found in CustomReader."));
122 123 124 125
      res_dims.emplace_back(sink_var->GetShape());
      res_lod_levels.push_back(sink_var->GetLoDLevel());
    }
    auto* out_reader =
R
Ruibiao Chen 已提交
126
        PADDLE_GET(framework::VarDesc*, ctx->GetOutputVarPtrs("Out")[0]);
127 128 129 130 131 132 133
    out_reader->SetShapes(res_dims);
    out_reader->SetLoDLevels(res_lod_levels);
  }
};

class CustomReaderInferVarType : public framework::VarTypeInference {
 public:
M
minqiyang 已提交
134 135
  void operator()(framework::InferVarTypeContext* ctx) const override {
    auto& out_var_name = ctx->Output("Out")[0];
136 137
    PADDLE_ENFORCE_EQ(ctx->HasVar(out_var_name),
                      true,
138 139
                      platform::errors::NotFound(
                          "The output reader variable should not be null."));
M
minqiyang 已提交
140
    ctx->SetType(out_var_name, framework::proto::VarType::READER);
F
fengjiayi 已提交
141

R
Ruibiao Chen 已提交
142 143
    auto sink_var_names = PADDLE_GET_CONST(std::vector<std::string>,
                                           ctx->GetAttr("sink_var_names"));
F
fengjiayi 已提交
144
    const auto* sub_block =
R
Ruibiao Chen 已提交
145
        PADDLE_GET_CONST(framework::BlockDesc*, ctx->GetAttr("sub_block"));
146 147
    std::vector<framework::proto::VarType::Type> res_data_types;
    for (const std::string& var_name : sink_var_names) {
F
fengjiayi 已提交
148
      framework::VarDesc* var = sub_block->FindVar(var_name);
149
      PADDLE_ENFORCE_NOT_NULL(
150 151 152
          var,
          platform::errors::NotFound(
              "The sink variable is not found in CustomReader."));
153 154
      res_data_types.emplace_back(var->GetDataType());
    }
M
minqiyang 已提交
155
    ctx->SetDataTypes(out_var_name, res_data_types);
156 157 158
  }
};

159
void CustomReader::ReadNextImpl(std::vector<framework::LoDTensor>* out) {
160 161 162 163 164 165 166
  out->clear();
  std::vector<framework::LoDTensor> underlying_outs;
  reader_->ReadNext(&underlying_outs);
  if (underlying_outs.empty()) {
    // There is not next data.
    return;
  }
167
  PADDLE_ENFORCE_EQ(
168 169
      source_var_names_.size(),
      underlying_outs.size(),
170 171 172 173
      platform::errors::InvalidArgument(
          "The size of source_var_names(%d) and the size of "
          "underlying_outs(%d) are not consistent. Each feeding element "
          "must have its own source variable.",
174 175
          source_var_names_.size(),
          underlying_outs.size()));
F
fengjiayi 已提交
176 177 178
  // The scope for CustomReader's sub-block should be independent and shouldn't
  // be any other computation scope's child. Otherwise, data preprocessing and
  // compution cannot be concurrent.
F
fengjiayi 已提交
179
  framework::Scope* exe_scope = &scope_.NewScope();
180
  // 1. Copy LoDTensors from underlying reader's output to source variables.
F
fengjiayi 已提交
181
  for (size_t i = 0; i < source_var_names_.size(); ++i) {
F
fengjiayi 已提交
182
    framework::Variable* var = exe_scope->Var(source_var_names_[i]);
183 184 185
    framework::LoDTensor* tensor = var->GetMutable<framework::LoDTensor>();
    tensor->ShareDataWith(underlying_outs[i]);
    tensor->set_lod(underlying_outs[i].lod());
F
fengjiayi 已提交
186
  }
187
  // 2. Run the sub-block.
Z
Zeng Jinle 已提交
188
  exe_.Run(program_, exe_scope, sub_block_id_, false, true, {}, true);
189 190 191
  // 3. Copy LoDTensors from sink variables to out.
  out->resize(sink_var_names_.size());
  for (size_t i = 0; i < sink_var_names_.size(); ++i) {
192
    auto* var = exe_scope->FindVar(sink_var_names_[i]);
193 194 195 196
    PADDLE_ENFORCE_NOT_NULL(
        var,
        platform::errors::NotFound("The variable %s is not in current scope.",
                                   sink_var_names_[i]));
197
    const auto& tensor = var->Get<framework::LoDTensor>();
F
fengjiayi 已提交
198
    framework::TensorCopySync(tensor, platform::CPUPlace(), &(*out)[i]);
199
  }
F
fengjiayi 已提交
200
  scope_.DeleteScope(exe_scope);
F
fengjiayi 已提交
201 202 203 204 205
}

}  // namespace reader
}  // namespace operators
}  // namespace paddle
206 207

namespace ops = paddle::operators::reader;
H
hong 已提交
208
REGISTER_OPERATOR(
209 210 211 212
    create_custom_reader,
    ops::CreateCustomReaderOp,
    ops::CreateCustomReaderOpMaker,
    ops::CustomReaderInferShape,
H
hong 已提交
213 214 215
    ops::CustomReaderInferVarType,
    paddle::framework::EmptyGradOpMaker<paddle::framework::OpDesc>,
    paddle::framework::EmptyGradOpMaker<paddle::imperative::OpBase>)