/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #include #include #include #include namespace apache { namespace thrift { namespace concurrency { /** * Monitor implementation using the std thread library * * @version $Id:$ */ class Monitor::Impl { public: Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); } Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); } Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(&(monitor->mutex())); } Mutex& mutex() { return *mutex_; } void lock() { mutex_->lock(); } void unlock() { mutex_->unlock(); } /** * Exception-throwing version of waitForTimeRelative(), called simply * wait(int64) for historical reasons. Timeout is in milliseconds. * * If the condition occurs, this function returns cleanly; on timeout or * error an exception is thrown. */ void wait(int64_t timeout_ms) { int result = waitForTimeRelative(timeout_ms); if (result == THRIFT_ETIMEDOUT) { throw TimedOutException(); } else if (result != 0) { throw TException("Monitor::wait() failed"); } } /** * Waits until the specified timeout in milliseconds for the condition to * occur, or waits forever if timeout_ms == 0. * * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ int waitForTimeRelative(int64_t timeout_ms) { if (timeout_ms == 0LL) { return waitForever(); } assert(mutex_); std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); assert(mutexImpl); std::unique_lock lock(*mutexImpl, std::adopt_lock); bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout); lock.release(); return (timedout ? THRIFT_ETIMEDOUT : 0); } /** * Waits until the absolute time specified using struct THRIFT_TIMESPEC. * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ int waitForTime(const THRIFT_TIMESPEC* abstime) { struct timeval temp; temp.tv_sec = static_cast(abstime->tv_sec); temp.tv_usec = static_cast(abstime->tv_nsec) / 1000; return waitForTime(&temp); } /** * Waits until the absolute time specified using struct timeval. * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ int waitForTime(const struct timeval* abstime) { assert(mutex_); std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); assert(mutexImpl); struct timeval currenttime; Util::toTimeval(currenttime, Util::currentTime()); long tv_sec = static_cast(abstime->tv_sec - currenttime.tv_sec); long tv_usec = static_cast(abstime->tv_usec - currenttime.tv_usec); if (tv_sec < 0) tv_sec = 0; if (tv_usec < 0) tv_usec = 0; std::unique_lock lock(*mutexImpl, std::adopt_lock); bool timedout = (conditionVariable_.wait_for(lock, std::chrono::seconds(tv_sec) + std::chrono::microseconds(tv_usec)) == std::cv_status::timeout); lock.release(); return (timedout ? THRIFT_ETIMEDOUT : 0); } /** * Waits forever until the condition occurs. * Returns 0 if condition occurs, or an error code otherwise. */ int waitForever() { assert(mutex_); std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); assert(mutexImpl); std::unique_lock lock(*mutexImpl, std::adopt_lock); conditionVariable_.wait(lock); lock.release(); return 0; } void notify() { conditionVariable_.notify_one(); } void notifyAll() { conditionVariable_.notify_all(); } private: void init(Mutex* mutex) { mutex_ = mutex; } const std::unique_ptr ownedMutex_; std::condition_variable_any conditionVariable_; Mutex* mutex_; }; Monitor::Monitor() : impl_(new Monitor::Impl()) { } Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) { } Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) { } Monitor::~Monitor() { delete impl_; } Mutex& Monitor::mutex() const { return const_cast(impl_)->mutex(); } void Monitor::lock() const { const_cast(impl_)->lock(); } void Monitor::unlock() const { const_cast(impl_)->unlock(); } void Monitor::wait(int64_t timeout) const { const_cast(impl_)->wait(timeout); } int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { return const_cast(impl_)->waitForTime(abstime); } int Monitor::waitForTime(const timeval* abstime) const { return const_cast(impl_)->waitForTime(abstime); } int Monitor::waitForTimeRelative(int64_t timeout_ms) const { return const_cast(impl_)->waitForTimeRelative(timeout_ms); } int Monitor::waitForever() const { return const_cast(impl_)->waitForever(); } void Monitor::notify() const { const_cast(impl_)->notify(); } void Monitor::notifyAll() const { const_cast(impl_)->notifyAll(); } } } } // apache::thrift::concurrency