1 #ifndef OPENPOSE_THREAD_QUEUE_BASE_HPP
2 #define OPENPOSE_THREAD_QUEUE_BASE_HPP
4 #include <condition_variable>
11 template<
typename TDatums,
typename TQueue>
15 explicit QueueBase(
const long long maxSize = -1);
27 bool tryPush(
const TDatums& tDatums);
31 bool tryPop(TDatums& tDatums);
55 virtual TDatums
front()
const = 0;
67 virtual bool pop(TDatums& tDatums) = 0;
72 const long long mMaxSize;
74 bool emplace(TDatums& tDatums);
76 bool push(
const TDatums& tDatums);
80 void updateMaxPoppersPushers();
95 template<
typename TDatums,
typename TQueue>
100 mPushIsStopped{
false},
106 template<
typename TDatums,
typename TQueue>
115 catch (
const std::exception& e)
117 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
121 template<
typename TDatums,
typename TQueue>
126 const std::lock_guard<std::mutex> lock{mMutex};
127 if (mTQueue.size() >= getMaxSize())
129 return emplace(tDatums);
131 catch (
const std::exception& e)
133 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
138 template<
typename TDatums,
typename TQueue>
143 const std::lock_guard<std::mutex> lock{mMutex};
144 if (mTQueue.size() >= getMaxSize())
146 return emplace(tDatums);
148 catch (
const std::exception& e)
150 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
155 template<
typename TDatums,
typename TQueue>
160 std::unique_lock<std::mutex> lock{mMutex};
161 mConditionVariable.wait(lock, [
this]{
return mTQueue.size() < getMaxSize() || mPushIsStopped; });
162 return emplace(tDatums);
164 catch (
const std::exception& e)
166 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
171 template<
typename TDatums,
typename TQueue>
176 const std::lock_guard<std::mutex> lock{mMutex};
177 if (mTQueue.size() >= getMaxSize())
179 return push(tDatums);
181 catch (
const std::exception& e)
183 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
188 template<
typename TDatums,
typename TQueue>
193 const std::lock_guard<std::mutex> lock{mMutex};
194 if (mTQueue.size() >= getMaxSize())
196 return push(tDatums);
198 catch (
const std::exception& e)
200 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
205 template<
typename TDatums,
typename TQueue>
210 std::unique_lock<std::mutex> lock{mMutex};
211 mConditionVariable.wait(lock, [
this]{
return mTQueue.size() < getMaxSize() || mPushIsStopped; });
212 return push(tDatums);
214 catch (
const std::exception& e)
216 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
221 template<
typename TDatums,
typename TQueue>
226 const std::lock_guard<std::mutex> lock{mMutex};
229 catch (
const std::exception& e)
231 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
236 template<
typename TDatums,
typename TQueue>
241 const std::lock_guard<std::mutex> lock{mMutex};
244 catch (
const std::exception& e)
246 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
251 template<
typename TDatums,
typename TQueue>
256 std::unique_lock<std::mutex> lock{mMutex};
257 mConditionVariable.wait(lock, [
this]{
return !mTQueue.empty() || mPopIsStopped; });
260 catch (
const std::exception& e)
262 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
267 template<
typename TDatums,
typename TQueue>
272 std::unique_lock<std::mutex> lock{mMutex};
273 mConditionVariable.wait(lock, [
this]{
return !mTQueue.empty() || mPopIsStopped; });
276 catch (
const std::exception& e)
278 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
283 template<
typename TDatums,
typename TQueue>
288 const std::lock_guard<std::mutex> lock{mMutex};
289 return mTQueue.empty();
291 catch (
const std::exception& e)
293 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
298 template<
typename TDatums,
typename TQueue>
304 const std::lock_guard<std::mutex> lock{mMutex};
305 mPopIsStopped = {
true};
306 mPushIsStopped = {
true};
307 while (!mTQueue.empty())
309 mConditionVariable.notify_all();
311 catch (
const std::exception& e)
313 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
317 template<
typename TDatums,
typename TQueue>
323 const std::lock_guard<std::mutex> lock{mMutex};
327 mPushIsStopped = {
true};
329 mPopIsStopped = {
true};
330 mConditionVariable.notify_all();
333 catch (
const std::exception& e)
335 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
339 template<
typename TDatums,
typename TQueue>
345 const std::lock_guard<std::mutex> lock{mMutex};
347 updateMaxPoppersPushers();
349 catch (
const std::exception& e)
351 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
355 template<
typename TDatums,
typename TQueue>
361 const std::lock_guard<std::mutex> lock{mMutex};
363 updateMaxPoppersPushers();
365 catch (
const std::exception& e)
367 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
371 template<
typename TDatums,
typename TQueue>
376 const std::lock_guard<std::mutex> lock{mMutex};
377 return !(mPushIsStopped && (mPopIsStopped || mTQueue.empty()));
379 catch (
const std::exception& e)
381 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
386 template<
typename TDatums,
typename TQueue>
391 const std::lock_guard<std::mutex> lock{mMutex};
392 return mTQueue.size();
394 catch (
const std::exception& e)
396 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
401 template<
typename TDatums,
typename TQueue>
406 const std::lock_guard<std::mutex> lock{mMutex};
407 while (!mTQueue.empty())
410 catch (
const std::exception& e)
412 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
416 template<
typename TDatums,
typename TQueue>
421 return (mMaxSize > 0 ? mMaxSize :
fastMax(1ll, mMaxPoppersPushers));
423 catch (
const std::exception& e)
425 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
430 template<
typename TDatums,
typename TQueue>
438 mTQueue.emplace(tDatums);
439 mConditionVariable.notify_all();
442 catch (
const std::exception& e)
444 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
449 template<
typename TDatums,
typename TQueue>
450 bool QueueBase<TDatums, TQueue>::push(
const TDatums& tDatums)
457 mTQueue.push(tDatums);
458 mConditionVariable.notify_all();
461 catch (
const std::exception& e)
463 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
468 template<
typename TDatums,
typename TQueue>
469 bool QueueBase<TDatums, TQueue>::pop()
473 if (mPopIsStopped || mTQueue.empty())
477 mConditionVariable.notify_all();
480 catch (
const std::exception& e)
482 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
487 template<
typename TDatums,
typename TQueue>
488 void QueueBase<TDatums, TQueue>::updateMaxPoppersPushers()
492 mMaxPoppersPushers =
fastMax(mPoppers, mPushers);
494 catch (
const std::exception& e)
496 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
500 extern template class QueueBase<DATUM_BASE, std::queue<DATUM_BASE>>;
501 extern template class QueueBase<DATUM_BASE, std::priority_queue<DATUM_BASE, std::vector<DATUM_BASE>, std::greater<DATUM_BASE>>>;
504 #endif // OPENPOSE_THREAD_QUEUE_BASE_HPP
Definition: queueBase.hpp:12
virtual bool pop(TDatums &tDatums)=0
size_t size() const
Definition: queueBase.hpp:387
QueueBase(const long long maxSize=-1)
Definition: queueBase.hpp:96
void addPusher()
Definition: queueBase.hpp:356
bool tryEmplace(TDatums &tDatums)
Definition: queueBase.hpp:139
virtual ~QueueBase()
Definition: queueBase.hpp:107
void stopPusher()
Definition: queueBase.hpp:318
std::mutex mMutex
Definition: queueBase.hpp:58
T fastMax(const T a, const T b)
Definition: fastMath.hpp:68
bool empty() const
Definition: queueBase.hpp:284
void stop()
Definition: queueBase.hpp:299
bool mPushIsStopped
Definition: queueBase.hpp:63
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
bool mPopIsStopped
Definition: queueBase.hpp:62
bool tryPop()
Definition: queueBase.hpp:237
std::condition_variable mConditionVariable
Definition: queueBase.hpp:64
bool waitAndPop()
Definition: queueBase.hpp:268
long long mPushers
Definition: queueBase.hpp:60
bool forceEmplace(TDatums &tDatums)
Definition: queueBase.hpp:122
TQueue mTQueue
Definition: queueBase.hpp:65
long long mMaxPoppersPushers
Definition: queueBase.hpp:61
bool tryPush(const TDatums &tDatums)
Definition: queueBase.hpp:189
bool isRunning() const
Definition: queueBase.hpp:372
long long mPoppers
Definition: queueBase.hpp:59
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
void addPopper()
Definition: queueBase.hpp:340
unsigned long long getMaxSize() const
Definition: queueBase.hpp:417
bool waitAndEmplace(TDatums &tDatums)
Definition: queueBase.hpp:156
virtual TDatums front() const =0
void clear()
Definition: queueBase.hpp:402
bool waitAndPush(const TDatums &tDatums)
Definition: queueBase.hpp:206
bool forcePush(const TDatums &tDatums)
Definition: queueBase.hpp:172