diff --git a/util/env_posix.cc b/util/env_posix.cc index 7cdf66cec996b2d2e1060693906f6e04d535c0f6..957c5b1052e5cd993875d00b2a8fc480957b4bc1 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -600,9 +600,9 @@ class PosixFileLock : public FileLock { class PosixEnv : public Env { public: PosixEnv(); - virtual ~PosixEnv() { - fprintf(stderr, "Destroying Env::Default()\n"); - exit(1); + + virtual ~PosixEnv(){ + WaitForBGThreads(); } void SetFD_CLOEXEC(int fd, const EnvOptions* options) { @@ -804,6 +804,8 @@ class PosixEnv : public Env { virtual void Schedule(void (*function)(void*), void* arg); + virtual void WaitForBGThreads(); + virtual void StartThread(void (*function)(void* arg), void* arg); virtual Status GetTestDirectory(std::string* result) { @@ -973,22 +975,43 @@ class PosixEnv : public Env { // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; typedef std::deque BGQueue; + int queue_size_; // number of items in BGQueue + bool exit_all_threads_; BGQueue queue_; + std::vector threads_to_join_; }; PosixEnv::PosixEnv() : checkedDiskForMmap_(false), forceMmapOff(false), page_size_(getpagesize()), started_bgthread_(0), - num_threads_(1) { + num_threads_(1), + queue_size_(0), + exit_all_threads_(false) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); bgthread_.resize(num_threads_); } +// Signal and Join all background threads started by calls to Schedule +void PosixEnv::WaitForBGThreads() { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + assert(! exit_all_threads_); + exit_all_threads_ = true; + PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + for (unsigned int i = 0; i < threads_to_join_.size(); i++) { + pthread_join(threads_to_join_[i], nullptr); + } +} + void PosixEnv::Schedule(void (*function)(void*), void* arg) { PthreadCall("lock", pthread_mutex_lock(&mu_)); + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } // Start background thread if necessary for (; started_bgthread_ < num_threads_; started_bgthread_++) { PthreadCall( @@ -997,6 +1020,7 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) { nullptr, &PosixEnv::BGThreadWrapper, this)); + threads_to_join_.push_back(bgthread_[started_bgthread_]); fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]); } @@ -1015,10 +1039,13 @@ void PosixEnv::BGThread() { while (true) { // Wait until there is an item that is ready to run PthreadCall("lock", pthread_mutex_lock(&mu_)); - while (queue_.empty()) { + while (queue_.empty() && !exit_all_threads_) { PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); } - + if (exit_all_threads_) { // mechanism to let BG threads exit safely + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + break; + } void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); @@ -1048,17 +1075,15 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { state->arg = arg; PthreadCall("start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + threads_to_join_.push_back(t); } } // namespace -static pthread_once_t once = PTHREAD_ONCE_INIT; -static Env* default_env; -static void InitDefaultEnv() { default_env = new PosixEnv; } +static PosixEnv default_env; Env* Env::Default() { - pthread_once(&once, InitDefaultEnv); - return default_env; + return &default_env; } } // namespace leveldb