提交 f64e4e77 编写于 作者: G groot 提交者: JinHai-CN

#2394 Drop collection timeout if too many partitions created on colle… (#2477)

* #2349 Drop collection timeout if too many partitions created on collection
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* changelog
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 0205ea45
......@@ -17,6 +17,7 @@ Please mark all change in change log and use the issue from GitHub
## Bug
- \#2367 Fix inconsistent reading and writing when using mishards
- \#2394 Drop collection timeout if too many partitions created on collection
## Feature
- \#2363 Update branch version
......
......@@ -269,11 +269,37 @@ DBImpl::DropCollection(const std::string& collection_id) {
return SHUTDOWN_ERROR;
}
// dates partly delete files of the collection but currently we don't support
LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
Status status;
if (options_.wal_enable_) {
wal_mgr_->DropCollection(collection_id);
}
return DropCollectionRecursively(collection_id);
status = mem_mgr_->EraseMemVector(collection_id); // not allow insert
status = meta_ptr_->DropCollections({collection_id}); // soft delete collection
index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
std::vector<std::string> partition_id_array;
for (auto& schema : partition_array) {
if (options_.wal_enable_) {
wal_mgr_->DropCollection(schema.collection_id_);
}
status = mem_mgr_->EraseMemVector(schema.collection_id_);
index_failed_checker_.CleanFailedIndexFileOfCollection(schema.collection_id_);
partition_id_array.push_back(schema.collection_id_);
}
status = meta_ptr_->DropCollections(partition_id_array);
fiu_do_on("DBImpl.DropCollection.failed", status = Status(DB_ERROR, ""));
if (!status.ok()) {
return status;
}
return Status::OK();
}
Status
......@@ -2543,39 +2569,6 @@ DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<
return Status::OK();
}
Status
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
// dates partly delete files of the collection but currently we don't support
LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
Status status;
if (options_.wal_enable_) {
wal_mgr_->DropCollection(collection_id);
}
status = mem_mgr_->EraseMemVector(collection_id); // not allow insert
status = meta_ptr_->DropCollection(collection_id); // soft delete collection
index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
// scheduler will determine when to delete collection files
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitAndDelete();
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
status = DropCollectionRecursively(schema.collection_id_);
fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
if (!status.ok()) {
return status;
}
}
return Status::OK();
}
Status
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
DropIndex(collection_id);
......
......@@ -268,9 +268,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
std::set<std::string>& partition_name_array);
Status
DropCollectionRecursively(const std::string& collection_id);
Status
UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index);
......
......@@ -71,10 +71,10 @@ class Meta {
GetCollectionFlushLSN(const std::string& collection_id, uint64_t& flush_lsn) = 0;
virtual Status
DropCollection(const std::string& collection_id) = 0;
DropCollections(const std::vector<std::string>& collection_id_array) = 0;
virtual Status
DeleteCollectionFiles(const std::string& collection_id) = 0;
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) = 0;
virtual Status
CreateCollectionFile(SegmentSchema& file_schema) = 0;
......
......@@ -46,6 +46,26 @@ namespace meta {
namespace {
constexpr uint64_t SQL_BATCH_SIZE = 50;
template <typename T>
void
DistributeBatch(const T& id_array, std::vector<std::vector<std::string>>& id_groups) {
std::vector<std::string> temp_group;
constexpr uint64_t SQL_BATCH_SIZE = 50;
for (auto& id : id_array) {
temp_group.push_back(id);
if (temp_group.size() >= SQL_BATCH_SIZE) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
}
Status
HandleException(const std::string& desc, const char* what = nullptr) {
if (what == nullptr) {
......@@ -636,10 +656,15 @@ MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_a
}
Status
MySQLMetaImpl::DropCollection(const std::string& collection_id) {
MySQLMetaImpl::DropCollections(const std::vector<std::string>& collection_id_array) {
try {
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
{
for (auto group : id_groups) {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
......@@ -650,26 +675,32 @@ MySQLMetaImpl::DropCollection(const std::string& collection_id) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection
mysqlpp::Query statement = connectionPtr->query();
//
statement << "UPDATE " << META_TABLES << " SET state = " << std::to_string(CollectionSchema::TO_DELETE)
<< " WHERE table_id = " << mysqlpp::quote << collection_id << ";";
<< " WHERE table_id in(";
for (size_t i = 0; i < group.size(); i++) {
statement << mysqlpp::quote << group[i];
if (i != group.size() - 1) {
statement << ",";
}
}
statement << ")";
LOG_ENGINE_DEBUG_ << "DeleteCollection: " << statement.str();
LOG_ENGINE_DEBUG_ << "DropCollections: " << statement.str();
if (!statement.exec()) {
return HandleException("Failed to drop collection", statement.error());
return HandleException("Failed to drop collections", statement.error());
}
} // Scoped Connection
bool is_writable_mode{mode_ == DBOptions::MODE::CLUSTER_WRITABLE};
fiu_do_on("MySQLMetaImpl.DropCollection.CLUSTER_WRITABLE_MODE", is_writable_mode = true);
if (is_writable_mode) {
DeleteCollectionFiles(collection_id);
}
LOG_ENGINE_DEBUG_ << "Successfully delete collection: " << collection_id;
auto status = DeleteCollectionFiles(collection_id_array);
LOG_ENGINE_DEBUG_ << "Successfully delete collections";
return status;
} catch (std::exception& e) {
return HandleException("Failed to drop collection", e.what());
}
......@@ -678,10 +709,14 @@ MySQLMetaImpl::DropCollection(const std::string& collection_id) {
}
Status
MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
MySQLMetaImpl::DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) {
try {
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
{
for (auto group : id_groups) {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
......@@ -699,9 +734,14 @@ MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
mysqlpp::Query statement = connectionPtr->query();
//
statement << "UPDATE " << META_TABLEFILES << " SET file_type = " << std::to_string(SegmentSchema::TO_DELETE)
<< " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp())
<< " WHERE table_id = " << mysqlpp::quote << collection_id << " AND file_type <> "
<< std::to_string(SegmentSchema::TO_DELETE) << ";";
<< " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " WHERE table_id in (";
for (size_t i = 0; i < group.size(); i++) {
statement << mysqlpp::quote << group[i];
if (i != group.size() - 1) {
statement << ",";
}
}
statement << ") AND file_type <> " << std::to_string(SegmentSchema::TO_DELETE) << ";";
LOG_ENGINE_DEBUG_ << "DeleteCollectionFiles: " << statement.str();
......@@ -710,7 +750,7 @@ MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
}
} // Scoped Connection
LOG_ENGINE_DEBUG_ << "Successfully delete collection files from " << collection_id;
LOG_ENGINE_DEBUG_ << "Successfully delete collection files";
} catch (std::exception& e) {
return HandleException("Failed to delete colletion files", e.what());
}
......@@ -1519,7 +1559,7 @@ MySQLMetaImpl::HasPartition(const std::string& collection_id, const std::string&
Status
MySQLMetaImpl::DropPartition(const std::string& partition_name) {
return DropCollection(partition_name);
return DropCollections({partition_name});
}
Status
......@@ -1717,20 +1757,8 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
}
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
for (auto& id : partition_id_array) {
temp_group.push_back(id);
if (temp_group.size() >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
DistributeBatch(partition_id_array, id_groups);
// perform query batch by batch
int64_t files_count = 0;
......@@ -2130,14 +2158,13 @@ MySQLMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collecti
server::MetricCollector metric;
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
std::unordered_map<std::string, meta::CollectionSchema> map_collections;
for (auto& collection : collections) {
map_collections.insert(std::make_pair(collection.collection_id_, collection));
temp_group.push_back(collection.collection_id_);
if (temp_group.size() >= batch_size) {
if (temp_group.size() >= SQL_BATCH_SIZE) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
......
......@@ -45,10 +45,10 @@ class MySQLMetaImpl : public Meta {
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
Status
DropCollection(const std::string& collection_id) override;
DropCollections(const std::vector<std::string>& collection_id_array) override;
Status
DeleteCollectionFiles(const std::string& collection_id) override;
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) override;
Status
CreateCollectionFile(SegmentSchema& file_schema) override;
......
......@@ -47,6 +47,26 @@ using namespace sqlite_orm;
namespace {
constexpr uint64_t SQL_BATCH_SIZE = 50;
template<typename T>
void
DistributeBatch(const T& id_array, std::vector<std::vector<std::string>>& id_groups) {
std::vector<std::string> temp_group;
constexpr uint64_t SQL_BATCH_SIZE = 50;
for (auto& id : id_array) {
temp_group.push_back(id);
if (temp_group.size() >= SQL_BATCH_SIZE) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
}
Status
HandleException(const std::string& desc, const char* what = nullptr) {
if (what == nullptr) {
......@@ -367,22 +387,32 @@ SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_
}
Status
SqliteMetaImpl::DropCollection(const std::string& collection_id) {
SqliteMetaImpl::DropCollections(const std::vector<std::string>& collection_id_array) {
try {
fiu_do_on("SqliteMetaImpl.DropCollection.throw_exception", throw std::exception());
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection
ConnectorPtr->update_all(
set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE),
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
for (auto group : id_groups) {
// soft delete collection
ConnectorPtr->update_all(
set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE),
where(in(&CollectionSchema::collection_id_, group)
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
}
}
LOG_ENGINE_DEBUG_ << "Successfully delete collection, collection id = " << collection_id;
auto status = DeleteCollectionFiles(collection_id_array);
LOG_ENGINE_DEBUG_ << "Successfully delete collections";
return status;
} catch (std::exception& e) {
return HandleException("Encounter exception when delete collection", e.what());
}
......@@ -391,22 +421,28 @@ SqliteMetaImpl::DropCollection(const std::string& collection_id) {
}
Status
SqliteMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
SqliteMetaImpl::DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) {
try {
fiu_do_on("SqliteMetaImpl.DeleteCollectionFiles.throw_exception", throw std::exception());
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection files
ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE,
c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(c(&SegmentSchema::collection_id_) == collection_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
for (auto group : id_groups) {
// soft delete collection files
ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE,
c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(in(&SegmentSchema::collection_id_, group) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
LOG_ENGINE_DEBUG_ << "Successfully delete collection files, collection id = " << collection_id;
LOG_ENGINE_DEBUG_ << "Successfully delete collection files";
} catch (std::exception& e) {
return HandleException("Encounter exception when delete collection files", e.what());
}
......@@ -979,7 +1015,7 @@ SqliteMetaImpl::HasPartition(const std::string& collection_id, const std::string
Status
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
return DropCollection(partition_name);
return DropCollections({partition_name});
}
Status
......
......@@ -47,10 +47,10 @@ class SqliteMetaImpl : public Meta {
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
Status
DropCollection(const std::string& collection_id) override;
DropCollections(const std::vector<std::string>& collection_id_array) override;
Status
DeleteCollectionFiles(const std::string& collection_id) override;
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) override;
Status
CreateCollectionFile(SegmentSchema& file_schema) override;
......
......@@ -27,7 +27,7 @@ void
DeleteJob::WaitAndDelete() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_resource == num_resource_; });
meta_ptr_->DeleteCollectionFiles(collection_id_);
meta_ptr_->DeleteCollectionFiles({collection_id_});
}
void
......
......@@ -968,10 +968,6 @@ TEST_F(DBTest2, DELETE_TEST) {
// fail drop collection
fiu_init(0);
FIU_ENABLE_FIU("DBImpl.DropCollectionRecursively.failed");
stat = db_->DropCollection(COLLECTION_NAME);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.DropCollectionRecursively.failed");
stat = db_->DropCollection(COLLECTION_NAME);
ASSERT_TRUE(stat.ok());
......
......@@ -45,7 +45,7 @@ TEST_F(MetaTest, COLLECTION_TEST) {
status = impl_->CreateCollection(collection);
ASSERT_EQ(status.code(), milvus::DB_ALREADY_EXIST);
status = impl_->DropCollection(collection.collection_id_);
status = impl_->DropCollections({collection.collection_id_});
ASSERT_TRUE(status.ok());
status = impl_->CreateCollection(collection);
......@@ -124,7 +124,7 @@ TEST_F(MetaTest, FALID_TEST) {
}
{
FIU_ENABLE_FIU("SqliteMetaImpl.DropCollection.throw_exception");
status = impl_->DropCollection(collection.collection_id_);
status = impl_->DropCollections({collection.collection_id_});
ASSERT_FALSE(status.ok());
fiu_disable("SqliteMetaImpl.DropCollection.throw_exception");
}
......@@ -142,7 +142,7 @@ TEST_F(MetaTest, FALID_TEST) {
}
{
FIU_ENABLE_FIU("SqliteMetaImpl.DeleteCollectionFiles.throw_exception");
status = impl_->DeleteCollectionFiles(collection.collection_id_);
status = impl_->DeleteCollectionFiles({collection.collection_id_});
ASSERT_FALSE(status.ok());
fiu_disable("SqliteMetaImpl.DeleteCollectionFiles.throw_exception");
}
......@@ -687,7 +687,7 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
to_index_files_cnt + index_files_cnt;
ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt);
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_TRUE(status.ok());
status = impl_->CreateCollectionFile(table_file);
......@@ -712,7 +712,7 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());
status = impl_->DropCollection(collection_id);
status = impl_->DropCollections({collection_id});
ASSERT_TRUE(status.ok());
}
......
......@@ -103,10 +103,6 @@ TEST_F(MySqlMetaTest, COLLECTION_TEST) {
ASSERT_FALSE(has_collection);
fiu_disable("MySQLMetaImpl.HasCollection.throw_exception");
FIU_ENABLE_FIU("MySQLMetaImpl.DropCollection.CLUSTER_WRITABLE_MODE");
stat = impl_->DropCollection(collection_id);
fiu_disable("MySQLMetaImpl.DropCollection.CLUSTER_WRITABLE_MODE");
FIU_ENABLE_FIU("MySQLMetaImpl.DropAll.null_connection");
status = impl_->DropAll();
ASSERT_FALSE(status.ok());
......@@ -298,7 +294,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());
status = impl_->DropCollection(table_file.collection_id_);
status = impl_->DropCollections({table_file.collection_id_});
ASSERT_TRUE(status.ok());
status = impl_->UpdateCollectionFile(table_file);
ASSERT_TRUE(status.ok());
......@@ -714,19 +710,19 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) {
ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt);
FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.null_connection");
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.DeleteCollectionFiles.null_connection");
FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.throw_exception");
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.DeleteCollectionFiles.throw_exception");
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_TRUE(status.ok());
status = impl_->DropCollection(collection_id);
status = impl_->DropCollections({collection_id});
ASSERT_TRUE(status.ok());
status = impl_->CleanUpFilesWithTTL(0UL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册