diff --git a/cpp/src/db/scheduler/TaskDispatchQueue.cpp b/cpp/src/db/scheduler/TaskDispatchQueue.cpp index 2ce0e933b46ea3e106f1d319e7254e7d905edc74..b728e925a9f00f79db3279468bab96f66dba7b20 100644 --- a/cpp/src/db/scheduler/TaskDispatchQueue.cpp +++ b/cpp/src/db/scheduler/TaskDispatchQueue.cpp @@ -24,14 +24,6 @@ TaskDispatchQueue::Put(const ScheduleContextPtr &context) { return; } - if (queue_.size() >= capacity_) { - std::string error_msg = - "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + - std::to_string(queue_.size()); - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - TaskDispatchStrategy::Schedule(context, queue_); empty_.notify_all(); @@ -42,12 +34,6 @@ TaskDispatchQueue::Take() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); - if (queue_.empty()) { - std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - ScheduleTaskPtr front(queue_.front()); queue_.pop_front(); full_.notify_all(); diff --git a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp index 7200f2584ff68177940240c8decbad307599d1c2..985f86cb0912684e5b8aab217995502ce7629e9b 100644 --- a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp +++ b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp @@ -74,20 +74,26 @@ public: } std::string table_id = context->table_id(); - for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) { + + //put delete task to proper position + //for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1 + //if user want to delete table1, the DeleteTask will be insert into No.6 position + for(std::list::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) { if((*iter)->type() != ScheduleTaskType::kIndexLoad) { continue; } - //put delete task to proper position IndexLoadTaskPtr loader = std::static_pointer_cast(*iter); - if(loader->file_->table_id_ == table_id) { - - task_list.insert(++iter, delete_task); - break; + if(loader->file_->table_id_ != table_id) { + continue; } + + task_list.insert(iter.base(), delete_task); + return true; } + //no task is searching this table, put DeleteTask to front of list so that the table will be delete asap + task_list.push_front(delete_task); return true; } }; diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index c9b7ea7dcc7071b3f599b36cc77719561700dda6..8427639ab95f11d5ec3cb95371fa925e4395c451 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -7,6 +7,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(./ test_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) @@ -30,9 +31,7 @@ set(db_test_src ${db_scheduler_srcs} ${wrapper_src} ${require_files} - utils.cpp - db_tests.cpp - meta_tests.cpp) + ${test_srcs}) cuda_add_executable(db_test ${db_test_src}) diff --git a/cpp/unittest/db/scheduler_test.cpp b/cpp/unittest/db/scheduler_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..01a7057e001d152cf6c5b6dfc836ce55c20c0224 --- /dev/null +++ b/cpp/unittest/db/scheduler_test.cpp @@ -0,0 +1,124 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include + +#include "db/scheduler/TaskScheduler.h" +#include "db/scheduler/TaskDispatchStrategy.h" +#include "db/scheduler/TaskDispatchQueue.h" +#include "db/scheduler/task/SearchTask.h" +#include "db/scheduler/task/DeleteTask.h" +#include "db/scheduler/task/IndexLoadTask.h" + +using namespace zilliz::milvus; + +namespace { + +engine::TableFileSchemaPtr CreateTabileFileStruct(size_t id, const std::string& table_id) { + auto file = std::make_shared(); + file->id_ = id; + file->table_id_ = table_id; + return file; +} + +} + +TEST(DBSchedulerTest, TASK_QUEUE_TEST) { + engine::TaskDispatchQueue queue; + queue.SetCapacity(1000); + queue.Put(nullptr); + ASSERT_EQ(queue.Size(), 1UL); + + auto ptr = queue.Take(); + ASSERT_EQ(ptr, nullptr); + ASSERT_TRUE(queue.Empty()); + + engine::SearchContextPtr context_ptr = std::make_shared(1, 1, nullptr); + for(size_t i = 0; i < 10; i++) { + auto file = CreateTabileFileStruct(i, "tbl"); + context_ptr->AddIndexFile(file); + } + + queue.Put(context_ptr); + ASSERT_EQ(queue.Size(), 10); + + auto index_files = context_ptr->GetIndexMap(); + + ptr = queue.Front(); + ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); + engine::IndexLoadTaskPtr load_task = std::static_pointer_cast(ptr); + ASSERT_EQ(load_task->file_->id_, index_files.begin()->first); + + ptr = queue.Back(); + ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); +} + +TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) { + std::list task_list; + bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); + ASSERT_FALSE(ret); + + for(size_t i = 10; i < 30; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, "tbl"); + task_list.push_back(task_ptr); + } + + engine::SearchContextPtr context_ptr = std::make_shared(1, 1, nullptr); + for(size_t i = 0; i < 20; i++) { + auto file = CreateTabileFileStruct(i, "tbl"); + context_ptr->AddIndexFile(file); + } + + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 30); +} + +TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) { + std::list task_list; + bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); + ASSERT_FALSE(ret); + + const std::string table_id = "to_delete_table"; + for(size_t i = 0; i < 10; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, table_id); + task_list.push_back(task_ptr); + } + + for(size_t i = 0; i < 10; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, "other_table"); + task_list.push_back(task_ptr); + } + + engine::meta::Meta::Ptr meta_ptr; + engine::DeleteContextPtr context_ptr = std::make_shared(table_id, meta_ptr); + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 21); + + auto temp_list = task_list; + for(size_t i = 0; ; i++) { + engine::ScheduleTaskPtr task_ptr = temp_list.front(); + temp_list.pop_front(); + if(task_ptr->type() == engine::ScheduleTaskType::kDelete) { + ASSERT_EQ(i, 10); + break; + } + } + + context_ptr = std::make_shared("no_task_table", meta_ptr); + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 22); + + engine::ScheduleTaskPtr task_ptr = task_list.front(); + ASSERT_EQ(task_ptr->type(), engine::ScheduleTaskType::kDelete); +}