OpenPose  1.0.0rc2
OpenPose: A Real-Time Multi-Person Key-Point Detection And Multi-Threading C++ Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
queueBase.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_QUEUE_BASE_HPP
2 #define OPENPOSE_THREAD_QUEUE_BASE_HPP
3 
4 #include <condition_variable>
5 #include <mutex>
6 #include <queue> // std::queue & std::priority_queue
8 
9 namespace op
10 {
11  template<typename TDatums, typename TQueue>
12  class QueueBase
13  {
14  public:
15  explicit QueueBase(const long long maxSize = -1);
16 
17  virtual ~QueueBase();
18 
19  bool forceEmplace(TDatums& tDatums);
20 
21  bool tryEmplace(TDatums& tDatums);
22 
23  bool waitAndEmplace(TDatums& tDatums);
24 
25  bool forcePush(const TDatums& tDatums);
26 
27  bool tryPush(const TDatums& tDatums);
28 
29  bool waitAndPush(const TDatums& tDatums);
30 
31  bool tryPop(TDatums& tDatums);
32 
33  bool tryPop();
34 
35  bool waitAndPop(TDatums& tDatums);
36 
37  bool waitAndPop();
38 
39  bool empty() const;
40 
41  void stop();
42 
43  void stopPusher();
44 
45  void addPopper();
46 
47  void addPusher();
48 
49  bool isRunning() const;
50 
51  size_t size() const;
52 
53  void clear();
54 
55  virtual TDatums front() const = 0;
56 
57  protected:
58  mutable std::mutex mMutex;
59  long long mPoppers;
60  long long mPushers;
61  long long mMaxPoppersPushers;
64  std::condition_variable mConditionVariable;
65  TQueue mTQueue;
66 
67  virtual bool pop(TDatums& tDatums) = 0;
68 
69  unsigned long long getMaxSize() const;
70 
71  private:
72  const long long mMaxSize;
73 
74  bool emplace(TDatums& tDatums);
75 
76  bool push(const TDatums& tDatums);
77 
78  bool pop();
79 
80  void updateMaxPoppersPushers();
81 
82  DELETE_COPY(QueueBase);
83  };
84 }
85 
86 
87 
88 
89 
90 // Implementation
91 #include <openpose/core/datum.hpp>
93 namespace op
94 {
95  template<typename TDatums, typename TQueue>
96  QueueBase<TDatums, TQueue>::QueueBase(const long long maxSize) :
97  mPoppers{0ll},
98  mPushers{0ll},
99  mPopIsStopped{false},
100  mPushIsStopped{false},
101  mMaxSize{maxSize}
102  {
103  }
104 
105  // Virutal destructor
106  template<typename TDatums, typename TQueue>
108  {
109  try
110  {
111  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
112  stop();
113  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
114  }
115  catch (const std::exception& e)
116  {
117  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
118  }
119  }
120 
121  template<typename TDatums, typename TQueue>
123  {
124  try
125  {
126  const std::lock_guard<std::mutex> lock{mMutex};
127  if (mTQueue.size() >= getMaxSize())
128  mTQueue.pop();
129  return emplace(tDatums);
130  }
131  catch (const std::exception& e)
132  {
133  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
134  return false;
135  }
136  }
137 
138  template<typename TDatums, typename TQueue>
140  {
141  try
142  {
143  const std::lock_guard<std::mutex> lock{mMutex};
144  if (mTQueue.size() >= getMaxSize())
145  return false;
146  return emplace(tDatums);
147  }
148  catch (const std::exception& e)
149  {
150  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
151  return false;
152  }
153  }
154 
155  template<typename TDatums, typename TQueue>
157  {
158  try
159  {
160  std::unique_lock<std::mutex> lock{mMutex};
161  mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; });
162  return emplace(tDatums);
163  }
164  catch (const std::exception& e)
165  {
166  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
167  return false;
168  }
169  }
170 
171  template<typename TDatums, typename TQueue>
172  bool QueueBase<TDatums, TQueue>::forcePush(const TDatums& tDatums)
173  {
174  try
175  {
176  const std::lock_guard<std::mutex> lock{mMutex};
177  if (mTQueue.size() >= getMaxSize())
178  mTQueue.pop();
179  return push(tDatums);
180  }
181  catch (const std::exception& e)
182  {
183  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
184  return false;
185  }
186  }
187 
188  template<typename TDatums, typename TQueue>
189  bool QueueBase<TDatums, TQueue>::tryPush(const TDatums& tDatums)
190  {
191  try
192  {
193  const std::lock_guard<std::mutex> lock{mMutex};
194  if (mTQueue.size() >= getMaxSize())
195  return false;
196  return push(tDatums);
197  }
198  catch (const std::exception& e)
199  {
200  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
201  return false;
202  }
203  }
204 
205  template<typename TDatums, typename TQueue>
206  bool QueueBase<TDatums, TQueue>::waitAndPush(const TDatums& tDatums)
207  {
208  try
209  {
210  std::unique_lock<std::mutex> lock{mMutex};
211  mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; });
212  return push(tDatums);
213  }
214  catch (const std::exception& e)
215  {
216  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
217  return false;
218  }
219  }
220 
221  template<typename TDatums, typename TQueue>
222  bool QueueBase<TDatums, TQueue>::tryPop(TDatums& tDatums)
223  {
224  try
225  {
226  const std::lock_guard<std::mutex> lock{mMutex};
227  return pop(tDatums);
228  }
229  catch (const std::exception& e)
230  {
231  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
232  return false;
233  }
234  }
235 
236  template<typename TDatums, typename TQueue>
238  {
239  try
240  {
241  const std::lock_guard<std::mutex> lock{mMutex};
242  return pop();
243  }
244  catch (const std::exception& e)
245  {
246  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
247  return false;
248  }
249  }
250 
251  template<typename TDatums, typename TQueue>
253  {
254  try
255  {
256  std::unique_lock<std::mutex> lock{mMutex};
257  mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; });
258  return pop(tDatums);
259  }
260  catch (const std::exception& e)
261  {
262  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
263  return false;
264  }
265  }
266 
267  template<typename TDatums, typename TQueue>
269  {
270  try
271  {
272  std::unique_lock<std::mutex> lock{mMutex};
273  mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; });
274  return pop();
275  }
276  catch (const std::exception& e)
277  {
278  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
279  return false;
280  }
281  }
282 
283  template<typename TDatums, typename TQueue>
285  {
286  try
287  {
288  const std::lock_guard<std::mutex> lock{mMutex};
289  return mTQueue.empty();
290  }
291  catch (const std::exception& e)
292  {
293  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
294  return false;
295  }
296  }
297 
298  template<typename TDatums, typename TQueue>
300  {
301  try
302  {
303  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
304  const std::lock_guard<std::mutex> lock{mMutex};
305  mPopIsStopped = {true};
306  mPushIsStopped = {true};
307  while (!mTQueue.empty())
308  mTQueue.pop();
309  mConditionVariable.notify_all();
310  }
311  catch (const std::exception& e)
312  {
313  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
314  }
315  }
316 
317  template<typename TDatums, typename TQueue>
319  {
320  try
321  {
322  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
323  const std::lock_guard<std::mutex> lock{mMutex};
324  mPushers--;
325  if (mPushers == 0)
326  {
327  mPushIsStopped = {true};
328  if (mTQueue.empty())
329  mPopIsStopped = {true};
330  mConditionVariable.notify_all();
331  }
332  }
333  catch (const std::exception& e)
334  {
335  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
336  }
337  }
338 
339  template<typename TDatums, typename TQueue>
341  {
342  try
343  {
344  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
345  const std::lock_guard<std::mutex> lock{mMutex};
346  mPoppers++;
347  updateMaxPoppersPushers();
348  }
349  catch (const std::exception& e)
350  {
351  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
352  }
353  }
354 
355  template<typename TDatums, typename TQueue>
357  {
358  try
359  {
360  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
361  const std::lock_guard<std::mutex> lock{mMutex};
362  mPushers++;
363  updateMaxPoppersPushers();
364  }
365  catch (const std::exception& e)
366  {
367  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
368  }
369  }
370 
371  template<typename TDatums, typename TQueue>
373  {
374  try
375  {
376  const std::lock_guard<std::mutex> lock{mMutex};
377  return !(mPushIsStopped && (mPopIsStopped || mTQueue.empty()));
378  }
379  catch (const std::exception& e)
380  {
381  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
382  return true;
383  }
384  }
385 
386  template<typename TDatums, typename TQueue>
388  {
389  try
390  {
391  const std::lock_guard<std::mutex> lock{mMutex};
392  return mTQueue.size();
393  }
394  catch (const std::exception& e)
395  {
396  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
397  return 0;
398  }
399  }
400 
401  template<typename TDatums, typename TQueue>
403  {
404  try
405  {
406  const std::lock_guard<std::mutex> lock{mMutex};
407  while (!mTQueue.empty())
408  mTQueue.pop();
409  }
410  catch (const std::exception& e)
411  {
412  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
413  }
414  }
415 
416  template<typename TDatums, typename TQueue>
417  unsigned long long QueueBase<TDatums, TQueue>::getMaxSize() const
418  {
419  try
420  {
421  return (mMaxSize > 0 ? mMaxSize : fastMax(1ll, mMaxPoppersPushers));
422  }
423  catch (const std::exception& e)
424  {
425  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
426  return false;
427  }
428  }
429 
430  template<typename TDatums, typename TQueue>
431  bool QueueBase<TDatums, TQueue>::emplace(TDatums& tDatums)
432  {
433  try
434  {
435  if (mPushIsStopped)
436  return false;
437 
438  mTQueue.emplace(tDatums);
439  mConditionVariable.notify_all();
440  return true;
441  }
442  catch (const std::exception& e)
443  {
444  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
445  return false;
446  }
447  }
448 
449  template<typename TDatums, typename TQueue>
450  bool QueueBase<TDatums, TQueue>::push(const TDatums& tDatums)
451  {
452  try
453  {
454  if (mPushIsStopped)
455  return false;
456 
457  mTQueue.push(tDatums);
458  mConditionVariable.notify_all();
459  return true;
460  }
461  catch (const std::exception& e)
462  {
463  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
464  return false;
465  }
466  }
467 
468  template<typename TDatums, typename TQueue>
469  bool QueueBase<TDatums, TQueue>::pop()
470  {
471  try
472  {
473  if (mPopIsStopped || mTQueue.empty())
474  return false;
475 
476  mTQueue.pop();
477  mConditionVariable.notify_all();
478  return true;
479  }
480  catch (const std::exception& e)
481  {
482  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
483  return false;
484  }
485  }
486 
487  template<typename TDatums, typename TQueue>
488  void QueueBase<TDatums, TQueue>::updateMaxPoppersPushers()
489  {
490  try
491  {
492  mMaxPoppersPushers = fastMax(mPoppers, mPushers);
493  }
494  catch (const std::exception& e)
495  {
496  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
497  }
498  }
499 
500  extern template class QueueBase<DATUM_BASE, std::queue<DATUM_BASE>>;
501  extern template class QueueBase<DATUM_BASE, std::priority_queue<DATUM_BASE, std::vector<DATUM_BASE>, std::greater<DATUM_BASE>>>;
502 }
503 
504 #endif // OPENPOSE_THREAD_QUEUE_BASE_HPP
Definition: queueBase.hpp:12
virtual bool pop(TDatums &tDatums)=0
size_t size() const
Definition: queueBase.hpp:387
QueueBase(const long long maxSize=-1)
Definition: queueBase.hpp:96
void addPusher()
Definition: queueBase.hpp:356
bool tryEmplace(TDatums &tDatums)
Definition: queueBase.hpp:139
virtual ~QueueBase()
Definition: queueBase.hpp:107
void stopPusher()
Definition: queueBase.hpp:318
std::mutex mMutex
Definition: queueBase.hpp:58
T fastMax(const T a, const T b)
Definition: fastMath.hpp:68
bool empty() const
Definition: queueBase.hpp:284
void stop()
Definition: queueBase.hpp:299
bool mPushIsStopped
Definition: queueBase.hpp:63
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
bool mPopIsStopped
Definition: queueBase.hpp:62
bool tryPop()
Definition: queueBase.hpp:237
std::condition_variable mConditionVariable
Definition: queueBase.hpp:64
bool waitAndPop()
Definition: queueBase.hpp:268
long long mPushers
Definition: queueBase.hpp:60
bool forceEmplace(TDatums &tDatums)
Definition: queueBase.hpp:122
TQueue mTQueue
Definition: queueBase.hpp:65
long long mMaxPoppersPushers
Definition: queueBase.hpp:61
bool tryPush(const TDatums &tDatums)
Definition: queueBase.hpp:189
bool isRunning() const
Definition: queueBase.hpp:372
long long mPoppers
Definition: queueBase.hpp:59
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
void addPopper()
Definition: queueBase.hpp:340
unsigned long long getMaxSize() const
Definition: queueBase.hpp:417
bool waitAndEmplace(TDatums &tDatums)
Definition: queueBase.hpp:156
virtual TDatums front() const =0
void clear()
Definition: queueBase.hpp:402
bool waitAndPush(const TDatums &tDatums)
Definition: queueBase.hpp:206
bool forcePush(const TDatums &tDatums)
Definition: queueBase.hpp:172