diff --git a/util/thread_local.cc b/util/thread_local.cc index 9002571fd1de483d531ee7d5a27c1361506a0a6c..62aec84996f685b55bc4ef7305af6c241e891b7a 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -14,8 +14,141 @@ namespace rocksdb { +struct Entry { + Entry() : ptr(nullptr) {} + Entry(const Entry& e) : ptr(e.ptr.load(std::memory_order_relaxed)) {} + std::atomic ptr; +}; + +class StaticMeta; + +// This is the structure that is declared as "thread_local" storage. +// The vector keep list of atomic pointer for all instances for "current" +// thread. The vector is indexed by an Id that is unique in process and +// associated with one ThreadLocalPtr instance. The Id is assigned by a +// global StaticMeta singleton. So if we instantiated 3 ThreadLocalPtr +// instances, each thread will have a ThreadData with a vector of size 3: +// --------------------------------------------------- +// | | instance 1 | instance 2 | instnace 3 | +// --------------------------------------------------- +// | thread 1 | void* | void* | void* | <- ThreadData +// --------------------------------------------------- +// | thread 2 | void* | void* | void* | <- ThreadData +// --------------------------------------------------- +// | thread 3 | void* | void* | void* | <- ThreadData +// --------------------------------------------------- +struct ThreadData { + explicit ThreadData(ThreadLocalPtr::StaticMeta* _inst) : entries(), inst(_inst) {} + std::vector entries; + ThreadData* next; + ThreadData* prev; + ThreadLocalPtr::StaticMeta* inst; +}; + +class ThreadLocalPtr::StaticMeta { +public: + StaticMeta(); + + // Return the next available Id + uint32_t GetId(); + // Return the next available Id without claiming it + uint32_t PeekId() const; + // Return the given Id back to the free pool. This also triggers + // UnrefHandler for associated pointer value (if not NULL) for all threads. + void ReclaimId(uint32_t id); + + // Return the pointer value for the given id for the current thread. + void* Get(uint32_t id) const; + // Reset the pointer value for the given id for the current thread. + void Reset(uint32_t id, void* ptr); + // Atomically swap the supplied ptr and return the previous value + void* Swap(uint32_t id, void* ptr); + // Atomically compare and swap the provided value only if it equals + // to expected value. + bool CompareAndSwap(uint32_t id, void* ptr, void*& expected); + // Reset all thread local data to replacement, and return non-nullptr + // data for all existing threads + void Scrape(uint32_t id, autovector* ptrs, void* const replacement); + // Update res by applying func on each thread-local value. Holds a lock that + // prevents unref handler from running during this call, but clients must + // still provide external synchronization since the owning thread can + // access the values without internal locking, e.g., via Get() and Reset(). + void Fold(uint32_t id, FoldFunc func, void* res); + + // Register the UnrefHandler for id + void SetHandler(uint32_t id, UnrefHandler handler); + + // protect inst, next_instance_id_, free_instance_ids_, head_, + // ThreadData.entries + // + // Note that here we prefer function static variable instead of the usual + // global static variable. The reason is that c++ destruction order of + // static variables in the reverse order of their construction order. + // However, C++ does not guarantee any construction order when global + // static variables are defined in different files, while the function + // static variables are initialized when their function are first called. + // As a result, the construction order of the function static variables + // can be controlled by properly invoke their first function calls in + // the right order. + // + // For instance, the following function contains a function static + // variable. We place a dummy function call of this inside + // Env::Default() to ensure the construction order of the construction + // order. + static port::Mutex* Mutex(); + + // Returns the member mutex of the current StaticMeta. In general, + // Mutex() should be used instead of this one. However, in case where + // the static variable inside Instance() goes out of scope, MemberMutex() + // should be used. One example is OnThreadExit() function. + port::Mutex* MemberMutex() { return &mutex_; } + +private: + // Get UnrefHandler for id with acquiring mutex + // REQUIRES: mutex locked + UnrefHandler GetHandler(uint32_t id); + + // Triggered before a thread terminates + static void OnThreadExit(void* ptr); + + // Add current thread's ThreadData to the global chain + // REQUIRES: mutex locked + void AddThreadData(ThreadData* d); + + // Remove current thread's ThreadData from the global chain + // REQUIRES: mutex locked + void RemoveThreadData(ThreadData* d); + + static ThreadData* GetThreadLocal(); + + uint32_t next_instance_id_; + // Used to recycle Ids in case ThreadLocalPtr is instantiated and destroyed + // frequently. This also prevents it from blowing up the vector space. + autovector free_instance_ids_; + // Chain all thread local structure together. This is necessary since + // when one ThreadLocalPtr gets destroyed, we need to loop over each + // thread's version of pointer corresponding to that instance and + // call UnrefHandler for it. + ThreadData head_; + + std::unordered_map handler_map_; + + // The private mutex. Developers should always use Mutex() instead of + // using this variable directly. + port::Mutex mutex_; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread ThreadLocalPtr::ThreadData* ThreadLocalPtr::StaticMeta::tls_ = nullptr; + // Thread local storage + static __thread ThreadData* tls_; +#endif + + // Used to make thread exit trigger possible if !defined(OS_MACOSX). + // Otherwise, used to retrieve thread data. + pthread_key_t pthread_key_; +}; + + +#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL +__thread ThreadData* ThreadLocalPtr::StaticMeta::tls_ = nullptr; #endif // Windows doesn't support a per-thread destructor with its @@ -205,7 +338,7 @@ ThreadLocalPtr::StaticMeta::StaticMeta() : next_instance_id_(0), head_(this) { #endif } -void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadLocalPtr::ThreadData* d) { +void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadData* d) { Mutex()->AssertHeld(); d->next = &head_; d->prev = head_.prev; @@ -214,14 +347,14 @@ void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadLocalPtr::ThreadData* d) { } void ThreadLocalPtr::StaticMeta::RemoveThreadData( - ThreadLocalPtr::ThreadData* d) { + ThreadData* d) { Mutex()->AssertHeld(); d->next->prev = d->prev; d->prev->next = d->next; d->next = d->prev = d; } -ThreadLocalPtr::ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() { +ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() { #ifndef ROCKSDB_SUPPORT_THREAD_LOCAL // Make this local variable name look like a member variable so that we // can share all the code below @@ -318,6 +451,10 @@ void ThreadLocalPtr::StaticMeta::Fold(uint32_t id, FoldFunc func, void* res) { } } +uint32_t ThreadLocalPtr::TEST_PeekId() { + return Instance()->PeekId(); +} + void ThreadLocalPtr::StaticMeta::SetHandler(uint32_t id, UnrefHandler handler) { MutexLock l(Mutex()); handler_map_[id] = handler; diff --git a/util/thread_local.h b/util/thread_local.h index b935208f103ad3eb66fbc036bac5d4243c4e225b..17c51ebcf20db8bf629b7c6fa457ec592d9716da 100644 --- a/util/thread_local.h +++ b/util/thread_local.h @@ -74,6 +74,10 @@ class ThreadLocalPtr { // access the values without internal locking, e.g., via Get() and Reset(). void Fold(FoldFunc func, void* res); + // Add here for testing + // Return the next available Id without claiming it + static uint32_t TEST_PeekId(); + // Initialize the static singletons of the ThreadLocalPtr. // // If this function is not called, then the singletons will be @@ -83,138 +87,9 @@ class ThreadLocalPtr { // initialized will be no-op. static void InitSingletons(); - protected: - struct Entry { - Entry() : ptr(nullptr) {} - Entry(const Entry& e) : ptr(e.ptr.load(std::memory_order_relaxed)) {} - std::atomic ptr; - }; - class StaticMeta; - // This is the structure that is declared as "thread_local" storage. - // The vector keep list of atomic pointer for all instances for "current" - // thread. The vector is indexed by an Id that is unique in process and - // associated with one ThreadLocalPtr instance. The Id is assigned by a - // global StaticMeta singleton. So if we instantiated 3 ThreadLocalPtr - // instances, each thread will have a ThreadData with a vector of size 3: - // --------------------------------------------------- - // | | instance 1 | instance 2 | instnace 3 | - // --------------------------------------------------- - // | thread 1 | void* | void* | void* | <- ThreadData - // --------------------------------------------------- - // | thread 2 | void* | void* | void* | <- ThreadData - // --------------------------------------------------- - // | thread 3 | void* | void* | void* | <- ThreadData - // --------------------------------------------------- - struct ThreadData { - explicit ThreadData(StaticMeta* _inst) : entries(), inst(_inst) {} - std::vector entries; - ThreadData* next; - ThreadData* prev; - StaticMeta* inst; - }; - - class StaticMeta { - public: - StaticMeta(); - - // Return the next available Id - uint32_t GetId(); - // Return the next available Id without claiming it - uint32_t PeekId() const; - // Return the given Id back to the free pool. This also triggers - // UnrefHandler for associated pointer value (if not NULL) for all threads. - void ReclaimId(uint32_t id); - - // Return the pointer value for the given id for the current thread. - void* Get(uint32_t id) const; - // Reset the pointer value for the given id for the current thread. - void Reset(uint32_t id, void* ptr); - // Atomically swap the supplied ptr and return the previous value - void* Swap(uint32_t id, void* ptr); - // Atomically compare and swap the provided value only if it equals - // to expected value. - bool CompareAndSwap(uint32_t id, void* ptr, void*& expected); - // Reset all thread local data to replacement, and return non-nullptr - // data for all existing threads - void Scrape(uint32_t id, autovector* ptrs, void* const replacement); - // Update res by applying func on each thread-local value. Holds a lock that - // prevents unref handler from running during this call, but clients must - // still provide external synchronization since the owning thread can - // access the values without internal locking, e.g., via Get() and Reset(). - void Fold(uint32_t id, FoldFunc func, void* res); - - // Register the UnrefHandler for id - void SetHandler(uint32_t id, UnrefHandler handler); - - // protect inst, next_instance_id_, free_instance_ids_, head_, - // ThreadData.entries - // - // Note that here we prefer function static variable instead of the usual - // global static variable. The reason is that c++ destruction order of - // static variables in the reverse order of their construction order. - // However, C++ does not guarantee any construction order when global - // static variables are defined in different files, while the function - // static variables are initialized when their function are first called. - // As a result, the construction order of the function static variables - // can be controlled by properly invoke their first function calls in - // the right order. - // - // For instance, the following function contains a function static - // variable. We place a dummy function call of this inside - // Env::Default() to ensure the construction order of the construction - // order. - static port::Mutex* Mutex(); - - // Returns the member mutex of the current StaticMeta. In general, - // Mutex() should be used instead of this one. However, in case where - // the static variable inside Instance() goes out of scope, MemberMutex() - // should be used. One example is OnThreadExit() function. - port::Mutex* MemberMutex() { return &mutex_; } - - private: - // Get UnrefHandler for id with acquiring mutex - // REQUIRES: mutex locked - UnrefHandler GetHandler(uint32_t id); - - // Triggered before a thread terminates - static void OnThreadExit(void* ptr); - - // Add current thread's ThreadData to the global chain - // REQUIRES: mutex locked - void AddThreadData(ThreadData* d); - - // Remove current thread's ThreadData from the global chain - // REQUIRES: mutex locked - void RemoveThreadData(ThreadData* d); - - static ThreadData* GetThreadLocal(); - - uint32_t next_instance_id_; - // Used to recycle Ids in case ThreadLocalPtr is instantiated and destroyed - // frequently. This also prevents it from blowing up the vector space. - autovector free_instance_ids_; - // Chain all thread local structure together. This is necessary since - // when one ThreadLocalPtr gets destroyed, we need to loop over each - // thread's version of pointer corresponding to that instance and - // call UnrefHandler for it. - ThreadData head_; - - std::unordered_map handler_map_; - - // The private mutex. Developers should always use Mutex() instead of - // using this variable directly. - port::Mutex mutex_; -#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL - // Thread local storage - static __thread ThreadData* tls_; -#endif - - // Used to make thread exit trigger possible if !defined(OS_MACOSX). - // Otherwise, used to retrieve thread data. - pthread_key_t pthread_key_; - }; +private: static StaticMeta* Instance(); diff --git a/util/thread_local_test.cc b/util/thread_local_test.cc index 7bf008193ef7c37cb868f6c12f003c8c37b4685a..904628b5d48e5c73085eb732dde35eeb6c6773f4 100644 --- a/util/thread_local_test.cc +++ b/util/thread_local_test.cc @@ -51,8 +51,10 @@ struct Params { }; class IDChecker : public ThreadLocalPtr { - public: - static uint32_t PeekId() { return Instance()->PeekId(); } +public: + static uint32_t PeekId() { + return TEST_PeekId(); + } }; } // anonymous namespace