提交 54361633 编写于 作者: G groot

refine code


Former-commit-id: eae3ec0b3f87dbecf1e44d079d4a4a46794f9b9f
上级 31e0303d
......@@ -25,6 +25,10 @@ namespace engine {
namespace {
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
......@@ -130,7 +134,7 @@ DBImpl::DBImpl(const Options& options)
pMemMgr_(new MemManager(pMeta_, options_)),
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
StartTimerTasks(options_.memory_sync_interval);
StartTimerTasks();
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
......@@ -399,12 +403,11 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
return Status::OK();
}
void DBImpl::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
void DBImpl::StartTimerTasks() {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
void DBImpl::BackgroundTimerTask(int interval) {
void DBImpl::BackgroundTimerTask() {
Status status;
server::SystemInfo::GetInstance().Init();
while (true) {
......@@ -419,9 +422,22 @@ void DBImpl::BackgroundTimerTask(int interval) {
break;
}
std::this_thread::sleep_for(std::chrono::seconds(interval));
std::this_thread::sleep_for(std::chrono::seconds(1));
StartMetricTask();
StartCompactionTask();
StartBuildIndexTask();
}
}
void DBImpl::StartMetricTask() {
static uint64_t metric_clock_tick = 0;
metric_clock_tick++;
if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
return;
}
server::Metrics::GetInstance().KeepingAliveCounterIncrement(interval);
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
......@@ -433,13 +449,15 @@ void DBImpl::BackgroundTimerTask(int interval) {
server::Metrics::GetInstance().GPUPercentGaugeSet();
server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
server::Metrics::GetInstance().OctetsSet();
StartCompactionTask();
StartBuildIndexTask();
}
}
void DBImpl::StartCompactionTask() {
static uint64_t compact_clock_tick = 0;
compact_clock_tick++;
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
return;
}
//serialize memory data
std::vector<std::string> temp_table_ids;
pMemMgr_->Serialize(temp_table_ids);
......@@ -556,6 +574,12 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
}
void DBImpl::StartBuildIndexTask() {
static uint64_t index_clock_tick = 0;
index_clock_tick++;
if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
return;
}
//build index has been finished?
if(!index_thread_results_.empty()) {
std::chrono::milliseconds span(10);
......@@ -581,7 +605,11 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
}
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
if(to_index == nullptr) {
return Status::Error("Invalid engine type");
}
try {
to_index->Load();
auto start_time = METRICS_NOW_TIME;
auto index = to_index->BuildIndex(table_file.location_);
......@@ -603,7 +631,10 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
<< " from file " << to_remove.file_id_;
index->Cache();
pMeta_->Archive();
} catch (std::exception& ex) {
return Status::Error("Build index encounter exception", ex.what());
}
return Status::OK();
}
......
......@@ -70,8 +70,10 @@ private:
const meta::DatesT& dates, QueryResults& results);
void StartTimerTasks(int interval);
void BackgroundTimerTask(int interval);
void StartTimerTasks();
void BackgroundTimerTask();
void StartMetricTask();
void StartCompactionTask();
Status MergeFiles(const std::string& table_id,
......
......@@ -186,10 +186,18 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::Metrics::GetInstance().MetaAccessTotalIncrement();
if (table_schema.table_id_ == "") {
NextTableId(table_schema.table_id_);
} else {
auto table = ConnectorPtr->select(columns(&TableSchema::state_),
where(c(&TableSchema::table_id_) == table_schema.table_id_));
if (table.size() == 1) {
std::string msg = (TableSchema::TO_DELETE == std::get<0>(table[0])) ?
"Table already exists" : "Table already exists and it is in delete state, please wait a second";
return Status::Error(msg);
}
}
table_schema.files_cnt_ = 0;
table_schema.id_ = -1;
table_schema.created_on_ = utils::GetMicroSecTimeStamp();
......@@ -207,8 +215,8 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
assert(ret);
}
} catch (std::exception &e) {
......@@ -733,10 +741,12 @@ Status DBMetaImpl::Size(uint64_t &result) {
}
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
LOG(DEBUG) << "About to discard size=" << to_discard_size;
if (to_discard_size <= 0) {
return Status::OK();
}
LOG(DEBUG) << "About to discard size=" << to_discard_size;
try {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::size_),
......
......@@ -6,7 +6,7 @@
#include "SearchScheduler.h"
#include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
#include "SearchTask.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "metrics/Metrics.h"
......
......@@ -7,7 +7,7 @@
#include "SearchContext.h"
#include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
#include "SearchTask.h"
namespace zilliz {
namespace milvus {
......@@ -35,6 +35,8 @@ private:
std::shared_ptr<std::thread> search_thread_;
IndexLoaderQueue index_load_queue_;
using SearchTaskQueue = server::BlockingQueue<SearchTaskPtr>;
SearchTaskQueue search_queue_;
bool stopped_ = true;
......
......@@ -3,7 +3,7 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "SearchTaskQueue.h"
#include "SearchTask.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......
......@@ -27,7 +27,6 @@ public:
};
using SearchTaskPtr = std::shared_ptr<SearchTask>;
using SearchTaskQueue = server::BlockingQueue<SearchTaskPtr>;
}
......
......@@ -221,7 +221,7 @@ ServerError CreateTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeTableTask::DescribeTableTask(const std::string &table_name, thrift::TableSchema &schema)
: BaseTask(PING_TASK_GROUP),
: BaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name),
schema_(schema) {
schema_.table_name = table_name_;
......@@ -329,7 +329,7 @@ ServerError DeleteTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask::ShowTablesTask(std::vector<std::string>& tables)
: BaseTask(DQL_TASK_GROUP),
: BaseTask(DDL_DML_TASK_GROUP),
tables_(tables) {
}
......@@ -575,7 +575,7 @@ ServerError SearchVectorTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count)
: BaseTask(DQL_TASK_GROUP),
: BaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name),
row_count_(row_count) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册