From 01cab6f3659ee00fe56d8f66619fa61f0c3e9811 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 20 May 2020 21:13:46 -0500 Subject: [PATCH] #2378 (#2388) * return partition lsn Signed-off-by: yhmo * fix wal lsn Signed-off-by: shengjun.li * fix wal issue Signed-off-by: yhmo * changelog Signed-off-by: yhmo * typo Signed-off-by: yhmo * all collection include partition Signed-off-by: yhmo * fix build error Signed-off-by: yhmo * fix flush Signed-off-by: shengjun.li Co-authored-by: shengjun.li --- CHANGELOG.md | 1 + core/src/db/DBImpl.cpp | 60 +++++--- core/src/db/meta/Meta.h | 2 +- core/src/db/meta/MySQLMetaImpl.cpp | 22 +-- core/src/db/meta/MySQLMetaImpl.h | 2 +- core/src/db/meta/SqliteMetaImpl.cpp | 48 ++++--- core/src/db/meta/SqliteMetaImpl.h | 2 +- core/src/db/wal/WalManager.cpp | 211 ++++++++++++++++++++-------- core/src/db/wal/WalManager.h | 50 ++++++- core/unittest/db/test_wal.cpp | 4 +- 10 files changed, 289 insertions(+), 113 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 735c840f..fc58c93c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Please mark all change in change log and use the issue from GitHub # Milvus 0.10.0 (TBD) ## Bug +- \#2378 Duplicate data after server restart ## Feature diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index ada30fff..b6871b8e 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -487,7 +487,11 @@ DBImpl::CreatePartition(const std::string& collection_id, const std::string& par } uint64_t lsn = 0; - meta_ptr_->GetCollectionFlushLSN(collection_id, lsn); + if (options_.wal_enable_) { + lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); + } else { + meta_ptr_->GetCollectionFlushLSN(collection_id, lsn); + } return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn); } @@ -545,6 +549,10 @@ DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& return status; } + if (options_.wal_enable_) { + wal_mgr_->DropPartition(collection_id, partition_tag); + } + return DropPartition(partition_name); } @@ -891,7 +899,7 @@ DBImpl::Flush(const std::string& collection_id) { swn_wal_.Notify(); flush_req_swn_.Wait(); } - + StartMergeTask(); } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; InternalFlush(collection_id); @@ -918,6 +926,7 @@ DBImpl::Flush() { swn_wal_.Notify(); flush_req_swn_.Wait(); } + StartMergeTask(); } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; InternalFlush(); @@ -1652,7 +1661,9 @@ DBImpl::DropIndex(const std::string& collection_id) { } LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id; - return DropCollectionIndexRecursively(collection_id); + auto status = DropCollectionIndexRecursively(collection_id); + StartMergeTask(); // merge small files after drop index + return status; } Status @@ -2660,30 +2671,39 @@ Status DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { fiu_return_on("DBImpl.ExexWalRecord.return", Status();); - auto collections_flushed = [&](const std::set& collection_ids) -> uint64_t { - if (collection_ids.empty()) { - return 0; - } - + auto collections_flushed = [&](const std::string collection_id, + const std::set& target_collection_names) -> uint64_t { uint64_t max_lsn = 0; if (options_.wal_enable_) { - for (auto& collection : collection_ids) { - uint64_t lsn = 0; + uint64_t lsn = 0; + for (auto& collection : target_collection_names) { meta_ptr_->GetCollectionFlushLSN(collection, lsn); - wal_mgr_->CollectionFlushed(collection, lsn); if (lsn > max_lsn) { max_lsn = lsn; } } + wal_mgr_->CollectionFlushed(collection_id, lsn); } std::lock_guard lck(merge_result_mutex_); - for (auto& collection : collection_ids) { + for (auto& collection : target_collection_names) { merge_collection_ids_.insert(collection); } return max_lsn; }; + auto partition_flushed = [&](const std::string& collection_id, const std::string& partition, + const std::string& target_collection_name) { + if (options_.wal_enable_) { + uint64_t lsn = 0; + meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn); + wal_mgr_->PartitionFlushed(collection_id, partition, lsn); + } + + std::lock_guard lck(merge_result_mutex_); + merge_collection_ids_.insert(target_collection_name); + }; + Status status; switch (record.type) { @@ -2700,7 +2720,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { (record.data_size / record.length / sizeof(float)), (const float*)record.data, record.attr_nbytes, record.attr_data_size, record.attr_data, record.lsn, flushed_collections); - collections_flushed(flushed_collections); + if (!flushed_collections.empty()) { + partition_flushed(record.collection_id, record.partition_tag, target_collection_name); + } milvus::server::CollectInsertMetrics metrics(record.length, status); break; @@ -2718,7 +2740,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { (record.data_size / record.length / sizeof(uint8_t)), (const u_int8_t*)record.data, record.lsn, flushed_collections); // even though !status.ok, run - collections_flushed(flushed_collections); + if (!flushed_collections.empty()) { + partition_flushed(record.collection_id, record.partition_tag, target_collection_name); + } // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); @@ -2738,7 +2762,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { (record.data_size / record.length / sizeof(float)), (const float*)record.data, record.lsn, flushed_collections); // even though !status.ok, run - collections_flushed(flushed_collections); + if (!flushed_collections.empty()) { + partition_flushed(record.collection_id, record.partition_tag, target_collection_name); + } // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); @@ -2801,7 +2827,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { flushed_collections.insert(collection_id); } - collections_flushed(flushed_collections); + collections_flushed(record.collection_id, flushed_collections); } else { // flush all collections @@ -2811,7 +2837,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { status = mem_mgr_->Flush(collection_ids); } - uint64_t lsn = collections_flushed(collection_ids); + uint64_t lsn = collections_flushed("", collection_ids); if (options_.wal_enable_) { wal_mgr_->RemoveOldFiles(lsn); } diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index 02d46463..b8be49e4 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -59,7 +59,7 @@ class Meta { HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) = 0; virtual Status - AllCollections(std::vector& table_schema_array) = 0; + AllCollections(std::vector& table_schema_array, bool is_root = false) = 0; virtual Status UpdateCollectionFlag(const std::string& collection_id, int64_t flag) = 0; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index bb2ae20b..3031d738 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -582,7 +582,7 @@ MySQLMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not, } Status -MySQLMetaImpl::AllCollections(std::vector& collection_schema_array) { +MySQLMetaImpl::AllCollections(std::vector& collection_schema_array, bool is_root) { try { server::MetricCollector metric; mysqlpp::StoreQueryResult res; @@ -599,8 +599,12 @@ MySQLMetaImpl::AllCollections(std::vector& collection_schema_a mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT id, table_id, dimension, engine_type, index_params, index_file_size, metric_type" << " ,owner_table, partition_tag, version, flush_lsn" - << " FROM " << META_TABLES << " WHERE state <> " << std::to_string(CollectionSchema::TO_DELETE) - << " AND owner_table = \"\";"; + << " FROM " << META_TABLES << " WHERE state <> " << std::to_string(CollectionSchema::TO_DELETE); + if (is_root) { + statement << " AND owner_table = \"\";"; + } else { + statement << ";"; + } LOG_ENGINE_DEBUG_ << "AllCollections: " << statement.str(); @@ -1535,8 +1539,8 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id, mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT table_id, id, state, dimension, created_on, flag, index_file_size," - << " engine_type, index_params, metric_type, partition_tag, version FROM " << META_TABLES - << " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> " + << " engine_type, index_params, metric_type, partition_tag, version, flush_lsn FROM " + << META_TABLES << " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> " << std::to_string(CollectionSchema::TO_DELETE) << ";"; LOG_ENGINE_DEBUG_ << "ShowPartitions: " << statement.str(); @@ -1559,6 +1563,7 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id, partition_schema.owner_collection_ = collection_id; resRow["partition_tag"].to_string(partition_schema.partition_tag_); resRow["version"].to_string(partition_schema.version_); + partition_schema.flush_lsn_ = resRow["flush_lsn"]; partition_schema_array.emplace_back(partition_schema); } @@ -2755,6 +2760,7 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) { } bool first_create = false; + uint64_t last_lsn = 0; { mysqlpp::StoreQueryResult res; mysqlpp::Query statement = connectionPtr->query(); @@ -2762,6 +2768,8 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) { res = statement.store(); if (res.num_rows() == 0) { first_create = true; + } else { + last_lsn = res[0]["global_lsn"]; } } @@ -2773,7 +2781,7 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) { if (!statement.exec()) { return HandleException("QUERY ERROR WHEN SET GLOBAL LSN", statement.error()); } - } else { + } else if (lsn > last_lsn) { mysqlpp::Query statement = connectionPtr->query(); statement << "UPDATE " << META_ENVIRONMENT << " SET global_lsn = " << lsn << ";"; LOG_ENGINE_DEBUG_ << "SetGlobalLastLSN: " << statement.str(); @@ -2783,8 +2791,6 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) { } } } // Scoped Connection - - LOG_ENGINE_DEBUG_ << "Successfully update global_lsn: " << lsn; } catch (std::exception& e) { return HandleException("Failed to set global lsn", e.what()); } diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index 708063e4..e327a637 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -42,7 +42,7 @@ class MySQLMetaImpl : public Meta { HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override; Status - AllCollections(std::vector& collection_schema_array) override; + AllCollections(std::vector& collection_schema_array, bool is_root = false) override; Status DropCollection(const std::string& collection_id) override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 86a4bb38..a3e9f273 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -309,29 +309,37 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not } Status -SqliteMetaImpl::AllCollections(std::vector& collection_schema_array) { +SqliteMetaImpl::AllCollections(std::vector& collection_schema_array, bool is_root) { try { fiu_do_on("SqliteMetaImpl.AllCollections.throw_exception", throw std::exception()); server::MetricCollector metric; // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto selected = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::collection_id_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE - and c(&CollectionSchema::owner_collection_) == "")); + auto select_columns = columns(&CollectionSchema::id_, + &CollectionSchema::collection_id_, + &CollectionSchema::dimension_, + &CollectionSchema::created_on_, + &CollectionSchema::flag_, + &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, + &CollectionSchema::index_params_, + &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, + &CollectionSchema::partition_tag_, + &CollectionSchema::version_, + &CollectionSchema::flush_lsn_); + decltype(ConnectorPtr->select(select_columns)) selected; + + if (is_root) { + selected = ConnectorPtr->select(select_columns, + where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE + and c(&CollectionSchema::owner_collection_) == "")); + } else { + selected = ConnectorPtr->select(select_columns, + where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + } + for (auto& collection : selected) { CollectionSchema schema; schema.id_ = std::get<0>(collection); @@ -992,7 +1000,8 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, &CollectionSchema::metric_type_, &CollectionSchema::partition_tag_, &CollectionSchema::version_, - &CollectionSchema::collection_id_), + &CollectionSchema::collection_id_, + &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::owner_collection_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -1011,6 +1020,7 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, partition_schema.partition_tag_ = std::get<9>(partitions[i]); partition_schema.version_ = std::get<10>(partitions[i]); partition_schema.collection_id_ = std::get<11>(partitions[i]); + partition_schema.flush_lsn_ = std::get<12>(partitions[i]); partition_schema_array.emplace_back(partition_schema); } } catch (std::exception& e) { @@ -1939,7 +1949,7 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) { ConnectorPtr->insert(env); } else { uint64_t last_lsn = std::get<0>(selected[0]); - if (lsn == last_lsn) { + if (lsn <= last_lsn) { return Status::OK(); } diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index d10a7599..ba46351a 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -44,7 +44,7 @@ class SqliteMetaImpl : public Meta { HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override; Status - AllCollections(std::vector& collection_schema_array) override; + AllCollections(std::vector& collection_schema_array, bool is_root = false) override; Status DropCollection(const std::string& collection_id) override; diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 246e114a..4c7d5f8e 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -62,35 +62,55 @@ WalManager::Init(const meta::MetaPtr& meta) { if (meta != nullptr) { meta->GetGlobalLastLSN(recovery_start); - std::vector table_schema_array; - auto status = meta->AllCollections(table_schema_array); + std::vector collention_schema_array; + auto status = meta->AllCollections(collention_schema_array); if (!status.ok()) { return WAL_META_ERROR; } - if (!table_schema_array.empty()) { - // get min and max flushed lsn - uint64_t min_flused_lsn = table_schema_array[0].flush_lsn_; - uint64_t max_flused_lsn = table_schema_array[0].flush_lsn_; - for (size_t i = 1; i < table_schema_array.size(); i++) { - if (min_flused_lsn > table_schema_array[i].flush_lsn_) { - min_flused_lsn = table_schema_array[i].flush_lsn_; - } else if (max_flused_lsn < table_schema_array[i].flush_lsn_) { - max_flused_lsn = table_schema_array[i].flush_lsn_; + if (!collention_schema_array.empty()) { + u_int64_t min_flushed_lsn = ~(u_int64_t)0; + u_int64_t max_flushed_lsn = 0; + auto update_limit_lsn = [&](u_int64_t lsn) { + if (min_flushed_lsn > lsn) { + min_flushed_lsn = lsn; + } + if (max_flushed_lsn < lsn) { + max_flushed_lsn = lsn; + } + }; + + for (auto& col_schema : collention_schema_array) { + auto& collection = collections_[col_schema.collection_id_]; + auto& default_part = collection[""]; + default_part.flush_lsn = col_schema.flush_lsn_; + update_limit_lsn(default_part.flush_lsn); + + std::vector partition_schema_array; + status = meta->ShowPartitions(col_schema.collection_id_, partition_schema_array); + if (!status.ok()) { + return WAL_META_ERROR; + } + for (auto& par_schema : partition_schema_array) { + auto& partition = collection[par_schema.partition_tag_]; + partition.flush_lsn = par_schema.flush_lsn_; + update_limit_lsn(partition.flush_lsn); } } - if (applied_lsn < max_flused_lsn) { + + if (applied_lsn < max_flushed_lsn) { // a new WAL folder? - applied_lsn = max_flused_lsn; + applied_lsn = max_flushed_lsn; } - if (recovery_start < min_flused_lsn) { + if (recovery_start < min_flushed_lsn) { // not flush all yet - recovery_start = min_flused_lsn; + recovery_start = min_flushed_lsn; } - for (auto& schema : table_schema_array) { - TableLsn tb_lsn = {schema.flush_lsn_, applied_lsn}; - tables_[schema.collection_id_] = tb_lsn; + for (auto& col : collections_) { + for (auto& part : col.second) { + part.second.wal_lsn = applied_lsn; + } } } } @@ -141,9 +161,10 @@ WalManager::GetNextRecovery(MXLogRecord& record) { // background thread has not started. // so, needn't lock here. - auto it = tables_.find(record.collection_id); - if (it != tables_.end()) { - if (it->second.flush_lsn < record.lsn) { + auto it_col = collections_.find(record.collection_id); + if (it_col != collections_.end()) { + auto it_part = it_col->second.find(record.partition_tag); + if (it_part->second.flush_lsn < record.lsn) { break; } } @@ -179,9 +200,10 @@ WalManager::GetNextEntityRecovery(milvus::engine::wal::MXLogRecord& record) { // background thread has not started. // so, needn't lock here. - auto it = tables_.find(record.collection_id); - if (it != tables_.end()) { - if (it->second.flush_lsn < record.lsn) { + auto it_col = collections_.find(record.collection_id); + if (it_col != collections_.end()) { + auto it_part = it_col->second.find(record.partition_tag); + if (it_part->second.flush_lsn < record.lsn) { break; } } @@ -229,9 +251,10 @@ WalManager::GetNextRecord(MXLogRecord& record) { } std::lock_guard lck(mutex_); - auto it = tables_.find(record.collection_id); - if (it != tables_.end()) { - if (it->second.flush_lsn < record.lsn) { + auto it_col = collections_.find(record.collection_id); + if (it_col != collections_.end()) { + auto it_part = it_col->second.find(record.partition_tag); + if (it_part->second.flush_lsn < record.lsn) { break; } } @@ -275,9 +298,10 @@ WalManager::GetNextEntityRecord(milvus::engine::wal::MXLogRecord& record) { } std::lock_guard lck(mutex_); - auto it = tables_.find(record.collection_id); - if (it != tables_.end()) { - if (it->second.flush_lsn < record.lsn) { + auto it_col = collections_.find(record.collection_id); + if (it_col != collections_.end()) { + auto it_part = it_col->second.find(record.partition_tag); + if (it_part->second.flush_lsn < record.lsn) { break; } } @@ -293,7 +317,16 @@ WalManager::CreateCollection(const std::string& collection_id) { LOG_WAL_INFO_ << "create collection " << collection_id << " " << last_applied_lsn_; std::lock_guard lck(mutex_); uint64_t applied_lsn = last_applied_lsn_; - tables_[collection_id] = {applied_lsn, applied_lsn}; + collections_[collection_id][""] = {applied_lsn, applied_lsn}; + return applied_lsn; +} + +uint64_t +WalManager::CreatePartition(const std::string& collection_id, const std::string& partition_tag) { + LOG_WAL_INFO_ << "create collection " << collection_id << " " << partition_tag << " " << last_applied_lsn_; + std::lock_guard lck(mutex_); + uint64_t applied_lsn = last_applied_lsn_; + collections_[collection_id][partition_tag] = {applied_lsn, applied_lsn}; return applied_lsn; } @@ -302,7 +335,7 @@ WalManager::CreateHybridCollection(const std::string& collection_id) { LOG_WAL_INFO_ << "create hybrid collection " << collection_id << " " << last_applied_lsn_; std::lock_guard lck(mutex_); uint64_t applied_lsn = last_applied_lsn_; - tables_[collection_id] = {applied_lsn, applied_lsn}; + collections_[collection_id][""] = {applied_lsn, applied_lsn}; return applied_lsn; } @@ -310,21 +343,84 @@ void WalManager::DropCollection(const std::string& collection_id) { LOG_WAL_INFO_ << "drop collection " << collection_id; std::lock_guard lck(mutex_); - tables_.erase(collection_id); + collections_.erase(collection_id); +} + +void +WalManager::DropPartition(const std::string& collection_id, const std::string& partition_tag) { + LOG_WAL_INFO_ << collection_id << " drop partition " << partition_tag; + std::lock_guard lck(mutex_); + auto it = collections_.find(collection_id); + if (it != collections_.end()) { + it->second.erase(partition_tag); + } } void WalManager::CollectionFlushed(const std::string& collection_id, uint64_t lsn) { std::unique_lock lck(mutex_); - auto it = tables_.find(collection_id); - if (it != tables_.end()) { - it->second.flush_lsn = lsn; + if (collection_id.empty()) { + // all collections + for (auto& col : collections_) { + for (auto& part : col.second) { + part.second.flush_lsn = lsn; + } + } + + } else { + // one collection + auto it_col = collections_.find(collection_id); + if (it_col != collections_.end()) { + for (auto& part : it_col->second) { + part.second.flush_lsn = lsn; + } + } } lck.unlock(); LOG_WAL_INFO_ << collection_id << " is flushed by lsn " << lsn; } +void +WalManager::PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) { + std::unique_lock lck(mutex_); + auto it_col = collections_.find(collection_id); + if (it_col != collections_.end()) { + auto it_part = it_col->second.find(partition_tag); + if (it_part != it_col->second.end()) { + it_part->second.flush_lsn = lsn; + } + } + lck.unlock(); + + LOG_WAL_INFO_ << collection_id << " " << partition_tag << " is flushed by lsn " << lsn; +} + +void +WalManager::CollectionUpdated(const std::string& collection_id, uint64_t lsn) { + std::unique_lock lck(mutex_); + auto it_col = collections_.find(collection_id); + if (it_col != collections_.end()) { + for (auto& part : it_col->second) { + part.second.wal_lsn = lsn; + } + } + lck.unlock(); +} + +void +WalManager::PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) { + std::unique_lock lck(mutex_); + auto it_col = collections_.find(collection_id); + if (it_col != collections_.end()) { + auto it_part = it_col->second.find(partition_tag); + if (it_part != it_col->second.end()) { + it_part->second.wal_lsn = lsn; + } + } + lck.unlock(); +} + template bool WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids, @@ -380,13 +476,8 @@ WalManager::Insert(const std::string& collection_id, const std::string& partitio new_lsn = record.lsn; } - std::unique_lock lck(mutex_); last_applied_lsn_ = new_lsn; - auto it = tables_.find(collection_id); - if (it != tables_.end()) { - it->second.wal_lsn = new_lsn; - } - lck.unlock(); + PartitionUpdated(collection_id, partition_tag, new_lsn); LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag << " with lsn " << new_lsn; @@ -472,13 +563,8 @@ WalManager::InsertEntities(const std::string& collection_id, const std::string& new_lsn = record.lsn; } - std::unique_lock lck(mutex_); last_applied_lsn_ = new_lsn; - auto it = tables_.find(collection_id); - if (it != tables_.end()) { - it->second.wal_lsn = new_lsn; - } - lck.unlock(); + PartitionUpdated(collection_id, partition_tag, new_lsn); LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag << " with lsn " << new_lsn; @@ -525,13 +611,8 @@ WalManager::DeleteById(const std::string& collection_id, const IDNumbers& vector new_lsn = record.lsn; } - std::unique_lock lck(mutex_); last_applied_lsn_ = new_lsn; - auto it = tables_.find(collection_id); - if (it != tables_.end()) { - it->second.wal_lsn = new_lsn; - } - lck.unlock(); + CollectionUpdated(collection_id, new_lsn); LOG_WAL_INFO_ << collection_id << " delete rows by id, lsn " << new_lsn; @@ -548,19 +629,25 @@ WalManager::Flush(const std::string& collection_id) { uint64_t lsn = 0; if (collection_id.empty()) { // flush all tables - for (auto& it : tables_) { - if (it.second.wal_lsn > it.second.flush_lsn) { - lsn = last_applied_lsn_; - break; + for (auto& col : collections_) { + for (auto& part : col.second) { + if (part.second.wal_lsn > part.second.flush_lsn) { + lsn = last_applied_lsn_; + break; + } } } } else { // flush one collection - auto it = tables_.find(collection_id); - if (it != tables_.end()) { - if (it->second.wal_lsn > it->second.flush_lsn) { - lsn = it->second.wal_lsn; + auto it_col = collections_.find(collection_id); + if (it_col != collections_.end()) { + for (auto& part : it_col->second) { + auto wal_lsn = part.second.wal_lsn; + auto flush_lsn = part.second.flush_lsn; + if (wal_lsn > flush_lsn && wal_lsn > lsn) { + lsn = wal_lsn; + } } } } diff --git a/core/src/db/wal/WalManager.h b/core/src/db/wal/WalManager.h index a8be297b..04fd8495 100644 --- a/core/src/db/wal/WalManager.h +++ b/core/src/db/wal/WalManager.h @@ -62,6 +62,7 @@ class WalManager { ErrorCode GetNextEntityRecord(MXLogRecord& record); + /* * Create collection * @param collection_id: collection id @@ -70,6 +71,15 @@ class WalManager { uint64_t CreateCollection(const std::string& collection_id); + /* + * Create partition + * @param collection_id: collection id + * @param partition_tag: partition tag + * @retval lsn + */ + uint64_t + CreatePartition(const std::string& collection_id, const std::string& partition_tag); + /* * Create hybrid collection * @param collection_id: collection id @@ -87,13 +97,49 @@ class WalManager { DropCollection(const std::string& collection_id); /* - * Collection is flushed + * Drop partition + * @param collection_id: collection id + * @param partition_tag: partition tag + * @retval none + */ + void + DropPartition(const std::string& collection_id, const std::string& partition_tag); + + /* + * Collection is flushed (update flushed_lsn) * @param collection_id: collection id * @param lsn: flushed lsn */ void CollectionFlushed(const std::string& collection_id, uint64_t lsn); + /* + * Partition is flushed (update flushed_lsn) + * @param collection_id: collection id + * @param partition_tag: partition_tag + * @param lsn: flushed lsn + */ + void + PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn); + + /* + * Collection is updated (update wal_lsn) + * @param collection_id: collection id + * @param partition_tag: partition_tag + * @param lsn: flushed lsn + */ + void + CollectionUpdated(const std::string& collection_id, uint64_t lsn); + + /* + * Partition is updated (update wal_lsn) + * @param collection_id: collection id + * @param partition_tag: partition_tag + * @param lsn: flushed lsn + */ + void + PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn); + /* * Insert * @param collection_id: collection id @@ -155,7 +201,7 @@ class WalManager { uint64_t wal_lsn; }; std::mutex mutex_; - std::map tables_; + std::map> collections_; std::atomic last_applied_lsn_; // if multi-thread call Flush(), use list diff --git a/core/unittest/db/test_wal.cpp b/core/unittest/db/test_wal.cpp index a4bdc34e..c9143f14 100644 --- a/core/unittest/db/test_wal.cpp +++ b/core/unittest/db/test_wal.cpp @@ -60,7 +60,7 @@ class TestWalMeta : public SqliteMetaImpl { } Status - AllCollections(std::vector& table_schema_array) override { + AllCollections(std::vector& table_schema_array, bool is_root) override { table_schema_array = tables_; return Status::OK(); } @@ -88,7 +88,7 @@ class TestWalMetaError : public SqliteMetaImpl { } Status - AllCollections(std::vector& table_schema_array) override { + AllCollections(std::vector& table_schema_array, bool is_root) override { return Status(DB_ERROR, "error"); } }; -- GitLab