From bf5393ae58f29659993170104024f20812863ffd Mon Sep 17 00:00:00 2001 From: kalistratovag Date: Fri, 29 May 2015 21:58:45 +0300 Subject: [PATCH] parallel for on pthreads initial commit removing trailing whitespaces Compilation error on Mac fix & warning on android Warnings fixed on iOs --- CMakeLists.txt | 2 + cmake/OpenCVFindLibsPerf.cmake | 10 + modules/core/src/parallel.cpp | 18 + modules/core/src/parallel_pthreads.cpp | 592 +++++++++++++++++++++++++ modules/core/src/precomp.hpp | 6 + 5 files changed, 628 insertions(+) create mode 100644 modules/core/src/parallel_pthreads.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 939ee12674..6ab4e7f8e3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -188,6 +188,7 @@ OCV_OPTION(WITH_QUICKTIME "Use QuickTime for Video I/O insted of QTKit" OFF OCV_OPTION(WITH_TBB "Include Intel TBB support" OFF IF (NOT IOS AND NOT WINRT) ) OCV_OPTION(WITH_OPENMP "Include OpenMP support" OFF) OCV_OPTION(WITH_CSTRIPES "Include C= support" OFF IF (WIN32 AND NOT WINRT) ) +OCV_OPTION(WITH_PTHREADS_PF "Use pthreads-based parallel_for" OFF IF (NOT WIN32) ) OCV_OPTION(WITH_TIFF "Include TIFF support" ON IF (NOT IOS) ) OCV_OPTION(WITH_UNICAP "Include Unicap support (GPL)" OFF IF (UNIX AND NOT APPLE AND NOT ANDROID) ) OCV_OPTION(WITH_V4L "Include Video 4 Linux support" ON IF (UNIX AND NOT ANDROID) ) @@ -1067,6 +1068,7 @@ status(" Use OpenMP:" HAVE_OPENMP THEN YES ELSE NO) status(" Use GCD" HAVE_GCD THEN YES ELSE NO) status(" Use Concurrency" HAVE_CONCURRENCY THEN YES ELSE NO) status(" Use C=:" HAVE_CSTRIPES THEN YES ELSE NO) +status(" Use pthreads for parallel for:" HAVE_PTHREADS_PF THEN YES ELSE NO) status(" Use Cuda:" HAVE_CUDA THEN "YES (ver ${CUDA_VERSION_STRING})" ELSE NO) status(" Use OpenCL:" HAVE_OPENCL THEN YES ELSE NO) diff --git a/cmake/OpenCVFindLibsPerf.cmake b/cmake/OpenCVFindLibsPerf.cmake index c7d0858451..bda5d792a3 100644 --- a/cmake/OpenCVFindLibsPerf.cmake +++ b/cmake/OpenCVFindLibsPerf.cmake @@ -119,3 +119,13 @@ if(WITH_OPENMP) endif() set(HAVE_OPENMP "${OPENMP_FOUND}") endif() + +if(UNIX OR ANDROID) +if(NOT APPLE AND NOT HAVE_TBB AND NOT HAVE_OPENMP) + set(HAVE_PTHREADS_PF 1) +else() + set(HAVE_PTHREADS_PF 0) +endif() +else() + set(HAVE_PTHREADS_PF 0) +endif() diff --git a/modules/core/src/parallel.cpp b/modules/core/src/parallel.cpp index 582c7cd039..b1e7567818 100644 --- a/modules/core/src/parallel.cpp +++ b/modules/core/src/parallel.cpp @@ -125,6 +125,8 @@ # define CV_PARALLEL_FRAMEWORK "winrt-concurrency" #elif defined HAVE_CONCURRENCY # define CV_PARALLEL_FRAMEWORK "ms-concurrency" +#elif defined HAVE_PTHREADS +# define CV_PARALLEL_FRAMEWORK "pthreads" #endif namespace cv @@ -298,6 +300,10 @@ void cv::parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, Concurrency::CurrentScheduler::Detach(); } +#elif defined HAVE_PTHREADS + void parallel_for_pthreads(const Range& range, const ParallelLoopBody& body, double nstripes); + parallel_for_pthreads(range, body, nstripes); + #else #error You have hacked and compiling with unsupported parallel framework @@ -353,6 +359,12 @@ int cv::getNumThreads(void) ? Concurrency::CurrentScheduler::Get()->GetNumberOfVirtualProcessors() : pplScheduler->GetNumberOfVirtualProcessors()); +#elif defined HAVE_PTHREADS + + size_t parallel_pthreads_get_threads_num(); + + return parallel_pthreads_get_threads_num(); + #else return 1; @@ -410,6 +422,12 @@ void cv::setNumThreads( int threads ) Concurrency::MaxConcurrency, threads-1)); } +#elif defined HAVE_PTHREADS + + void parallel_pthreads_set_threads_num(int num); + + parallel_pthreads_set_threads_num(threads); + #endif } diff --git a/modules/core/src/parallel_pthreads.cpp b/modules/core/src/parallel_pthreads.cpp new file mode 100644 index 0000000000..f46515f37a --- /dev/null +++ b/modules/core/src/parallel_pthreads.cpp @@ -0,0 +1,592 @@ +/*M/////////////////////////////////////////////////////////////////////////////////////// +// +// IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. +// +// By downloading, copying, installing or using the software you agree to this license. +// If you do not agree to this license, do not download, install, +// copy or use the software. +// +// +// License Agreement +// For Open Source Computer Vision Library +// +// Copyright (C) 2000-2008, Intel Corporation, all rights reserved. +// Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved. +// Third party copyrights are property of their respective owners. +// +// Redistribution and use in source and binary forms, with or without modification, +// are permitted provided that the following conditions are met: +// +// * Redistribution's of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistribution's in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * The name of the copyright holders may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// This software is provided by the copyright holders and contributors "as is" and +// any express or implied warranties, including, but not limited to, the implied +// warranties of merchantability and fitness for a particular purpose are disclaimed. +// In no event shall the Intel Corporation or contributors be liable for any direct, +// indirect, incidental, special, exemplary, or consequential damages +// (including, but not limited to, procurement of substitute goods or services; +// loss of use, data, or profits; or business interruption) however caused +// and on any theory of liability, whether in contract, strict liability, +// or tort (including negligence or otherwise) arising in any way out of +// the use of this software, even if advised of the possibility of such damage. +// +//M*/ + +#include "precomp.hpp" + +#if defined HAVE_PTHREADS && HAVE_PTHREADS + +#include +#include + +namespace cv +{ + +class ThreadManager; + +enum ForThreadState +{ + eFTNotStarted = 0, + eFTStarted = 1, + eFTToStop = 2, + eFTStoped = 3 +}; + +enum ThreadManagerPoolState +{ + eTMNotInited = 0, + eTMFailedToInit = 1, + eTMInited = 2, + eTMSingleThreaded = 3 +}; + +struct work_load +{ + work_load() + { + clear(); + } + + work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes) + { + set(range, body, nstripes); + } + + void set(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes) + { + m_body = &body; + m_range = ⦥ + m_nstripes = nstripes; + m_blocks_count = ((m_range->end - m_range->start - 1)/m_nstripes) + 1; + } + + const cv::ParallelLoopBody* m_body; + const cv::Range* m_range; + int m_nstripes; + unsigned int m_blocks_count; + + void clear() + { + m_body = 0; + m_range = 0; + m_nstripes = 0; + m_blocks_count = 0; + } +}; + +class ForThread +{ +public: + + ForThread(): m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0) + { + } + + //called from manager thread + bool init(size_t id, ThreadManager* parent); + + //called from manager thread + void run(); + + //called from manager thread + void stop(); + + ~ForThread(); + +private: + + //called from worker thread + static void* thread_loop_wrapper(void* thread_object); + + //called from worker thread + void execute(); + + //called from worker thread + void thread_body(); + + pthread_t m_posix_thread; + pthread_mutex_t m_thread_mutex; + pthread_cond_t m_cond_thread_task; + bool m_task_start; + + ThreadManager* m_parent; + ForThreadState m_state; + size_t m_id; +}; + +class ThreadManager +{ +public: + friend class ForThread; + + static ThreadManager& instance() + { + if(!m_instance.ptr) + { + pthread_mutex_lock(&m_manager_access_mutex); + + if(!m_instance.ptr) + { + m_instance.ptr = new ThreadManager(); + } + + pthread_mutex_unlock(&m_manager_access_mutex); + } + + return *m_instance.ptr; + } + + + static void stop() + { + ThreadManager& manager = instance(); + + if(manager.m_pool_state == eTMInited) + { + for(size_t i = 0; i < manager.m_num_threads; ++i) + { + manager.m_threads[i].stop(); + } + } + + manager.m_pool_state = eTMNotInited; + } + + void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); + + size_t getNumOfThreads(); + + void setNumOfThreads(size_t n); + +private: + + struct ptr_holder + { + ThreadManager* ptr; + + ptr_holder(): ptr(NULL) { } + + ~ptr_holder() + { + if(ptr) + { + delete ptr; + } + } + }; + + ThreadManager(); + + ~ThreadManager(); + + void wait_complete(); + + void notify_complete(); + + bool initPool(); + + size_t defaultNumberOfThreads(); + + std::vector m_threads; + size_t m_num_threads; + + pthread_mutex_t m_manager_task_mutex; + pthread_cond_t m_cond_thread_task_complete; + bool m_task_complete; + + unsigned int m_task_position; + unsigned int m_num_of_completed_tasks; + + static pthread_mutex_t m_manager_access_mutex; + static ptr_holder m_instance; + + static const char m_env_name[]; + static const unsigned int m_default_number_of_threads; + + work_load m_work_load; + + struct work_thread_t + { + work_thread_t(): value(false) { } + bool value; + }; + + cv::TLSData m_is_work_thread; + + ThreadManagerPoolState m_pool_state; +}; + +#ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP +#define PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP PTHREAD_RECURSIVE_MUTEX_INITIALIZER +#endif + +pthread_mutex_t ThreadManager::m_manager_access_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; + +ThreadManager::ptr_holder ThreadManager::m_instance; +const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM"; +const unsigned int ThreadManager::m_default_number_of_threads = 8; + +ForThread::~ForThread() +{ + if(m_state == eFTStarted) + { + stop(); + + pthread_mutex_destroy(&m_thread_mutex); + + pthread_cond_destroy(&m_cond_thread_task); + } +} + +bool ForThread::init(size_t id, ThreadManager* parent) +{ + m_id = id; + + m_parent = parent; + + int res = 0; + + res |= pthread_mutex_init(&m_thread_mutex, NULL); + + res |= pthread_cond_init(&m_cond_thread_task, NULL); + + if(!res) + { + res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this); + } + + + return res == 0; +} + +void ForThread::stop() +{ + if(m_state == eFTStarted) + { + m_state = eFTToStop; + + run(); + + pthread_join(m_posix_thread, NULL); + } + + m_state = eFTStoped; +} + +void ForThread::run() +{ + pthread_mutex_lock(&m_thread_mutex); + + m_task_start = true; + + pthread_cond_signal(&m_cond_thread_task); + + pthread_mutex_unlock(&m_thread_mutex); +} + +void* ForThread::thread_loop_wrapper(void* thread_object) +{ + ((ForThread*)thread_object)->thread_body(); + return 0; +} + +void ForThread::execute() +{ + unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1); + + work_load& load = m_parent->m_work_load; + + while(m_current_pos < load.m_blocks_count) + { + int start = load.m_range->start + m_current_pos*load.m_nstripes; + int end = std::min(start + load.m_nstripes, load.m_range->end); + + load.m_body->operator()(cv::Range(start, end)); + + m_current_pos = CV_XADD(&m_parent->m_task_position, 1); + } +} + +void ForThread::thread_body() +{ + m_parent->m_is_work_thread.get()->value = true; + + pthread_mutex_lock(&m_thread_mutex); + + m_state = eFTStarted; + + while(m_state == eFTStarted) + { + //to handle spurious wakeups + while( !m_task_start && m_state != eFTToStop ) + pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex); + + if(m_state == eFTStarted) + { + execute(); + + m_task_start = false; + + m_parent->notify_complete(); + } + } + + pthread_mutex_unlock(&m_thread_mutex); +} + +ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited) +{ + int res = 0; + + res |= pthread_mutex_init(&m_manager_task_mutex, NULL); + + res |= pthread_cond_init(&m_cond_thread_task_complete, NULL); + + if(!res) + { + setNumOfThreads(defaultNumberOfThreads()); + + m_task_position = 0; + } + else + { + m_num_threads = 1; + m_pool_state = eTMFailedToInit; + m_task_position = 0; + + //print error; + } +} + +ThreadManager::~ThreadManager() +{ + stop(); + + pthread_mutex_destroy(&m_manager_task_mutex); + + pthread_cond_destroy(&m_cond_thread_task_complete); + + pthread_mutex_destroy(&m_manager_access_mutex); +} + +void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) +{ + bool is_work_thread; + + is_work_thread = m_is_work_thread.get()->value; + + if( (getNumOfThreads() > 1) && !is_work_thread && (range.end - range.start > 1) ) + { + int res = pthread_mutex_trylock(&m_manager_access_mutex); + + if(!res) + { + if(initPool()) + { + double min_stripes = double(range.end - range.start)/(4*m_threads.size()); + + nstripes = std::max(nstripes, min_stripes); + + pthread_mutex_lock(&m_manager_task_mutex); + + m_num_of_completed_tasks = 0; + + m_task_position = 0; + + m_task_complete = false; + + m_work_load.set(range, body, std::ceil(nstripes)); + + for(size_t i = 0; i < m_threads.size(); ++i) + { + m_threads[i].run(); + } + + wait_complete(); + } + else + { + //print error + body(range); + } + } + else + { + body(range); + } + } + else + { + body(range); + } +} + +void ThreadManager::wait_complete() +{ + //to handle spurious wakeups + while(!m_task_complete) + pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex); + + pthread_mutex_unlock(&m_manager_task_mutex); + + pthread_mutex_unlock(&m_manager_access_mutex); +} + +void ThreadManager::notify_complete() +{ + + unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1); + + if(comp == (m_num_threads - 1)) + { + pthread_mutex_lock(&m_manager_task_mutex); + + m_task_complete = true; + + pthread_cond_signal(&m_cond_thread_task_complete); + + pthread_mutex_unlock(&m_manager_task_mutex); + } +} + +bool ThreadManager::initPool() +{ + if(m_pool_state != eTMNotInited || m_num_threads == 1) + return true; + + m_threads.resize(m_num_threads); + + bool res = true; + + for(size_t i = 0; i < m_threads.size(); ++i) + { + res |= m_threads[i].init(i, this); + } + + if(res) + { + m_pool_state = eTMInited; + } + else + { + //TODO: join threads? + m_pool_state = eTMFailedToInit; + } + + return res; +} + +size_t ThreadManager::getNumOfThreads() +{ + return m_num_threads; +} + +void ThreadManager::setNumOfThreads(size_t n) +{ + int res = pthread_mutex_lock(&m_manager_access_mutex); + + if(!res) + { + if(n == 0) + { + n = defaultNumberOfThreads(); + } + + if(n != m_num_threads && m_pool_state != eTMFailedToInit) + { + if(m_pool_state == eTMInited) + { + stop(); + m_threads.clear(); + } + + m_num_threads = n; + + if(m_num_threads == 1) + { + m_pool_state = eTMSingleThreaded; + } + else + { + m_pool_state = eTMNotInited; + } + } + + pthread_mutex_unlock(&m_manager_access_mutex); + } +} + +size_t ThreadManager::defaultNumberOfThreads() +{ + unsigned int result = m_default_number_of_threads; + + char * env = getenv(m_env_name); + + if(env != NULL) + { + sscanf(env, "%u", &result); + + result = std::max(1u, result); + //do we need upper limit of threads number? + } + + return result; +} + +void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); +size_t parallel_pthreads_get_threads_num(); +void parallel_pthreads_set_threads_num(int num); + +size_t parallel_pthreads_get_threads_num() +{ + return ThreadManager::instance().getNumOfThreads(); +} + +void parallel_pthreads_set_threads_num(int num) +{ + if(num < 0) + { + ThreadManager::instance().setNumOfThreads(0); + } + else + { + ThreadManager::instance().setNumOfThreads(size_t(num)); + } +} + +void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) +{ + ThreadManager::instance().run(range, body, nstripes); +} + +} + +#endif diff --git a/modules/core/src/precomp.hpp b/modules/core/src/precomp.hpp index 88b60e4713..d463126368 100644 --- a/modules/core/src/precomp.hpp +++ b/modules/core/src/precomp.hpp @@ -292,6 +292,12 @@ TLSData& getCoreTlsData(); #define CL_RUNTIME_EXPORT #endif +#ifndef HAVE_PTHREADS +#if !(defined WIN32 || defined _WIN32 || defined WINCE || defined HAVE_WINRT) +#define HAVE_PTHREADS 1 +#endif +#endif + extern bool __termination; // skip some cleanups, because process is terminating // (for example, if ExitProcess() was already called) -- GitLab