未验证 提交 0fd4b244 编写于 作者: G groot 提交者: GitHub

refine dbimpl (#1869)

* #1827 Combine request target vectors exceed max nq
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refine dbimpl
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refine dbimpl
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix unittest failure
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
Co-authored-by: NJin Hai <hai.jin@zilliz.com>
上级 35276ffc
......@@ -47,9 +47,14 @@ server_config:
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: sqlite://:@:/
preload_table:
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -47,9 +47,14 @@ server_config:
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: sqlite://:@:/
preload_table:
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -53,10 +53,8 @@ namespace milvus {
namespace engine {
namespace {
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
......@@ -125,18 +123,26 @@ DBImpl::Start() {
// for distribute version, some nodes are read only
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// background thread
bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalTask, this);
// background wal thread
bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
}
} else {
// for distribute version, some nodes are read only
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// ENGINE_LOG_TRACE << "StartTimerTasks";
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
// background flush thread
bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
}
}
// for distribute version, some nodes are read only
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// background build index thread
bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
}
// background metric thread
bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
return Status::OK();
}
......@@ -150,24 +156,30 @@ DBImpl::Stop() {
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
if (options_.wal_enable_) {
// wait flush merge/buildindex finish
bg_task_swn_.Notify();
// wait wal thread finish
swn_wal_.Notify();
bg_wal_thread_.join();
} else {
// flush all
// flush all without merge
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
ExecWalRecord(record);
// wait merge/buildindex finish
bg_task_swn_.Notify();
bg_timer_thread_.join();
// wait flush thread finish
swn_flush_.Notify();
bg_flush_thread_.join();
}
swn_index_.Notify();
bg_index_thread_.join();
meta_ptr_->CleanUpShadowFiles();
}
// wait metric thread exit
swn_metric_.Notify();
bg_metric_thread_.join();
// ENGINE_LOG_TRACE << "DB service stop";
return Status::OK();
}
......@@ -512,8 +524,7 @@ DBImpl::InsertVectors(const std::string& collection_id, const std::string& parti
} else if (!vectors.binary_data_.empty()) {
wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
}
bg_task_swn_.Notify();
swn_wal_.Notify();
} else {
wal::MXLogRecord record;
record.lsn = 0; // need to get from meta ?
......@@ -555,8 +566,7 @@ DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
Status status;
if (options_.wal_enable_) {
wal_mgr_->DeleteById(collection_id, vector_ids);
bg_task_swn_.Notify();
swn_wal_.Notify();
} else {
wal::MXLogRecord record;
record.lsn = 0; // need to get from meta ?
......@@ -593,19 +603,14 @@ DBImpl::Flush(const std::string& collection_id) {
if (options_.wal_enable_) {
ENGINE_LOG_DEBUG << "WAL flush";
auto lsn = wal_mgr_->Flush(collection_id);
ENGINE_LOG_DEBUG << "wal_mgr_->Flush";
if (lsn != 0) {
bg_task_swn_.Notify();
flush_task_swn_.Wait();
ENGINE_LOG_DEBUG << "flush_task_swn_.Wait()";
swn_wal_.Notify();
flush_req_swn_.Wait();
}
} else {
ENGINE_LOG_DEBUG << "MemTable flush";
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
record.collection_id = collection_id;
status = ExecWalRecord(record);
InternalFlush(collection_id);
}
ENGINE_LOG_DEBUG << "End flush collection: " << collection_id;
......@@ -626,14 +631,12 @@ DBImpl::Flush() {
ENGINE_LOG_DEBUG << "WAL flush";
auto lsn = wal_mgr_->Flush();
if (lsn != 0) {
bg_task_swn_.Notify();
flush_task_swn_.Wait();
swn_wal_.Notify();
flush_req_swn_.Wait();
}
} else {
ENGINE_LOG_DEBUG << "MemTable flush";
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
status = ExecWalRecord(record);
InternalFlush();
}
ENGINE_LOG_DEBUG << "End flush all collections";
......@@ -1236,7 +1239,7 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::
}
void
DBImpl::BackgroundTimerTask() {
DBImpl::BackgroundIndexThread() {
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
......@@ -1247,14 +1250,9 @@ DBImpl::BackgroundTimerTask() {
break;
}
if (options_.auto_flush_interval_ > 0) {
bg_task_swn_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
} else {
bg_task_swn_.Wait();
}
swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
StartMetricTask();
StartMergeTask();
WaitMergeFileFinish();
StartBuildIndexTask();
}
}
......@@ -1281,13 +1279,7 @@ DBImpl::WaitBuildIndexFinish() {
void
DBImpl::StartMetricTask() {
static uint64_t metric_clock_tick = 0;
++metric_clock_tick;
if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
return;
}
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);
......@@ -1317,16 +1309,6 @@ DBImpl::StartMetricTask() {
void
DBImpl::StartMergeTask() {
static uint64_t compact_clock_tick = 0;
++compact_clock_tick;
if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
return;
}
if (!options_.wal_enable_) {
Flush();
}
// ENGINE_LOG_DEBUG << "Begin StartMergeTask";
// merge task has been finished?
{
......@@ -1514,13 +1496,7 @@ DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
}
void
DBImpl::StartBuildIndexTask(bool force) {
static uint64_t index_clock_tick = 0;
++index_clock_tick;
if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
return;
}
DBImpl::StartBuildIndexTask() {
// build index has been finished?
{
std::lock_guard<std::mutex> lck(index_result_mutex_);
......@@ -1992,7 +1968,17 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
}
void
DBImpl::BackgroundWalTask() {
DBImpl::InternalFlush(const std::string& collection_id) {
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
record.collection_id = collection_id;
ExecWalRecord(record);
StartMergeTask();
}
void
DBImpl::BackgroundWalThread() {
server::SystemInfo::GetInstance().Init();
std::chrono::system_clock::time_point next_auto_flush_time;
......@@ -2003,26 +1989,15 @@ DBImpl::BackgroundWalTask() {
next_auto_flush_time = get_next_auto_flush_time();
}
wal::MXLogRecord record;
auto auto_flush = [&]() {
record.type = wal::MXLogType::Flush;
record.collection_id.clear();
ExecWalRecord(record);
StartMetricTask();
StartMergeTask();
StartBuildIndexTask();
};
while (true) {
if (options_.auto_flush_interval_ > 0) {
if (std::chrono::system_clock::now() >= next_auto_flush_time) {
auto_flush();
InternalFlush();
next_auto_flush_time = get_next_auto_flush_time();
}
}
wal::MXLogRecord record;
auto error_code = wal_mgr_->GetNextRecord(record);
if (error_code != WAL_SUCCESS) {
ENGINE_LOG_ERROR << "WAL background GetNextRecord error";
......@@ -2032,8 +2007,8 @@ DBImpl::BackgroundWalTask() {
if (record.type != wal::MXLogType::None) {
ExecWalRecord(record);
if (record.type == wal::MXLogType::Flush) {
// user req flush
flush_task_swn_.Notify();
// notify flush request to return
flush_req_swn_.Notify();
// if user flush all manually, update auto flush also
if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
......@@ -2043,7 +2018,8 @@ DBImpl::BackgroundWalTask() {
} else {
if (!initialized_.load(std::memory_order_acquire)) {
auto_flush();
InternalFlush();
flush_req_swn_.Notify();
WaitMergeFileFinish();
WaitBuildIndexFinish();
ENGINE_LOG_DEBUG << "WAL background thread exit";
......@@ -2051,14 +2027,46 @@ DBImpl::BackgroundWalTask() {
}
if (options_.auto_flush_interval_ > 0) {
bg_task_swn_.Wait_Until(next_auto_flush_time);
swn_wal_.Wait_Until(next_auto_flush_time);
} else {
bg_task_swn_.Wait();
swn_wal_.Wait();
}
}
}
}
void
DBImpl::BackgroundFlushThread() {
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
ENGINE_LOG_DEBUG << "DB background flush thread exit";
break;
}
InternalFlush();
if (options_.auto_flush_interval_ > 0) {
swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
} else {
swn_flush_.Wait();
}
}
}
void
DBImpl::BackgroundMetricThread() {
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
ENGINE_LOG_DEBUG << "DB background metric thread exit";
break;
}
swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
StartMetricTask();
}
}
void
DBImpl::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
......
......@@ -170,7 +170,19 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
const meta::SegmentsSchema& files);
void
BackgroundTimerTask();
InternalFlush(const std::string& collection_id = "");
void
BackgroundWalThread();
void
BackgroundFlushThread();
void
BackgroundMetricThread();
void
BackgroundIndexThread();
void
WaitMergeFileFinish();
......@@ -194,7 +206,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
BackgroundMerge(std::set<std::string> collection_ids);
void
StartBuildIndexTask(bool force = false);
StartBuildIndexTask();
void
BackgroundBuildIndex();
......@@ -240,22 +252,21 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
Status
ExecWalRecord(const wal::MXLogRecord& record);
void
BackgroundWalTask();
private:
DBOptions options_;
std::atomic<bool> initialized_;
std::thread bg_timer_thread_;
meta::MetaPtr meta_ptr_;
MemManagerPtr mem_mgr_;
std::shared_ptr<wal::WalManager> wal_mgr_;
std::thread bg_wal_thread_;
std::thread bg_flush_thread_;
std::thread bg_metric_thread_;
std::thread bg_index_thread_;
struct SimpleWaitNotify {
bool notified_ = false;
std::mutex mutex_;
......@@ -297,8 +308,12 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
}
};
SimpleWaitNotify bg_task_swn_;
SimpleWaitNotify flush_task_swn_;
SimpleWaitNotify swn_wal_;
SimpleWaitNotify swn_flush_;
SimpleWaitNotify swn_metric_;
SimpleWaitNotify swn_index_;
SimpleWaitNotify flush_req_swn_;
ThreadPool merge_thread_pool_;
std::mutex merge_result_mutex_;
......
......@@ -562,7 +562,7 @@ TEST_F(DeleteTest, delete_add_create_index) {
// stat = db_->Flush();
// ASSERT_TRUE(stat.ok());
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
index.extra_params_ = {{"nlist", 100}};
stat = db_->CreateIndex(collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
......
......@@ -188,8 +188,10 @@ DBTest::SetUp() {
void
DBTest::TearDown() {
db_->Stop();
db_->DropAll();
if (db_) {
db_->Stop();
db_->DropAll();
}
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
......@@ -309,7 +311,9 @@ MySqlMetaTest::SetUp() {
void
MySqlMetaTest::TearDown() {
impl_->DropAll();
if (impl_) {
impl_->DropAll();
}
BaseTest::TearDown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册