OpenPose  1.0.0rc2
OpenPose: A Real-Time Multi-Person Key-Point Detection And Multi-Threading C++ Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
wQueueOrderer.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
2 #define OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
3 
4 #include <queue> // std::priority_queue
8 
9 namespace op
10 {
11  template<typename TDatums>
12  class WQueueOrderer : public Worker<TDatums>
13  {
14  public:
15  explicit WQueueOrderer(const unsigned int maxBufferSize = 64u);
16 
18 
19  void work(TDatums& tDatums);
20 
21  void tryStop();
22 
23  private:
24  const unsigned int mMaxBufferSize;
25  bool mStopWhenEmpty;
26  unsigned long long mNextExpectedId;
27  std::priority_queue<TDatums, std::vector<TDatums>, PointerContainerGreater<TDatums>> mPriorityQueueBuffer;
28 
29  DELETE_COPY(WQueueOrderer);
30  };
31 }
32 
33 
34 
35 
36 
37 // Implementation
38 #include <chrono>
39 #include <thread>
40 namespace op
41 {
42  template<typename TDatums>
43  WQueueOrderer<TDatums>::WQueueOrderer(const unsigned int maxBufferSize) :
44  mMaxBufferSize{maxBufferSize},
45  mStopWhenEmpty{false},
46  mNextExpectedId{0}
47  {
48  }
49 
50  template<typename TDatums>
52  {
53  }
54 
55  template<typename TDatums>
56  void WQueueOrderer<TDatums>::work(TDatums& tDatums)
57  {
58  try
59  {
60  // Profiling speed
61  const auto profilerKey = Profiler::timerInit(__LINE__, __FUNCTION__, __FILE__);
62  bool profileSpeed = (tDatums != nullptr);
63  // Input TDatum -> enqueue or return it back
64  if (checkNoNullNorEmpty(tDatums))
65  {
66  // T* to T
67  auto& tDatumsNoPtr = *tDatums;
68  // tDatums is the next expected, update counter
69  if (tDatumsNoPtr[0].id == mNextExpectedId)
70  mNextExpectedId++;
71  // Else push it to our buffered queue
72  else
73  {
74  // Enqueue current tDatums
75  mPriorityQueueBuffer.emplace(tDatums);
76  tDatums = nullptr;
77  // Else if buffer full -> remove one tDatums
78  if (mPriorityQueueBuffer.size() > mMaxBufferSize)
79  {
80  tDatums = mPriorityQueueBuffer.top();
81  mPriorityQueueBuffer.pop();
82  }
83  }
84  }
85  // If input TDatum enqueued -> check if previously enqueued next desired frame and pop it
86  if (!checkNoNullNorEmpty(tDatums))
87  {
88  // Retrieve frame if next is desired frame or if we want to stop this worker
89  if (!mPriorityQueueBuffer.empty() && ((*mPriorityQueueBuffer.top())[0].id == mNextExpectedId || mStopWhenEmpty))
90  {
91  tDatums = { mPriorityQueueBuffer.top() };
92  mPriorityQueueBuffer.pop();
93  }
94  }
95  // If TDatum ready to be returned -> updated next expected id
96  if (checkNoNullNorEmpty(tDatums))
97  {
98  const auto& tDatumsNoPtr = *tDatums;
99  mNextExpectedId = tDatumsNoPtr[0].id + 1;
100  }
101  // Sleep if no new tDatums to either pop
102  if (!checkNoNullNorEmpty(tDatums) && mPriorityQueueBuffer.size() < mMaxBufferSize / 2u)
103  std::this_thread::sleep_for(std::chrono::milliseconds{1});
104  // If TDatum popped and/or pushed
105  if (profileSpeed || tDatums != nullptr)
106  {
107  // Profiling speed
108  Profiler::timerEnd(profilerKey);
109  Profiler::printAveragedTimeMsOnIterationX(profilerKey, __LINE__, __FUNCTION__, __FILE__);
110  // Debugging log
111  dLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
112  }
113  }
114  catch (const std::exception& e)
115  {
116  this->stop();
117  tDatums = nullptr;
118  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
119  }
120  }
121 
122  template<typename TDatums>
124  {
125  try
126  {
127  // Close if all frames were retrieved from the queue
128  if (mPriorityQueueBuffer.empty())
129  this->stop();
130  mStopWhenEmpty = true;
131 
132  }
133  catch (const std::exception& e)
134  {
135  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
136  }
137  }
138 
140 }
141 
142 #endif // OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
void work(TDatums &tDatums)
Definition: wQueueOrderer.hpp:56
void initializationOnThread()
Definition: wQueueOrderer.hpp:51
Definition: worker.hpp:9
Definition: pointerContainer.hpp:13
static const std::string timerInit(const int line, const std::string &function, const std::string &file)
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
Definition: wQueueOrderer.hpp:12
void tryStop()
Definition: wQueueOrderer.hpp:123
void dLog(const T &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
Definition: errorAndLog.hpp:53
bool checkNoNullNorEmpty(const TPointerContainer &tPointerContainer)
Definition: pointerContainer.hpp:7
WQueueOrderer(const unsigned int maxBufferSize=64u)
Definition: wQueueOrderer.hpp:43
static void printAveragedTimeMsOnIterationX(const std::string &key, const int line, const std::string &function, const std::string &file, const unsigned long long x=DEFAULT_X)
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
static void timerEnd(const std::string &key)