OpenPose  1.0.0rc2
OpenPose: A Real-Time Multi-Person Key-Point Detection And Multi-Threading C++ Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thread.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_THREAD_HPP
2 #define OPENPOSE_THREAD_THREAD_HPP
3 
4 #include <atomic>
5 #include <thread>
9 
10 namespace op
11 {
12  template<typename TDatums, typename TWorker = std::shared_ptr<Worker<TDatums>>>
13  class Thread
14  {
15  public:
16  explicit Thread(const std::shared_ptr<std::atomic<bool>>& isRunningSharedPtr = nullptr);
17 
18  // Move constructor
19  Thread(Thread&& t);
20 
21  // Move assignment
22  Thread& operator=(Thread&& t);
23 
24  // Destructor
25  virtual ~Thread();
26 
27  void add(const std::vector<std::shared_ptr<SubThread<TDatums, TWorker>>>& subThreads);
28 
29  void add(const std::shared_ptr<SubThread<TDatums, TWorker>>& subThread);
30 
31  void exec(const std::shared_ptr<std::atomic<bool>>& isRunningSharedPtr);
32 
33  void startInThread();
34 
35  void stopAndJoin();
36 
37  inline bool isRunning() const
38  {
39  return *spIsRunning;
40  }
41 
42  private:
43  std::shared_ptr<std::atomic<bool>> spIsRunning;
44  std::vector<std::shared_ptr<SubThread<TDatums, TWorker>>> mSubThreads;
45  std::thread mThread;
46 
47  void initializationOnThread();
48 
49  void threadFunction();
50 
51  void stop();
52 
53  void join();
54 
55  DELETE_COPY(Thread);
56  };
57 }
58 
59 
60 
61 
62 
63 // Implementation
64 namespace op
65 {
66  template<typename TDatums, typename TWorker>
67  Thread<TDatums, TWorker>::Thread(const std::shared_ptr<std::atomic<bool>>& isRunningSharedPtr) :
68  spIsRunning{(isRunningSharedPtr != nullptr ? isRunningSharedPtr : std::make_shared<std::atomic<bool>>(false))}
69  {
70  }
71 
72  template<typename TDatums, typename TWorker>
74  spIsRunning{std::make_shared<std::atomic<bool>>(t.spIsRunning->load())}
75  {
76  std::swap(mSubThreads, t.mSubThreads);
77  std::swap(mThread, t.mThread);
78  }
79 
80  template<typename TDatums, typename TWorker>
82  {
83  std::swap(mSubThreads, t.mSubThreads);
84  std::swap(mThread, t.mThread);
85  spIsRunning = {std::make_shared<std::atomic<bool>>(t.spIsRunning->load())};
86  return *this;
87  }
88 
89  template<typename TDatums, typename TWorker>
91  {
92  try
93  {
94  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
95  stopAndJoin();
96  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
97  }
98  catch (const std::exception& e)
99  {
100  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
101  }
102  }
103 
104  template<typename TDatums, typename TWorker>
106  {
107  for (const auto& subThread : subThreads)
108  mSubThreads.emplace_back(subThread);
109  }
110 
111  template<typename TDatums, typename TWorker>
112  void Thread<TDatums, TWorker>::add(const std::shared_ptr<SubThread<TDatums, TWorker>>& subThread)
113  {
114  add(std::vector<std::shared_ptr<SubThread<TDatums, TWorker>>>{subThread});
115  }
116 
117  template<typename TDatums, typename TWorker>
118  void Thread<TDatums, TWorker>::exec(const std::shared_ptr<std::atomic<bool>>& isRunningSharedPtr)
119  {
120  try
121  {
122  stopAndJoin();
123  spIsRunning = isRunningSharedPtr;
124  *spIsRunning = {true};
125  threadFunction();
126  }
127  catch (const std::exception& e)
128  {
129  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
130  }
131  }
132 
133  template<typename TDatums, typename TWorker>
135  {
136  try
137  {
138  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
139  stopAndJoin();
140  *spIsRunning = {true};
141  mThread = {std::thread{&Thread::threadFunction, this}};
142  }
143  catch (const std::exception& e)
144  {
145  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
146  }
147  }
148 
149  template<typename TDatums, typename TWorker>
151  {
152  try
153  {
154  stop();
155  join();
156  }
157  catch (const std::exception& e)
158  {
159  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
160  }
161  }
162 
163  template<typename TDatums, typename TWorker>
165  {
166  try
167  {
168  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
169  for (auto& subThread : mSubThreads)
170  subThread->initializationOnThread();
171  }
172  catch (const std::exception& e)
173  {
174  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
175  }
176  }
177 
178  template<typename TDatums, typename TWorker>
179  void Thread<TDatums, TWorker>::threadFunction()
180  {
181  try
182  {
183  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
184  initializationOnThread();
185 
186  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
187  while (isRunning())
188  {
189  bool allSubThreadsClosed = true;
190  for (auto& subThread : mSubThreads)
191  allSubThreadsClosed &= !subThread->work();
192 
193  if (allSubThreadsClosed)
194  {
195  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
196  stop();
197  break;
198  }
199  }
200  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
201  }
202  catch (const std::exception& e)
203  {
204  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
205  }
206  }
207 
208  template<typename TDatums, typename TWorker>
209  void Thread<TDatums, TWorker>::stop()
210  {
211  try
212  {
213  *spIsRunning = {false};
214  }
215  catch (const std::exception& e)
216  {
217  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
218  }
219  }
220 
221  template<typename TDatums, typename TWorker>
222  void Thread<TDatums, TWorker>::join()
223  {
224  try
225  {
226  if (mThread.joinable())
227  mThread.join();
228  }
229  catch (const std::exception& e)
230  {
231  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
232  }
233  }
234 
236 }
237 
238 #endif // OPENPOSE_THREAD_THREAD_HPP
void startInThread()
Definition: thread.hpp:134
Definition: subThread.hpp:10
void add(const std::vector< std::shared_ptr< SubThread< TDatums, TWorker >>> &subThreads)
Definition: thread.hpp:105
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
virtual ~Thread()
Definition: thread.hpp:90
void stopAndJoin()
Definition: thread.hpp:150
bool isRunning() const
Definition: thread.hpp:37
Thread & operator=(Thread &&t)
Definition: thread.hpp:81
void exec(const std::shared_ptr< std::atomic< bool >> &isRunningSharedPtr)
Definition: thread.hpp:118
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
Definition: thread.hpp:13
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
std::vector< T, Alloc > vector
Definition: cl2.hpp:567
Thread(const std::shared_ptr< std::atomic< bool >> &isRunningSharedPtr=nullptr)
Definition: thread.hpp:67