reader.h 3.7 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

F
fengjiayi 已提交
17
#include <memory>
Y
yuyang18 已提交
18
#include <unordered_set>
F
fengjiayi 已提交
19 20
#include <vector>

Y
Yi Wang 已提交
21 22
#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
23 24
#include "paddle/fluid/platform/place.h"

F
fengjiayi 已提交
25 26 27
namespace paddle {
namespace framework {

28 29
enum ReaderStatus { kRunning, kStopped };

F
fengjiayi 已提交
30
class ReaderBase {
F
fengjiayi 已提交
31
 public:
32 33 34
  void ReadNext(std::vector<LoDTensor>* out);

  void Shutdown();
F
fengjiayi 已提交
35

36
  void Start();
F
fengjiayi 已提交
37

Y
yuyang18 已提交
38 39 40 41
  // Return the readers which are the end of decorating chain. Basically
  // they are readers just before read op.
  std::unordered_set<ReaderBase*> GetEndPoints();

Y
Yu Yang 已提交
42
  virtual ~ReaderBase();
43 44 45 46

 protected:
  virtual void ReadNextImpl(std::vector<LoDTensor>* out) = 0;

Y
yuyang18 已提交
47
  virtual void ShutdownImpl() {}
48

Y
yuyang18 已提交
49
  virtual void StartImpl() {}
50

Y
yuyang18 已提交
51
  ReaderStatus status_{kRunning};
52 53

  mutable std::mutex mu_;
54

Y
yuyang18 已提交
55 56 57 58
 private:
  friend class DecoratedReader;
  // These methods can be only invoked inside DecoratedReader to record the
  // decorating chain.
59 60
  void InsertDecoratedReader(
      const std::shared_ptr<ReaderBase>& decorated_reader);
Y
yuyang18 已提交
61
  // A set of which readers that decorated this reader.
62
  std::vector<std::weak_ptr<ReaderBase>> decorated_readers_;
F
fengjiayi 已提交
63 64
};

65 66
class DecoratedReader : public ReaderBase,
                        public std::enable_shared_from_this<DecoratedReader> {
F
fengjiayi 已提交
67
 public:
F
fengjiayi 已提交
68 69
  explicit DecoratedReader(const std::shared_ptr<ReaderBase>& reader)
      : ReaderBase(), reader_(reader) {
F
fengjiayi 已提交
70 71 72
    PADDLE_ENFORCE_NOT_NULL(reader_);
  }

73 74 75 76
  void RegisterDecorateChain() {
    reader_->InsertDecoratedReader(shared_from_this());
  }

F
fengjiayi 已提交
77
 protected:
78 79 80 81
  void ShutdownImpl() override { reader_->Shutdown(); }

  void StartImpl() override { reader_->Start(); }

F
fengjiayi 已提交
82
  std::shared_ptr<ReaderBase> reader_;
F
fengjiayi 已提交
83 84
};

Y
yuyang18 已提交
85 86
// FileReader is just a conceptual class.
class FileReader : public ReaderBase {};
87

88 89
// The ReaderHolder is used as reader' unified wrapper,
// making it easier to access different type reader in Variables.
F
fengjiayi 已提交
90 91
class ReaderHolder {
 public:
92 93 94 95 96 97
  template <typename T>
  void Reset(const std::shared_ptr<T>& reader) {
    auto reader_base = std::dynamic_pointer_cast<ReaderBase>(reader);
    PADDLE_ENFORCE_NOT_NULL(reader_base);
    reader_ = reader_base;
  }
F
fengjiayi 已提交
98

99
  const std::shared_ptr<ReaderBase>& Get() const { return reader_; }
F
fengjiayi 已提交
100

101 102 103 104
  void ReadNext(std::vector<LoDTensor>* out) {
    PADDLE_ENFORCE_NOT_NULL(reader_);
    reader_->ReadNext(out);
  }
105 106

  void ResetAll() {
F
fengjiayi 已提交
107 108 109 110 111 112 113
    auto end_readers = reader_->GetEndPoints();
    for (auto* reader : end_readers) {
      reader->Shutdown();
    }
    for (auto* reader : end_readers) {
      reader->Start();
    }
114 115 116 117 118 119 120 121
  }

  void Shutdown() {
    PADDLE_ENFORCE_NOT_NULL(reader_);
    reader_->Shutdown();
  }

  void Start() {
122
    PADDLE_ENFORCE_NOT_NULL(reader_);
123
    reader_->Start();
124
  }
F
fengjiayi 已提交
125

126 127
  operator const std::shared_ptr<ReaderBase>&() const { return this->reader_; }

F
fengjiayi 已提交
128
 private:
F
fengjiayi 已提交
129
  std::shared_ptr<ReaderBase> reader_;
F
fengjiayi 已提交
130 131
};

132 133 134 135 136 137 138
template <typename T, typename... ARGS>
inline std::shared_ptr<DecoratedReader> MakeDecoratedReader(ARGS&&... args) {
  std::shared_ptr<DecoratedReader> reader(new T(std::forward<ARGS>(args)...));
  reader->RegisterDecorateChain();
  return reader;
}

F
fengjiayi 已提交
139 140
}  // namespace framework
}  // namespace paddle