提交 59d66f49 编写于 作者: W wxyu

solve conflicts


Former-commit-id: 3598c2e165c6f019ae291d5e3ae3fd4a19359f6b
......@@ -40,9 +40,11 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-394 - Update scheduler unittest
- MS-400 - Add timestamp record in task state change function
- MS-402 - Add dump implementation for TaskTableItem
- MS-406 - Add table flag for meta
- MS-403 - Add GpuCacheMgr
- MS-404 - Release index after search task done avoid memory increment continues
- MS-405 - Add delete task support
- MS-407 - Reconstruct MetricsCollector
- MS-408 - Add device_id in resource construct function
- MS-409 - Using new scheduler
......
server_config:
address: 0.0.0.0
address: 0.0.0.0 # milvus server ip address
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1
mode: single # milvus deployment type: single, cluster, read_only
......@@ -7,44 +7,35 @@ server_config:
db_config:
db_path: @MILVUS_DB_PATH@ # milvus data storage path
db_slave_path: # secondry data storage path, split by semicolon
parallel_reduce: false # use multi-threads to reduce topk result
parallel_reduce: false # use multi-threads to reduce topk result
# URI format: dialect://username:password@host:port/database
# All parts except dialect are optional, but you MUST include the delimiters
# Currently dialect supports mysql or sqlite
db_backend_url: sqlite://:@:/
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
archive_disk_threshold: 0 # triger archive action if storage size exceed this value, 0 means no limit, unit: GB
archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day
insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB.
# the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory, unit: GB
metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
license_config: # license configure
license_path: "@MILVUS_DB_PATH@/system.license" # license file path
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
gpu_ids: 0,1 # gpu id
cache_config:
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
gpu_ids: 0,1 # gpu id
engine_config:
nprobe: 10
nlist: 16384
use_blas_threshold: 20
metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP
omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute
......@@ -82,4 +73,4 @@ resource_config:
connections:
- ssda===cpu
- cpu===gtx1060
- cpu===gtx1660
\ No newline at end of file
- cpu===gtx1660
......@@ -29,6 +29,7 @@ public:
virtual Status AllTables(std::vector<meta::TableSchema>& table_schema_array) = 0;
virtual Status GetTableRowCount(const std::string& table_id, uint64_t& row_count) = 0;
virtual Status PreloadTable(const std::string& table_id) = 0;
virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
virtual Status InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
......
......@@ -24,6 +24,7 @@
#include <cache/CpuCacheMgr.h>
#include <boost/filesystem.hpp>
#include "scheduler/SchedInst.h"
#include <src/cache/GpuCacheMgr.h>
namespace zilliz {
namespace milvus {
......@@ -35,32 +36,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
}
}
void CollectQueryMetrics(double total_time, size_t nq) {
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
}
......@@ -157,6 +132,10 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
return Status::OK();
}
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
return meta_ptr_->UpdateTableFlag(table_id, flag);
}
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
return meta_ptr_->Count(table_id, row_count);
}
......@@ -165,29 +144,23 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
auto start_time = METRICS_NOW_TIME;
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
Status status;
zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
CollectInsertMetrics(total_time, n, status.ok());
return status;
}
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float *vectors, QueryResults &results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
meta::DatesT dates = {meta::Meta::GetDate()};
Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return result;
}
......@@ -254,7 +227,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
server::TimeRecorder rc("");
//step 1: get files to search
......@@ -297,11 +271,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
results = context->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return Status::OK();
}
......@@ -421,20 +390,16 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
long index_size = 0;
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
auto start_time = METRICS_NOW_TIME;
index->Merge(file.location_);
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
index_size = index->Size();
if (index_size >= options_.index_trigger_size) break;
if (index_size >= file_schema.index_file_size_) break;
}
//step 3: serialize to disk
......@@ -584,6 +549,11 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
//step 2: drop old index files
DropIndex(table_id);
if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
ENGINE_LOG_DEBUG << "index type = IDMAP, no need to build index";
return Status::OK();
}
//step 3: update index info
status = meta_ptr_->UpdateTableIndexParam(table_id, index);
......@@ -644,11 +614,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
std::shared_ptr<ExecutionEngine> index;
try {
auto start_time = METRICS_NOW_TIME;
index = to_index->BuildIndex(table_file.location_);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
} catch (std::exception& ex) {
//typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
......
......@@ -36,40 +36,32 @@ class DBImpl : public DB {
explicit DBImpl(const Options &options);
Status
CreateTable(meta::TableSchema &table_schema) override;
Status CreateTable(meta::TableSchema &table_schema) override;
Status
DeleteTable(const std::string &table_id, const meta::DatesT &dates) override;
Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override;
Status
DescribeTable(meta::TableSchema &table_schema) override;
Status DescribeTable(meta::TableSchema &table_schema) override;
Status
HasTable(const std::string &table_id, bool &has_or_not) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override;
Status
AllTables(std::vector<meta::TableSchema> &table_schema_array) override;
Status AllTables(std::vector<meta::TableSchema> &table_schema_array) override;
Status
PreloadTable(const std::string &table_id) override;
Status PreloadTable(const std::string &table_id) override;
Status
GetTableRowCount(const std::string &table_id, uint64_t &row_count) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag);
Status
InsertVectors(const std::string &table_id, uint64_t n, const float *vectors, IDNumbers &vector_ids) override;
Status GetTableRowCount(const std::string &table_id, uint64_t &row_count) override;
Status
Query(const std::string &table_id,
Status InsertVectors(const std::string &table_id, uint64_t n, const float *vectors, IDNumbers &vector_ids) override;
Status Query(const std::string &table_id,
uint64_t k,
uint64_t nq,
uint64_t nprobe,
const float *vectors,
QueryResults &results) override;
Status
Query(const std::string &table_id,
Status Query(const std::string &table_id,
uint64_t k,
uint64_t nq,
uint64_t nprobe,
......@@ -77,8 +69,7 @@ class DBImpl : public DB {
const meta::DatesT &dates,
QueryResults &results) override;
Status
Query(const std::string &table_id,
Status Query(const std::string &table_id,
const std::vector<std::string> &file_ids,
uint64_t k,
uint64_t nq,
......
......@@ -55,9 +55,8 @@ struct Options {
} MODE;
Options();
uint16_t memory_sync_interval = 1; //unit: second
uint16_t merge_trigger_number = 2;
size_t index_trigger_size = ONE_GB; //unit: byte
DBMetaOptions meta;
int mode = MODE::SINGLE;
......
......@@ -153,6 +153,10 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
&& index1.metric_type_ == index2.metric_type_;
}
bool UserDefinedId(int64_t flag) {
return flag & meta::FLAG_MASK_USERID;
}
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -27,6 +27,8 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2);
bool UserDefinedId(int64_t flag);
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -62,7 +62,7 @@ public:
float *distances,
long *labels) const = 0;
virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string&) = 0;
virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string& location, EngineType engine_type) = 0;
virtual Status Cache() = 0;
......
......@@ -118,9 +118,10 @@ Status ExecutionEngineImpl::Serialize() {
Status ExecutionEngineImpl::Load(bool to_cache) {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
double physical_size = PhysicalSize();
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = read_index(location_);
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
} catch (knowhere::KnowhereException &e) {
......@@ -133,14 +134,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
if (!already_in_cache && to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time));
}
return Status::OK();
}
......@@ -148,7 +141,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToGpu(device_id);
......@@ -163,12 +155,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
if (!already_in_cache) {
GpuCache(device_id);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
......@@ -177,7 +163,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
Status ExecutionEngineImpl::CopyToCpu() {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToCpu();
......@@ -192,14 +177,7 @@ Status ExecutionEngineImpl::CopyToCpu() {
if(!already_in_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
......@@ -219,6 +197,8 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
try {
double physical_size = server::CommonUtil::GetFileSize(location);
server::CollectExecutionEngineMetrics metrics(physical_size);
to_merge = read_index(location);
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
......@@ -241,11 +221,11 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
}
ExecutionEnginePtr
ExecutionEngineImpl::BuildIndex(const std::string &location) {
ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_type) {
ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;
auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
auto to_index = CreatetVecIndex(index_type_);
auto to_index = CreatetVecIndex(engine_type);
if (!to_index) {
throw Exception("Create Empty VecIndex");
}
......@@ -263,7 +243,7 @@ ExecutionEngineImpl::BuildIndex(const std::string &location) {
build_cfg);
if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
return std::make_shared<ExecutionEngineImpl>(to_index, location, index_type_, metric_type_, nlist_);
return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
}
Status ExecutionEngineImpl::Search(long n,
......
......@@ -61,7 +61,7 @@ public:
float *distances,
long *labels) const override;
ExecutionEnginePtr BuildIndex(const std::string &) override;
ExecutionEnginePtr BuildIndex(const std::string &location, EngineType engine_type) override;
Status Cache() override;
......
......@@ -80,21 +80,14 @@ bool MemTableFile::IsFull() {
}
Status MemTableFile::Serialize() {
auto start_time = METRICS_NOW_TIME;
auto size = GetCurrentMem();
size_t size = GetCurrentMem();
server::CollectSerializeMetrics metrics(size);
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.file_size_ = execution_engine_->PhysicalSize();
table_file_schema_.row_count_ = execution_engine_->Count();
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ?
table_file_schema_.file_type_ = (size >= table_file_schema_.index_file_size_) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = meta_->UpdateTableFile(table_file_schema_);
......
......@@ -24,7 +24,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
size_t &num_vectors_added,
IDNumbers &vector_ids) {
auto start_time = METRICS_NOW_TIME;
server::CollectAddMetrics metrics(n_, table_file_schema.dimension_);
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
num_vectors_to_add : n_ - current_num_vectors_added;
......@@ -49,12 +49,6 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(table_file_schema.dimension_),
total_time);
return status;
}
......
......@@ -42,6 +42,9 @@ class Meta {
virtual Status
UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0;
virtual Status
UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
virtual Status
DeleteTable(const std::string &table_id) = 0;
......
......@@ -19,9 +19,11 @@ namespace meta {
constexpr int32_t DEFAULT_ENGINE_TYPE = (int)EngineType::FAISS_IDMAP;
constexpr int32_t DEFAULT_NLIST = 16384;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = 1024*ONE_MB;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB;
constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2;
constexpr int64_t FLAG_MASK_USERID = 1;
typedef int DateT;
const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
......@@ -37,6 +39,7 @@ struct TableSchema {
int32_t state_ = (int)NORMAL;
uint16_t dimension_ = 0;
int64_t created_on_ = 0;
int64_t flag_ = 0;
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
int32_t nlist_ = DEFAULT_NLIST;
int32_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE;
......@@ -68,6 +71,7 @@ struct TableFileSchema {
int64_t created_on_ = 0;
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
int32_t nlist_ = DEFAULT_NLIST; //not persist to meta
int32_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE; //not persist to meta
int32_t metric_type_ = DEFAULT_METRIC_TYPE; //not persist to meta
}; // TableFileSchema
......
......@@ -39,24 +39,6 @@ Status HandleException(const std::string &desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
......@@ -155,6 +137,7 @@ Status MySQLMetaImpl::Initialize() {
"state INT NOT NULL, " <<
"dimension SMALLINT NOT NULL, " <<
"created_on BIGINT NOT NULL, " <<
"flag BIGINT DEFAULT 0 NOT NULL, " <<
"engine_type INT DEFAULT 1 NOT NULL, " <<
"nlist INT DEFAULT 16384 NOT NULL, " <<
"index_file_size INT DEFAULT 1024 NOT NULL, " <<
......@@ -272,7 +255,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -390,7 +373,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -425,7 +408,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
"engine_type_ = " << index.engine_type_ << ", " <<
"nlist = " << index.nlist_ << ", " <<
"index_file_size = " << index.index_file_size_*ONE_MB << ", " <<
"metric_type = " << index.metric_type_ << ", " <<
"metric_type = " << index.metric_type_ << " " <<
"WHERE id = " << quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();
......@@ -455,9 +438,49 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
return Status::OK();
}
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
Query updateTableFlagQuery = connectionPtr->query();
updateTableFlagQuery << "UPDATE Tables " <<
"SET flag = " << flag << " " <<
"WHERE id = " << quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();
if (!updateTableFlagQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FLAG";
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FLAG",
updateTableFlagQuery.error());
}
} //Scoped Connection
} catch (const BadQuery &er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FLAG" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FLAG", er.what());
} catch (const Exception &er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FLAG" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FLAG", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -504,7 +527,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -560,7 +583,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -603,7 +626,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -642,7 +665,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -697,7 +720,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -737,7 +760,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -802,7 +825,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -812,6 +835,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
file_schema.nlist_ = table_schema.nlist_;
file_schema.index_file_size_ = table_schema.index_file_size_;
file_schema.metric_type_ = table_schema.metric_type_;
utils::GetTableFilePath(options_, file_schema);
......@@ -870,7 +894,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -926,8 +950,9 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
groups[table_file.table_id_] = table_schema;
}
table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
table_file.nlist_ = groups[table_file.table_id_].nlist_;
table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
table_file.dimension_ = groups[table_file.table_id_].dimension_;
utils::GetTableFilePath(options_, table_file);
......@@ -953,7 +978,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1020,10 +1045,12 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
table_file.engine_type_ = resRow["engine_type"];
table_file.metric_type_ = table_schema.metric_type_;
table_file.nlist_ = table_schema.nlist_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.metric_type_ = table_schema.metric_type_;
std::string file_id;
resRow["file_id"].to_string(file_id);
table_file.file_id_ = file_id;
......@@ -1067,7 +1094,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1132,10 +1159,12 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
table_file.engine_type_ = resRow["engine_type"];
table_file.metric_type_ = table_schema.metric_type_;
table_file.nlist_ = table_schema.nlist_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.metric_type_ = table_schema.metric_type_;
std::string file_id;
resRow["file_id"].to_string(file_id);
table_file.file_id_ = file_id;
......@@ -1177,7 +1206,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -1232,10 +1261,12 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
table_file.engine_type_ = resRow["engine_type"];
table_file.metric_type_ = table_schema.metric_type_;
table_file.nlist_ = table_schema.nlist_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.metric_type_ = table_schema.metric_type_;
table_file.created_on_ = resRow["created_on"];
table_file.dimension_ = table_schema.dimension_;
......@@ -1315,10 +1346,12 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
file_schema.engine_type_ = resRow["engine_type"];
file_schema.metric_type_ = table_schema.metric_type_;
file_schema.nlist_ = table_schema.nlist_;
file_schema.index_file_size_ = table_schema.index_file_size_;
file_schema.metric_type_ = table_schema.metric_type_;
std::string file_id;
resRow["file_id"].to_string(file_id);
file_schema.file_id_ = file_id;
......@@ -1457,7 +1490,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
bool status;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1529,7 +1562,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1643,7 +1676,7 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1733,7 +1766,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1812,7 +1845,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1870,7 +1903,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1955,7 +1988,7 @@ Status MySQLMetaImpl::CleanUp() {
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
TableSchema table_schema;
table_schema.table_id_ = table_id;
......
......@@ -26,14 +26,19 @@ class MySQLMetaImpl : public Meta {
MySQLMetaImpl(const DBMetaOptions &options_, const int &mode);
Status CreateTable(TableSchema &table_schema) override;
Status DescribeTable(TableSchema &group_info_) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override;
Status AllTables(std::vector<TableSchema> &table_schema_array) override;
Status DeleteTable(const std::string &table_id) override;
Status DeleteTableFiles(const std::string &table_id) override;
Status CreateTableFile(TableFileSchema &file_schema) override;
Status DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) override;
......@@ -45,6 +50,8 @@ class MySQLMetaImpl : public Meta {
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag);
Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
Status DropTableIndex(const std::string &table_id) override;
......
......@@ -34,24 +34,6 @@ Status HandleException(const std::string& desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
inline auto StoragePrototype(const std::string &path) {
......@@ -62,6 +44,7 @@ inline auto StoragePrototype(const std::string &path) {
make_column("state", &TableSchema::state_),
make_column("dimension", &TableSchema::dimension_),
make_column("created_on", &TableSchema::created_on_),
make_column("flag", &TableSchema::flag_, default_value(0)),
make_column("engine_type", &TableSchema::engine_type_),
make_column("nlist", &TableSchema::nlist_),
make_column("index_file_size", &TableSchema::index_file_size_),
......@@ -170,7 +153,7 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -212,7 +195,7 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -236,7 +219,7 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -261,12 +244,13 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_,
&TableSchema::dimension_,
&TableSchema::created_on_,
&TableSchema::flag_,
&TableSchema::engine_type_,
&TableSchema::nlist_,
&TableSchema::index_file_size_,
......@@ -279,10 +263,11 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
table_schema.state_ = std::get<1>(groups[0]);
table_schema.dimension_ = std::get<2>(groups[0]);
table_schema.created_on_ = std::get<3>(groups[0]);
table_schema.engine_type_ = std::get<4>(groups[0]);
table_schema.nlist_ = std::get<5>(groups[0]);
table_schema.index_file_size_ = std::get<6>(groups[0]);
table_schema.metric_type_ = std::get<7>(groups[0]);
table_schema.flag_ = std::get<4>(groups[0]);
table_schema.engine_type_ = std::get<5>(groups[0]);
table_schema.nlist_ = std::get<6>(groups[0]);
table_schema.index_file_size_ = std::get<7>(groups[0]);
table_schema.metric_type_ = std::get<8>(groups[0]);
} else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
......@@ -350,7 +335,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -358,7 +343,8 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_,
&TableSchema::dimension_,
&TableSchema::created_on_),
&TableSchema::created_on_,
&TableSchema::flag_),
where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
......@@ -369,6 +355,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
table_schema.state_ = std::get<1>(tables[0]);
table_schema.dimension_ = std::get<2>(tables[0]);
table_schema.created_on_ = std::get<3>(tables[0]);
table_schema.flag_ = std::get<4>(tables[0]);
table_schema.engine_type_ = index.engine_type_;
table_schema.nlist_ = index.nlist_;
table_schema.index_file_size_ = index.index_file_size_*ONE_MB;
......@@ -394,12 +381,34 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
std::string msg = "Encounter exception when update table index: table_id = " + table_id;
return HandleException(msg, e);
}
return Status::OK();
}
Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
server::MetricCollector metric;
//set all backup file to raw
ConnectorPtr->update_all(
set(
c(&TableSchema::flag_) = flag
),
where(
c(&TableSchema::table_id_) == table_id
));
} catch (std::exception &e) {
std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
return HandleException(msg, e);
}
return Status::OK();
}
Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
&TableSchema::nlist_,
......@@ -426,7 +435,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -464,7 +473,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not = false;
try {
MetricCollector metric;
server::MetricCollector metric;
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
......@@ -483,12 +492,13 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_,
&TableSchema::dimension_,
&TableSchema::created_on_,
&TableSchema::flag_,
&TableSchema::engine_type_,
&TableSchema::nlist_,
&TableSchema::index_file_size_,
......@@ -498,12 +508,13 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
TableSchema schema;
schema.id_ = std::get<0>(table);
schema.table_id_ = std::get<1>(table);
schema.created_on_ = std::get<2>(table);
schema.dimension_ = std::get<3>(table);
schema.engine_type_ = std::get<4>(table);
schema.nlist_ = std::get<5>(table);
schema.index_file_size_ = std::get<6>(table);
schema.metric_type_ = std::get<7>(table);
schema.dimension_ = std::get<2>(table);
schema.created_on_ = std::get<3>(table);
schema.flag_ = std::get<4>(table);
schema.engine_type_ = std::get<5>(table);
schema.nlist_ = std::get<6>(table);
schema.index_file_size_ = std::get<7>(table);
schema.metric_type_ = std::get<8>(table);
table_schema_array.emplace_back(schema);
}
......@@ -527,7 +538,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -537,6 +548,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
file_schema.nlist_ = table_schema.nlist_;
file_schema.index_file_size_ = table_schema.index_file_size_;
file_schema.metric_type_ = table_schema.metric_type_;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
......@@ -558,7 +570,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
......@@ -597,8 +609,9 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
}
groups[table_file.table_id_] = table_schema;
}
table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
table_file.nlist_ = groups[table_file.table_id_].nlist_;
table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
table_file.dimension_ = groups[table_file.table_id_].dimension_;
files.push_back(table_file);
}
......@@ -616,7 +629,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
if (partition.empty()) {
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
......@@ -692,9 +705,11 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.metric_type_ = table_schema.metric_type_;
table_file.nlist_ = table_schema.nlist_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.metric_type_ = table_schema.metric_type_;
table_file.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
......@@ -716,7 +731,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
files.clear();
MetricCollector metric;
server::MetricCollector metric;
try {
auto select_columns = columns(&TableFileSchema::id_,
......@@ -771,8 +786,10 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
table_file.metric_type_ = table_schema.metric_type_;
table_file.nlist_ = table_schema.nlist_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.metric_type_ = table_schema.metric_type_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
......@@ -793,7 +810,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -831,8 +848,10 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
table_file.date_ = std::get<6>(file);
table_file.created_on_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
table_file.metric_type_ = table_schema.metric_type_;
table_file.nlist_ = table_schema.nlist_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.metric_type_ = table_schema.metric_type_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
......@@ -881,8 +900,9 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
file_schema.row_count_ = std::get<4>(file);
file_schema.date_ = std::get<5>(file);
file_schema.engine_type_ = std::get<6>(file);
file_schema.metric_type_ = table_schema.metric_type_;
file_schema.nlist_ = table_schema.nlist_;
file_schema.index_file_size_ = table_schema.index_file_size_;
file_schema.metric_type_ = table_schema.metric_type_;
file_schema.created_on_ = std::get<7>(file);
file_schema.dimension_ = table_schema.dimension_;
......@@ -967,7 +987,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1024,7 +1044,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1050,7 +1070,7 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1072,7 +1092,7 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1121,7 +1141,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1165,7 +1185,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1195,7 +1215,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
for(auto& table_id : table_ids) {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
......@@ -1214,7 +1234,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status SqliteMetaImpl::CleanUp() {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1245,7 +1265,7 @@ Status SqliteMetaImpl::CleanUp() {
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
......
......@@ -21,82 +21,64 @@ class SqliteMetaImpl : public Meta {
public:
explicit SqliteMetaImpl(const DBMetaOptions &options_);
Status
CreateTable(TableSchema &table_schema) override;
Status CreateTable(TableSchema &table_schema) override;
Status
DescribeTable(TableSchema &group_info_) override;
Status DescribeTable(TableSchema &group_info_) override;
Status
HasTable(const std::string &table_id, bool &has_or_not) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override;
Status
AllTables(std::vector<TableSchema> &table_schema_array) override;
Status AllTables(std::vector<TableSchema> &table_schema_array) override;
Status
DeleteTable(const std::string &table_id) override;
Status DeleteTable(const std::string &table_id) override;
Status
DeleteTableFiles(const std::string &table_id) override;
Status DeleteTableFiles(const std::string &table_id) override;
Status
CreateTableFile(TableFileSchema &file_schema) override;
Status CreateTableFile(TableFileSchema &file_schema) override;
Status
DropPartitionsByDates(const std::string &table_id, const DatesT &dates) override;
Status DropPartitionsByDates(const std::string &table_id, const DatesT &dates) override;
Status
GetTableFiles(const std::string &table_id, const std::vector<size_t> &ids, TableFilesSchema &table_files) override;
Status GetTableFiles(const std::string &table_id,
const std::vector<size_t> &ids,
TableFilesSchema &table_files) override;
Status HasNonIndexFiles(const std::string &table_id, bool &has) override;
Status
HasNonIndexFiles(const std::string &table_id, bool &has) override;
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status
UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag) override;
Status
DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
Status
DropTableIndex(const std::string &table_id) override;
Status DropTableIndex(const std::string &table_id) override;
Status
UpdateTableFilesToIndex(const std::string &table_id) override;
Status UpdateTableFilesToIndex(const std::string &table_id) override;
Status
UpdateTableFile(TableFileSchema &file_schema) override;
Status UpdateTableFile(TableFileSchema &file_schema) override;
Status
UpdateTableFiles(TableFilesSchema &files) override;
Status UpdateTableFiles(TableFilesSchema &files) override;
Status
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) override;
Status FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) override;
Status FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids,
const DatesT &partition,
DatePartionedTableFilesSchema &files) override;
Status
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override;
Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override;
Status
FilesToIndex(TableFilesSchema &) override;
Status FilesToIndex(TableFilesSchema &) override;
Status
Archive() override;
Status Archive() override;
Status
Size(uint64_t &result) override;
Status Size(uint64_t &result) override;
Status
CleanUp() override;
Status CleanUp() override;
Status
CleanUpFilesWithTTL(uint16_t seconds) override;
Status CleanUpFilesWithTTL(uint16_t seconds) override;
Status
DropAll() override;
Status DropAll() override;
Status Count(const std::string &table_id, uint64_t &result) override;
......
......@@ -16,7 +16,7 @@ namespace engine {
namespace {
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000;
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 1000000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
bool NeedParallelReduce(uint64_t nq, uint64_t topk) {
......@@ -59,23 +59,6 @@ void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t
}
}
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
}
SearchTask::SearchTask()
......@@ -92,7 +75,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectSearchTaskMetrics metrics(file_type_);
bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2);
......@@ -137,10 +120,6 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(file_type_, total_time);
rc.ElapseFromBegin("totally cost");
return nullptr;
......
......@@ -6,6 +6,7 @@
#pragma once
#include "MetricBase.h"
#include "db/meta/MetaTypes.h"
namespace zilliz {
......@@ -29,6 +30,233 @@ class Metrics {
static MetricsBase &CreateMetricsCollector();
};
class CollectInsertMetrics {
public:
CollectInsertMetrics(size_t n, engine::Status& status) : n_(n), status_(status) {
start_time_ = METRICS_NOW_TIME;
}
~CollectInsertMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
double avg_time = total_time / n_;
for (int i = 0; i < n_; ++i) {
Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (status_.ok()) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_);
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
engine::Status& status_;
};
class CollectQueryMetrics {
public:
CollectQueryMetrics(size_t nq) : nq_(nq) {
start_time_ = METRICS_NOW_TIME;
}
~CollectQueryMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
for (int i = 0; i < nq_; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq_;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t nq_;
};
class CollectMergeFilesMetrics {
public:
CollectMergeFilesMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectMergeFilesMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectBuildIndexMetrics {
public:
CollectBuildIndexMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectBuildIndexMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectExecutionEngineMetrics {
public:
CollectExecutionEngineMetrics(double physical_size) : physical_size_(physical_size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectExecutionEngineMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size_);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size_ / double(total_time));
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
double physical_size_;
};
class CollectSerializeMetrics {
public:
CollectSerializeMetrics(size_t size) : size_(size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSerializeMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size_ / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t size_;
};
class CollectAddMetrics {
public:
CollectAddMetrics(size_t n, uint16_t dimension) : n_(n), dimension_(dimension) {
start_time_ = METRICS_NOW_TIME;
}
~CollectAddMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(dimension_),
total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
uint16_t dimension_;
};
class CollectDurationMetrics {
public:
CollectDurationMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectDurationMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch (index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class CollectSearchTaskMetrics {
public:
CollectSearchTaskMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSearchTaskMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch(index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
}
......
......@@ -81,24 +81,6 @@ CollectFileMetrics(int file_type, size_t file_size) {
}
}
void
CollectDurationMetrics(int index_type, double total_time) {
switch (index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
......@@ -161,7 +143,7 @@ XSearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectDurationMetrics metrics(index_type_);
std::vector<long> output_ids;
std::vector<float> output_distence;
......@@ -204,10 +186,6 @@ XSearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
rc.ElapseFromBegin("totally cost");
// release index in resource
......
......@@ -236,7 +236,6 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::vector<std::pair<int64_t, RowRecord>> search_record_array;
{//insert vectors
std::vector<int64_t> record_ids;
for (int i = 0; i < ADD_VECTOR_LOOP; i++) {//add vectors
std::vector<RowRecord> record_array;
int64_t begin_index = i * BATCH_ROW_COUNT;
......@@ -249,6 +248,12 @@ ClientTest::Test(const std::string& address, const std::string& port) {
}
#endif
std::vector<int64_t> record_ids;
//generate user defined ids
for(int k = 0; k < BATCH_ROW_COUNT; k++) {
record_ids.push_back(i*BATCH_ROW_COUNT+k);
}
auto start = std::chrono::high_resolution_clock::now();
Status stat = conn->Insert(TABLE_NAME, record_array, record_ids);
......
......@@ -27,19 +27,6 @@ DBWrapper::DBWrapper() {
std::string db_slave_path = db_config.GetValue(CONFIG_DB_SLAVE_PATH);
StringHelpFunctions::SplitStringByDelimeter(db_slave_path, ";", opt.meta.slave_paths);
int64_t index_size = db_config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
int64_t insert_buffer_size = db_config.GetInt64Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4);
if (insert_buffer_size >= 1) {
opt.insert_buffer_size = insert_buffer_size * engine::ONE_GB;
}
else {
std::cout << "ERROR: insert_buffer_size should be at least 1 GB" << std::endl;
kill(0, SIGUSR1);
}
// cache config
ConfigNode& cache_config = ServerConfig::GetInstance().GetConfig(CONFIG_CACHE);
opt.insert_cache_immediately_ = cache_config.GetBoolValue(CONFIG_INSERT_CACHE_IMMEDIATELY, false);
......@@ -74,12 +61,6 @@ DBWrapper::DBWrapper() {
}
}
std::string metric_type = engine_config.GetValue(CONFIG_METRICTYPE, "L2");
if(metric_type != "L2" && metric_type != "IP") {
std::cout << "ERROR! Illegal metric type: " << metric_type << ", available options: L2 or IP" << std::endl;
kill(0, SIGUSR1);
}
//set archive config
engine::ArchiveConf::CriteriaT criterial;
int64_t disk = db_config.GetInt64Value(CONFIG_DB_ARCHIVE_DISK, 0);
......
......@@ -79,19 +79,6 @@ ServerError ServerConfig::ValidateConfig() const {
return SERVER_INVALID_ARGUMENT;
}
uint64_t index_building_threshold = (uint64_t)db_config.GetInt32Value(CONFIG_DB_INDEX_TRIGGER_SIZE, 1024);
index_building_threshold *= MB;
size_t gpu_mem = 0;
ValidationUtil::GetGpuMemory(gpu_index, gpu_mem);
if(index_building_threshold >= gpu_mem) {
std::cout << "Error: index_building_threshold execeed gpu memory" << std::endl;
return SERVER_INVALID_ARGUMENT;
} else if(index_building_threshold >= gpu_mem/3) {
std::cout << "Warnning: index_building_threshold is greater than 1/3 of gpu memory, "
<< "some index type(such as IVFLAT) may cause cuda::bad_alloc() error" << std::endl;
}
//cache config validation
ConfigNode cache_config = GetConfig(CONFIG_CACHE);
uint64_t cache_cap = (uint64_t)cache_config.GetInt64Value(CONFIG_CPU_CACHE_CAPACITY, 16);
......
......@@ -17,7 +17,6 @@ namespace server {
static const char* CONFIG_SERVER = "server_config";
static const char* CONFIG_SERVER_ADDRESS = "address";
static const char* CONFIG_SERVER_PORT = "port";
static const char* CONFIG_SERVER_PROTOCOL = "transfer_protocol";
static const char* CONFIG_CLUSTER_MODE = "mode";
static const char* CONFIG_GPU_INDEX = "gpu_index";
......@@ -25,7 +24,6 @@ static const char* CONFIG_DB = "db_config";
static const char* CONFIG_DB_URL = "db_backend_url";
static const char* CONFIG_DB_PATH = "db_path";
static const char* CONFIG_DB_SLAVE_PATH = "db_slave_path";
static const char* CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const char* CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const char* CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
static const char* CONFIG_DB_INSERT_BUFFER_SIZE = "insert_buffer_size";
......@@ -41,9 +39,6 @@ static const char* CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately";
static const char* CONFIG_GPU_IDS = "gpu_ids";
static const char *GPU_CACHE_FREE_PERCENT = "gpu_cache_free_percent";
static const char* CONFIG_LICENSE = "license_config";
static const char* CONFIG_LICENSE_PATH = "license_path";
static const char* CONFIG_METRIC = "metric_config";
static const char* CONFIG_METRIC_IS_STARTUP = "is_startup";
static const char* CONFIG_METRIC_COLLECTOR = "collector";
......@@ -51,13 +46,8 @@ static const char* CONFIG_PROMETHEUS = "prometheus_config";
static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port";
static const std::string CONFIG_ENGINE = "engine_config";
static const std::string CONFIG_NPROBE = "nprobe";
static const std::string CONFIG_NLIST = "nlist";
static const std::string CONFIG_DCBT = "use_blas_threshold";
static const std::string CONFIG_METRICTYPE = "metric_type";
static const std::string CONFIG_OMP_THREAD_NUM = "omp_thread_num";
static const std::string CONFIG_USE_HYBRID_INDEX = "use_hybrid_index";
static const std::string CONFIG_HYBRID_INDEX_GPU = "hybrid_index_gpu";
static const char* CONFIG_RESOURCE = "resource_config";
static const char* CONFIG_RESOURCES = "resources";
......
......@@ -12,9 +12,12 @@
#include "../DBWrapper.h"
#include "version.h"
#include "GrpcMilvusServer.h"
#include "db/Utils.h"
#include "src/server/Server.h"
#include <string.h>
namespace zilliz {
namespace milvus {
namespace server {
......@@ -435,6 +438,23 @@ InsertTask::OnExecute() {
}
}
//all user provide id, or all internal id
uint64_t row_count = 0;
DBWrapper::DB()->GetTableRowCount(table_info.table_id_, row_count);
bool empty_table = (row_count == 0);
bool user_provide_ids = !insert_param_.row_id_array().empty();
if(!empty_table) {
//user already provided id before, all insert action require user id
if(engine::utils::UserDefinedId(table_info.flag_) && !user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch");
}
//user didn't provided id before, no need to provide user id
if(!engine::utils::UserDefinedId(table_info.flag_) && user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch");
}
}
rc.RecordSection("check validation");
#ifdef MILVUS_ENABLE_PROFILING
......@@ -469,8 +489,10 @@ InsertTask::OnExecute() {
//step 4: insert vectors
auto vec_count = (uint64_t) insert_param_.row_record_array_size();
std::vector<int64_t> vec_ids(insert_param_.row_id_array_size(), 0);
for (auto i = 0; i < insert_param_.row_id_array_size(); i++) {
vec_ids[i] = insert_param_.row_id_array(i);
if(!insert_param_.row_id_array().empty()) {
const int64_t* src_data = insert_param_.row_id_array().data();
int64_t* target_data = vec_ids.data();
memcpy(target_data, src_data, (size_t)(sizeof(int64_t)*insert_param_.row_id_array_size()));
}
stat = DBWrapper::DB()->InsertVectors(insert_param_.table_name(), vec_count, vec_f.data(), vec_ids);
......@@ -489,6 +511,12 @@ InsertTask::OnExecute() {
return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
}
//step 5: update table flag
if(empty_table && user_provide_ids) {
stat = DBWrapper::DB()->UpdateTableFlag(insert_param_.table_name(),
table_info.flag_ | engine::meta::FLAG_MASK_USERID);
}
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop();
#endif
......
......@@ -21,7 +21,8 @@ TEST(MetricbaseTest, METRICBASE_TEST){
instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0);
instance.CpuCacheUsageGaugeSet(1.0);
instance.GpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
......@@ -119,4 +119,33 @@ TEST_F(MetricTest, Metric_Tes) {
delete [] qxb;
};
TEST_F(MetricTest, Collector_Metrics_Test){
engine::Status status = engine::Status::OK();
server::CollectInsertMetrics insert_metrics0(0, status);
status = engine::Status::Error("error");
server::CollectInsertMetrics insert_metrics1(0, status);
server::CollectQueryMetrics query_metrics(10);
server::CollectMergeFilesMetrics merge_metrics();
server::CollectBuildIndexMetrics build_index_metrics();
server::CollectExecutionEngineMetrics execution_metrics(10);
server::CollectSerializeMetrics serialize_metrics(10);
server::CollectAddMetrics add_metrics(10, 128);
server::CollectDurationMetrics duration_metrics_raw(engine::meta::TableFileSchema::RAW);
server::CollectDurationMetrics duration_metrics_index(engine::meta::TableFileSchema::TO_INDEX);
server::CollectDurationMetrics duration_metrics_delete(engine::meta::TableFileSchema::TO_DELETE);
server::CollectSearchTaskMetrics search_metrics_raw(engine::meta::TableFileSchema::RAW);
server::CollectSearchTaskMetrics search_metrics_index(engine::meta::TableFileSchema::TO_INDEX);
server::CollectSearchTaskMetrics search_metrics_delete(engine::meta::TableFileSchema::TO_DELETE);
server::MetricCollector metric_collector();
}
......@@ -22,7 +22,8 @@ TEST(PrometheusTest, PROMETHEUS_TEST){
instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0);
instance.CpuCacheUsageGaugeSet(1.0);
instance.GpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
......@@ -18,7 +18,6 @@ metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
......
......@@ -135,12 +135,6 @@ TEST(ConfigTest, SERVER_CONFIG_TEST) {
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
size_t index_building_threshold = (gpu_mem + 1*MB)/MB;
db_config.SetValue(server::CONFIG_DB_INDEX_TRIGGER_SIZE,
std::to_string(index_building_threshold));
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
insert_buffer_size = total_mem/GB + 2;
db_config.SetValue(server::CONFIG_DB_INSERT_BUFFER_SIZE, std::to_string(insert_buffer_size));
err = config.ValidateConfig();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册