OpenPose  1.0.0rc2
OpenPose: A Real-Time Multi-Person Key-Point Detection And Multi-Threading C++ Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
threadManager.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_THREAD_MANAGER_HPP
2 #define OPENPOSE_THREAD_THREAD_MANAGER_HPP
3 
4 #include <atomic>
5 #include <set> // std::multiset
6 #include <tuple>
12 
13 namespace op
14 {
15  template<typename TDatums, typename TWorker = std::shared_ptr<Worker<TDatums>>, typename TQueue = Queue<TDatums>>
17  {
18  public:
19  // Completely customizable case
20  explicit ThreadManager(const ThreadManagerMode threadManagerMode = ThreadManagerMode::Synchronous);
21 
22  void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues = -1);
23 
24  void add(const unsigned long long threadId, const std::vector<TWorker>& tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId);
25 
26  void add(const unsigned long long threadId, const TWorker& tWorker, const unsigned long long queueInId, const unsigned long long queueOutId);
27 
28  void reset();
29 
30  void exec();
31 
32  void start();
33 
34  void stop();
35 
36  inline std::shared_ptr<std::atomic<bool>> getIsRunningSharedPtr()
37  {
38  return spIsRunning;
39  }
40 
41  inline bool isRunning() const
42  {
43  return *spIsRunning;
44  }
45 
46  bool tryEmplace(TDatums& tDatums);
47 
48  bool waitAndEmplace(TDatums& tDatums);
49 
50  bool tryPush(const TDatums& tDatums);
51 
52  bool waitAndPush(const TDatums& tDatums);
53 
54  bool tryPop(TDatums& tDatums);
55 
56  bool waitAndPop(TDatums& tDatums);
57 
58  private:
59  const ThreadManagerMode mThreadManagerMode;
60  std::shared_ptr<std::atomic<bool>> spIsRunning;
61  long long mDefaultMaxSizeQueues;
62  std::multiset<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>> mThreadWorkerQueues;
63  std::vector<std::shared_ptr<Thread<TDatums, TWorker>>> mThreads;
64  std::vector<std::shared_ptr<TQueue>> mTQueues;
65 
66  void add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>>& threadWorkerQueues);
67 
68  void add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long, unsigned long long>>& threadWorkerQueues);
69 
70  void multisetToThreads();
71 
72  void checkAndCreateEmptyThreads();
73 
74  void checkAndCreateQueues();
75 
76  DELETE_COPY(ThreadManager);
77  };
78 }
79 
80 
81 
82 
83 
84 // Implementation
85 #include <utility> // std::pair
92 namespace op
93 {
94  template<typename TDatums, typename TWorker, typename TQueue>
96  mThreadManagerMode{threadManagerMode},
97  spIsRunning{std::make_shared<std::atomic<bool>>(false)},
98  mDefaultMaxSizeQueues{-1ll}
99  {
100  }
101 
102  template<typename TDatums, typename TWorker, typename TQueue>
103  void ThreadManager<TDatums, TWorker, TQueue>::setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues)
104  {
105  try
106  {
107  mDefaultMaxSizeQueues = {defaultMaxSizeQueues};
108  }
109  catch (const std::exception& e)
110  {
111  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
112  }
113  }
114 
115  template<typename TDatums, typename TWorker, typename TQueue>
116  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId, const std::vector<TWorker>& tWorkers, const unsigned long long queueInId,
117  const unsigned long long queueOutId)
118  {
119  try
120  {
121  add({std::make_tuple(threadId, tWorkers, queueInId, queueOutId)});
122  }
123  catch (const std::exception& e)
124  {
125  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
126  }
127  }
128 
129  template<typename TDatums, typename TWorker, typename TQueue>
130  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId, const TWorker& tWorker, const unsigned long long queueInId,
131  const unsigned long long queueOutId)
132  {
133  try
134  {
135  add({std::make_tuple(threadId, std::vector<TWorker>{tWorker}, queueInId, queueOutId)});
136  }
137  catch (const std::exception& e)
138  {
139  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
140  }
141  }
142 
143  template<typename TDatums, typename TWorker, typename TQueue>
145  {
146  try
147  {
148  mThreadWorkerQueues.clear();
149  mThreads.clear();
150  mTQueues.clear();
151  }
152  catch (const std::exception& e)
153  {
154  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
155  }
156  }
157 
158  template<typename TDatums, typename TWorker, typename TQueue>
160  {
161  try
162  {
163  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
164  // Set threads
165  multisetToThreads();
166  if (!mThreads.empty())
167  {
168  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
169  // Start threads
170  for (auto i = 0u; i < mThreads.size() - 1; i++)
171  mThreads.at(i)->startInThread();
172  (*mThreads.rbegin())->exec(spIsRunning);
173  // Stop threads - It will arrive here when the exec() command has finished
174  stop();
175  }
176  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
177  }
178  catch (const std::exception& e)
179  {
180  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
181  }
182  }
183 
184  template<typename TDatums, typename TWorker, typename TQueue>
186  {
187  try
188  {
189  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
190  // Set threads
191  multisetToThreads();
192  // Start threads
193  for (auto& thread : mThreads)
194  thread->startInThread();
195  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
196  }
197  catch (const std::exception& e)
198  {
199  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
200  }
201  }
202 
203  template<typename TDatums, typename TWorker, typename TQueue>
205  {
206  try
207  {
208  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
209  for (auto& tQueue : mTQueues)
210  tQueue->stop();
211  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
212  *spIsRunning = false;
213  for (auto& thread : mThreads)
214  thread->stopAndJoin();
215  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
216  }
217  catch (const std::exception& e)
218  {
219  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
220  }
221  }
222 
223  template<typename TDatums, typename TWorker, typename TQueue>
225  {
226  try
227  {
228  if (mThreadManagerMode != ThreadManagerMode::Asynchronous && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
229  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
230  if (mTQueues.empty())
231  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
232  return mTQueues[0]->tryEmplace(tDatums);
233  }
234  catch (const std::exception& e)
235  {
236  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
237  return false;
238  }
239  }
240 
241  template<typename TDatums, typename TWorker, typename TQueue>
243  {
244  try
245  {
246  if (mThreadManagerMode != ThreadManagerMode::Asynchronous && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
247  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
248  if (mTQueues.empty())
249  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
250  return mTQueues[0]->waitAndEmplace(tDatums);
251  }
252  catch (const std::exception& e)
253  {
254  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
255  return false;
256  }
257  }
258 
259  template<typename TDatums, typename TWorker, typename TQueue>
261  {
262  try
263  {
264  if (mThreadManagerMode != ThreadManagerMode::Asynchronous && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
265  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
266  if (mTQueues.empty())
267  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
268  return mTQueues[0]->tryPush(tDatums);
269  }
270  catch (const std::exception& e)
271  {
272  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
273  return false;
274  }
275  }
276 
277  template<typename TDatums, typename TWorker, typename TQueue>
279  {
280  try
281  {
282  if (mThreadManagerMode != ThreadManagerMode::Asynchronous && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
283  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
284  if (mTQueues.empty())
285  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
286  return mTQueues[0]->waitAndPush(tDatums);
287  }
288  catch (const std::exception& e)
289  {
290  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
291  return false;
292  }
293  }
294 
295  template<typename TDatums, typename TWorker, typename TQueue>
297  {
298  try
299  {
300  if (mThreadManagerMode != ThreadManagerMode::Asynchronous && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
301  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
302  if (mTQueues.empty())
303  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
304  return (*mTQueues.rbegin())->tryPop(tDatums);
305  }
306  catch (const std::exception& e)
307  {
308  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
309  return false;
310  }
311  }
312 
313  template<typename TDatums, typename TWorker, typename TQueue>
315  {
316  try
317  {
318  if (mThreadManagerMode != ThreadManagerMode::Asynchronous && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
319  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
320  if (mTQueues.empty())
321  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
322  return (*mTQueues.rbegin())->waitAndPop(tDatums);
323  }
324  catch (const std::exception& e)
325  {
326  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
327  return false;
328  }
329  }
330 
331  template<typename TDatums, typename TWorker, typename TQueue>
332  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>,
333  unsigned long long, unsigned long long>>& threadWorkerQueues)
334  {
335  try
336  {
337  for (const auto& threadWorkerQueue : threadWorkerQueues)
338  mThreadWorkerQueues.insert(threadWorkerQueue);
339  }
340  catch (const std::exception& e)
341  {
342  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
343  }
344  }
345 
346  template<typename TDatums, typename TWorker, typename TQueue>
347  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long,
348  unsigned long long>>& threadWorkerQueues)
349  {
350  try
351  {
352  for (const auto& threadWorkerQueue : threadWorkerQueues)
353  add({std::make_tuple(std::get<0>(threadWorkerQueue), std::vector<TWorker>{std::get<1>(threadWorkerQueue)},
354  std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue))});
355  }
356  catch (const std::exception& e)
357  {
358  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
359  }
360  }
361 
362  template<typename TDatums, typename TWorker, typename TQueue>
363  void ThreadManager<TDatums, TWorker, TQueue>::multisetToThreads()
364  {
365  try
366  {
367  if (!mThreadWorkerQueues.empty())
368  {
369  // Check threads
370  checkAndCreateEmptyThreads();
371 
372  // Check and create queues
373  checkAndCreateQueues();
374 
375  // Data
376  const auto maxQueueIdSynchronous = mTQueues.size()+1;
377 
378  // Set up threads
379  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
380  {
381  auto& thread = mThreads[std::get<0>(threadWorkerQueue)];
382  const auto& tWorkers = std::get<1>(threadWorkerQueue);
383  const auto queueIn = std::get<2>(threadWorkerQueue);
384  const auto queueOut = std::get<3>(threadWorkerQueue);
385  std::shared_ptr<SubThread<TDatums, TWorker>> subThread;
386  // If AsynchronousIn -> queue indexes are OK
387  if (mThreadManagerMode == ThreadManagerMode::Asynchronous || mThreadManagerMode == ThreadManagerMode::AsynchronousIn)
388  {
389  if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn && queueOut == mTQueues.size())
390  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn))};
391  else
392  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn), mTQueues.at(queueOut))};
393  }
394  // If !AsynchronousIn -> queue indexes - 1
395  else if (queueOut != maxQueueIdSynchronous || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
396  {
397  // Queue in + out
398  if (queueIn != 0)
399  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn-1), mTQueues.at(queueOut-1))};
400  // Case queue out (first TWorker(s))
401  else
402  subThread = {std::make_shared<SubThreadQueueOut<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueOut-1))};
403  }
404  // Case queue in (last TWorker(s))
405  else if (queueIn != 0) // && queueOut == maxQueueIdSynchronous
406  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn-1))};
407  // Case no queue
408  else // if (queueIn == 0 && queueOut == maxQueueIdSynchronous)
409  subThread = {std::make_shared<SubThreadNoQueue<TDatums, TWorker>>(tWorkers)};
410  thread->add(subThread);
411  }
412  }
413  else
414  error("Empty, no TWorker(s) added.", __LINE__);
415  }
416  catch (const std::exception& e)
417  {
418  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
419  }
420  }
421 
422  template<typename TDatums, typename TWorker, typename TQueue>
423  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateEmptyThreads()
424  {
425  try
426  {
427  // Check all thread ids from 0-maxThreadId are present
428  const auto maxThreadId = std::get<0>(*mThreadWorkerQueues.crbegin());
429  auto previousThreadId = std::get<0>(*mThreadWorkerQueues.cbegin());
430  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
431  {
432  const auto currentThreadId = std::get<0>(threadWorkerQueue);
433  if (currentThreadId - previousThreadId > 1)
434  error("Missing thread id " + std::to_string(currentThreadId) + " of " + std::to_string(maxThreadId) + ".", __LINE__, __FUNCTION__, __FILE__);
435  previousThreadId = currentThreadId;
436  }
437 
438  // Create Threads
439  // #threads = maxThreadId+1
440  mThreads.resize(maxThreadId);
441  for (auto& thread : mThreads)
442  thread = std::make_shared<Thread<TDatums, TWorker>>();
443  mThreads.emplace_back(std::make_shared<Thread<TDatums, TWorker>>(spIsRunning));
444  }
445  catch (const std::exception& e)
446  {
447  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
448  }
449  }
450 
451  template<typename TDatums, typename TWorker, typename TQueue>
452  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateQueues()
453  {
454  try
455  {
456  if (!mThreadWorkerQueues.empty())
457  {
458  // Get max queue id to get queue size
459  auto maxQueueId = std::get<3>(*mThreadWorkerQueues.cbegin());
460  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
461  maxQueueId = fastMax(maxQueueId, fastMax(std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue)));
462 
463  // Check each queue id has at least a worker that uses it as input and another one as output. Special cases:
464  std::vector<std::pair<bool, bool>> usedQueueIds(maxQueueId+1, {false, false});
465  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
466  {
467  usedQueueIds.at(std::get<2>(threadWorkerQueue)).first = true;
468  usedQueueIds.at(std::get<3>(threadWorkerQueue)).second = true;
469  }
470  // Id 0 must only needs a worker using it as input.
471  usedQueueIds.begin()->second = true;
472  // Id maxQueueId only needs a worker using it as output.
473  usedQueueIds.rbegin()->first = true;
474  // Error if missing queue id
475  for (auto i = 0ull ; i < usedQueueIds.size() ; i++)
476  {
477  if (!usedQueueIds[i].first)
478  error("Missing queue id " + std::to_string(i) + " (of " + std::to_string(maxQueueId) + ") as input.", __LINE__, __FUNCTION__, __FILE__);
479  if (!usedQueueIds[i].second)
480  error("Missing queue id " + std::to_string(i) + " (of " + std::to_string(maxQueueId) + ") as output.", __LINE__, __FUNCTION__, __FILE__);
481  }
482 
483  // Create Queues
484  if (mThreadManagerMode == ThreadManagerMode::Asynchronous)
485  mTQueues.resize(maxQueueId+1); // First and last one are queues
486  else if (mThreadManagerMode == ThreadManagerMode::Synchronous)
487  mTQueues.resize(maxQueueId-1); // First and last one are not actually queues
488  else if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
489  mTQueues.resize(maxQueueId); // First or last one is queue
490  else
491  error("Unknown ThreadManagerMode", __LINE__, __FUNCTION__, __FILE__);
492  for (auto& tQueue : mTQueues)
493  tQueue = std::make_shared<TQueue>(mDefaultMaxSizeQueues);
494  }
495  }
496  catch (const std::exception& e)
497  {
498  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
499  }
500  }
501 
502  COMPILE_TEMPLATE_DATUM(ThreadManager);
503 }
504 
505 #endif // OPENPOSE_THREAD_THREAD_MANAGER_HPP
ThreadManager(const ThreadManagerMode threadManagerMode=ThreadManagerMode::Synchronous)
Definition: threadManager.hpp:95
void stop()
Definition: threadManager.hpp:204
bool isRunning() const
Definition: threadManager.hpp:41
void start()
Definition: threadManager.hpp:185
void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues=-1)
Definition: threadManager.hpp:103
bool waitAndEmplace(TDatums &tDatums)
Definition: threadManager.hpp:242
std::shared_ptr< std::atomic< bool > > getIsRunningSharedPtr()
Definition: threadManager.hpp:36
T fastMax(const T a, const T b)
Definition: fastMath.hpp:68
bool tryPop(TDatums &tDatums)
Definition: threadManager.hpp:296
void add(const unsigned long long threadId, const std::vector< TWorker > &tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId)
Definition: threadManager.hpp:116
void reset()
Definition: threadManager.hpp:144
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
bool waitAndPop(TDatums &tDatums)
Definition: threadManager.hpp:314
bool tryEmplace(TDatums &tDatums)
Definition: threadManager.hpp:224
Definition: threadManager.hpp:16
void exec()
Definition: threadManager.hpp:159
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
ThreadManagerMode
Definition: enumClasses.hpp:9
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
std::vector< T, Alloc > vector
Definition: cl2.hpp:567
bool waitAndPush(const TDatums &tDatums)
Definition: threadManager.hpp:278
bool tryPush(const TDatums &tDatums)
Definition: threadManager.hpp:260