提交 c8bcf53d 编写于 作者: X Xu Peng

refactor(db): refactor for db impl


Former-commit-id: f7488d5189ed1c34d9b3e8e3da4eaa71766f9c11
上级 541692c2
......@@ -35,10 +35,10 @@ public:
virtual Status add_group(meta::TableSchema& table_schema) override;
virtual Status get_group(meta::TableSchema& table_schema) override;
virtual Status has_group(const std::string& table_id_, bool& has_or_not_) override;
virtual Status has_group(const std::string& table_id, bool& has_or_not) override;
virtual Status add_vectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) override;
virtual Status add_vectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids) override;
virtual Status search(const std::string& table_id, size_t k, size_t nq,
const float* vectors, QueryResults& results) override;
......@@ -54,30 +54,30 @@ public:
private:
void background_build_index();
Status build_index(const meta::TableFileSchema&);
Status try_build_index();
Status merge_files(const std::string& table_id,
void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema&);
Status TryBuildIndex();
Status MergeFiles(const std::string& table_id,
const meta::DateT& date,
const meta::TableFilesSchema& files);
Status background_merge_files(const std::string& table_id);
Status BackgroundMergeFiles(const std::string& table_id);
void try_schedule_compaction();
void start_timer_task(int interval_);
void background_timer_task(int interval_);
void TrySchedule();
void StartTimerTasks(int interval);
void BackgroundTimerTask(int interval);
static void BGWork(void* db);
void background_call();
void background_compaction();
void BackgroundCall();
void BackgroundCompaction();
Env* const _env;
const Options _options;
Env* const env_;
const Options options_;
std::mutex _mutex;
std::condition_variable _bg_work_finish_signal;
bool _bg_compaction_scheduled;
Status _bg_error;
std::atomic<bool> _shutting_down;
std::mutex mutex_;
std::condition_variable bg_work_finish_signal_;
bool bg_compaction_scheduled_;
Status bg_error_;
std::atomic<bool> shutting_down_;
std::mutex build_index_mutex_;
bool bg_build_index_started_;
......@@ -85,8 +85,8 @@ private:
std::thread bg_timer_thread_;
MetaPtr _pMeta;
MemManagerPtr _pMemMgr;
MetaPtr pMeta_;
MemManagerPtr pMemMgr_;
}; // DBImpl
......
......@@ -23,35 +23,35 @@ namespace engine {
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
: _env(options.env),
_options(options),
_bg_compaction_scheduled(false),
_shutting_down(false),
: env_(options.env),
options_(options),
bg_compaction_scheduled_(false),
shutting_down_(false),
bg_build_index_started_(false),
_pMeta(new meta::DBMetaImpl(_options.meta)),
_pMemMgr(new MemManager<EngineT>(_pMeta, _options)) {
start_timer_task(_options.memory_sync_interval);
pMeta_(new meta::DBMetaImpl(options_.meta)),
pMemMgr_(new MemManager<EngineT>(pMeta_, options_)) {
StartTimerTasks(options_.memory_sync_interval);
}
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::TableSchema& table_schema) {
return _pMeta->CreateTable(table_schema);
return pMeta_->CreateTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::TableSchema& table_schema) {
return _pMeta->DescribeTable(table_schema);
return pMeta_->DescribeTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& table_id, bool& has_or_not) {
return _pMeta->HasTable(table_id, has_or_not);
return pMeta_->HasTable(table_id, has_or_not);
}
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
Status status = _pMemMgr->add_vectors(table_id_, n, vectors, vector_ids_);
Status status = pMemMgr_->add_vectors(table_id_, n, vectors, vector_ids_);
if (!status.ok()) {
return status;
}
......@@ -69,7 +69,7 @@ Status DBImpl<EngineT>::search(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedTableFilesSchema files;
auto status = _pMeta->FilesToSearch(table_id, dates, files);
auto status = pMeta_->FilesToSearch(table_id, dates, files);
if (!status.ok()) { return status; }
LOG(DEBUG) << "Search DateT Size=" << files.size();
......@@ -195,59 +195,59 @@ Status DBImpl<EngineT>::search(const std::string& table_id, size_t k, size_t nq,
}
template<typename EngineT>
void DBImpl<EngineT>::start_timer_task(int interval_) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::background_timer_task, this, interval_);
void DBImpl<EngineT>::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::BackgroundTimerTask, this, interval);
}
template<typename EngineT>
void DBImpl<EngineT>::background_timer_task(int interval_) {
void DBImpl<EngineT>::BackgroundTimerTask(int interval) {
Status status;
while (true) {
if (!_bg_error.ok()) break;
if (_shutting_down.load(std::memory_order_acquire)) break;
if (!bg_error_.ok()) break;
if (shutting_down_.load(std::memory_order_acquire)) break;
std::this_thread::sleep_for(std::chrono::seconds(interval_));
std::this_thread::sleep_for(std::chrono::seconds(interval));
try_schedule_compaction();
TrySchedule();
}
}
template<typename EngineT>
void DBImpl<EngineT>::try_schedule_compaction() {
if (_bg_compaction_scheduled) return;
if (!_bg_error.ok()) return;
void DBImpl<EngineT>::TrySchedule() {
if (bg_compaction_scheduled_) return;
if (!bg_error_.ok()) return;
_bg_compaction_scheduled = true;
_env->Schedule(&DBImpl<EngineT>::BGWork, this);
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl<EngineT>::BGWork, this);
}
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
reinterpret_cast<DBImpl*>(db_)->background_call();
reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
}
template<typename EngineT>
void DBImpl<EngineT>::background_call() {
std::lock_guard<std::mutex> lock(_mutex);
assert(_bg_compaction_scheduled);
void DBImpl<EngineT>::BackgroundCall() {
std::lock_guard<std::mutex> lock(mutex_);
assert(bg_compaction_scheduled_);
if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire))
if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
return ;
background_compaction();
BackgroundCompaction();
_bg_compaction_scheduled = false;
_bg_work_finish_signal.notify_all();
bg_compaction_scheduled_ = false;
bg_work_finish_signal_.notify_all();
}
template<typename EngineT>
Status DBImpl<EngineT>::merge_files(const std::string& table_id, const meta::DateT& date,
Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::DateT& date,
const meta::TableFilesSchema& files) {
meta::TableFileSchema table_file;
table_file.table_id = table_id;
table_file.date = date;
Status status = _pMeta->CreateTableFile(table_file);
Status status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
LOG(INFO) << status.ToString() << std::endl;
......@@ -267,19 +267,19 @@ Status DBImpl<EngineT>::merge_files(const std::string& table_id, const meta::Dat
LOG(DEBUG) << "Merging file " << file_schema.file_id;
index_size = index.Size();
if (index_size >= _options.index_trigger_size) break;
if (index_size >= options_.index_trigger_size) break;
}
index.Serialize();
if (index_size >= _options.index_trigger_size) {
if (index_size >= options_.index_trigger_size) {
table_file.file_type = meta::TableFileSchema::TO_INDEX;
} else {
table_file.file_type = meta::TableFileSchema::RAW;
}
table_file.size = index_size;
updated.push_back(table_file);
status = _pMeta->UpdateTableFiles(updated);
status = pMeta_->UpdateTableFiles(updated);
LOG(DEBUG) << "New merged file " << table_file.file_id <<
" of size=" << index.PhysicalSize()/(1024*1024) << " M";
......@@ -289,43 +289,39 @@ Status DBImpl<EngineT>::merge_files(const std::string& table_id, const meta::Dat
}
template<typename EngineT>
Status DBImpl<EngineT>::background_merge_files(const std::string& table_id) {
Status DBImpl<EngineT>::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = _pMeta->FilesToMerge(table_id, raw_files);
auto status = pMeta_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
return status;
}
/* if (raw_files.size() == 0) { */
/* return Status::OK(); */
/* } */
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() <= _options.merge_trigger_number) {
if (files.size() <= options_.merge_trigger_number) {
continue;
}
has_merge = true;
merge_files(table_id, kv.first, kv.second);
MergeFiles(table_id, kv.first, kv.second);
}
_pMeta->Archive();
pMeta_->Archive();
try_build_index();
TryBuildIndex();
_pMeta->CleanUpFilesWithTTL(1);
pMeta_->CleanUpFilesWithTTL(1);
return Status::OK();
}
template<typename EngineT>
Status DBImpl<EngineT>::build_index(const meta::TableFileSchema& file) {
Status DBImpl<EngineT>::BuildIndex(const meta::TableFileSchema& file) {
meta::TableFileSchema table_file;
table_file.table_id = file.table_id;
table_file.date = file.date;
Status status = _pMeta->CreateTableFile(table_file);
Status status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
return status;
}
......@@ -342,30 +338,30 @@ Status DBImpl<EngineT>::build_index(const meta::TableFileSchema& file) {
to_remove.file_type = meta::TableFileSchema::TO_DELETE;
meta::TableFilesSchema update_files = {to_remove, table_file};
_pMeta->UpdateTableFiles(update_files);
pMeta_->UpdateTableFiles(update_files);
LOG(DEBUG) << "New index file " << table_file.file_id << " of size "
<< index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id;
index->Cache();
_pMeta->Archive();
pMeta_->Archive();
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::background_build_index() {
void DBImpl<EngineT>::BackgroundBuildIndex() {
std::lock_guard<std::mutex> lock(build_index_mutex_);
assert(bg_build_index_started_);
meta::TableFilesSchema to_index_files;
_pMeta->FilesToIndex(to_index_files);
pMeta_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
/* LOG(DEBUG) << "Buiding index for " << file.location; */
status = build_index(file);
status = BuildIndex(file);
if (!status.ok()) {
_bg_error = status;
bg_error_ = status;
return;
}
}
......@@ -376,25 +372,25 @@ void DBImpl<EngineT>::background_build_index() {
}
template<typename EngineT>
Status DBImpl<EngineT>::try_build_index() {
Status DBImpl<EngineT>::TryBuildIndex() {
if (bg_build_index_started_) return Status::OK();
if (_shutting_down.load(std::memory_order_acquire)) return Status::OK();
if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
bg_build_index_started_ = true;
std::thread build_index_task(&DBImpl<EngineT>::background_build_index, this);
std::thread build_index_task(&DBImpl<EngineT>::BackgroundBuildIndex, this);
build_index_task.detach();
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::background_compaction() {
void DBImpl<EngineT>::BackgroundCompaction() {
std::vector<std::string> table_ids;
_pMemMgr->serialize(table_ids);
pMemMgr_->serialize(table_ids);
Status status;
for (auto table_id : table_ids) {
status = background_merge_files(table_id);
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
_bg_error = status;
bg_error_ = status;
return;
}
}
......@@ -402,21 +398,21 @@ void DBImpl<EngineT>::background_compaction() {
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
return _pMeta->DropAll();
return pMeta_->DropAll();
}
template<typename EngineT>
Status DBImpl<EngineT>::size(long& result) {
return _pMeta->Size(result);
return pMeta_->Size(result);
}
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
{
std::unique_lock<std::mutex> lock(_mutex);
_shutting_down.store(true, std::memory_order_release);
while (_bg_compaction_scheduled) {
_bg_work_finish_signal.wait(lock);
std::unique_lock<std::mutex> lock(mutex_);
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_) {
bg_work_finish_signal_.wait(lock);
}
}
{
......@@ -427,8 +423,8 @@ DBImpl<EngineT>::~DBImpl() {
}
bg_timer_thread_.join();
std::vector<std::string> ids;
_pMemMgr->serialize(ids);
_env->Stop();
pMemMgr_->serialize(ids);
env_->Stop();
}
} // namespace engine
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册