1 #ifndef OPENPOSE_THREAD_THREAD_MANAGER_HPP
2 #define OPENPOSE_THREAD_THREAD_MANAGER_HPP
15 template<
typename TDatums,
typename TWorker = std::shared_ptr<Worker<TDatums>>,
typename TQueue = Queue<TDatums>>
24 void add(
const unsigned long long threadId,
const std::vector<TWorker>& tWorkers,
const unsigned long long queueInId,
const unsigned long long queueOutId);
26 void add(
const unsigned long long threadId,
const TWorker& tWorker,
const unsigned long long queueInId,
const unsigned long long queueOutId);
50 bool tryPush(
const TDatums& tDatums);
54 bool tryPop(TDatums& tDatums);
60 std::shared_ptr<std::atomic<bool>> spIsRunning;
61 long long mDefaultMaxSizeQueues;
62 std::multiset<std::tuple<unsigned long long, std::vector<TWorker>,
unsigned long long,
unsigned long long>> mThreadWorkerQueues;
63 std::vector<std::shared_ptr<Thread<TDatums, TWorker>>> mThreads;
64 std::vector<std::shared_ptr<TQueue>> mTQueues;
66 void add(
const std::vector<std::tuple<
unsigned long long, std::vector<TWorker>,
unsigned long long,
unsigned long long>>& threadWorkerQueues);
68 void add(
const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long, unsigned long long>>& threadWorkerQueues);
70 void multisetToThreads();
72 void checkAndCreateEmptyThreads();
74 void checkAndCreateQueues();
94 template<
typename TDatums,
typename TWorker,
typename TQueue>
96 mThreadManagerMode{threadManagerMode},
97 spIsRunning{std::make_shared<std::atomic<bool>>(
false)},
98 mDefaultMaxSizeQueues{-1ll}
102 template<
typename TDatums,
typename TWorker,
typename TQueue>
107 mDefaultMaxSizeQueues = {defaultMaxSizeQueues};
109 catch (
const std::exception& e)
111 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
115 template<
typename TDatums,
typename TWorker,
typename TQueue>
117 const unsigned long long queueOutId)
121 add({std::make_tuple(threadId, tWorkers, queueInId, queueOutId)});
123 catch (
const std::exception& e)
125 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
129 template<
typename TDatums,
typename TWorker,
typename TQueue>
131 const unsigned long long queueOutId)
135 add({std::make_tuple(threadId, std::vector<TWorker>{tWorker}, queueInId, queueOutId)});
137 catch (
const std::exception& e)
139 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
143 template<
typename TDatums,
typename TWorker,
typename TQueue>
148 mThreadWorkerQueues.clear();
152 catch (
const std::exception& e)
154 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
158 template<
typename TDatums,
typename TWorker,
typename TQueue>
166 if (!mThreads.empty())
170 for (
auto i = 0u; i < mThreads.size() - 1; i++)
171 mThreads.at(i)->startInThread();
172 (*mThreads.rbegin())->exec(spIsRunning);
178 catch (
const std::exception& e)
180 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
184 template<
typename TDatums,
typename TWorker,
typename TQueue>
193 for (
auto& thread : mThreads)
194 thread->startInThread();
197 catch (
const std::exception& e)
199 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
203 template<
typename TDatums,
typename TWorker,
typename TQueue>
209 for (
auto& tQueue : mTQueues)
212 *spIsRunning =
false;
213 for (
auto& thread : mThreads)
214 thread->stopAndJoin();
217 catch (
const std::exception& e)
219 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
223 template<
typename TDatums,
typename TWorker,
typename TQueue>
229 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
230 if (mTQueues.empty())
231 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
232 return mTQueues[0]->tryEmplace(tDatums);
234 catch (
const std::exception& e)
236 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
241 template<
typename TDatums,
typename TWorker,
typename TQueue>
247 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
248 if (mTQueues.empty())
249 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
250 return mTQueues[0]->waitAndEmplace(tDatums);
252 catch (
const std::exception& e)
254 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
259 template<
typename TDatums,
typename TWorker,
typename TQueue>
265 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
266 if (mTQueues.empty())
267 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
268 return mTQueues[0]->tryPush(tDatums);
270 catch (
const std::exception& e)
272 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
277 template<
typename TDatums,
typename TWorker,
typename TQueue>
283 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
284 if (mTQueues.empty())
285 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
286 return mTQueues[0]->waitAndPush(tDatums);
288 catch (
const std::exception& e)
290 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
295 template<
typename TDatums,
typename TWorker,
typename TQueue>
301 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
302 if (mTQueues.empty())
303 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
304 return (*mTQueues.rbegin())->tryPop(tDatums);
306 catch (
const std::exception& e)
308 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
313 template<
typename TDatums,
typename TWorker,
typename TQueue>
319 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
320 if (mTQueues.empty())
321 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
322 return (*mTQueues.rbegin())->waitAndPop(tDatums);
324 catch (
const std::exception& e)
326 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
331 template<
typename TDatums,
typename TWorker,
typename TQueue>
333 unsigned long long,
unsigned long long>>& threadWorkerQueues)
337 for (
const auto& threadWorkerQueue : threadWorkerQueues)
338 mThreadWorkerQueues.insert(threadWorkerQueue);
340 catch (
const std::exception& e)
342 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
346 template<
typename TDatums,
typename TWorker,
typename TQueue>
348 unsigned long long>>& threadWorkerQueues)
352 for (
const auto& threadWorkerQueue : threadWorkerQueues)
353 add({std::make_tuple(std::get<0>(threadWorkerQueue), std::vector<TWorker>{std::get<1>(threadWorkerQueue)},
354 std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue))});
356 catch (
const std::exception& e)
358 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
362 template<
typename TDatums,
typename TWorker,
typename TQueue>
363 void ThreadManager<TDatums, TWorker, TQueue>::multisetToThreads()
367 if (!mThreadWorkerQueues.empty())
370 checkAndCreateEmptyThreads();
373 checkAndCreateQueues();
376 const auto maxQueueIdSynchronous = mTQueues.size()+1;
379 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
381 auto& thread = mThreads[std::get<0>(threadWorkerQueue)];
382 const auto& tWorkers = std::get<1>(threadWorkerQueue);
383 const auto queueIn = std::get<2>(threadWorkerQueue);
384 const auto queueOut = std::get<3>(threadWorkerQueue);
385 std::shared_ptr<SubThread<TDatums, TWorker>> subThread;
390 subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn))};
392 subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn), mTQueues.at(queueOut))};
399 subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn-1), mTQueues.at(queueOut-1))};
402 subThread = {std::make_shared<SubThreadQueueOut<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueOut-1))};
405 else if (queueIn != 0)
406 subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(tWorkers, mTQueues.at(queueIn-1))};
409 subThread = {std::make_shared<SubThreadNoQueue<TDatums, TWorker>>(tWorkers)};
410 thread->add(subThread);
414 error(
"Empty, no TWorker(s) added.", __LINE__);
416 catch (
const std::exception& e)
418 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
422 template<
typename TDatums,
typename TWorker,
typename TQueue>
423 void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateEmptyThreads()
428 const auto maxThreadId = std::get<0>(*mThreadWorkerQueues.crbegin());
429 auto previousThreadId = std::get<0>(*mThreadWorkerQueues.cbegin());
430 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
432 const auto currentThreadId = std::get<0>(threadWorkerQueue);
433 if (currentThreadId - previousThreadId > 1)
434 error(
"Missing thread id " + std::to_string(currentThreadId) +
" of " + std::to_string(maxThreadId) +
".", __LINE__, __FUNCTION__, __FILE__);
435 previousThreadId = currentThreadId;
440 mThreads.resize(maxThreadId);
441 for (
auto& thread : mThreads)
442 thread = std::make_shared<Thread<TDatums, TWorker>>();
443 mThreads.emplace_back(std::make_shared<Thread<TDatums, TWorker>>(spIsRunning));
445 catch (
const std::exception& e)
447 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
451 template<
typename TDatums,
typename TWorker,
typename TQueue>
452 void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateQueues()
456 if (!mThreadWorkerQueues.empty())
459 auto maxQueueId = std::get<3>(*mThreadWorkerQueues.cbegin());
460 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
461 maxQueueId =
fastMax(maxQueueId,
fastMax(std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue)));
464 std::vector<std::pair<bool, bool>> usedQueueIds(maxQueueId+1, {
false,
false});
465 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
467 usedQueueIds.at(std::get<2>(threadWorkerQueue)).first =
true;
468 usedQueueIds.at(std::get<3>(threadWorkerQueue)).second =
true;
471 usedQueueIds.begin()->second =
true;
473 usedQueueIds.rbegin()->first =
true;
475 for (
auto i = 0ull ; i < usedQueueIds.size() ; i++)
477 if (!usedQueueIds[i].first)
478 error(
"Missing queue id " + std::to_string(i) +
" (of " + std::to_string(maxQueueId) +
") as input.", __LINE__, __FUNCTION__, __FILE__);
479 if (!usedQueueIds[i].second)
480 error(
"Missing queue id " + std::to_string(i) +
" (of " + std::to_string(maxQueueId) +
") as output.", __LINE__, __FUNCTION__, __FILE__);
485 mTQueues.resize(maxQueueId+1);
487 mTQueues.resize(maxQueueId-1);
489 mTQueues.resize(maxQueueId);
491 error(
"Unknown ThreadManagerMode", __LINE__, __FUNCTION__, __FILE__);
492 for (
auto& tQueue : mTQueues)
493 tQueue = std::make_shared<TQueue>(mDefaultMaxSizeQueues);
496 catch (
const std::exception& e)
498 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
505 #endif // OPENPOSE_THREAD_THREAD_MANAGER_HPP
ThreadManager(const ThreadManagerMode threadManagerMode=ThreadManagerMode::Synchronous)
Definition: threadManager.hpp:95
void stop()
Definition: threadManager.hpp:204
bool isRunning() const
Definition: threadManager.hpp:41
void start()
Definition: threadManager.hpp:185
void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues=-1)
Definition: threadManager.hpp:103
bool waitAndEmplace(TDatums &tDatums)
Definition: threadManager.hpp:242
std::shared_ptr< std::atomic< bool > > getIsRunningSharedPtr()
Definition: threadManager.hpp:36
T fastMax(const T a, const T b)
Definition: fastMath.hpp:68
bool tryPop(TDatums &tDatums)
Definition: threadManager.hpp:296
void add(const unsigned long long threadId, const std::vector< TWorker > &tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId)
Definition: threadManager.hpp:116
void reset()
Definition: threadManager.hpp:144
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
bool waitAndPop(TDatums &tDatums)
Definition: threadManager.hpp:314
bool tryEmplace(TDatums &tDatums)
Definition: threadManager.hpp:224
Definition: threadManager.hpp:16
void exec()
Definition: threadManager.hpp:159
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
ThreadManagerMode
Definition: enumClasses.hpp:9
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
std::vector< T, Alloc > vector
Definition: cl2.hpp:567
bool waitAndPush(const TDatums &tDatums)
Definition: threadManager.hpp:278
bool tryPush(const TDatums &tDatums)
Definition: threadManager.hpp:260