garbage_collector.h 5.1 KB
Newer Older
S
sneaxiy 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <algorithm>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>  // NOLINT
S
fix bug  
sneaxiy 已提交
22 23 24
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h"
#endif
S
sneaxiy 已提交
25 26 27 28 29 30 31 32 33 34
#include "paddle/fluid/platform/device_context.h"

namespace paddle {
namespace framework {

// T should have memory_size() and clear() method
template <typename T>
class GarbageCollector {
 public:
  GarbageCollector(const platform::Place &place, size_t max_memory_size)
P
peizhilin 已提交
35
      : max_memory_size_((std::max)(max_memory_size, static_cast<size_t>(1))) {
S
sneaxiy 已提交
36 37 38 39 40 41
    garbages_.reset(new std::deque<T *>());
    dev_ctx_ = platform::DeviceContextPool::Instance().Get(place);
  }

  virtual ~GarbageCollector() {}

S
fix bug  
sneaxiy 已提交
42 43 44 45 46
  size_t NumOfGarbages() const {
    std::lock_guard<std::mutex> guard(mutex_);
    return garbages_->size();
  }

S
sneaxiy 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59
  void Reset() {
    std::lock_guard<std::mutex> guard(mutex_);
    garbages_.reset(new std::deque<T *>());
    cur_memory_size_ = 0;
  }

  template <typename Container>
  void Add(const Container &objs) {
    Add(objs, []() {});
  }

  template <typename Container, typename Callback>
  void Add(const Container &objs, Callback &&callback) {
S
fix bug  
sneaxiy 已提交
60
    std::deque<T *> *clear_deque = nullptr;
S
sneaxiy 已提交
61 62 63 64 65 66 67 68
    {
      std::lock_guard<std::mutex> guard(mutex_);
      for (auto *obj : objs) {
        garbages_->push_back(obj);
        cur_memory_size_ += obj->memory_size();
      }
      if (cur_memory_size_ >= max_memory_size_) {
        cur_memory_size_ = 0;
S
fix bug  
sneaxiy 已提交
69
        clear_deque = garbages_.release();
S
sneaxiy 已提交
70 71 72 73 74 75
        garbages_.reset(new std::deque<T *>());
      }
    }

    if (clear_deque != nullptr) {
      callback();
S
sneaxiy 已提交
76
      ClearCallback([clear_deque]() {
S
sneaxiy 已提交
77
        for (auto *obj : *clear_deque) obj->clear();
S
fix bug  
sneaxiy 已提交
78
        delete clear_deque;
S
sneaxiy 已提交
79 80 81 82 83 84 85 86 87 88
      });
    }
  }

  virtual void Wait() const {}

 protected:
  virtual void ClearCallback(const std::function<void()> &callback) = 0;

  platform::DeviceContext *dev_ctx_;
S
fix bug  
sneaxiy 已提交
89
  std::unique_ptr<std::deque<T *>> garbages_;
S
sneaxiy 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
  mutable std::mutex mutex_;
  const size_t max_memory_size_;
  size_t cur_memory_size_ = 0;
};

template <typename T>
class CPUGarbageCollector : public GarbageCollector<T> {
 public:
  CPUGarbageCollector(const platform::CPUPlace &place, size_t max_memory_size)
      : GarbageCollector<T>(place, max_memory_size) {}

 protected:
  void ClearCallback(const std::function<void()> &callback) override {
    callback();
  }
};

#ifdef PADDLE_WITH_CUDA
S
fix bug  
sneaxiy 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120
template <typename T>
class UnsafeFastGPUGarbageCollector : public GarbageCollector<T> {
 public:
  UnsafeFastGPUGarbageCollector(const platform::CUDAPlace &place,
                                size_t max_memory_size)
      : GarbageCollector<T>(place, max_memory_size) {}

 protected:
  void ClearCallback(const std::function<void()> &callback) override {
    callback();
  }
};

S
sneaxiy 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133
template <typename T>
class DefaultStreamGarbageCollector : public GarbageCollector<T> {
 public:
  DefaultStreamGarbageCollector(const platform::CUDAPlace &place,
                                size_t max_memory_size)
      : GarbageCollector<T>(place, max_memory_size) {}

  cudaStream_t stream() const {
    return static_cast<const platform::CUDADeviceContext *>(this->dev_ctx_)
        ->stream();
  }

  void Wait() const override {
S
fix bug  
sneaxiy 已提交
134
    static_cast<platform::CUDADeviceContext *>(this->dev_ctx_)
S
sneaxiy 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        ->WaitStreamCallback();
  }

 protected:
  void ClearCallback(const std::function<void()> &callback) override {
    static_cast<platform::CUDADeviceContext *>(this->dev_ctx_)
        ->AddStreamCallback(callback);
  }
};

template <typename T>
class StreamGarbageCollector : public GarbageCollector<T> {
 public:
  StreamGarbageCollector(const platform::CUDAPlace &place,
                         size_t max_memory_size)
      : GarbageCollector<T>(place, max_memory_size) {
S
fix bug  
sneaxiy 已提交
151
    platform::CUDADeviceGuard guard(place.device);
S
sneaxiy 已提交
152 153 154 155 156 157
    PADDLE_ENFORCE(cudaStreamCreate(&stream_));
    callback_manager_.reset(new platform::StreamCallbackManager(stream_));
  }

  ~StreamGarbageCollector() {
    auto place = boost::get<platform::CUDAPlace>(this->dev_ctx_->GetPlace());
S
fix bug  
sneaxiy 已提交
158
    platform::CUDADeviceGuard guard(place.device);
S
sneaxiy 已提交
159 160 161 162
    PADDLE_ENFORCE(cudaStreamSynchronize(stream_));
    PADDLE_ENFORCE(cudaStreamDestroy(stream_));
  }

S
fix bug  
sneaxiy 已提交
163
  void Wait() const override { callback_manager_->Wait(); }
S
sneaxiy 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179

  cudaStream_t stream() const { return stream_; }

 protected:
  void ClearCallback(const std::function<void()> &callback) override {
    callback_manager_->AddCallback(callback);
  }

 private:
  cudaStream_t stream_;
  std::unique_ptr<platform::StreamCallbackManager> callback_manager_;
};
#endif

}  // namespace framework
}  // namespace paddle