提交 55754a0e 编写于 作者: J jinhai

Merge branch 'branch-0.3.0' into 'branch-0.3.0'

MS-45 Implement DeleteTable interface

See merge request megasearch/vecwise_engine!108

Former-commit-id: cf31fdb1cc48c63369cdc0a2c8dea27b66aab8a5
......@@ -8,7 +8,6 @@ server_config:
db_config:
db_path: /tmp/milvus
db_backend_url: http://127.0.0.1
db_flush_interval: 5 #flush cache data into disk at intervals, unit: second
index_building_threshold: 1024 #build index file when raw data file size larger than this value, unit: MB
metric_config:
......
......@@ -10,10 +10,18 @@ aux_source_directory(config config_files)
aux_source_directory(server server_files)
aux_source_directory(utils utils_files)
aux_source_directory(db db_files)
aux_source_directory(db/scheduler db_scheduler_files)
aux_source_directory(wrapper wrapper_files)
aux_source_directory(metrics metrics_files)
aux_source_directory(db/scheduler scheduler_files)
aux_source_directory(db/scheduler/context scheduler_context_files)
aux_source_directory(db/scheduler/task scheduler_task_files)
set(db_scheduler_files
${scheduler_files}
${scheduler_context_files}
${scheduler_task_files}
)
set(license_check_files
license/LicenseLibrary.cpp
license/LicenseCheck.cpp
......
......@@ -6,7 +6,6 @@
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "Factories.h"
namespace zilliz {
......
......@@ -5,11 +5,12 @@
******************************************************************************/
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "Log.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/SearchContext.h"
#include "scheduler/context/DeleteContext.h"
#include "utils/TimeRecorder.h"
#include <assert.h>
......@@ -26,6 +27,10 @@ namespace engine {
namespace {
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
......@@ -125,69 +130,54 @@ void CalcScore(uint64_t vector_count,
DBImpl::DBImpl(const Options& options)
: env_(options.env),
options_(options),
bg_compaction_scheduled_(false),
: options_(options),
shutting_down_(false),
bg_build_index_started_(false),
pMeta_(new meta::DBMetaImpl(options_.meta)),
pMemMgr_(new MemManager(pMeta_, options_)) {
StartTimerTasks(options_.memory_sync_interval);
meta_ptr_(new meta::DBMetaImpl(options_.meta)),
mem_mgr_(new MemManager(meta_ptr_, options_)),
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
StartTimerTasks();
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
return pMeta_->CreateTable(table_schema);
return meta_ptr_->CreateTable(table_schema);
}
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToDelete(table_id, dates, files);
if (!status.ok()) { return status; }
for (auto &day_files : files) {
for (auto &file : day_files.second) {
boost::filesystem::remove(file.location_);
}
}
//dates partly delete files of the table but currently we don't support
//dates empty means delete all files of the table
if(dates.empty()) {
meta::TableSchema table_schema;
table_schema.table_id_ = table_id;
status = DescribeTable(table_schema);
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
pMeta_->DeleteTable(table_id);
boost::system::error_code ec;
boost::filesystem::remove_all(table_schema.location_, ec);
if(ec.failed()) {
ENGINE_LOG_WARNING << "Failed to remove table folder";
}
}
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
scheduler.Schedule(context);
return Status::OK();
}
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
return pMeta_->DescribeTable(table_schema);
return meta_ptr_->DescribeTable(table_schema);
}
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
return pMeta_->HasTable(table_id, has_or_not);
return meta_ptr_->HasTable(table_id, has_or_not);
}
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
return pMeta_->AllTables(table_schema_array);
return meta_ptr_->AllTables(table_schema_array);
}
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
return pMeta_->Count(table_id, row_count);
return meta_ptr_->Count(table_id, row_count);
}
Status DBImpl::InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
auto start_time = METRICS_NOW_TIME;
Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
......@@ -219,7 +209,7 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
//get all table files from table
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
if (!status.ok()) { return status; }
meta::TableFilesSchema file_id_array;
......@@ -241,7 +231,7 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
for (auto &id : file_ids) {
meta::TableFileSchema table_file;
table_file.table_id_ = id;
auto status = pMeta_->GetTableFile(table_file);
auto status = meta_ptr_->GetTableFile(table_file);
if (!status.ok()) {
return status;
}
......@@ -254,7 +244,7 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
if (!status.ok()) { return status; }
ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
......@@ -403,8 +393,8 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
}
//step 2: put search task to scheduler
SearchScheduler& scheduler = SearchScheduler::GetInstance();
scheduler.ScheduleSearchTask(context);
TaskScheduler& scheduler = TaskScheduler::GetInstance();
scheduler.Schedule(context);
context->WaitResult();
......@@ -412,66 +402,89 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
auto& context_result = context->GetResult();
meta::TableSchema table_schema;
table_schema.table_id_ = table_id;
pMeta_->DescribeTable(table_schema);
meta_ptr_->DescribeTable(table_schema);
CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
return Status::OK();
}
void DBImpl::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
void DBImpl::StartTimerTasks() {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
void DBImpl::BackgroundTimerTask(int interval) {
void DBImpl::BackgroundTimerTask() {
Status status;
server::SystemInfo::GetInstance().Init();
while (true) {
if (!bg_error_.ok()) break;
if (shutting_down_.load(std::memory_order_acquire)) break;
std::this_thread::sleep_for(std::chrono::seconds(interval));
server::Metrics::GetInstance().KeepingAliveCounterIncrement(interval);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
uint64_t size;
Size(size);
server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
server::Metrics::GetInstance().CPUUsagePercentSet();
server::Metrics::GetInstance().RAMUsagePercentSet();
server::Metrics::GetInstance().GPUPercentGaugeSet();
server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
server::Metrics::GetInstance().OctetsSet();
TrySchedule();
}
}
if (shutting_down_.load(std::memory_order_acquire)){
for(auto& iter : compact_thread_results_) {
iter.wait();
}
for(auto& iter : index_thread_results_) {
iter.wait();
}
break;
}
void DBImpl::TrySchedule() {
if (bg_compaction_scheduled_) return;
if (!bg_error_.ok()) return;
std::this_thread::sleep_for(std::chrono::seconds(1));
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
StartMetricTask();
StartCompactionTask();
StartBuildIndexTask();
}
}
void DBImpl::BGWork(void* db_) {
reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
}
void DBImpl::StartMetricTask() {
static uint64_t metric_clock_tick = 0;
metric_clock_tick++;
if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
return;
}
void DBImpl::BackgroundCall() {
std::lock_guard<std::mutex> lock(mutex_);
assert(bg_compaction_scheduled_);
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
uint64_t size;
Size(size);
server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
server::Metrics::GetInstance().CPUUsagePercentSet();
server::Metrics::GetInstance().RAMUsagePercentSet();
server::Metrics::GetInstance().GPUPercentGaugeSet();
server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
server::Metrics::GetInstance().OctetsSet();
}
void DBImpl::StartCompactionTask() {
static uint64_t compact_clock_tick = 0;
compact_clock_tick++;
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
return;
}
if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
return ;
//serialize memory data
std::vector<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids);
for(auto& id : temp_table_ids) {
compact_table_ids_.insert(id);
}
BackgroundCompaction();
//compactiong has been finished?
if(!compact_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
compact_thread_results_.pop_back();
}
}
bg_compaction_scheduled_ = false;
bg_work_finish_signal_.notify_all();
//add new compaction task
if(compact_thread_results_.empty()) {
compact_thread_results_.push_back(
compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
compact_table_ids_.clear();
}
}
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
......@@ -479,10 +492,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
meta::TableFileSchema table_file;
table_file.table_id_ = table_id;
table_file.date_ = date;
Status status = pMeta_->CreateTableFile(table_file);
Status status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
LOG(INFO) << status.ToString() << std::endl;
ENGINE_LOG_INFO << status.ToString() << std::endl;
return status;
}
......@@ -503,7 +516,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
LOG(DEBUG) << "Merging file " << file_schema.file_id_;
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
index_size = index->Size();
if (index_size >= options_.index_trigger_size) break;
......@@ -519,8 +532,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
}
table_file.size_ = index_size;
updated.push_back(table_file);
status = pMeta_->UpdateTableFiles(updated);
LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
status = meta_ptr_->UpdateTableFiles(updated);
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
" of size=" << index->PhysicalSize()/(1024*1024) << " M";
index->Cache();
......@@ -530,13 +543,12 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = pMeta_->FilesToMerge(table_id, raw_files);
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
return status;
}
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() <= options_.merge_trigger_number) {
......@@ -544,123 +556,143 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
has_merge = true;
MergeFiles(table_id, kv.first, kv.second);
if (shutting_down_.load(std::memory_order_acquire)){
break;
}
}
pMeta_->Archive();
return Status::OK();
}
TryBuildIndex();
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
Status status;
for (auto table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
bg_error_ = status;
return;
}
}
pMeta_->CleanUpFilesWithTTL(1);
meta_ptr_->Archive();
meta_ptr_->CleanUpFilesWithTTL(1);
}
return Status::OK();
void DBImpl::StartBuildIndexTask() {
static uint64_t index_clock_tick = 0;
index_clock_tick++;
if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
return;
}
//build index has been finished?
if(!index_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
index_thread_results_.pop_back();
}
}
//add new build index task
if(index_thread_results_.empty()) {
index_thread_results_.push_back(
index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
}
}
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
meta::TableFileSchema table_file;
table_file.table_id_ = file.table_id_;
table_file.date_ = file.date_;
Status status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
return status;
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
if(to_index == nullptr) {
return Status::Error("Invalid engine type");
}
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
try {
//step 1: load index
to_index->Load();
to_index->Load();
auto start_time = METRICS_NOW_TIME;
auto index = to_index->BuildIndex(table_file.location_);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
//step 2: create table file
meta::TableFileSchema table_file;
table_file.table_id_ = file.table_id_;
table_file.date_ = file.date_;
Status status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
return status;
}
table_file.file_type_ = meta::TableFileSchema::INDEX;
table_file.size_ = index->Size();
//step 3: build index
auto start_time = METRICS_NOW_TIME;
auto index = to_index->BuildIndex(table_file.location_);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
//step 4: if table has been deleted, dont save index file
bool has_table = false;
meta_ptr_->HasTable(file.table_id_, has_table);
if(!has_table) {
meta_ptr_->DeleteTableFiles(file.table_id_);
return Status::OK();
}
auto to_remove = file;
to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
//step 5: save index file
index->Serialize();
meta::TableFilesSchema update_files = {to_remove, table_file};
pMeta_->UpdateTableFiles(update_files);
//step 6: update meta
table_file.file_type_ = meta::TableFileSchema::INDEX;
table_file.size_ = index->Size();
LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
<< index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id_;
auto to_remove = file;
to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
index->Cache();
pMeta_->Archive();
meta::TableFilesSchema update_files = {to_remove, table_file};
meta_ptr_->UpdateTableFiles(update_files);
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
<< index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id_;
index->Cache();
} catch (std::exception& ex) {
return Status::Error("Build index encounter exception", ex.what());
}
return Status::OK();
}
void DBImpl::BackgroundBuildIndex() {
std::lock_guard<std::mutex> lock(build_index_mutex_);
assert(bg_build_index_started_);
meta::TableFilesSchema to_index_files;
pMeta_->FilesToIndex(to_index_files);
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
/* LOG(DEBUG) << "Buiding index for " << file.location; */
/* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
status = BuildIndex(file);
if (!status.ok()) {
bg_error_ = status;
return;
}
}
/* LOG(DEBUG) << "All Buiding index Done"; */
bg_build_index_started_ = false;
bg_build_index_finish_signal_.notify_all();
}
Status DBImpl::TryBuildIndex() {
if (bg_build_index_started_) return Status::OK();
if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
bg_build_index_started_ = true;
std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
build_index_task.detach();
return Status::OK();
}
void DBImpl::BackgroundCompaction() {
std::vector<std::string> table_ids;
pMemMgr_->Serialize(table_ids);
Status status;
for (auto table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
bg_error_ = status;
return;
if (shutting_down_.load(std::memory_order_acquire)){
break;
}
}
/* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
}
Status DBImpl::DropAll() {
return pMeta_->DropAll();
return meta_ptr_->DropAll();
}
Status DBImpl::Size(uint64_t& result) {
return pMeta_->Size(result);
return meta_ptr_->Size(result);
}
DBImpl::~DBImpl() {
{
std::unique_lock<std::mutex> lock(mutex_);
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_) {
bg_work_finish_signal_.wait(lock);
}
}
{
std::unique_lock<std::mutex> lock(build_index_mutex_);
while (bg_build_index_started_) {
bg_build_index_finish_signal_.wait(lock);
}
}
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
std::vector<std::string> ids;
pMemMgr_->Serialize(ids);
env_->Stop();
mem_mgr_->Serialize(ids);
}
} // namespace engine
......
......@@ -8,12 +8,15 @@
#include "DB.h"
#include "MemManager.h"
#include "Types.h"
#include "utils/ThreadPool.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
#include <thread>
#include <list>
#include <set>
namespace zilliz {
namespace milvus {
......@@ -67,39 +70,38 @@ private:
const meta::DatesT& dates, QueryResults& results);
void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema&);
Status TryBuildIndex();
void StartTimerTasks();
void BackgroundTimerTask();
void StartMetricTask();
void StartCompactionTask();
Status MergeFiles(const std::string& table_id,
const meta::DateT& date,
const meta::TableFilesSchema& files);
const meta::DateT& date,
const meta::TableFilesSchema& files);
Status BackgroundMergeFiles(const std::string& table_id);
void BackgroundCompaction(std::set<std::string> table_ids);
void TrySchedule();
void StartTimerTasks(int interval);
void BackgroundTimerTask(int interval);
static void BGWork(void* db);
void BackgroundCall();
void BackgroundCompaction();
void StartBuildIndexTask();
void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema&);
Env* const env_;
const Options options_;
std::mutex mutex_;
std::condition_variable bg_work_finish_signal_;
bool bg_compaction_scheduled_;
Status bg_error_;
std::atomic<bool> shutting_down_;
std::mutex build_index_mutex_;
bool bg_build_index_started_;
std::condition_variable bg_build_index_finish_signal_;
std::thread bg_timer_thread_;
MetaPtr pMeta_;
MemManagerPtr pMemMgr_;
MetaPtr meta_ptr_;
MemManagerPtr mem_mgr_;
server::ThreadPool compact_thread_pool_;
std::list<std::future<void>> compact_thread_results_;
std::set<std::string> compact_table_ids_;
server::ThreadPool index_thread_pool_;
std::list<std::future<void>> index_thread_results_;
}; // DBImpl
......
此差异已折叠。
......@@ -20,11 +20,13 @@ public:
DBMetaImpl(const DBMetaOptions& options_);
virtual Status CreateTable(TableSchema& table_schema) override;
virtual Status DeleteTable(const std::string& table_id) override;
virtual Status DescribeTable(TableSchema& group_info_) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) override;
virtual Status DeleteTable(const std::string& table_id) override;
virtual Status DeleteTableFiles(const std::string& table_id) override;
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) override;
......@@ -42,10 +44,6 @@ public:
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) override;
virtual Status FilesToDelete(const std::string& table_id,
const DatesT& partition,
DatePartionedTableFilesSchema& files) override;
virtual Status FilesToIndex(TableFilesSchema&) override;
virtual Status Archive() override;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <easylogging++.h>
#include <assert.h>
#include <atomic>
#include "Env.h"
namespace zilliz {
namespace milvus {
namespace engine {
Env::Env()
: bg_work_started_(false),
shutting_down_(false) {
}
void Env::Schedule(void (*function)(void* arg), void* arg) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_) return;
if (!bg_work_started_) {
bg_work_started_ = true;
std::thread bg_thread(Env::BackgroundThreadEntryPoint, this);
bg_thread.detach();
}
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
bg_work_queue_.emplace(function, arg);
}
void Env::BackgroundThreadMain() {
while (!shutting_down_) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
while (bg_work_queue_.empty() && !shutting_down_) {
bg_work_cv_.wait(lock);
}
if (shutting_down_) break;
assert(!bg_work_queue_.empty());
auto bg_function = bg_work_queue_.front().function_;
void* bg_arg = bg_work_queue_.front().arg_;
bg_work_queue_.pop();
lock.unlock();
bg_function(bg_arg);
}
std::unique_lock<std::mutex> lock(bg_work_mutex_);
bg_work_started_ = false;
bg_work_cv_.notify_all();
}
void Env::Stop() {
{
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_ || !bg_work_started_) return;
}
shutting_down_ = true;
{
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
while (bg_work_started_) {
bg_work_cv_.wait(lock);
}
}
shutting_down_ = false;
}
Env::~Env() {}
Env* Env::Default() {
static Env env;
return &env;
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <condition_variable>
#include <thread>
#include <mutex>
#include <queue>
#include <atomic>
namespace zilliz {
namespace milvus {
namespace engine {
class Env {
public:
Env();
Env(const Env&) = delete;
Env& operator=(const Env&) = delete;
void Schedule(void (*function)(void* arg), void* arg);
virtual void Stop();
virtual ~Env();
static Env* Default();
protected:
void BackgroundThreadMain();
static void BackgroundThreadEntryPoint(Env* env) {
env->BackgroundThreadMain();
}
struct BGWork {
explicit BGWork(void (*function)(void*), void* arg)
: function_(function), arg_(arg) {}
void (* const function_)(void*);
void* const arg_;
};
std::mutex bg_work_mutex_;
std::condition_variable bg_work_cv_;
std::queue<BGWork> bg_work_queue_;
bool bg_work_started_;
std::atomic<bool> shutting_down_;
}; // Env
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -4,8 +4,8 @@
* Proprietary and confidential.
******************************************************************************/
#include "FaissExecutionEngine.h"
#include "Log.h"
#include <easylogging++.h>
#include <faiss/AutoTune.h>
#include <faiss/MetaIndexes.h>
#include <faiss/IndexFlat.h>
......@@ -74,7 +74,7 @@ Status FaissExecutionEngine::Load() {
if (!index) {
index = read_index(location_);
to_cache = true;
LOG(DEBUG) << "Disk io from: " << location_;
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
}
pIndex_ = index->data();
......@@ -98,6 +98,8 @@ Status FaissExecutionEngine::Merge(const std::string& location) {
if (location == location_) {
return Status::Error("Cannot Merge Self");
}
ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
to_merge = read_index(location);
......@@ -110,6 +112,8 @@ Status FaissExecutionEngine::Merge(const std::string& location) {
ExecutionEnginePtr
FaissExecutionEngine::BuildIndex(const std::string& location) {
ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;
auto opd = std::make_shared<Operand>();
opd->d = pIndex_->d;
opd->index_type = build_index_type_;
......@@ -122,7 +126,6 @@ FaissExecutionEngine::BuildIndex(const std::string& location) {
from_index->id_map.data());
ExecutionEnginePtr new_ee(new FaissExecutionEngine(index->data(), location, build_index_type_, raw_index_type_));
new_ee->Serialize();
return new_ee;
}
......
......@@ -62,7 +62,7 @@ Status MemVectors::Serialize(std::string& table_id) {
auto status = pMeta_->UpdateTableFile(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id_ << " of size " << pEE_->Size() / meta::M << " M";
<< " file " << schema_.file_id_ << " of size " << (double)(pEE_->Size()) / (double)meta::M << " M";
pEE_->Cache();
......@@ -142,6 +142,13 @@ Status MemManager::Serialize(std::vector<std::string>& table_ids) {
return Status::OK();
}
Status MemManager::EraseMemVector(const std::string& table_id) {
std::unique_lock<std::mutex> lock(mutex_);
memMap_.erase(table_id);
return Status::OK();
}
} // namespace engine
} // namespace milvus
......
......@@ -75,6 +75,8 @@ public:
Status Serialize(std::vector<std::string>& table_ids);
Status EraseMemVector(const std::string& table_id);
private:
Status InsertVectorsNoLock(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
......
......@@ -24,11 +24,13 @@ public:
using Ptr = std::shared_ptr<Meta>;
virtual Status CreateTable(TableSchema& table_schema) = 0;
virtual Status DeleteTable(const std::string& table_id) = 0;
virtual Status DescribeTable(TableSchema& table_schema) = 0;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0;
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) = 0;
virtual Status DeleteTable(const std::string& table_id) = 0;
virtual Status DeleteTableFiles(const std::string& table_id) = 0;
virtual Status CreateTableFile(TableFileSchema& file_schema) = 0;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) = 0;
......@@ -45,10 +47,6 @@ public:
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) = 0;
virtual Status FilesToDelete(const std::string& table_id,
const DatesT& partition,
DatePartionedTableFilesSchema& files) = 0;
virtual Status Size(uint64_t& result) = 0;
virtual Status Archive() = 0;
......
......@@ -21,12 +21,18 @@ const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
struct TableSchema {
size_t id_;
typedef enum {
NORMAL,
TO_DELETE,
} TABLE_STATE;
size_t id_ = 0;
std::string table_id_;
int state_ = (int)NORMAL;
size_t files_cnt_ = 0;
uint16_t dimension_;
uint16_t dimension_ = 0;
std::string location_;
long created_on_;
long created_on_ = 0;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
bool store_raw_data_ = false;
}; // TableSchema
......@@ -40,17 +46,17 @@ struct TableFileSchema {
TO_DELETE,
} FILE_TYPE;
size_t id_;
size_t id_ = 0;
std::string table_id_;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
std::string file_id_;
int file_type_ = NEW;
size_t size_;
size_t size_ = 0;
DateT date_ = EmptyDate;
uint16_t dimension_;
uint16_t dimension_ = 0;
std::string location_;
long updated_time_;
long created_on_;
long updated_time_ = 0;
long created_on_ = 0;
}; // TableFileSchema
typedef std::vector<TableFileSchema> TableFilesSchema;
......
......@@ -9,7 +9,6 @@
#include <boost/algorithm/string.hpp>
#include "Options.h"
#include "Env.h"
#include "DBMetaImpl.h"
#include "Exception.h"
......@@ -17,8 +16,7 @@ namespace zilliz {
namespace milvus {
namespace engine {
Options::Options()
: env(Env::Default()) {
Options::Options() {
}
ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) {
......
......@@ -47,7 +47,6 @@ struct Options {
uint16_t memory_sync_interval = 1; //unit: second
uint16_t merge_trigger_number = 2;
size_t index_trigger_size = ONE_GB; //unit: byte
Env* env;
DBMetaOptions meta;
}; // Options
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "SearchScheduler.h"
#include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "metrics/Metrics.h"
#include "db/EngineFactory.h"
namespace zilliz {
namespace milvus {
namespace engine {
namespace {
void CollectFileMetrics(int file_type, size_t file_size) {
switch(file_type) {
case meta::TableFileSchema::RAW:
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
}
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
}
SearchScheduler::SearchScheduler()
: stopped_(true) {
Start();
}
SearchScheduler::~SearchScheduler() {
Stop();
}
SearchScheduler& SearchScheduler::GetInstance() {
static SearchScheduler s_instance;
return s_instance;
}
bool
SearchScheduler::Start() {
if(!stopped_) {
return true;
}
stopped_ = false;
search_queue_.SetCapacity(2);
index_load_thread_ = std::make_shared<std::thread>(&SearchScheduler::IndexLoadWorker, this);
search_thread_ = std::make_shared<std::thread>(&SearchScheduler::SearchWorker, this);
return true;
}
bool
SearchScheduler::Stop() {
if(stopped_) {
return true;
}
if(index_load_thread_) {
index_load_queue_.Put(nullptr);
index_load_thread_->join();
index_load_thread_ = nullptr;
}
if(search_thread_) {
search_queue_.Put(nullptr);
search_thread_->join();
search_thread_ = nullptr;
}
stopped_ = true;
return true;
}
bool
SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) {
index_load_queue_.Put(search_context);
return true;
}
bool
SearchScheduler::IndexLoadWorker() {
while(true) {
IndexLoaderContextPtr context = index_load_queue_.Take();
if(context == nullptr) {
SERVER_LOG_INFO << "Stop thread for index loading";
break;//exit
}
SERVER_LOG_INFO << "Loading index(" << context->file_->id_ << ") from location: " << context->file_->location_;
server::TimeRecorder rc("Load index");
//step 1: load index
ExecutionEnginePtr index_ptr = EngineFactory::Build(context->file_->dimension_,
context->file_->location_,
(EngineType)context->file_->engine_type_);
index_ptr->Load();
rc.Record("load index file to memory");
size_t file_size = index_ptr->PhysicalSize();
LOG(DEBUG) << "Index file type " << context->file_->file_type_ << " Of Size: "
<< file_size/(1024*1024) << " M";
CollectFileMetrics(context->file_->file_type_, file_size);
//step 2: put search task into another queue
SearchTaskPtr task_ptr = std::make_shared<SearchTask>();
task_ptr->index_id_ = context->file_->id_;
task_ptr->index_type_ = context->file_->file_type_;
task_ptr->index_engine_ = index_ptr;
task_ptr->search_contexts_.swap(context->search_contexts_);
search_queue_.Put(task_ptr);
}
return true;
}
bool
SearchScheduler::SearchWorker() {
while(true) {
SearchTaskPtr task_ptr = search_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop thread for searching";
break;//exit
}
SERVER_LOG_INFO << "Searching in index(" << task_ptr->index_id_<< ") with "
<< task_ptr->search_contexts_.size() << " tasks";
//do search
auto start_time = METRICS_NOW_TIME;
task_ptr->DoSearch();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(task_ptr->index_type_, total_time);
}
return true;
}
}
}
}
\ No newline at end of file
......@@ -4,8 +4,8 @@
* Proprietary and confidential.
******************************************************************************/
#include "IndexLoaderQueue.h"
#include "ScheduleStrategy.h"
#include "TaskDispatchQueue.h"
#include "TaskDispatchStrategy.h"
#include "utils/Error.h"
#include "utils/Log.h"
......@@ -14,12 +14,12 @@ namespace milvus {
namespace engine {
void
IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
TaskDispatchQueue::Put(const ScheduleContextPtr &context) {
std::unique_lock <std::mutex> lock(mtx);
full_.wait(lock, [this] { return (queue_.size() < capacity_); });
if(search_context == nullptr) {
queue_.push_back(nullptr);
if(context == nullptr) {
queue_.push_front(nullptr);
empty_.notify_all();
return;
}
......@@ -32,14 +32,13 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
ScheduleStrategyPtr strategy = StrategyFactory::CreateMemStrategy();
strategy->Schedule(search_context, queue_);
TaskDispatchStrategy::Schedule(context, queue_);
empty_.notify_all();
}
IndexLoaderContextPtr
IndexLoaderQueue::Take() {
ScheduleTaskPtr
TaskDispatchQueue::Take() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
......@@ -49,20 +48,20 @@ IndexLoaderQueue::Take() {
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
IndexLoaderContextPtr front(queue_.front());
ScheduleTaskPtr front(queue_.front());
queue_.pop_front();
full_.notify_all();
return front;
}
size_t
IndexLoaderQueue::Size() {
TaskDispatchQueue::Size() {
std::lock_guard <std::mutex> lock(mtx);
return queue_.size();
}
IndexLoaderContextPtr
IndexLoaderQueue::Front() {
ScheduleTaskPtr
TaskDispatchQueue::Front() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
if (queue_.empty()) {
......@@ -70,12 +69,12 @@ IndexLoaderQueue::Front() {
SERVER_LOG_ERROR << error_msg;
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
IndexLoaderContextPtr front(queue_.front());
ScheduleTaskPtr front(queue_.front());
return front;
}
IndexLoaderContextPtr
IndexLoaderQueue::Back() {
ScheduleTaskPtr
TaskDispatchQueue::Back() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
......@@ -85,18 +84,18 @@ IndexLoaderQueue::Back() {
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
IndexLoaderContextPtr back(queue_.back());
ScheduleTaskPtr back(queue_.back());
return back;
}
bool
IndexLoaderQueue::Empty() {
TaskDispatchQueue::Empty() {
std::unique_lock <std::mutex> lock(mtx);
return queue_.empty();
}
void
IndexLoaderQueue::SetCapacity(const size_t capacity) {
TaskDispatchQueue::SetCapacity(const size_t capacity) {
capacity_ = (capacity > 0 ? capacity : capacity_);
}
......
......@@ -5,7 +5,8 @@
******************************************************************************/
#pragma once
#include "SearchContext.h"
#include "context/IScheduleContext.h"
#include "task/IScheduleTask.h"
#include <condition_variable>
#include <iostream>
......@@ -17,31 +18,23 @@ namespace zilliz {
namespace milvus {
namespace engine {
class IndexLoaderContext {
public:
TableFileSchemaPtr file_;
std::vector<SearchContextPtr> search_contexts_;
};
using IndexLoaderContextPtr = std::shared_ptr<IndexLoaderContext>;
class IndexLoaderQueue {
class TaskDispatchQueue {
public:
IndexLoaderQueue() : mtx(), full_(), empty_() {}
TaskDispatchQueue() : mtx(), full_(), empty_() {}
IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete;
TaskDispatchQueue(const TaskDispatchQueue &rhs) = delete;
IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete;
TaskDispatchQueue &operator=(const TaskDispatchQueue &rhs) = delete;
using LoaderQueue = std::list<IndexLoaderContextPtr>;
using TaskList = std::list<ScheduleTaskPtr>;
void Put(const SearchContextPtr &search_context);
void Put(const ScheduleContextPtr &context);
IndexLoaderContextPtr Take();
ScheduleTaskPtr Take();
IndexLoaderContextPtr Front();
ScheduleTaskPtr Front();
IndexLoaderContextPtr Back();
ScheduleTaskPtr Back();
size_t Size();
......@@ -54,7 +47,7 @@ private:
std::condition_variable full_;
std::condition_variable empty_;
LoaderQueue queue_;
TaskList queue_;
size_t capacity_ = 1000000;
};
......
......@@ -3,48 +3,88 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ScheduleStrategy.h"
#include "TaskDispatchStrategy.h"
#include "context/SearchContext.h"
#include "context/DeleteContext.h"
#include "task/IndexLoadTask.h"
#include "task/DeleteTask.h"
#include "cache/CpuCacheMgr.h"
#include "utils/Error.h"
#include "utils/Log.h"
#include "db/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
class MemScheduleStrategy : public IScheduleStrategy {
class ReuseCacheIndexStrategy {
public:
bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override {
if(search_context == nullptr) {
bool Schedule(const SearchContextPtr &context, std::list<ScheduleTaskPtr>& task_list) {
if(context == nullptr) {
return false;
}
SearchContext::Id2IndexMap index_files = search_context->GetIndexMap();
SearchContext::Id2IndexMap index_files = context->GetIndexMap();
//some index loader alread exists
for(auto& loader : loader_list) {
for(auto& task : task_list) {
if(task->type() != ScheduleTaskType::kIndexLoad) {
continue;
}
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
if(index_files.find(loader->file_->id_) != index_files.end()){
SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
ENGINE_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
index_files.erase(loader->file_->id_);
loader->search_contexts_.push_back(search_context);
loader->search_contexts_.push_back(context);
}
}
//index_files still contains some index files, create new loader
for(auto& pair : index_files) {
SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_;
IndexLoaderContextPtr new_loader = std::make_shared<IndexLoaderContext>();
new_loader->search_contexts_.push_back(search_context);
ENGINE_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_;
IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
new_loader->search_contexts_.push_back(context);
new_loader->file_ = pair.second;
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_);
if(index != nullptr) {
//if the index file has been in memory, increase its priority
loader_list.push_front(new_loader);
task_list.push_front(new_loader);
} else {
//index file not in memory, put it to tail
loader_list.push_back(new_loader);
task_list.push_back(new_loader);
}
}
return true;
}
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteTableStrategy {
public:
bool Schedule(const DeleteContextPtr &context, std::list<ScheduleTaskPtr> &task_list) {
if (context == nullptr) {
return false;
}
DeleteTaskPtr delete_task = std::make_shared<DeleteTask>(context);
if(task_list.empty()) {
task_list.push_back(delete_task);
return true;
}
std::string table_id = context->table_id();
for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) {
if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
continue;
}
//put delete task to proper position
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
if(loader->file_->table_id_ == table_id) {
task_list.insert(++iter, delete_task);
break;
}
}
......@@ -54,9 +94,27 @@ public:
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ScheduleStrategyPtr StrategyFactory::CreateMemStrategy() {
ScheduleStrategyPtr strategy(new MemScheduleStrategy());
return strategy;
bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr,
std::list<zilliz::milvus::engine::ScheduleTaskPtr> &task_list) {
if(context_ptr == nullptr) {
return false;
}
switch(context_ptr->type()) {
case ScheduleContextType::kSearch: {
SearchContextPtr search_context = std::static_pointer_cast<SearchContext>(context_ptr);
ReuseCacheIndexStrategy strategy;
return strategy.Schedule(search_context, task_list);
}
case ScheduleContextType::kDelete: {
DeleteContextPtr delete_context = std::static_pointer_cast<DeleteContext>(context_ptr);
DeleteTableStrategy strategy;
return strategy.Schedule(delete_context, task_list);
}
default:
ENGINE_LOG_ERROR << "Invalid schedule task type";
return false;
}
}
}
......
......@@ -5,18 +5,18 @@
******************************************************************************/
#pragma once
#include "IScheduleStrategy.h"
#include "context/IScheduleContext.h"
#include "task/IScheduleTask.h"
#include <list>
namespace zilliz {
namespace milvus {
namespace engine {
class StrategyFactory {
private:
StrategyFactory() {}
class TaskDispatchStrategy {
public:
static ScheduleStrategyPtr CreateMemStrategy();
static bool Schedule(const ScheduleContextPtr &context_ptr, std::list<ScheduleTaskPtr>& task_list);
};
}
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "TaskScheduler.h"
#include "TaskDispatchQueue.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "db/EngineFactory.h"
namespace zilliz {
namespace milvus {
namespace engine {
TaskScheduler::TaskScheduler()
: stopped_(true) {
Start();
}
TaskScheduler::~TaskScheduler() {
Stop();
}
TaskScheduler& TaskScheduler::GetInstance() {
static TaskScheduler s_instance;
return s_instance;
}
bool
TaskScheduler::Start() {
if(!stopped_) {
return true;
}
stopped_ = false;
task_queue_.SetCapacity(2);
task_dispatch_thread_ = std::make_shared<std::thread>(&TaskScheduler::TaskDispatchWorker, this);
task_thread_ = std::make_shared<std::thread>(&TaskScheduler::TaskWorker, this);
return true;
}
bool
TaskScheduler::Stop() {
if(stopped_) {
return true;
}
if(task_dispatch_thread_) {
task_dispatch_queue_.Put(nullptr);
task_dispatch_thread_->join();
task_dispatch_thread_ = nullptr;
}
if(task_thread_) {
task_queue_.Put(nullptr);
task_thread_->join();
task_thread_ = nullptr;
}
stopped_ = true;
return true;
}
bool
TaskScheduler::Schedule(ScheduleContextPtr context) {
task_dispatch_queue_.Put(context);
return true;
}
bool
TaskScheduler::TaskDispatchWorker() {
while(true) {
ScheduleTaskPtr task_ptr = task_dispatch_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop db task dispatch thread";
break;//exit
}
//execute task
ScheduleTaskPtr next_task = task_ptr->Execute();
if(next_task != nullptr) {
task_queue_.Put(next_task);
}
}
return true;
}
bool
TaskScheduler::TaskWorker() {
while(true) {
ScheduleTaskPtr task_ptr = task_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop db task thread";
break;//exit
}
//execute task
ScheduleTaskPtr next_task = task_ptr->Execute();
if(next_task != nullptr) {
task_queue_.Put(next_task);
}
}
return true;
}
}
}
}
\ No newline at end of file
......@@ -5,37 +5,40 @@
******************************************************************************/
#pragma once
#include "SearchContext.h"
#include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
#include "context/IScheduleContext.h"
#include "task/IScheduleTask.h"
#include "TaskDispatchQueue.h"
#include "utils/BlockingQueue.h"
namespace zilliz {
namespace milvus {
namespace engine {
class SearchScheduler {
class TaskScheduler {
private:
SearchScheduler();
virtual ~SearchScheduler();
TaskScheduler();
virtual ~TaskScheduler();
public:
static SearchScheduler& GetInstance();
static TaskScheduler& GetInstance();
bool ScheduleSearchTask(SearchContextPtr& search_context);
bool Schedule(ScheduleContextPtr context);
private:
bool Start();
bool Stop();
bool IndexLoadWorker();
bool SearchWorker();
bool TaskDispatchWorker();
bool TaskWorker();
private:
std::shared_ptr<std::thread> index_load_thread_;
std::shared_ptr<std::thread> search_thread_;
std::shared_ptr<std::thread> task_dispatch_thread_;
std::shared_ptr<std::thread> task_thread_;
IndexLoaderQueue index_load_queue_;
SearchTaskQueue search_queue_;
TaskDispatchQueue task_dispatch_queue_;
using TaskQueue = server::BlockingQueue<ScheduleTaskPtr>;
TaskQueue task_queue_;
bool stopped_ = true;
};
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "DeleteContext.h"
namespace zilliz {
namespace milvus {
namespace engine {
DeleteContext::DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr)
: IScheduleContext(ScheduleContextType::kDelete),
table_id_(table_id),
meta_ptr_(meta_ptr) {
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "IScheduleContext.h"
#include "db/Meta.h"
namespace zilliz {
namespace milvus {
namespace engine {
class DeleteContext : public IScheduleContext {
public:
DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr);
std::string table_id() const { return table_id_; }
meta::Meta::Ptr meta() const { return meta_ptr_; }
private:
std::string table_id_;
meta::Meta::Ptr meta_ptr_;
};
using DeleteContextPtr = std::shared_ptr<DeleteContext>;
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
enum class ScheduleContextType {
kUnknown = 0,
kSearch,
kDelete,
};
class IScheduleContext {
public:
IScheduleContext(ScheduleContextType type)
: type_(type) {
}
virtual ~IScheduleContext() = default;
ScheduleContextType type() const { return type_; }
protected:
ScheduleContextType type_;
};
using ScheduleContextPtr = std::shared_ptr<IScheduleContext>;
}
}
}
......@@ -14,7 +14,8 @@ namespace milvus {
namespace engine {
SearchContext::SearchContext(uint64_t topk, uint64_t nq, const float* vectors)
: topk_(topk),
: IScheduleContext(ScheduleContextType::kSearch),
topk_(topk),
nq_(nq),
vectors_(vectors) {
//use current time to identify this context
......
......@@ -5,6 +5,7 @@
******************************************************************************/
#pragma once
#include "IScheduleContext.h"
#include "db/MetaTypes.h"
#include <unordered_map>
......@@ -18,7 +19,7 @@ namespace engine {
using TableFileSchemaPtr = std::shared_ptr<meta::TableFileSchema>;
class SearchContext {
class SearchContext : public IScheduleContext {
public:
SearchContext(uint64_t topk, uint64_t nq, const float* vectors);
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "DeleteTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
DeleteTask::DeleteTask(const DeleteContextPtr& context)
: IScheduleTask(ScheduleTaskType::kDelete),
context_(context) {
}
std::shared_ptr<IScheduleTask> DeleteTask::Execute() {
if(context_ != nullptr && context_->meta() != nullptr) {
context_->meta()->DeleteTableFiles(context_->table_id());
}
return nullptr;
}
}
}
}
\ No newline at end of file
......@@ -5,22 +5,25 @@
******************************************************************************/
#pragma once
#include "IndexLoaderQueue.h"
#include "SearchContext.h"
#include "IScheduleTask.h"
#include "db/scheduler/context/DeleteContext.h"
namespace zilliz {
namespace milvus {
namespace engine {
class IScheduleStrategy {
class DeleteTask : public IScheduleTask {
public:
virtual ~IScheduleStrategy() {}
DeleteTask(const DeleteContextPtr& context);
virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0;
virtual std::shared_ptr<IScheduleTask> Execute() override;
private:
DeleteContextPtr context_;
};
using ScheduleStrategyPtr = std::shared_ptr<IScheduleStrategy>;
using DeleteTaskPtr = std::shared_ptr<DeleteTask>;
}
}
}
\ No newline at end of file
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
enum class ScheduleTaskType {
kUnknown = 0,
kIndexLoad,
kSearch,
kDelete,
};
class IScheduleTask {
public:
IScheduleTask(ScheduleTaskType type)
: type_(type) {
}
virtual ~IScheduleTask() = default;
ScheduleTaskType type() const { return type_; }
virtual std::shared_ptr<IScheduleTask> Execute() = 0;
protected:
ScheduleTaskType type_;
};
using ScheduleTaskPtr = std::shared_ptr<IScheduleTask>;
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "IndexLoadTask.h"
#include "SearchTask.h"
#include "db/Log.h"
#include "db/EngineFactory.h"
#include "utils/TimeRecorder.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace engine {
namespace {
void CollectFileMetrics(int file_type, size_t file_size) {
switch(file_type) {
case meta::TableFileSchema::RAW:
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
}
}
IndexLoadTask::IndexLoadTask()
: IScheduleTask(ScheduleTaskType::kIndexLoad) {
}
std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
ENGINE_LOG_INFO << "Loading index(" << file_->id_ << ") from location: " << file_->location_;
server::TimeRecorder rc("Load index");
//step 1: load index
ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType)file_->engine_type_);
index_ptr->Load();
rc.Record("load index file to memory");
size_t file_size = index_ptr->PhysicalSize();
LOG(DEBUG) << "Index file type " << file_->file_type_ << " Of Size: "
<< file_size/(1024*1024) << " M";
CollectFileMetrics(file_->file_type_, file_size);
//step 2: return search task for later execution
SearchTaskPtr task_ptr = std::make_shared<SearchTask>();
task_ptr->index_id_ = file_->id_;
task_ptr->index_type_ = file_->file_type_;
task_ptr->index_engine_ = index_ptr;
task_ptr->search_contexts_.swap(search_contexts_);
return std::static_pointer_cast<IScheduleTask>(task_ptr);
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "IScheduleTask.h"
#include "db/scheduler/context/SearchContext.h"
namespace zilliz {
namespace milvus {
namespace engine {
class IndexLoadTask : public IScheduleTask {
public:
IndexLoadTask();
virtual std::shared_ptr<IScheduleTask> Execute() override;
public:
TableFileSchemaPtr file_;
std::vector<SearchContextPtr> search_contexts_;
};
using IndexLoadTaskPtr = std::shared_ptr<IndexLoadTask>;
}
}
}
......@@ -3,7 +3,8 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "SearchTaskQueue.h"
#include "SearchTask.h"
#include "metrics/Metrics.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......@@ -54,7 +55,7 @@ void MergeResult(SearchContext::Id2ScoreMap &score_src,
while(true) {
//all score_src items are merged, if score_merged.size() still less than topk
//move items from score_target to score_merged until score_merged.size() equal topk
if(src_index >= src_count - 1) {
if(src_index >= src_count) {
for(size_t i = target_index; i < target_count && score_merged.size() < topk; ++i) {
score_merged.push_back(score_target[i]);
}
......@@ -63,7 +64,7 @@ void MergeResult(SearchContext::Id2ScoreMap &score_src,
//all score_target items are merged, if score_merged.size() still less than topk
//move items from score_src to score_merged until score_merged.size() equal topk
if(target_index >= target_count - 1) {
if(target_index >= target_count) {
for(size_t i = src_index; i < src_count && score_merged.size() < topk; ++i) {
score_merged.push_back(score_src[i]);
}
......@@ -110,15 +111,42 @@ void TopkResult(SearchContext::ResultSet &result_src,
}
}
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
bool SearchTask::DoSearch() {
}
SearchTask::SearchTask()
: IScheduleTask(ScheduleTaskType::kSearch) {
}
std::shared_ptr<IScheduleTask> SearchTask::Execute() {
if(index_engine_ == nullptr) {
return false;
return nullptr;
}
SERVER_LOG_INFO << "Searching in index(" << index_id_<< ") with "
<< search_contexts_.size() << " tasks";
server::TimeRecorder rc("DoSearch index(" + std::to_string(index_id_) + ")");
auto start_time = METRICS_NOW_TIME;
std::vector<long> output_ids;
std::vector<float> output_distence;
for(auto& context : search_contexts_) {
......@@ -153,9 +181,13 @@ bool SearchTask::DoSearch() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
rc.Elapse("totally cost");
return true;
return nullptr;
}
}
......
......@@ -5,19 +5,19 @@
******************************************************************************/
#pragma once
#include "SearchContext.h"
#include "utils/BlockingQueue.h"
#include "IScheduleTask.h"
#include "db/scheduler/context/SearchContext.h"
#include "db/ExecutionEngine.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
class SearchTask {
class SearchTask : public IScheduleTask {
public:
bool DoSearch();
SearchTask();
virtual std::shared_ptr<IScheduleTask> Execute() override;
public:
size_t index_id_ = 0;
......@@ -27,7 +27,6 @@ public:
};
using SearchTaskPtr = std::shared_ptr<SearchTask>;
using SearchTaskQueue = server::BlockingQueue<SearchTaskPtr>;
}
......
......@@ -17,10 +17,11 @@ namespace {
static const std::string TABLE_NAME = GetTableName();
static constexpr int64_t TABLE_DIMENSION = 512;
static constexpr int64_t TOTAL_ROW_COUNT = 100000;
static constexpr int64_t BATCH_ROW_COUNT = 100000;
static constexpr int64_t NQ = 10;
static constexpr int64_t TOP_K = 10;
static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
static constexpr int64_t ADD_VECTOR_LOOP = 1;
static constexpr int64_t ADD_VECTOR_LOOP = 5;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
......@@ -96,7 +97,7 @@ namespace {
TableSchema BuildTableSchema() {
TableSchema tb_schema;
tb_schema.table_name = TABLE_NAME;
tb_schema.index_type = IndexType::gpu_ivfflat;
tb_schema.index_type = IndexType::cpu_idmap;
tb_schema.dimension = TABLE_DIMENSION;
tb_schema.store_raw_vector = true;
......@@ -110,17 +111,21 @@ namespace {
}
vector_record_array.clear();
for (int64_t k = from; k < to; k++) {
RowRecord record;
record.data.resize(TABLE_DIMENSION);
for(int64_t i = 0; i < TABLE_DIMENSION; i++) {
record.data[i] = (float)(i + k);
record.data[i] = (float)(k%(i+1));
}
vector_record_array.emplace_back(record);
}
}
void Sleep(int seconds) {
std::cout << "Waiting " << seconds << " seconds ..." << std::endl;
sleep(seconds);
}
}
void
......@@ -171,7 +176,7 @@ ClientTest::Test(const std::string& address, const std::string& port) {
for(int i = 0; i < ADD_VECTOR_LOOP; i++){//add vectors
std::vector<RowRecord> record_array;
BuildVectors(i*TOTAL_ROW_COUNT, (i+1)*TOTAL_ROW_COUNT, record_array);
BuildVectors(i*BATCH_ROW_COUNT, (i+1)*BATCH_ROW_COUNT, record_array);
std::vector<int64_t> record_ids;
Status stat = conn->AddVector(TABLE_NAME, record_array, record_ids);
std::cout << "AddVector function call status: " << stat.ToString() << std::endl;
......@@ -179,10 +184,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
}
{//search vectors
std::cout << "Waiting data persist. Sleep 10 seconds ..." << std::endl;
sleep(10);
Sleep(2);
std::vector<RowRecord> record_array;
BuildVectors(SEARCH_TARGET, SEARCH_TARGET + 10, record_array);
BuildVectors(SEARCH_TARGET, SEARCH_TARGET + NQ, record_array);
std::vector<Range> query_range_array;
Range rg;
......@@ -195,10 +200,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
PrintSearchResult(topk_query_result_array);
}
// {//delete table
// Status stat = conn->DeleteTable(TABLE_NAME);
// std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
// }
{//delete table
Status stat = conn->DeleteTable(TABLE_NAME);
std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
}
{//server status
std::string status = conn->ServerStatus();
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "DBWrapper.h"
#include "ServerConfig.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace server {
DBWrapper::DBWrapper() {
zilliz::milvus::engine::Options opt;
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
opt.meta.path = db_path + "/db";
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
CommonUtil::CreateDirectory(opt.meta.path);
zilliz::milvus::engine::DB::Open(opt, &db_);
if(db_ == nullptr) {
SERVER_LOG_ERROR << "Failed to open db";
throw ServerException(SERVER_NULL_POINTER, "Failed to open db");
}
}
DBWrapper::~DBWrapper() {
delete db_;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "db/DB.h"
#include "db/Meta.h"
namespace zilliz {
namespace milvus {
namespace server {
class DBWrapper {
private:
DBWrapper();
~DBWrapper();
public:
static zilliz::milvus::engine::DB* DB() {
static DBWrapper db_wrapper;
return db_wrapper.db();
}
zilliz::milvus::engine::DB* db() { return db_; }
private:
zilliz::milvus::engine::DB* db_ = nullptr;
};
}
}
}
......@@ -7,6 +7,7 @@
#include "RequestHandler.h"
#include "ServerConfig.h"
#include "ThreadPoolServer.h"
#include "DBWrapper.h"
#include "milvus_types.h"
#include "milvus_constants.h"
......@@ -51,6 +52,8 @@ MilvusServer::StartService() {
std::string mode = server_config.GetValue(CONFIG_SERVER_MODE, "thread_pool");
try {
DBWrapper::DB();//initialize db
stdcxx::shared_ptr<RequestHandler> handler(new RequestHandler());
stdcxx::shared_ptr<TProcessor> processor(new MilvusServiceProcessor(handler));
stdcxx::shared_ptr<TServerTransport> server_transport(new TServerSocket(address, port));
......
......@@ -8,9 +8,7 @@
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "db/DB.h"
#include "db/Env.h"
#include "db/Meta.h"
#include "DBWrapper.h"
#include "version.h"
namespace zilliz {
......@@ -27,44 +25,6 @@ using DB_META = zilliz::milvus::engine::meta::Meta;
using DB_DATE = zilliz::milvus::engine::meta::DateT;
namespace {
class DBWrapper {
public:
DBWrapper() {
zilliz::milvus::engine::Options opt;
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
opt.memory_sync_interval = (uint16_t)config.GetInt32Value(CONFIG_DB_FLUSH_INTERVAL, 10);
opt.meta.path = db_path + "/db";
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
CommonUtil::CreateDirectory(opt.meta.path);
zilliz::milvus::engine::DB::Open(opt, &db_);
if(db_ == nullptr) {
SERVER_LOG_ERROR << "Failed to open db";
throw ServerException(SERVER_NULL_POINTER, "Failed to open db");
}
}
~DBWrapper() {
delete db_;
}
zilliz::milvus::engine::DB* DB() { return db_; }
private:
zilliz::milvus::engine::DB* db_ = nullptr;
};
zilliz::milvus::engine::DB* DB() {
static DBWrapper db_wrapper;
return db_wrapper.DB();
}
engine::EngineType EngineType(int type) {
static std::map<int, engine::EngineType> map_type = {
{0, engine::EngineType::INVALID},
......@@ -201,7 +161,7 @@ ServerError CreateTableTask::OnExecute() {
table_info.store_raw_data_ = schema_.store_raw_vector;
//step 3: create table
engine::Status stat = DB()->CreateTable(table_info);
engine::Status stat = DBWrapper::DB()->CreateTable(table_info);
if(!stat.ok()) {//table could exist
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
......@@ -223,7 +183,7 @@ ServerError CreateTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeTableTask::DescribeTableTask(const std::string &table_name, thrift::TableSchema &schema)
: BaseTask(PING_TASK_GROUP),
: BaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name),
schema_(schema) {
schema_.table_name = table_name_;
......@@ -248,7 +208,7 @@ ServerError DescribeTableTask::OnExecute() {
//step 2: get table info
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name_;
engine::Status stat = DB()->DescribeTable(table_info);
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
if(!stat.ok()) {
error_code_ = SERVER_TABLE_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
......@@ -299,7 +259,7 @@ ServerError DeleteTableTask::OnExecute() {
//step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name_;
engine::Status stat = DB()->DescribeTable(table_info);
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
if(!stat.ok()) {
error_code_ = SERVER_TABLE_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
......@@ -311,7 +271,7 @@ ServerError DeleteTableTask::OnExecute() {
//step 3: delete table
std::vector<DB_DATE> dates;
stat = DB()->DeleteTable(table_name_, dates);
stat = DBWrapper::DB()->DeleteTable(table_name_, dates);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
......@@ -331,7 +291,7 @@ ServerError DeleteTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask::ShowTablesTask(std::vector<std::string>& tables)
: BaseTask(DQL_TASK_GROUP),
: BaseTask(DDL_DML_TASK_GROUP),
tables_(tables) {
}
......@@ -342,7 +302,7 @@ BaseTaskPtr ShowTablesTask::Create(std::vector<std::string>& tables) {
ServerError ShowTablesTask::OnExecute() {
std::vector<engine::meta::TableSchema> schema_array;
engine::Status stat = DB()->AllTables(schema_array);
engine::Status stat = DBWrapper::DB()->AllTables(schema_array);
if(!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
......@@ -397,7 +357,7 @@ ServerError AddVectorTask::OnExecute() {
//step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name_;
engine::Status stat = DB()->DescribeTable(table_info);
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
if(!stat.ok()) {
error_code_ = SERVER_TABLE_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
......@@ -419,7 +379,7 @@ ServerError AddVectorTask::OnExecute() {
//step 4: insert vectors
uint64_t vec_count = (uint64_t)record_array_.size();
stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
stat = DBWrapper::DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
rc.Record("add vectors to engine");
if(!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
......@@ -453,13 +413,13 @@ SearchVectorTask::SearchVectorTask(const std::string &table_name,
const std::vector<thrift::Range> &query_range_array,
const int64_t top_k,
std::vector<thrift::TopKQueryResult> &result_array)
: BaseTask(DQL_TASK_GROUP),
table_name_(table_name),
file_id_array_(file_id_array),
record_array_(query_record_array),
range_array_(query_range_array),
top_k_(top_k),
result_array_(result_array) {
: BaseTask(DQL_TASK_GROUP),
table_name_(table_name),
file_id_array_(file_id_array),
record_array_(query_record_array),
range_array_(query_range_array),
top_k_(top_k),
result_array_(result_array) {
}
......@@ -495,7 +455,7 @@ ServerError SearchVectorTask::OnExecute() {
//step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name_;
engine::Status stat = DB()->DescribeTable(table_info);
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
if(!stat.ok()) {
error_code_ = SERVER_TABLE_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
......@@ -528,9 +488,9 @@ ServerError SearchVectorTask::OnExecute() {
uint64_t record_count = (uint64_t)record_array_.size();
if(file_id_array_.empty()) {
stat = DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
} else {
stat = DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
stat = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
}
rc.Record("search vectors from engine");
......@@ -577,7 +537,7 @@ ServerError SearchVectorTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count)
: BaseTask(DQL_TASK_GROUP),
: BaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name),
row_count_(row_count) {
......@@ -601,7 +561,7 @@ ServerError GetTableRowCountTask::OnExecute() {
//step 2: get row count
uint64_t row_count = 0;
engine::Status stat = DB()->GetTableRowCount(table_name_, row_count);
engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
if (!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
......
......@@ -23,7 +23,6 @@ static const std::string CONFIG_SERVER_MODE = "server_mode";
static const std::string CONFIG_DB = "db_config";
static const std::string CONFIG_DB_URL = "db_backend_url";
static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_FLUSH_INTERVAL = "db_flush_interval";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_LOG = "log_config";
......
......@@ -4,11 +4,19 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler db_scheduler_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files)
set(db_scheduler_srcs
${scheduler_files}
${scheduler_context_files}
${scheduler_task_files}
)
include_directories(/usr/local/cuda/include)
link_directories("/usr/local/cuda/lib64")
......
......@@ -6,6 +6,7 @@
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include <boost/filesystem.hpp>
#include "utils.h"
#include "db/DB.h"
......@@ -14,6 +15,31 @@
using namespace zilliz::milvus;
namespace {
static const std::string TABLE_NAME = "test_group";
static constexpr int64_t TABLE_DIM = 256;
engine::meta::TableSchema BuildTableSchema() {
engine::meta::TableSchema table_info;
table_info.dimension_ = TABLE_DIM;
table_info.table_id_ = TABLE_NAME;
table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP;
return table_info;
}
void BuildVectors(int64_t n, std::vector<float>& vectors) {
vectors.clear();
vectors.resize(n*TABLE_DIM);
float* data = vectors.data();
for(int i = 0; i < n; i++) {
for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48();
data[TABLE_DIM * i] += i / 2000.;
}
}
}
TEST_F(DBTest, CONFIG_TEST) {
{
ASSERT_ANY_THROW(engine::ArchiveConf conf("wrong"));
......@@ -60,89 +86,33 @@ TEST_F(DBTest, CONFIG_TEST) {
}
}
TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
uint64_t size;
engine::meta::TableSchema group_info;
group_info.dimension_ = group_dim;
group_info.table_id_ = group_name;
group_info.engine_type_ = (int)engine::EngineType::FAISS_IVFFLAT;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id_ = group_name;
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension_, group_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
db_->Size(size);
int d = 256;
int nb = 20;
float *xb = new float[d * nb];
for(int i = 0; i < nb; i++) {
for(int j = 0; j < d; j++) xb[d * i + j] = drand48();
xb[d * i] += i / 2000.;
}
int loop = 100000;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(group_name, nb, xb, vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->Size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_TRUE(size < 1 * engine::meta::G);
delete [] xb;
};
TEST_F(DBTest, DB_TEST) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::TableSchema group_info;
group_info.dimension_ = group_dim;
group_info.table_id_ = group_name;
group_info.engine_type_ = (int)engine::EngineType::FAISS_IVFFLAT;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id_ = group_name;
stat = db_->DescribeTable(group_info_get);
static const std::string table_name = "test_group";
static const int table_dim = 256;
engine::meta::TableSchema table_info;
table_info.dimension_ = table_dim;
table_info.table_id_ = table_name;
table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP;
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = table_name;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension_, group_dim);
ASSERT_EQ(table_info_get.dimension_, table_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
int d = 256;
int nb = 50;
float *xb = new float[d * nb];
for(int i = 0; i < nb; i++) {
for(int j = 0; j < d; j++) xb[d * i + j] = drand48();
xb[d * i] += i / 2000.;
}
int64_t nb = 50;
std::vector<float> xb;
BuildVectors(nb, xb);
int qb = 5;
float *qxb = new float[d * qb];
for(int i = 0; i < qb; i++) {
for(int j = 0; j < d; j++) qxb[d * i + j] = drand48();
qxb[d * i] += i / 2000.;
}
int64_t qb = 5;
std::vector<float> qxb;
BuildVectors(qb, qxb);
std::thread search([&]() {
engine::QueryResults results;
......@@ -160,7 +130,7 @@ TEST_F(DBTest, DB_TEST) {
prev_count = count;
START_TIMER;
stat = db_->Query(group_name, k, qb, qxb, results);
stat = db_->Query(table_name, k, qb, qxb.data(), results);
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
......@@ -183,54 +153,45 @@ TEST_F(DBTest, DB_TEST) {
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->InsertVectors(group_name, qb, qxb, target_ids);
db_->InsertVectors(table_name, qb, qxb.data(), target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->InsertVectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(table_name, nb, xb.data(), vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
search.join();
delete [] xb;
delete [] qxb;
};
TEST_F(DBTest, SEARCH_TEST) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::TableSchema group_info;
group_info.dimension_ = group_dim;
group_info.table_id_ = group_name;
group_info.engine_type_ = (int)engine::EngineType::FAISS_IVFFLAT;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id_ = group_name;
stat = db_->DescribeTable(group_info_get);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension_, group_dim);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
// prepare raw data
size_t nb = 250000;
size_t nq = 10;
size_t k = 5;
std::vector<float> xb(nb*group_dim);
std::vector<float> xq(nq*group_dim);
std::vector<float> xb(nb*TABLE_DIM);
std::vector<float> xq(nq*TABLE_DIM);
std::vector<long> ids(nb);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
for (size_t i = 0; i < nb*group_dim; i++) {
for (size_t i = 0; i < nb*TABLE_DIM; i++) {
xb[i] = dis_xt(gen);
if (i < nb){
ids[i] = i;
}
}
for (size_t i = 0; i < nq*group_dim; i++) {
for (size_t i = 0; i < nq*TABLE_DIM; i++) {
xq[i] = dis_xt(gen);
}
......@@ -243,7 +204,7 @@ TEST_F(DBTest, SEARCH_TEST) {
// insert data
const int batch_size = 100;
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->InsertVectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids);
stat = db_->InsertVectors(TABLE_NAME, batch_size, xb.data()+batch_size*j*TABLE_DIM, ids);
if (j == 200){ sleep(1);}
ASSERT_STATS(stat);
}
......@@ -251,9 +212,77 @@ TEST_F(DBTest, SEARCH_TEST) {
sleep(2); // wait until build index finish
engine::QueryResults results;
stat = db_->Query(group_name, k, nq, xq.data(), results);
stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results);
ASSERT_STATS(stat);
// TODO(linxj): add groundTruth assert
};
TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
uint64_t size;
db_->Size(size);
int64_t nb = 10;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 100000;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->Size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_LE(size, 1 * engine::meta::G);
};
TEST_F(DBTest2, DELETE_TEST) {
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_));
engine::IDNumbers vector_ids;
uint64_t size;
db_->Size(size);
int64_t nb = 100000;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 20;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::vector<engine::meta::DateT> dates;
stat = db_->DeleteTable(TABLE_NAME, dates);
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_TRUE(stat.ok());
ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
};
......@@ -13,12 +13,19 @@ include_directories(../../src)
aux_source_directory(../../src/db db_srcs)
aux_source_directory(../../src/db/scheduler db_scheduler_srcs)
aux_source_directory(../../src/config config_files)
aux_source_directory(../../src/cache cache_srcs)
aux_source_directory(../../src/wrapper wrapper_src)
aux_source_directory(../../src/metrics metrics_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files)
set(db_scheduler_srcs
${scheduler_files}
${scheduler_context_files}
${scheduler_task_files}
)
include_directories(/usr/include)
include_directories(../../third_party/build/include)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册