From c8bcf53d81173b097bc8273667434d09d4683a92 Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Mon, 27 May 2019 20:41:22 +0800 Subject: [PATCH] refactor(db): refactor for db impl Former-commit-id: f7488d5189ed1c34d9b3e8e3da4eaa71766f9c11 --- cpp/src/db/DBImpl.h | 44 +++++++------- cpp/src/db/DBImpl.inl | 138 ++++++++++++++++++++---------------------- 2 files changed, 89 insertions(+), 93 deletions(-) diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 6d91a91c..6e596ff3 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -35,10 +35,10 @@ public: virtual Status add_group(meta::TableSchema& table_schema) override; virtual Status get_group(meta::TableSchema& table_schema) override; - virtual Status has_group(const std::string& table_id_, bool& has_or_not_) override; + virtual Status has_group(const std::string& table_id, bool& has_or_not) override; - virtual Status add_vectors(const std::string& table_id_, - size_t n, const float* vectors, IDNumbers& vector_ids_) override; + virtual Status add_vectors(const std::string& table_id, + size_t n, const float* vectors, IDNumbers& vector_ids) override; virtual Status search(const std::string& table_id, size_t k, size_t nq, const float* vectors, QueryResults& results) override; @@ -54,30 +54,30 @@ public: private: - void background_build_index(); - Status build_index(const meta::TableFileSchema&); - Status try_build_index(); - Status merge_files(const std::string& table_id, + void BackgroundBuildIndex(); + Status BuildIndex(const meta::TableFileSchema&); + Status TryBuildIndex(); + Status MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files); - Status background_merge_files(const std::string& table_id); + Status BackgroundMergeFiles(const std::string& table_id); - void try_schedule_compaction(); - void start_timer_task(int interval_); - void background_timer_task(int interval_); + void TrySchedule(); + void StartTimerTasks(int interval); + void BackgroundTimerTask(int interval); static void BGWork(void* db); - void background_call(); - void background_compaction(); + void BackgroundCall(); + void BackgroundCompaction(); - Env* const _env; - const Options _options; + Env* const env_; + const Options options_; - std::mutex _mutex; - std::condition_variable _bg_work_finish_signal; - bool _bg_compaction_scheduled; - Status _bg_error; - std::atomic _shutting_down; + std::mutex mutex_; + std::condition_variable bg_work_finish_signal_; + bool bg_compaction_scheduled_; + Status bg_error_; + std::atomic shutting_down_; std::mutex build_index_mutex_; bool bg_build_index_started_; @@ -85,8 +85,8 @@ private: std::thread bg_timer_thread_; - MetaPtr _pMeta; - MemManagerPtr _pMemMgr; + MetaPtr pMeta_; + MemManagerPtr pMemMgr_; }; // DBImpl diff --git a/cpp/src/db/DBImpl.inl b/cpp/src/db/DBImpl.inl index 7adf80d4..025d7c7b 100644 --- a/cpp/src/db/DBImpl.inl +++ b/cpp/src/db/DBImpl.inl @@ -23,35 +23,35 @@ namespace engine { template DBImpl::DBImpl(const Options& options) - : _env(options.env), - _options(options), - _bg_compaction_scheduled(false), - _shutting_down(false), + : env_(options.env), + options_(options), + bg_compaction_scheduled_(false), + shutting_down_(false), bg_build_index_started_(false), - _pMeta(new meta::DBMetaImpl(_options.meta)), - _pMemMgr(new MemManager(_pMeta, _options)) { - start_timer_task(_options.memory_sync_interval); + pMeta_(new meta::DBMetaImpl(options_.meta)), + pMemMgr_(new MemManager(pMeta_, options_)) { + StartTimerTasks(options_.memory_sync_interval); } template Status DBImpl::add_group(meta::TableSchema& table_schema) { - return _pMeta->CreateTable(table_schema); + return pMeta_->CreateTable(table_schema); } template Status DBImpl::get_group(meta::TableSchema& table_schema) { - return _pMeta->DescribeTable(table_schema); + return pMeta_->DescribeTable(table_schema); } template Status DBImpl::has_group(const std::string& table_id, bool& has_or_not) { - return _pMeta->HasTable(table_id, has_or_not); + return pMeta_->HasTable(table_id, has_or_not); } template Status DBImpl::add_vectors(const std::string& table_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) { - Status status = _pMemMgr->add_vectors(table_id_, n, vectors, vector_ids_); + Status status = pMemMgr_->add_vectors(table_id_, n, vectors, vector_ids_); if (!status.ok()) { return status; } @@ -69,7 +69,7 @@ Status DBImpl::search(const std::string& table_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { meta::DatePartionedTableFilesSchema files; - auto status = _pMeta->FilesToSearch(table_id, dates, files); + auto status = pMeta_->FilesToSearch(table_id, dates, files); if (!status.ok()) { return status; } LOG(DEBUG) << "Search DateT Size=" << files.size(); @@ -195,59 +195,59 @@ Status DBImpl::search(const std::string& table_id, size_t k, size_t nq, } template -void DBImpl::start_timer_task(int interval_) { - bg_timer_thread_ = std::thread(&DBImpl::background_timer_task, this, interval_); +void DBImpl::StartTimerTasks(int interval) { + bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval); } template -void DBImpl::background_timer_task(int interval_) { +void DBImpl::BackgroundTimerTask(int interval) { Status status; while (true) { - if (!_bg_error.ok()) break; - if (_shutting_down.load(std::memory_order_acquire)) break; + if (!bg_error_.ok()) break; + if (shutting_down_.load(std::memory_order_acquire)) break; - std::this_thread::sleep_for(std::chrono::seconds(interval_)); + std::this_thread::sleep_for(std::chrono::seconds(interval)); - try_schedule_compaction(); + TrySchedule(); } } template -void DBImpl::try_schedule_compaction() { - if (_bg_compaction_scheduled) return; - if (!_bg_error.ok()) return; +void DBImpl::TrySchedule() { + if (bg_compaction_scheduled_) return; + if (!bg_error_.ok()) return; - _bg_compaction_scheduled = true; - _env->Schedule(&DBImpl::BGWork, this); + bg_compaction_scheduled_ = true; + env_->Schedule(&DBImpl::BGWork, this); } template void DBImpl::BGWork(void* db_) { - reinterpret_cast(db_)->background_call(); + reinterpret_cast(db_)->BackgroundCall(); } template -void DBImpl::background_call() { - std::lock_guard lock(_mutex); - assert(_bg_compaction_scheduled); +void DBImpl::BackgroundCall() { + std::lock_guard lock(mutex_); + assert(bg_compaction_scheduled_); - if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire)) + if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire)) return ; - background_compaction(); + BackgroundCompaction(); - _bg_compaction_scheduled = false; - _bg_work_finish_signal.notify_all(); + bg_compaction_scheduled_ = false; + bg_work_finish_signal_.notify_all(); } template -Status DBImpl::merge_files(const std::string& table_id, const meta::DateT& date, +Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) { meta::TableFileSchema table_file; table_file.table_id = table_id; table_file.date = date; - Status status = _pMeta->CreateTableFile(table_file); + Status status = pMeta_->CreateTableFile(table_file); if (!status.ok()) { LOG(INFO) << status.ToString() << std::endl; @@ -267,19 +267,19 @@ Status DBImpl::merge_files(const std::string& table_id, const meta::Dat LOG(DEBUG) << "Merging file " << file_schema.file_id; index_size = index.Size(); - if (index_size >= _options.index_trigger_size) break; + if (index_size >= options_.index_trigger_size) break; } index.Serialize(); - if (index_size >= _options.index_trigger_size) { + if (index_size >= options_.index_trigger_size) { table_file.file_type = meta::TableFileSchema::TO_INDEX; } else { table_file.file_type = meta::TableFileSchema::RAW; } table_file.size = index_size; updated.push_back(table_file); - status = _pMeta->UpdateTableFiles(updated); + status = pMeta_->UpdateTableFiles(updated); LOG(DEBUG) << "New merged file " << table_file.file_id << " of size=" << index.PhysicalSize()/(1024*1024) << " M"; @@ -289,43 +289,39 @@ Status DBImpl::merge_files(const std::string& table_id, const meta::Dat } template -Status DBImpl::background_merge_files(const std::string& table_id) { +Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { meta::DatePartionedTableFilesSchema raw_files; - auto status = _pMeta->FilesToMerge(table_id, raw_files); + auto status = pMeta_->FilesToMerge(table_id, raw_files); if (!status.ok()) { return status; } - /* if (raw_files.size() == 0) { */ - /* return Status::OK(); */ - /* } */ - bool has_merge = false; for (auto& kv : raw_files) { auto files = kv.second; - if (files.size() <= _options.merge_trigger_number) { + if (files.size() <= options_.merge_trigger_number) { continue; } has_merge = true; - merge_files(table_id, kv.first, kv.second); + MergeFiles(table_id, kv.first, kv.second); } - _pMeta->Archive(); + pMeta_->Archive(); - try_build_index(); + TryBuildIndex(); - _pMeta->CleanUpFilesWithTTL(1); + pMeta_->CleanUpFilesWithTTL(1); return Status::OK(); } template -Status DBImpl::build_index(const meta::TableFileSchema& file) { +Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { meta::TableFileSchema table_file; table_file.table_id = file.table_id; table_file.date = file.date; - Status status = _pMeta->CreateTableFile(table_file); + Status status = pMeta_->CreateTableFile(table_file); if (!status.ok()) { return status; } @@ -342,30 +338,30 @@ Status DBImpl::build_index(const meta::TableFileSchema& file) { to_remove.file_type = meta::TableFileSchema::TO_DELETE; meta::TableFilesSchema update_files = {to_remove, table_file}; - _pMeta->UpdateTableFiles(update_files); + pMeta_->UpdateTableFiles(update_files); LOG(DEBUG) << "New index file " << table_file.file_id << " of size " << index->PhysicalSize()/(1024*1024) << " M" << " from file " << to_remove.file_id; index->Cache(); - _pMeta->Archive(); + pMeta_->Archive(); return Status::OK(); } template -void DBImpl::background_build_index() { +void DBImpl::BackgroundBuildIndex() { std::lock_guard lock(build_index_mutex_); assert(bg_build_index_started_); meta::TableFilesSchema to_index_files; - _pMeta->FilesToIndex(to_index_files); + pMeta_->FilesToIndex(to_index_files); Status status; for (auto& file : to_index_files) { /* LOG(DEBUG) << "Buiding index for " << file.location; */ - status = build_index(file); + status = BuildIndex(file); if (!status.ok()) { - _bg_error = status; + bg_error_ = status; return; } } @@ -376,25 +372,25 @@ void DBImpl::background_build_index() { } template -Status DBImpl::try_build_index() { +Status DBImpl::TryBuildIndex() { if (bg_build_index_started_) return Status::OK(); - if (_shutting_down.load(std::memory_order_acquire)) return Status::OK(); + if (shutting_down_.load(std::memory_order_acquire)) return Status::OK(); bg_build_index_started_ = true; - std::thread build_index_task(&DBImpl::background_build_index, this); + std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this); build_index_task.detach(); return Status::OK(); } template -void DBImpl::background_compaction() { +void DBImpl::BackgroundCompaction() { std::vector table_ids; - _pMemMgr->serialize(table_ids); + pMemMgr_->serialize(table_ids); Status status; for (auto table_id : table_ids) { - status = background_merge_files(table_id); + status = BackgroundMergeFiles(table_id); if (!status.ok()) { - _bg_error = status; + bg_error_ = status; return; } } @@ -402,21 +398,21 @@ void DBImpl::background_compaction() { template Status DBImpl::drop_all() { - return _pMeta->DropAll(); + return pMeta_->DropAll(); } template Status DBImpl::size(long& result) { - return _pMeta->Size(result); + return pMeta_->Size(result); } template DBImpl::~DBImpl() { { - std::unique_lock lock(_mutex); - _shutting_down.store(true, std::memory_order_release); - while (_bg_compaction_scheduled) { - _bg_work_finish_signal.wait(lock); + std::unique_lock lock(mutex_); + shutting_down_.store(true, std::memory_order_release); + while (bg_compaction_scheduled_) { + bg_work_finish_signal_.wait(lock); } } { @@ -427,8 +423,8 @@ DBImpl::~DBImpl() { } bg_timer_thread_.join(); std::vector ids; - _pMemMgr->serialize(ids); - _env->Stop(); + pMemMgr_->serialize(ids); + env_->Stop(); } } // namespace engine -- GitLab