buffered_channel.h 3.9 KB
Newer Older
1
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2 3 4 5 6 7 8 9 10 11 12 13 14 15

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
16
#include <atomic>
17 18 19 20
#include <condition_variable>
#include <deque>
#include <mutex>

Y
Yi Wang 已提交
21 22
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/platform/enforce.h"
23 24 25 26 27

namespace paddle {
namespace framework {
namespace details {

C
chengduoZH 已提交
28 29
// Four of the properties of Buffered Channel:
// - A send to a full channel blocks temporarily until a receive from the
30
// channel or the channel is closed.
C
chengduoZH 已提交
31
// - A receive from an empty channel blocks temporarily until a send to the
32 33 34 35
// channel or the channel is closed.
// - A send to a closed channel returns false immediately.
// - A receive from a closed channel returns false immediately.

36 37 38 39 40 41
template <typename T>
class Buffered : public paddle::framework::Channel<T> {
  friend Channel<T>* paddle::framework::MakeChannel<T>(size_t);
  friend void paddle::framework::CloseChannel<T>(Channel<T>*);

 public:
C
chengduo 已提交
42 43
  virtual bool Send(T*);
  virtual bool Receive(T*);
44
  virtual size_t Cap() { return cap_; }
C
chengduo 已提交
45 46
  virtual void Close();
  virtual ~Buffered();
47 48 49 50 51 52

 private:
  size_t cap_;
  std::mutex mu_;
  std::condition_variable empty_cond_var_;
  std::condition_variable full_cond_var_;
53
  std::condition_variable destructor_cond_var_;
54
  std::deque<T> channel_;
55
  std::atomic<bool> closed_{false};
56 57
  std::atomic<unsigned> send_ctr{0};
  std::atomic<unsigned> recv_ctr{0};
58

C
chengduo 已提交
59 60 61
  Buffered(size_t cap) : cap_(cap), closed_(false) {
    PADDLE_ENFORCE_GT(cap, 0);
  }
62

63
  void NotifyAllParticipants(std::unique_lock<std::mutex>*);
64 65 66
};

template <typename T>
C
chengduo 已提交
67
bool Buffered<T>::Send(T* item) {
68 69 70 71
  bool ret = false;
  if (closed_) {
    return ret;
  }
72
  send_ctr++;
73
  std::unique_lock<std::mutex> lock(mu_);
C
chengduo 已提交
74 75 76 77 78 79
  full_cond_var_.wait(lock,
                      [this]() { return channel_.size() < cap_ || closed_; });
  if (!closed_) {
    channel_.push_back(std::move(*item));
    lock.unlock();
    empty_cond_var_.notify_one();
C
chengduo 已提交
80
    ret = true;
C
chengduo 已提交
81
  }
82 83
  send_ctr--;
  destructor_cond_var_.notify_one();
C
chengduo 已提交
84
  return ret;
85 86 87
}

template <typename T>
C
chengduo 已提交
88
bool Buffered<T>::Receive(T* item) {
89 90 91 92 93 94 95
  bool ret = false;
  // Once the channel has been closed and all data has been consumed,
  // just return false. Don't even try acquiring the mutex.
  if (closed_ && channel_.empty()) {
    return false;
  }
  recv_ctr++;
96
  std::unique_lock<std::mutex> lock(mu_);
C
chengduo 已提交
97
  empty_cond_var_.wait(lock, [this]() { return !channel_.empty() || closed_; });
C
chengduoZH 已提交
98
  if (!channel_.empty()) {
C
chengduo 已提交
99 100
    *item = std::move(channel_.front());
    channel_.pop_front();
C
chengduo 已提交
101 102
    full_cond_var_.notify_one();
    ret = true;
C
chengduo 已提交
103
  }
104 105
  recv_ctr--;
  destructor_cond_var_.notify_one();
C
chengduo 已提交
106
  return ret;
C
chengduo 已提交
107 108 109 110
}

template <typename T>
void Buffered<T>::Close() {
111 112 113
  if (closed_) {
    return;
  }
C
chengduo 已提交
114 115
  std::unique_lock<std::mutex> lock(mu_);
  closed_ = true;
116
  NotifyAllParticipants(&lock);
117 118 119 120 121
}

template <typename T>
Buffered<T>::~Buffered() {
  std::unique_lock<std::mutex> lock(mu_);
C
chengduo 已提交
122
  closed_ = true;
123
  channel_.clear();
124
  NotifyAllParticipants(&lock);
125 126 127 128 129 130

  // The destructor must wait for all readers and writers to complete their task
  // The channel has been closed, so we will not accept new readers and writers
  lock.lock();
  destructor_cond_var_.wait(
      lock, [this]() { return send_ctr == 0 && recv_ctr == 0; });
131 132
}

133 134 135 136 137 138 139
template <typename T>
void Buffered<T>::NotifyAllParticipants(std::unique_lock<std::mutex>* lock) {
  lock->unlock();
  full_cond_var_.notify_all();
  empty_cond_var_.notify_all();
}

140 141 142
}  // namespace details
}  // namespace framework
}  // namespace paddle