提交 7f20eac0 编写于 作者: W wxyu

Merge branch 'branch-0.4.0' of 192.168.1.105:wxyu/milvus into branch-0.4.0


Former-commit-id: 369dd0e5daba8dd3c61c6c8c754a01169026dade
timeout(time: 10, unit: 'MINUTES') {
timeout(time: 25, unit: 'MINUTES') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
......
......@@ -40,12 +40,16 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-394 - Update scheduler unittest
- MS-400 - Add timestamp record in task state change function
- MS-402 - Add dump implementation for TaskTableItem
- MS-403 - Add GpuCacheMgr
- MS-404 - Release index after search task done avoid memory increment continues
- MS-405 - Add delete task support
- MS-408 - Add device_id in resource construct function
## New Feature
- MS-343 - Implement ResourceMgr
- MS-338 - NewAPI: refine code to support CreateIndex
- MS-339 - NewAPI: refine code to support DropIndex
- MS-340 - NewAPI: implement DescribeIndex
## Task
- MS-297 - disable mysql unit test
......@@ -70,6 +74,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-257 - Update bzip2 download url
- MS-288 - Update compile scripts
- MS-330 - Stability test failed caused by server core dumped
- MS-347 - Build index hangs again
- MS-382 - fix MySQLMetaImpl::CleanUpFilesWithTTL unknown column bug
## Improvement
- MS-156 - Add unittest for merge result functions
......@@ -98,6 +104,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-324 - Show error when there is not enough gpu memory to build index
- MS-328 - Check metric type on server start
- MS-332 - Set grpc and thrift server run concurrently
- MS-352 - Add hybrid index
## New Feature
- MS-180 - Add new mem manager
......@@ -157,8 +164,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-130 - Add prometheus_test
- MS-144 - Add nprobe config
- MS-147 - Enable IVF
- MS-130 - Add prometheus_test
## Task
- MS-74 - Change README.md in cpp
- MS-88 - Add support for arm architecture
......
......@@ -86,7 +86,7 @@ if [[ ! -d cmake_build ]]; then
fi
cd cmake_build
git
CUDA_COMPILER=/usr/local/cuda/bin/nvcc
if [[ ${MAKE_CLEAN} == "ON" ]]; then
......
# Define a function that check last file modification
function(Check_Last_Modify cache_ignore_file_path working_dir last_modified_commit_id)
function(Check_Last_Modify cache_check_lists_file_path working_dir last_modified_commit_id)
if(EXISTS "${working_dir}")
if(EXISTS "${cache_ignore_file_path}")
if(EXISTS "${cache_check_lists_file_path}")
set(GIT_LOG_SKIP_NUM 0)
set(_MATCH_ALL ON CACHE BOOL "Match all")
set(_LOOP_STATUS ON CACHE BOOL "Whether out of loop")
file(STRINGS ${cache_ignore_file_path} CACHE_IGNORE_TXT)
file(STRINGS ${cache_check_lists_file_path} CACHE_IGNORE_TXT)
while(_LOOP_STATUS)
foreach(_IGNORE_ENTRY ${CACHE_IGNORE_TXT})
if(NOT _IGNORE_ENTRY MATCHES "^[^#]+")
......
......@@ -157,7 +157,6 @@ if (UNIX)
endif (APPLE)
endif (UNIX)
# ----------------------------------------------------------------------
# thirdparty directory
set(THIRDPARTY_DIR "${MILVUS_SOURCE_DIR}/thirdparty")
......@@ -167,7 +166,7 @@ set(THIRDPARTY_DIR "${MILVUS_SOURCE_DIR}/thirdparty")
if(NOT DEFINED USE_JFROG_CACHE)
set(USE_JFROG_CACHE "OFF")
endif()
if(USE_JFROG_CACHE STREQUAL "ON")
if(USE_JFROG_CACHE STREQUAL "ON")
set(JFROG_ARTFACTORY_CACHE_URL "http://192.168.1.201:80/artifactory/generic-local/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${MILVUS_BUILD_ARCH}/${BUILD_TYPE}")
set(JFROG_USER_NAME "test")
set(JFROG_PASSWORD "Fantast1c")
......@@ -308,9 +307,11 @@ set(EASYLOGGINGPP_MD5 "b78cd319db4be9b639927657b8aa7732")
if(DEFINED ENV{MILVUS_FAISS_URL})
set(FAISS_SOURCE_URL "$ENV{MILVUS_FAISS_URL}")
else()
set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz")
set(FAISS_SOURCE_URL "http://192.168.1.105:6060/jinhai/faiss/-/archive/${FAISS_VERSION}/faiss-${FAISS_VERSION}.tar.gz")
# set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz")
endif()
set(FAISS_MD5 "0bc12737b23def156f6a1eb782050135")
set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
if(DEFINED ENV{MILVUS_KNOWHERE_URL})
set(KNOWHERE_SOURCE_URL "$ENV{MILVUS_KNOWHERE_URL}")
......@@ -462,6 +463,7 @@ else()
endif()
set(GRPC_MD5 "7ec59ad54c85a12dcbbfede09bf413a9")
# ----------------------------------------------------------------------
# ARROW
......@@ -686,7 +688,7 @@ macro(build_bzip2)
set(BZIP2_STATIC_LIB
"${BZIP2_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}bz2${CMAKE_STATIC_LIBRARY_SUFFIX}")
if(USE_JFROG_CACHE STREQUAL "ON")
if(USE_JFROG_CACHE STREQUAL "ON")
set(BZIP2_CACHE_PACKAGE_NAME "bzip2_${BZIP2_MD5}.tar.gz")
set(BZIP2_CACHE_URL "${JFROG_ARTFACTORY_CACHE_URL}/${BZIP2_CACHE_PACKAGE_NAME}")
set(BZIP2_CACHE_PACKAGE_PATH "${THIRDPARTY_PACKAGE_CACHE}/${BZIP2_CACHE_PACKAGE_NAME}")
......@@ -1184,7 +1186,7 @@ macro(build_faiss)
INTERFACE_INCLUDE_DIRECTORIES "${FAISS_INCLUDE_DIR}"
INTERFACE_LINK_LIBRARIES "openblas;lapack" )
endif()
add_dependencies(faiss faiss_ep)
if(${BUILD_FAISS_WITH_MKL} STREQUAL "OFF")
......@@ -1321,7 +1323,7 @@ if (MILVUS_BUILD_TESTS)
if(NOT GTEST_VENDORED)
endif()
get_target_property(GTEST_INCLUDE_DIR gtest INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM "${GTEST_PREFIX}/lib")
include_directories(SYSTEM ${GTEST_INCLUDE_DIR})
......@@ -1828,7 +1830,7 @@ endmacro()
if(MILVUS_WITH_SNAPPY)
resolve_dependency(Snappy)
get_target_property(SNAPPY_INCLUDE_DIRS snappy INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM ${SNAPPY_PREFIX}/lib/)
include_directories(SYSTEM ${SNAPPY_INCLUDE_DIRS})
......@@ -2131,7 +2133,7 @@ endmacro()
if(MILVUS_WITH_YAMLCPP)
resolve_dependency(yaml-cpp)
get_target_property(YAMLCPP_INCLUDE_DIR yaml-cpp INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM ${YAMLCPP_PREFIX}/lib/)
include_directories(SYSTEM ${YAMLCPP_INCLUDE_DIR})
......@@ -2203,7 +2205,7 @@ endmacro()
if(MILVUS_WITH_ZLIB)
resolve_dependency(ZLIB)
get_target_property(ZLIB_INCLUDE_DIR zlib INTERFACE_INCLUDE_DIRECTORIES)
include_directories(SYSTEM ${ZLIB_INCLUDE_DIR})
endif()
......@@ -2301,7 +2303,7 @@ endmacro()
if(MILVUS_WITH_ZSTD)
resolve_dependency(ZSTD)
get_target_property(ZSTD_INCLUDE_DIR zstd INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM ${ZSTD_PREFIX}/lib)
include_directories(SYSTEM ${ZSTD_INCLUDE_DIR})
......@@ -2406,7 +2408,7 @@ endmacro()
if(MILVUS_WITH_AWS)
resolve_dependency(AWS)
link_directories(SYSTEM ${AWS_PREFIX}/lib)
get_target_property(AWS_CPP_SDK_S3_INCLUDE_DIR aws-cpp-sdk-s3 INTERFACE_INCLUDE_DIRECTORIES)
......
......@@ -36,8 +36,11 @@ license_config: # license configure
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
gpu_ids: 0,1 # gpu id
engine_config:
nprobe: 10
......
......@@ -46,7 +46,7 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) {
return cache_->get(key);
}
engine::Index_ptr CacheMgr::GetIndex(const std::string& key) {
engine::VecIndexPtr CacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
......@@ -65,7 +65,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) {
void CacheMgr::InsertItem(const std::string& key, const engine::VecIndexPtr& index) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......
......@@ -19,10 +19,10 @@ public:
virtual bool ItemExists(const std::string& key);
virtual DataObjPtr GetItem(const std::string& key);
virtual engine::Index_ptr GetIndex(const std::string& key);
virtual engine::VecIndexPtr GetIndex(const std::string& key);
virtual void InsertItem(const std::string& key, const DataObjPtr& data);
virtual void InsertItem(const std::string& key, const engine::Index_ptr& index);
virtual void InsertItem(const std::string& key, const engine::VecIndexPtr& index);
virtual void EraseItem(const std::string& key);
......
......@@ -16,6 +16,7 @@ private:
CpuCacheMgr();
public:
//TODO: use smart pointer instead
static CacheMgr* GetInstance() {
static CpuCacheMgr s_mgr;
return &s_mgr;
......
......@@ -6,7 +6,7 @@
#pragma once
#include "wrapper/Index.h"
#include "wrapper/knowhere/vec_index.h"
#include <memory>
......@@ -16,17 +16,17 @@ namespace cache {
class DataObj {
public:
DataObj(const engine::Index_ptr& index)
DataObj(const engine::VecIndexPtr& index)
: index_(index)
{}
DataObj(const engine::Index_ptr& index, int64_t size)
DataObj(const engine::VecIndexPtr& index, int64_t size)
: index_(index),
size_(size)
{}
engine::Index_ptr data() { return index_; }
const engine::Index_ptr& data() const { return index_; }
engine::VecIndexPtr data() { return index_; }
const engine::VecIndexPtr& data() const { return index_; }
int64_t size() const {
if(index_ == nullptr) {
......@@ -41,7 +41,7 @@ public:
}
private:
engine::Index_ptr index_ = nullptr;
engine::VecIndexPtr index_ = nullptr;
int64_t size_ = 0;
};
......
......@@ -4,6 +4,8 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <sstream>
#include "utils/Log.h"
#include "GpuCacheMgr.h"
#include "server/ServerConfig.h"
......@@ -11,19 +13,62 @@ namespace zilliz {
namespace milvus {
namespace cache {
std::mutex GpuCacheMgr::mutex_;
std::unordered_map<uint64_t, GpuCacheMgrPtr> GpuCacheMgr::instance_;
namespace {
constexpr int64_t unit = 1024 * 1024 * 1024;
std::vector<uint64_t> load() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
std::string gpu_ids_str = config.GetValue(server::CONFIG_GPU_IDS, "0,1");
std::vector<uint64_t > gpu_ids;
std::stringstream ss(gpu_ids_str);
for (int i; ss >> i;) {
gpu_ids.push_back(i);
if (ss.peek() == ',') {
ss.ignore();
}
}
return gpu_ids;
}
}
bool GpuCacheMgr::GpuIdInConfig(uint64_t gpu_id) {
static std::vector<uint64_t > ids = load();
for (auto id : ids) {
if (gpu_id == id) return true;
}
return false;
}
GpuCacheMgr::GpuCacheMgr() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 1);
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 2);
cap *= unit;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::GPU_CACHE_FREE_PERCENT, 0.85);
if (free_percent > 0.0 && free_percent <= 1.0) {
cache_->set_freemem_percent(free_percent);
} else {
SERVER_LOG_ERROR << "Invalid gpu_cache_free_percent: " << free_percent <<
", defaultly set to " << cache_->freemem_percent();
}
}
void GpuCacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
//TODO: copy data to gpu
if (cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->insert(key, data);
}
}
......
......@@ -5,22 +5,41 @@
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
#include <unordered_map>
#include <memory>
namespace zilliz {
namespace milvus {
namespace cache {
class GpuCacheMgr;
using GpuCacheMgrPtr = std::shared_ptr<GpuCacheMgr>;
class GpuCacheMgr : public CacheMgr {
private:
public:
GpuCacheMgr();
public:
static CacheMgr* GetInstance() {
static GpuCacheMgr s_mgr;
return &s_mgr;
static bool GpuIdInConfig(uint64_t gpu_id);
static CacheMgr* GetInstance(uint64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
if (GpuIdInConfig(gpu_id)) {
instance_.insert(std::pair<uint64_t, GpuCacheMgrPtr>(gpu_id, std::make_shared<GpuCacheMgr>()));
} else {
return nullptr;
}
}
}
return instance_[gpu_id].get();
}
void InsertItem(const std::string& key, const DataObjPtr& data) override;
private:
static std::mutex mutex_;
static std::unordered_map<uint64_t, GpuCacheMgrPtr> instance_;
};
}
......
......@@ -73,19 +73,19 @@ YamlConfigMgr::SetChildConfig(const YAML::Node& node,
return false;
}
bool
YamlConfigMgr::SetSequence(const YAML::Node &node,
const std::string &child_name,
ConfigNode &config) {
if(node[child_name].IsDefined ()) {
size_t cnt = node[child_name].size();
for(size_t i = 0; i < cnt; i++){
config.AddSequenceItem(child_name, node[child_name][i].as<std::string>());
}
return true;
}
return false;
}
//bool
//YamlConfigMgr::SetSequence(const YAML::Node &node,
// const std::string &child_name,
// ConfigNode &config) {
// if(node[child_name].IsDefined ()) {
// size_t cnt = node[child_name].size();
// for(size_t i = 0; i < cnt; i++){
// config.AddSequenceItem(child_name, node[child_name][i].as<std::string>());
// }
// return true;
// }
// return false;
//}
void
YamlConfigMgr::LoadConfigNode(const YAML::Node& node, ConfigNode& config) {
......@@ -98,8 +98,8 @@ YamlConfigMgr::LoadConfigNode(const YAML::Node& node, ConfigNode& config) {
SetConfigValue(node, key, config);
} else if(node[key].IsMap()){
SetChildConfig(node, key, config);
} else if(node[key].IsSequence()){
SetSequence(node, key, config);
// } else if(node[key].IsSequence()){
// SetSequence(node, key, config);
}
}
}
......
......@@ -33,10 +33,10 @@ class YamlConfigMgr : public IConfigMgr {
const std::string &name,
ConfigNode &config);
bool
SetSequence(const YAML::Node &node,
const std::string &child_name,
ConfigNode &config);
// bool
// SetSequence(const YAML::Node &node,
// const std::string &child_name,
// ConfigNode &config);
void LoadConfigNode(const YAML::Node& node, ConfigNode& config);
......
......@@ -5,19 +5,25 @@
******************************************************************************/
#pragma once
#include <stdint.h>
namespace zilliz {
namespace milvus {
namespace engine {
constexpr size_t K = 1024UL;
constexpr size_t M = K * K;
constexpr size_t G = K * M;
constexpr size_t T = K * G;
constexpr uint64_t K = 1024UL;
constexpr uint64_t M = K * K;
constexpr uint64_t G = K * M;
constexpr uint64_t T = K * G;
constexpr size_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr uint64_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr int VECTOR_TYPE_SIZE = sizeof(float);
static constexpr uint64_t ONE_KB = K;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -46,6 +46,9 @@ public:
virtual Status Size(uint64_t& result) = 0;
virtual Status BuildIndex(const std::string& table_id) = 0;
virtual Status CreateIndex(const std::string& table_id, const TableIndex& index) = 0;
virtual Status DescribeIndex(const std::string& table_id, TableIndex& index) = 0;
virtual Status DropIndex(const std::string& table_id) = 0;
virtual Status DropAll() = 0;
......
......@@ -6,6 +6,7 @@
#include "DBImpl.h"
#include "src/db/meta/SqliteMetaImpl.h"
#include "Log.h"
#include "Utils.h"
#include "engine/EngineFactory.h"
#include "Factories.h"
#include "metrics/Metrics.h"
......@@ -59,25 +60,6 @@ void CollectQueryMetrics(double total_time, size_t nq) {
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
switch(file_type) {
case meta::TableFileSchema::RAW:
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
}
}
......@@ -104,13 +86,18 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date
//dates partly delete files of the table but currently we don't support
ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
if (dates.empty()) {
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
scheduler.Schedule(context);
} else {
meta_ptr_->DropPartitionsByDates(table_id, dates);
}
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
scheduler.Schedule(context);
return Status::OK();
}
......@@ -143,7 +130,7 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
for(auto &day_files : files) {
for (auto &file : day_files.second) {
ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
if(engine == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status::Error("Invalid engine type");
......@@ -204,7 +191,7 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
ENGINE_LOG_DEBUG << "Query by vectors";
ENGINE_LOG_DEBUG << "Query by vectors " << table_id;
//get all table files from table
meta::DatePartionedTableFilesSchema files;
......@@ -355,7 +342,7 @@ void DBImpl::StartMetricTask() {
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
uint64_t size;
Size(size);
server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
......@@ -424,7 +411,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
//step 2: merge files
ExecutionEnginePtr index =
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
(MetricType)table_file.metric_type_, table_file.nlist_);
meta::TableFilesSchema updated;
long index_size = 0;
......@@ -465,12 +453,9 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
}
//step 4: update table files state
if (index_size >= options_.index_trigger_size) {
table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
} else {
table_file.file_type_ = meta::TableFileSchema::RAW;
}
table_file.size_ = index_size;
table_file.file_type_ = meta::TableFileSchema::RAW;
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
updated.push_back(table_file);
status = meta_ptr_->UpdateTableFiles(updated);
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
......@@ -566,7 +551,7 @@ Status DBImpl::BuildIndex(const std::string& table_id) {
int times = 1;
while (has) {
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
ENGINE_LOG_DEBUG << "Non index files detected in " << table_id << "! Will build index " << times;
meta_ptr_->UpdateTableFilesToIndex(table_id);
/* StartBuildIndexTask(true); */
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
......@@ -574,11 +559,64 @@ Status DBImpl::BuildIndex(const std::string& table_id) {
times++;
}
return Status::OK();
/* return BuildIndexByTable(table_id); */
}
Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
{
std::unique_lock<std::mutex> lock(build_index_mutex_);
//step 1: check index difference
TableIndex old_index;
auto status = DescribeIndex(table_id, old_index);
if(!status.ok()) {
ENGINE_LOG_ERROR << "Failed to get table index info";
return status;
}
if(utils::IsSameIndex(old_index, index)) {
ENGINE_LOG_DEBUG << "Same index setting, no need to create index again";
return Status::OK();
}
//step 2: drop old index files
DropIndex(table_id);
//step 3: update index info
status = meta_ptr_->UpdateTableIndexParam(table_id, index);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to update table index info";
return status;
}
}
bool has = false;
auto status = meta_ptr_->HasNonIndexFiles(table_id, has);
int times = 1;
while (has) {
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
/* StartBuildIndexTask(true); */
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
status = meta_ptr_->HasNonIndexFiles(table_id, has);
times++;
}
return Status::OK();
}
Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
return meta_ptr_->DescribeTableIndex(table_id, index);
}
Status DBImpl::DropIndex(const std::string& table_id) {
return meta_ptr_->DropTableIndex(table_id);
}
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
ExecutionEnginePtr to_index =
EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
(MetricType)file.metric_type_, file.nlist_);
if(to_index == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status::Error("Invalid engine type");
......@@ -650,26 +688,27 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
//step 6: update meta
table_file.file_type_ = meta::TableFileSchema::INDEX;
table_file.size_ = index->Size();
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
auto to_remove = file;
to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
auto origin_file = file;
origin_file.file_type_ = meta::TableFileSchema::BACKUP;
meta::TableFilesSchema update_files = {table_file, to_remove};
meta::TableFilesSchema update_files = {table_file, origin_file};
status = meta_ptr_->UpdateTableFiles(update_files);
if(status.ok()) {
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
<< index->PhysicalSize() << " bytes"
<< " from file " << to_remove.file_id_;
<< " from file " << origin_file.file_id_;
if(options_.insert_cache_immediately_) {
index->Cache();
}
} else {
//failed to update meta, mark the new file as to_delete, don't delete old file
to_remove.file_type_ = meta::TableFileSchema::TO_INDEX;
status = meta_ptr_->UpdateTableFile(to_remove);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << to_remove.file_id_ << " to to_index";
origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
status = meta_ptr_->UpdateTableFile(origin_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
......@@ -685,30 +724,6 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
return Status::OK();
}
Status DBImpl::BuildIndexByTable(const std::string& table_id) {
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
status = BuildIndex(file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
return status;
}
ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed";
if (shutting_down_.load(std::memory_order_acquire)){
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action for table " << table_id;
break;
}
}
return status;
}
void DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_TRACE << " Background build index thread start";
......
......@@ -93,6 +93,12 @@ class DBImpl : public DB {
Status BuildIndex(const std::string& table_id) override;
Status CreateIndex(const std::string& table_id, const TableIndex& index) override;
Status DescribeIndex(const std::string& table_id, TableIndex& index) override;
Status DropIndex(const std::string& table_id) override;
~DBImpl() override;
private:
......@@ -122,8 +128,6 @@ class DBImpl : public DB {
void StartBuildIndexTask(bool force=false);
void BackgroundBuildIndex();
Status
BuildIndexByTable(const std::string& table_id);
Status
BuildIndex(const meta::TableFileSchema &);
......
......@@ -90,11 +90,11 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
}
}
std::shared_ptr<DB> DBFactory::Build() {
auto options = OptionsFactory::Build();
auto db = DBFactory::Build(options);
return std::shared_ptr<DB>(db);
}
//std::shared_ptr<DB> DBFactory::Build() {
// auto options = OptionsFactory::Build();
// auto db = DBFactory::Build(options);
// return std::shared_ptr<DB>(db);
//}
DB* DBFactory::Build(const Options& options) {
return new DBImpl(options);
......
......@@ -33,7 +33,7 @@ struct DBMetaImplFactory {
};
struct DBFactory {
static std::shared_ptr<DB> Build();
//static std::shared_ptr<DB> Build();
static DB *Build(const Options &);
};
......
......@@ -5,6 +5,8 @@
******************************************************************************/
#pragma once
#include "Constants.h"
#include <string>
#include <memory>
#include <map>
......@@ -16,10 +18,6 @@ namespace engine {
class Env;
static constexpr uint64_t ONE_KB = 1024;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
static const char* ARCHIVE_CONF_DISK = "disk";
static const char* ARCHIVE_CONF_DAYS = "days";
......
......@@ -5,7 +5,10 @@
******************************************************************************/
#pragma once
#include "db/engine/ExecutionEngine.h"
#include <vector>
#include <stdint.h>
namespace zilliz {
namespace milvus {
......@@ -18,6 +21,12 @@ typedef std::vector<IDNumber> IDNumbers;
typedef std::vector<std::pair<IDNumber, double>> QueryResult;
typedef std::vector<QueryResult> QueryResults;
struct TableIndex {
int32_t engine_type_ = (int)EngineType::FAISS_IDMAP;
int32_t nlist_ = 16384;
int32_t index_file_size_ = 1024; //MB
int32_t metric_type_ = (int)MetricType::L2;
};
} // namespace engine
} // namespace milvus
......
......@@ -85,16 +85,20 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id
return Status::OK();
}
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string table_path = db_path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
for(auto& path : options.slave_paths) {
table_path = path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id, bool force) {
std::vector<std::string> paths = options.slave_paths;
paths.push_back(options.path);
for(auto& path : paths) {
std::string table_path = path + TABLES_FOLDER + table_id;
if(force) {
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
} else if(boost::filesystem::exists(table_path) &&
boost::filesystem::is_empty(table_path)) {
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
}
}
return Status::OK();
......@@ -142,6 +146,13 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
return Status::OK();
}
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
return index1.engine_type_ == index2.engine_type_
&& index1.nlist_ == index2.nlist_
&& index1.index_file_size_ == index2.index_file_size_
&& index1.metric_type_ == index2.metric_type_;
}
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -7,6 +7,7 @@
#include "Options.h"
#include "db/meta/MetaTypes.h"
#include "db/Types.h"
#include <string>
......@@ -18,12 +19,14 @@ namespace utils {
long GetMicroSecTimeStamp();
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id);
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id);
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id, bool force = true);
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2);
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -4,7 +4,6 @@
* Proprietary and confidential.
******************************************************************************/
#include "EngineFactory.h"
//#include "FaissExecutionEngine.h"
#include "ExecutionEngineImpl.h"
#include "db/Log.h"
......@@ -12,61 +11,25 @@ namespace zilliz {
namespace milvus {
namespace engine {
#if 0
ExecutionEnginePtr
EngineFactory::Build(uint16_t dimension,
const std::string &location,
EngineType type) {
EngineType index_type,
MetricType metric_type,
int32_t nlist) {
ExecutionEnginePtr execution_engine_ptr;
switch (type) {
case EngineType::FAISS_IDMAP: {
execution_engine_ptr =
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, BUILD_INDEX_TYPE_IDMAP, "IDMap,Flat"));
break;
}
case EngineType::FAISS_IVFFLAT_GPU: {
execution_engine_ptr =
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, BUILD_INDEX_TYPE_IVF, "IDMap,Flat"));
break;
}
case EngineType::FAISS_IVFSQ8: {
execution_engine_ptr =
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, BUILD_INDEX_TYPE_IVFSQ8, "IDMap,Flat"));
break;
}
default: {
ENGINE_LOG_ERROR << "Unsupported engine type";
return nullptr;
}
}
execution_engine_ptr->Init();
return execution_engine_ptr;
}
#else
ExecutionEnginePtr
EngineFactory::Build(uint16_t dimension,
const std::string &location,
EngineType type) {
if(type == EngineType::INVALID) {
if(index_type == EngineType::INVALID) {
ENGINE_LOG_ERROR << "Unsupported engine type";
return nullptr;
}
ENGINE_LOG_DEBUG << "EngineFactory EngineTypee: " << int(type);
ENGINE_LOG_DEBUG << "EngineFactory EngineTypee: " << (int)index_type;
ExecutionEnginePtr execution_engine_ptr =
std::make_shared<ExecutionEngineImpl>(dimension, location, type);
std::make_shared<ExecutionEngineImpl>(dimension, location, index_type, metric_type, nlist);
execution_engine_ptr->Init();
return execution_engine_ptr;
}
#endif
}
}
......
......@@ -16,7 +16,9 @@ class EngineFactory {
public:
static ExecutionEnginePtr Build(uint16_t dimension,
const std::string& location,
EngineType type);
EngineType index_type,
MetricType metric_type,
int32_t nlist);
};
}
......
......@@ -23,6 +23,11 @@ enum class EngineType {
MAX_VALUE = NSG_MIX,
};
enum class MetricType {
L2 = 1,
IP = 2,
};
class ExecutionEngine {
public:
......@@ -59,7 +64,13 @@ public:
virtual Status Cache() = 0;
virtual Status GpuCache(uint64_t gpu_id) = 0;
virtual Status Init() = 0;
virtual EngineType IndexEngineType() const = 0;
virtual MetricType IndexMetricType() const = 0;
};
using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>;
......
......@@ -4,8 +4,8 @@
* Proprietary and confidential.
******************************************************************************/
#include <stdexcept>
#include "src/cache/GpuCacheMgr.h"
#include "src/server/ServerConfig.h"
#include "src/metrics/Metrics.h"
#include "db/Log.h"
#include "utils/CommonUtil.h"
......@@ -22,26 +22,23 @@ namespace zilliz {
namespace milvus {
namespace engine {
namespace {
std::string GetMetricType() {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode engine_config = config.GetConfig(server::CONFIG_ENGINE);
return engine_config.GetValue(server::CONFIG_METRICTYPE, "L2");
}
}
ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension,
const std::string &location,
EngineType type)
: location_(location), dim(dimension), build_type(type) {
current_type = EngineType::FAISS_IDMAP;
EngineType index_type,
MetricType metric_type,
int32_t nlist)
: location_(location),
dim_(dimension),
index_type_(index_type),
metric_type_(metric_type),
nlist_(nlist) {
index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
if (!index_) throw Exception("Create Empty VecIndex");
Config build_cfg;
build_cfg["dim"] = dimension;
build_cfg["metric_type"] = GetMetricType();
build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2";
AutoGenParams(index_->GetType(), 0, build_cfg);
auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(build_cfg);
if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
......@@ -49,9 +46,14 @@ ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension,
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index,
const std::string &location,
EngineType type)
: index_(std::move(index)), location_(location), build_type(type) {
current_type = type;
EngineType index_type,
MetricType metric_type,
int32_t nlist)
: index_(std::move(index)),
location_(location),
index_type_(index_type),
metric_type_(metric_type),
nlist_(nlist) {
}
VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
......@@ -144,28 +146,60 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
}
}
if (!already_in_cache) {
GpuCache(device_id);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
Status ExecutionEngineImpl::CopyToCpu() {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
}
}
if(!already_in_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
......@@ -204,15 +238,15 @@ ExecutionEngineImpl::BuildIndex(const std::string &location) {
ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;
auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
auto to_index = CreatetVecIndex(build_type);
auto to_index = CreatetVecIndex(index_type_);
if (!to_index) {
throw Exception("Create Empty VecIndex");
}
Config build_cfg;
build_cfg["dim"] = Dimension();
build_cfg["metric_type"] = GetMetricType();
build_cfg["gpu_id"] = gpu_num;
build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2";
build_cfg["gpu_id"] = gpu_num_;
build_cfg["nlist"] = nlist_;
AutoGenParams(to_index->GetType(), Count(), build_cfg);
......@@ -222,7 +256,7 @@ ExecutionEngineImpl::BuildIndex(const std::string &location) {
build_cfg);
if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
return std::make_shared<ExecutionEngineImpl>(to_index, location, build_type);
return std::make_shared<ExecutionEngineImpl>(to_index, location, index_type_, metric_type_, nlist_);
}
Status ExecutionEngineImpl::Search(long n,
......@@ -246,21 +280,16 @@ Status ExecutionEngineImpl::Cache() {
return Status::OK();
}
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, index_);
}
// TODO(linxj): remove.
Status ExecutionEngineImpl::Init() {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
gpu_num = server_config.GetInt32Value("gpu_index", 0);
switch (build_type) {
case EngineType::FAISS_IVFSQ8:
case EngineType::FAISS_IVFFLAT: {
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
nlist_ = engine_config.GetInt32Value(CONFIG_NLIST, 16384);
break;
}
}
gpu_num_ = server_config.GetInt32Value("gpu_index", 0);
return Status::OK();
}
......
......@@ -22,11 +22,15 @@ public:
ExecutionEngineImpl(uint16_t dimension,
const std::string &location,
EngineType type);
EngineType index_type,
MetricType metric_type,
int32_t nlist);
ExecutionEngineImpl(VecIndexPtr index,
const std::string &location,
EngineType type);
EngineType index_type,
MetricType metric_type,
int32_t nlist);
Status AddWithIds(long n, const float *xdata, const long *xids) override;
......@@ -59,8 +63,14 @@ public:
Status Cache() override;
Status GpuCache(uint64_t gpu_id) override;
Status Init() override;
EngineType IndexEngineType() const override { return index_type_; }
MetricType IndexMetricType() const override { return metric_type_; }
private:
VecIndexPtr CreatetVecIndex(EngineType type);
......@@ -68,14 +78,14 @@ private:
protected:
VecIndexPtr index_ = nullptr;
EngineType build_type;
EngineType current_type;
EngineType index_type_;
MetricType metric_type_;
int64_t dim;
int64_t dim_;
std::string location_;
size_t nlist_ = 0;
int64_t gpu_num = 0;
int32_t nlist_ = 0;
int64_t gpu_num_ = 0;
};
......
......@@ -42,9 +42,11 @@ Status MemManagerImpl::InsertVectorsNoLock(const std::string &table_id,
MemTablePtr mem = GetMemByTable(table_id);
VectorSource::Ptr source = std::make_shared<VectorSource>(n, vectors);
auto status = mem->Add(source);
auto status = mem->Add(source, vector_ids);
if (status.ok()) {
vector_ids = source->GetVectorIds();
if (vector_ids.empty()) {
vector_ids = source->GetVectorIds();
}
}
return status;
}
......
......@@ -15,7 +15,7 @@ MemTable::MemTable(const std::string &table_id,
}
Status MemTable::Add(VectorSource::Ptr &source) {
Status MemTable::Add(VectorSource::Ptr &source, IDNumbers &vector_ids) {
while (!source->AllAdded()) {
......@@ -27,12 +27,12 @@ Status MemTable::Add(VectorSource::Ptr &source) {
Status status;
if (mem_table_file_list_.empty() || current_mem_table_file->IsFull()) {
MemTableFile::Ptr new_mem_table_file = std::make_shared<MemTableFile>(table_id_, meta_, options_);
status = new_mem_table_file->Add(source);
status = new_mem_table_file->Add(source, vector_ids);
if (status.ok()) {
mem_table_file_list_.emplace_back(new_mem_table_file);
}
} else {
status = current_mem_table_file->Add(source);
status = current_mem_table_file->Add(source, vector_ids);
}
if (!status.ok()) {
......
......@@ -21,7 +21,7 @@ class MemTable {
MemTable(const std::string &table_id, const std::shared_ptr<meta::Meta> &meta, const Options &options);
Status Add(VectorSource::Ptr &source);
Status Add(VectorSource::Ptr &source, IDNumbers &vector_ids);
void GetCurrentMemTableFile(MemTableFile::Ptr &mem_table_file);
......
......@@ -23,7 +23,9 @@ MemTableFile::MemTableFile(const std::string &table_id,
if (status.ok()) {
execution_engine_ = EngineFactory::Build(table_file_schema_.dimension_,
table_file_schema_.location_,
(EngineType) table_file_schema_.engine_type_);
(EngineType) table_file_schema_.engine_type_,
(MetricType)table_file_schema_.metric_type_,
table_file_schema_.nlist_);
}
}
......@@ -41,7 +43,7 @@ Status MemTableFile::CreateTableFile() {
return status;
}
Status MemTableFile::Add(const VectorSource::Ptr &source) {
Status MemTableFile::Add(const VectorSource::Ptr &source, IDNumbers& vector_ids) {
if (table_file_schema_.dimension_ <= 0) {
std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " +
......@@ -55,7 +57,7 @@ Status MemTableFile::Add(const VectorSource::Ptr &source) {
if (mem_left >= single_vector_mem_size) {
size_t num_vectors_to_add = std::ceil(mem_left / single_vector_mem_size);
size_t num_vectors_added;
auto status = source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added);
auto status = source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added, vector_ids);
if (status.ok()) {
current_mem_ += (num_vectors_added * single_vector_mem_size);
}
......@@ -86,7 +88,9 @@ Status MemTableFile::Serialize() {
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.size_ = size;
table_file_schema_.file_size_ = execution_engine_->PhysicalSize();
table_file_schema_.row_count_ = execution_engine_->Count();
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
......
......@@ -19,7 +19,7 @@ class MemTableFile {
MemTableFile(const std::string &table_id, const std::shared_ptr<meta::Meta> &meta, const Options &options);
Status Add(const VectorSource::Ptr &source);
Status Add(const VectorSource::Ptr &source, IDNumbers& vector_ids);
size_t GetCurrentMem();
......
......@@ -12,23 +12,31 @@ namespace engine {
VectorSource::VectorSource(const size_t &n,
const float *vectors) :
n_(n),
vectors_(vectors),
id_generator_(new SimpleIDGenerator()) {
n_(n),
vectors_(vectors),
id_generator_(std::make_shared<SimpleIDGenerator>()) {
current_num_vectors_added = 0;
}
Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
const meta::TableFileSchema &table_file_schema,
const size_t &num_vectors_to_add,
size_t &num_vectors_added) {
size_t &num_vectors_added,
IDNumbers &vector_ids) {
auto start_time = METRICS_NOW_TIME;
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
num_vectors_to_add : n_ - current_num_vectors_added;
IDNumbers vector_ids_to_add;
id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add);
if (vector_ids.empty()) {
id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add);
} else {
vector_ids_to_add.resize(num_vectors_added);
for (int pos = current_num_vectors_added; pos < current_num_vectors_added + num_vectors_added; pos++) {
vector_ids_to_add[pos-current_num_vectors_added] = vector_ids[pos];
}
}
Status status = execution_engine->AddWithIds(num_vectors_added,
vectors_ + current_num_vectors_added * table_file_schema.dimension_,
vector_ids_to_add.data());
......
......@@ -21,7 +21,8 @@ class VectorSource {
Status Add(const ExecutionEnginePtr &execution_engine,
const meta::TableFileSchema &table_file_schema,
const size_t &num_vectors_to_add,
size_t &num_vectors_added);
size_t &num_vectors_added,
IDNumbers &vector_ids);
size_t GetNumVectorsAdded();
......@@ -37,7 +38,7 @@ class VectorSource {
size_t current_num_vectors_added;
IDGenerator *id_generator_;
std::shared_ptr<IDGenerator> id_generator_;
}; //VectorSource
......
......@@ -8,6 +8,7 @@
#include "MetaTypes.h"
#include "db/Options.h"
#include "db/Status.h"
#include "db/Types.h"
#include <cstddef>
#include <ctime>
......@@ -38,6 +39,9 @@ class Meta {
virtual Status
AllTables(std::vector<TableSchema> &table_schema_array) = 0;
virtual Status
UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0;
virtual Status
DeleteTable(const std::string &table_id) = 0;
......@@ -83,6 +87,12 @@ class Meta {
virtual Status
HasNonIndexFiles(const std::string &table_id, bool &has) = 0;
virtual Status
DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0;
virtual Status
DropTableIndex(const std::string &table_id) = 0;
virtual Status
CleanUp() = 0;
......
......@@ -6,6 +6,7 @@
#pragma once
#include "db/engine/ExecutionEngine.h"
#include "db/Constants.h"
#include <vector>
#include <map>
......@@ -16,6 +17,11 @@ namespace milvus {
namespace engine {
namespace meta {
constexpr int32_t DEFAULT_ENGINE_TYPE = (int)EngineType::FAISS_IDMAP;
constexpr int32_t DEFAULT_NLIST = 16384;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = 1024*ONE_MB;
constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2;
typedef int DateT;
const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
......@@ -28,12 +34,13 @@ struct TableSchema {
size_t id_ = 0;
std::string table_id_;
int state_ = (int)NORMAL;
size_t files_cnt_ = 0;
int32_t state_ = (int)NORMAL;
uint16_t dimension_ = 0;
long created_on_ = 0;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
bool store_raw_data_ = false;
int64_t created_on_ = 0;
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
int32_t nlist_ = DEFAULT_NLIST;
int32_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE;
int32_t metric_type_ = DEFAULT_METRIC_TYPE;
}; // TableSchema
struct TableFileSchema {
......@@ -45,19 +52,23 @@ struct TableFileSchema {
TO_DELETE,
NEW_MERGE,
NEW_INDEX,
BACKUP,
} FILE_TYPE;
size_t id_ = 0;
std::string table_id_;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
std::string file_id_;
int file_type_ = NEW;
size_t size_ = 0;
int32_t file_type_ = NEW;
size_t file_size_ = 0;
size_t row_count_ = 0;
DateT date_ = EmptyDate;
uint16_t dimension_ = 0;
std::string location_;
long updated_time_ = 0;
long created_on_ = 0;
int64_t updated_time_ = 0;
int64_t created_on_ = 0;
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
int32_t nlist_ = DEFAULT_NLIST; //not persist to meta
int32_t metric_type_ = DEFAULT_METRIC_TYPE; //not persist to meta
}; // TableFileSchema
typedef std::vector<TableFileSchema> TableFilesSchema;
......
......@@ -30,13 +30,13 @@ namespace meta {
}
}
int MySQLConnectionPool::getConnectionsInUse() {
return conns_in_use_;
}
void MySQLConnectionPool::set_max_idle_time(int max_idle) {
max_idle_time_ = max_idle;
}
// int MySQLConnectionPool::getConnectionsInUse() {
// return conns_in_use_;
// }
//
// void MySQLConnectionPool::set_max_idle_time(int max_idle) {
// max_idle_time_ = max_idle;
// }
std::string MySQLConnectionPool::getDB() {
return db_;
......
......@@ -44,9 +44,9 @@ public:
// Other half of in-use conn count limit
void release(const mysqlpp::Connection *pc) override;
int getConnectionsInUse();
void set_max_idle_time(int max_idle);
// int getConnectionsInUse();
//
// void set_max_idle_time(int max_idle);
std::string getDB();
......
此差异已折叠。
......@@ -43,6 +43,12 @@ class MySQLMetaImpl : public Meta {
Status HasNonIndexFiles(const std::string &table_id, bool &has) override;
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
Status DropTableIndex(const std::string &table_id) override;
Status UpdateTableFile(TableFileSchema &file_schema) override;
Status UpdateTableFilesToIndex(const std::string &table_id) override;
......
此差异已折叠。
......@@ -51,6 +51,15 @@ class SqliteMetaImpl : public Meta {
Status
HasNonIndexFiles(const std::string &table_id, bool &has) override;
Status
UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status
DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
Status
DropTableIndex(const std::string &table_id) override;
Status
UpdateTableFilesToIndex(const std::string &table_id) override;
......
......@@ -45,7 +45,9 @@ std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
//step 1: load index
ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType)file_->engine_type_);
(EngineType)file_->engine_type_,
(MetricType)file_->metric_type_,
file_->nlist_);
try {
index_ptr->Load();
......@@ -75,7 +77,7 @@ std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
//step 2: return search task for later execution
SearchTaskPtr task_ptr = std::make_shared<SearchTask>();
task_ptr->index_id_ = file_->id_;
task_ptr->index_type_ = file_->file_type_;
task_ptr->file_type_ = file_->file_type_;
task_ptr->index_engine_ = index_ptr;
task_ptr->search_contexts_.swap(search_contexts_);
return std::static_pointer_cast<IScheduleTask>(task_ptr);
......
......@@ -76,20 +76,10 @@ void CollectDurationMetrics(int index_type, double total_time) {
}
}
std::string GetMetricType() {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& engine_config = config.GetConfig(server::CONFIG_ENGINE);
return engine_config.GetValue(server::CONFIG_METRICTYPE, "L2");
}
}
SearchTask::SearchTask()
: IScheduleTask(ScheduleTaskType::kSearch) {
std::string metric_type = GetMetricType();
if(metric_type != "L2") {
metric_l2 = false;
}
}
std::shared_ptr<IScheduleTask> SearchTask::Execute() {
......@@ -104,6 +94,8 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
auto start_time = METRICS_NOW_TIME;
bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2);
std::vector<long> output_ids;
std::vector<float> output_distence;
for(auto& context : search_contexts_) {
......@@ -147,7 +139,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
CollectDurationMetrics(file_type_, total_time);
rc.ElapseFromBegin("totally cost");
......
......@@ -37,10 +37,9 @@ public:
public:
size_t index_id_ = 0;
int index_type_ = 0; //for metrics
int file_type_ = 0; //for metrics
ExecutionEnginePtr index_engine_;
std::vector<SearchContextPtr> search_contexts_;
bool metric_l2 = true;
};
using SearchTaskPtr = std::shared_ptr<SearchTask>;
......
#!/bin/bash
/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --grpc_out=./gen-status --plugin=protoc-gen-grpc="/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/grpc_cpp_plugin" status.proto
../../cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --grpc_out=./gen-status --plugin=protoc-gen-grpc="../../cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/grpc_cpp_plugin" status.proto
/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --cpp_out=./gen-status status.proto
../../cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --cpp_out=./gen-status status.proto
/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/grpc_cpp_plugin" milvus.proto
../../cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/grpc_cpp_plugin" milvus.proto
/home/yukun/test/milvus/cpp/cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --cpp_out=./gen-milvus milvus.proto
\ No newline at end of file
../../cmake-build-debug/grpc_ep-prefix/src/grpc_ep/bins/opt/protobuf/protoc -I . --cpp_out=./gen-milvus milvus.proto
\ No newline at end of file
......@@ -395,9 +395,7 @@ class TableSchema :
enum : int {
kTableNameFieldNumber = 1,
kDimensionFieldNumber = 3,
kIndexTypeFieldNumber = 2,
kStoreRawVectorFieldNumber = 4,
kDimensionFieldNumber = 2,
};
// .milvus.grpc.TableName table_name = 1;
bool has_table_name() const;
......@@ -407,21 +405,11 @@ class TableSchema :
::milvus::grpc::TableName* mutable_table_name();
void set_allocated_table_name(::milvus::grpc::TableName* table_name);
// int64 dimension = 3;
// int64 dimension = 2;
void clear_dimension();
::PROTOBUF_NAMESPACE_ID::int64 dimension() const;
void set_dimension(::PROTOBUF_NAMESPACE_ID::int64 value);
// int32 index_type = 2;
void clear_index_type();
::PROTOBUF_NAMESPACE_ID::int32 index_type() const;
void set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value);
// bool store_raw_vector = 4;
void clear_store_raw_vector();
bool store_raw_vector() const;
void set_store_raw_vector(bool value);
// @@protoc_insertion_point(class_scope:milvus.grpc.TableSchema)
private:
class _Internal;
......@@ -429,8 +417,6 @@ class TableSchema :
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::milvus::grpc::TableName* table_name_;
::PROTOBUF_NAMESPACE_ID::int64 dimension_;
::PROTOBUF_NAMESPACE_ID::int32 index_type_;
bool store_raw_vector_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
};
......@@ -2330,21 +2316,21 @@ class Index :
// accessors -------------------------------------------------------
enum : int {
kNlistFieldNumber = 2,
kIndexTypeFieldNumber = 1,
kNlistFieldNumber = 2,
kIndexFileSizeFieldNumber = 3,
kMetricTypeFieldNumber = 4,
};
// int64 nlist = 2;
void clear_nlist();
::PROTOBUF_NAMESPACE_ID::int64 nlist() const;
void set_nlist(::PROTOBUF_NAMESPACE_ID::int64 value);
// int32 index_type = 1;
void clear_index_type();
::PROTOBUF_NAMESPACE_ID::int32 index_type() const;
void set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value);
// int32 nlist = 2;
void clear_nlist();
::PROTOBUF_NAMESPACE_ID::int32 nlist() const;
void set_nlist(::PROTOBUF_NAMESPACE_ID::int32 value);
// int32 index_file_size = 3;
void clear_index_file_size();
::PROTOBUF_NAMESPACE_ID::int32 index_file_size() const;
......@@ -2360,8 +2346,8 @@ class Index :
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::int64 nlist_;
::PROTOBUF_NAMESPACE_ID::int32 index_type_;
::PROTOBUF_NAMESPACE_ID::int32 nlist_;
::PROTOBUF_NAMESPACE_ID::int32 index_file_size_;
::PROTOBUF_NAMESPACE_ID::int32 metric_type_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
......@@ -2820,21 +2806,7 @@ inline void TableSchema::set_allocated_table_name(::milvus::grpc::TableName* tab
// @@protoc_insertion_point(field_set_allocated:milvus.grpc.TableSchema.table_name)
}
// int32 index_type = 2;
inline void TableSchema::clear_index_type() {
index_type_ = 0;
}
inline ::PROTOBUF_NAMESPACE_ID::int32 TableSchema::index_type() const {
// @@protoc_insertion_point(field_get:milvus.grpc.TableSchema.index_type)
return index_type_;
}
inline void TableSchema::set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value) {
index_type_ = value;
// @@protoc_insertion_point(field_set:milvus.grpc.TableSchema.index_type)
}
// int64 dimension = 3;
// int64 dimension = 2;
inline void TableSchema::clear_dimension() {
dimension_ = PROTOBUF_LONGLONG(0);
}
......@@ -2848,20 +2820,6 @@ inline void TableSchema::set_dimension(::PROTOBUF_NAMESPACE_ID::int64 value) {
// @@protoc_insertion_point(field_set:milvus.grpc.TableSchema.dimension)
}
// bool store_raw_vector = 4;
inline void TableSchema::clear_store_raw_vector() {
store_raw_vector_ = false;
}
inline bool TableSchema::store_raw_vector() const {
// @@protoc_insertion_point(field_get:milvus.grpc.TableSchema.store_raw_vector)
return store_raw_vector_;
}
inline void TableSchema::set_store_raw_vector(bool value) {
store_raw_vector_ = value;
// @@protoc_insertion_point(field_set:milvus.grpc.TableSchema.store_raw_vector)
}
// -------------------------------------------------------------------
// Range
......@@ -3869,15 +3827,15 @@ inline void Index::set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value) {
// @@protoc_insertion_point(field_set:milvus.grpc.Index.index_type)
}
// int64 nlist = 2;
// int32 nlist = 2;
inline void Index::clear_nlist() {
nlist_ = PROTOBUF_LONGLONG(0);
nlist_ = 0;
}
inline ::PROTOBUF_NAMESPACE_ID::int64 Index::nlist() const {
inline ::PROTOBUF_NAMESPACE_ID::int32 Index::nlist() const {
// @@protoc_insertion_point(field_get:milvus.grpc.Index.nlist)
return nlist_;
}
inline void Index::set_nlist(::PROTOBUF_NAMESPACE_ID::int64 value) {
inline void Index::set_nlist(::PROTOBUF_NAMESPACE_ID::int32 value) {
nlist_ = value;
// @@protoc_insertion_point(field_set:milvus.grpc.Index.nlist)
......
......@@ -61,7 +61,7 @@ static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] =
const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
"\n\014status.proto\022\013milvus.grpc\"D\n\006Status\022*\n"
"\nerror_code\030\001 \001(\0162\026.milvus.grpc.ErrorCod"
"e\022\016\n\006reason\030\002 \001(\t*\354\003\n\tErrorCode\022\013\n\007SUCCE"
"e\022\016\n\006reason\030\002 \001(\t*\230\004\n\tErrorCode\022\013\n\007SUCCE"
"SS\020\000\022\024\n\020UNEXPECTED_ERROR\020\001\022\022\n\016CONNECT_FA"
"ILED\020\002\022\025\n\021PERMISSION_DENIED\020\003\022\024\n\020TABLE_N"
"OT_EXISTS\020\004\022\024\n\020ILLEGAL_ARGUMENT\020\005\022\021\n\rILL"
......@@ -73,7 +73,9 @@ const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(
"TA_FAILED\020\017\022\020\n\014CACHE_FAILED\020\020\022\030\n\024CANNOT_"
"CREATE_FOLDER\020\021\022\026\n\022CANNOT_CREATE_FILE\020\022\022"
"\030\n\024CANNOT_DELETE_FOLDER\020\023\022\026\n\022CANNOT_DELE"
"TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025b\006proto3"
"TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025\022\021\n\rILLE"
"GAL_NLIST\020\026\022\027\n\023ILLEGAL_METRIC_TYPE\020\027b\006pr"
"oto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_status_2eproto_deps[1] = {
};
......@@ -83,7 +85,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_sta
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_status_2eproto_once;
static bool descriptor_table_status_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_status_2eproto = {
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 600,
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 644,
&descriptor_table_status_2eproto_once, descriptor_table_status_2eproto_sccs, descriptor_table_status_2eproto_deps, 1, 0,
schemas, file_default_instances, TableStruct_status_2eproto::offsets,
file_level_metadata_status_2eproto, 1, file_level_enum_descriptors_status_2eproto, file_level_service_descriptors_status_2eproto,
......@@ -121,6 +123,8 @@ bool ErrorCode_IsValid(int value) {
case 19:
case 20:
case 21:
case 22:
case 23:
return true;
default:
return false;
......
......@@ -91,12 +91,14 @@ enum ErrorCode : int {
CANNOT_DELETE_FOLDER = 19,
CANNOT_DELETE_FILE = 20,
BUILD_INDEX_ERROR = 21,
ILLEGAL_NLIST = 22,
ILLEGAL_METRIC_TYPE = 23,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ErrorCode_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool ErrorCode_IsValid(int value);
constexpr ErrorCode ErrorCode_MIN = SUCCESS;
constexpr ErrorCode ErrorCode_MAX = BUILD_INDEX_ERROR;
constexpr ErrorCode ErrorCode_MAX = ILLEGAL_METRIC_TYPE;
constexpr int ErrorCode_ARRAYSIZE = ErrorCode_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ErrorCode_descriptor();
......
......@@ -17,9 +17,7 @@ message TableName {
*/
message TableSchema {
TableName table_name = 1;
int32 index_type = 2;
int64 dimension = 3;
bool store_raw_vector = 4;
int64 dimension = 2;
}
/**
......@@ -122,10 +120,12 @@ message Command {
/**
* @brief Index
* @index_type: 0-invalid, 1-idmap, 2-ivflat, 3-ivfsq8, 4-nsgmix
* @metric_type: 1-L2, 2-IP
*/
message Index {
int32 index_type = 1;
int64 nlist = 2;
int32 nlist = 2;
int32 index_file_size = 3;
int32 metric_type = 4;
}
......
......@@ -25,6 +25,8 @@ enum ErrorCode {
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
}
message Status {
......
......@@ -31,7 +31,8 @@ class MetricsBase{
virtual void IndexFileSizeHistogramObserve(double value) {};
virtual void BuildIndexDurationSecondsHistogramObserve(double value) {};
virtual void CacheUsageGaugeSet(double value) {};
virtual void CpuCacheUsageGaugeSet(double value) {};
virtual void GpuCacheUsageGaugeSet(double value) {};
virtual void MetaAccessTotalIncrement(double value = 1) {};
virtual void MetaAccessDurationSecondsHistogramObserve(double value) {};
......
......@@ -3,36 +3,29 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Metrics.h"
#include "PrometheusMetrics.h"
namespace zilliz {
namespace milvus {
namespace server {
MetricsBase &
Metrics::CreateMetricsCollector(MetricCollectorType collector_type) {
switch (collector_type) {
case MetricCollectorType::PROMETHEUS:
static PrometheusMetrics instance = PrometheusMetrics::GetInstance();
return instance;
default:return MetricsBase::GetInstance();
}
Metrics::GetInstance() {
static MetricsBase &instance = CreateMetricsCollector();
return instance;
}
MetricsBase &
Metrics::GetInstance() {
Metrics::CreateMetricsCollector() {
ConfigNode &config = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
std::string collector_typr_str = config.GetValue(CONFIG_METRIC_COLLECTOR);
std::string collector_type_str = config.GetValue(CONFIG_METRIC_COLLECTOR);
if (collector_typr_str == "prometheus") {
return CreateMetricsCollector(MetricCollectorType::PROMETHEUS);
} else if (collector_typr_str == "zabbix") {
return CreateMetricsCollector(MetricCollectorType::ZABBIX);
if (collector_type_str == "prometheus") {
return PrometheusMetrics::GetInstance();
} else {
return CreateMetricsCollector(MetricCollectorType::INVALID);
return MetricsBase::GetInstance();
}
}
......
......@@ -5,22 +5,14 @@
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include <memory>
#include <vector>
#pragma once
#include "MetricBase.h"
//#include "PrometheusMetrics.h"
namespace zilliz {
namespace milvus {
namespace server {
#define METRICS_NOW_TIME std::chrono::system_clock::now()
//#define server::Metrics::GetInstance() server::Metrics::GetInstance()
#define METRICS_MICROSECONDS(a, b) (std::chrono::duration_cast<std::chrono::microseconds> (b-a)).count();
enum class MetricCollectorType {
......@@ -31,15 +23,13 @@ enum class MetricCollectorType {
class Metrics {
public:
static MetricsBase &
CreateMetricsCollector(MetricCollectorType collector_type);
static MetricsBase &GetInstance();
static MetricsBase &
GetInstance();
private:
static MetricsBase &CreateMetricsCollector();
};
}
}
}
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <cache/GpuCacheMgr.h>
#include "PrometheusMetrics.h"
#include "utils/Log.h"
#include "SystemInfo.h"
......@@ -166,6 +167,18 @@ void PrometheusMetrics::CPUTemperature() {
}
}
void PrometheusMetrics::GpuCacheUsageGaugeSet(double value) {
if(!startup_) return;
int64_t num_processors = server::SystemInfo::GetInstance().num_processor();
for (auto i = 0; i < num_processors; ++i) {
// int gpu_cache_usage = cache::GpuCacheMgr::GetInstance(i)->CacheUsage();
// int gpu_cache_total = cache::GpuCacheMgr::GetInstance(i)->CacheCapacity();
// prometheus::Gauge &gpu_cache = gpu_cache_usage_.Add({{"GPU_Cache", std::to_string(i)}});
// gpu_cache.Set(gpu_cache_usage * 100 / gpu_cache_total);
}
}
}
}
}
......@@ -54,7 +54,8 @@ class PrometheusMetrics: public MetricsBase {
void RawFileSizeHistogramObserve(double value) override { if(startup_) raw_files_size_histogram_.Observe(value);};
void IndexFileSizeHistogramObserve(double value) override { if(startup_) index_files_size_histogram_.Observe(value);};
void BuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) build_index_duration_seconds_histogram_.Observe(value);};
void CacheUsageGaugeSet(double value) override { if(startup_) cache_usage_gauge_.Set(value);};
void CpuCacheUsageGaugeSet(double value) override { if(startup_) cpu_cache_usage_gauge_.Set(value);};
void GpuCacheUsageGaugeSet(double value) override;
void MetaAccessTotalIncrement(double value = 1) override { if(startup_) meta_access_total_.Increment(value);};
void MetaAccessDurationSecondsHistogramObserve(double value) override { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);};
......@@ -336,12 +337,18 @@ class PrometheusMetrics: public MetricsBase {
.Register(*registry_);
prometheus::Counter &cache_access_total_ = cache_access_.Add({});
// record cache usage and %
prometheus::Family<prometheus::Gauge> &cache_usage_ = prometheus::BuildGauge()
// record CPU cache usage and %
prometheus::Family<prometheus::Gauge> &cpu_cache_usage_ = prometheus::BuildGauge()
.Name("cache_usage_bytes")
.Help("current cache usage by bytes")
.Register(*registry_);
prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({});
prometheus::Gauge &cpu_cache_usage_gauge_ = cpu_cache_usage_.Add({});
//record GPU cache usage and %
prometheus::Family<prometheus::Gauge> &gpu_cache_usage_ = prometheus::BuildGauge()
.Name("gpu_cache_usage_bytes")
.Help("current gpu cache usage by bytes")
.Register(*registry_);
// record query response
using Quantiles = std::vector<prometheus::detail::CKMSQuantiles::Quantile>;
......@@ -360,8 +367,7 @@ class PrometheusMetrics: public MetricsBase {
prometheus::Family<prometheus::Gauge> &query_vector_response_per_second_ = prometheus::BuildGauge()
.Name("query_vector_response_per_microsecond")
.Help("the number of vectors can be queried every second ")
.Register(*registry_);
prometheus::Gauge &query_vector_response_per_second_gauge_ = query_vector_response_per_second_.Add({});
.Register(*registry_); prometheus::Gauge &query_vector_response_per_second_gauge_ = query_vector_response_per_second_.Add({});
prometheus::Family<prometheus::Gauge> &query_response_per_second_ = prometheus::BuildGauge()
.Name("query_response_per_microsecond")
......
......@@ -15,6 +15,8 @@
using namespace milvus;
//#define SET_VECTOR_IDS;
namespace {
std::string GetTableName();
......@@ -24,7 +26,7 @@ namespace {
constexpr int64_t NQ = 10;
constexpr int64_t TOP_K = 10;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t ADD_VECTOR_LOOP = 10;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
......@@ -32,9 +34,7 @@ namespace {
void PrintTableSchema(const TableSchema& tb_schema) {
BLOCK_SPLITER
std::cout << "Table name: " << tb_schema.table_name << std::endl;
std::cout << "Table index type: " << (int)tb_schema.index_type << std::endl;
std::cout << "Table dimension: " << tb_schema.dimension << std::endl;
std::cout << "Table store raw data: " << (tb_schema.store_raw_vector ? "true" : "false") << std::endl;
BLOCK_SPLITER
}
......@@ -93,9 +93,7 @@ namespace {
TableSchema BuildTableSchema() {
TableSchema tb_schema;
tb_schema.table_name = TABLE_NAME;
tb_schema.index_type = IndexType::gpu_ivfflat;
tb_schema.dimension = TABLE_DIMENSION;
tb_schema.store_raw_vector = true;
return tb_schema;
}
......@@ -235,59 +233,21 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "DescribeTable function call status: " << stat.ToString() << std::endl;
PrintTableSchema(tb_schema);
}
//
// Connection::Destroy(conn);
// pid_t pid;
// for (int i = 0; i < 5; ++i) {
// pid = fork();
// if (pid == 0 || pid == -1) {
// break;
// }
// }
// if (pid == -1) {
// std::cout << "fail to fork!\n";
// exit(1);
// } else if (pid == 0) {
// std::shared_ptr<Connection> conn = Connection::Create();
//
// {//connect server
// ConnectParam param = {address, port};
// Status stat = conn->Connect(param);
// std::cout << "Connect function call status: " << stat.ToString() << std::endl;
// }
//
// {//server version
// std::string version = conn->ServerVersion();
// std::cout << "Server version: " << version << std::endl;
// }
// Connection::Destroy(conn);
// exit(0);
// } else {
// std::shared_ptr<Connection> conn = Connection::Create();
//
// {//connect server
// ConnectParam param = {address, port};
// Status stat = conn->Connect(param);
// std::cout << "Connect function call status: " << stat.ToString() << std::endl;
// }
//
// {//server version
// std::string version = conn->ServerVersion();
// std::cout << "Server version: " << version << std::endl;
// }
// Connection::Destroy(conn);
// std::cout << "in main process\n";
// exit(0);
// }
std::vector<std::pair<int64_t, RowRecord>> search_record_array;
{//insert vectors
std::vector<int64_t> record_ids;
for (int i = 0; i < ADD_VECTOR_LOOP; i++) {//add vectors
std::vector<RowRecord> record_array;
int64_t begin_index = i * BATCH_ROW_COUNT;
BuildVectors(begin_index, begin_index + BATCH_ROW_COUNT, record_array);
std::vector<int64_t> record_ids;
#ifdef SET_VECTOR_IDS
record_ids.resize(ADD_VECTOR_LOOP * BATCH_ROW_COUNT);
for (auto j = begin_index; j <begin_index + BATCH_ROW_COUNT; j++) {
record_ids[i * BATCH_ROW_COUNT + j] = i * BATCH_ROW_COUNT + j;
}
#endif
auto start = std::chrono::high_resolution_clock::now();
......@@ -308,13 +268,27 @@ ClientTest::Test(const std::string& address, const std::string& port) {
{//search vectors without index
Sleep(2);
int64_t row_count = 0;
Status stat = conn->CountTable(TABLE_NAME, row_count);
std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
DoSearch(conn, search_record_array, "Search without index");
}
{//wait unit build index finish
// std::cout << "Wait until build all index done" << std::endl;
// Status stat = conn->CreateIndex();
// std::cout << "BuildIndex function call status: " << stat.ToString() << std::endl;
std::cout << "Wait until create all index done" << std::endl;
IndexParam index;
index.table_name = TABLE_NAME;
index.index_type = IndexType::gpu_ivfflat;
index.nlist = 1000;
index.index_file_size = 1024;
index.metric_type = 1;
Status stat = conn->CreateIndex(index);
std::cout << "CreateIndex function call status: " << stat.ToString() << std::endl;
IndexParam index2;
stat = conn->DescribeIndex(TABLE_NAME, index2);
std::cout << "DescribeIndex function call status: " << stat.ToString() << std::endl;
}
{//preload table
......@@ -326,6 +300,24 @@ ClientTest::Test(const std::string& address, const std::string& port) {
DoSearch(conn, search_record_array, "Search after build index finish");
}
{//delete index
Status stat = conn->DropIndex(TABLE_NAME);
std::cout << "DropIndex function call status: " << stat.ToString() << std::endl;
int64_t row_count = 0;
stat = conn->CountTable(TABLE_NAME, row_count);
std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
}
{//delete by range
Range rg;
rg.start_value = CurrentTmDate(-2);
rg.end_value = CurrentTmDate(-3);
Status stat = conn->DeleteByRange(rg, TABLE_NAME);
std::cout << "DeleteByRange function call status: " << stat.ToString() << std::endl;
}
{//delete table
Status stat = conn->DropTable(TABLE_NAME);
std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
......
......@@ -82,9 +82,7 @@ ClientProxy::CreateTable(const TableSchema &param) {
try {
::milvus::grpc::TableSchema schema;
schema.mutable_table_name()->set_table_name(param.table_name);
schema.set_index_type((int) param.index_type);
schema.set_dimension(param.dimension);
schema.set_store_raw_vector(param.store_raw_vector);
return client_ptr_->CreateTable(schema);
} catch (std::exception &ex) {
......@@ -119,6 +117,10 @@ ClientProxy::CreateIndex(const IndexParam &index_param) {
::milvus::grpc::IndexParam grpc_index_param;
grpc_index_param.mutable_table_name()->set_table_name(
index_param.table_name);
grpc_index_param.mutable_index()->set_index_type((int32_t)index_param.index_type);
grpc_index_param.mutable_index()->set_nlist(index_param.nlist);
grpc_index_param.mutable_index()->set_index_file_size(index_param.index_file_size);
grpc_index_param.mutable_index()->set_metric_type(index_param.metric_type);
return client_ptr_->CreateIndex(grpc_index_param);
} catch (std::exception &ex) {
......@@ -187,15 +189,20 @@ ClientProxy::Insert(const std::string &table_name,
}
}
::milvus::grpc::VectorIds vector_ids;
//Single thread
client_ptr_->Insert(vector_ids, insert_param, status);
auto finish = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < vector_ids.vector_id_array_size(); i++) {
id_array.push_back(vector_ids.vector_id_array(i));
::milvus::grpc::VectorIds vector_ids;
if (!id_array.empty()) {
for (auto i = 0; i < id_array.size(); i++) {
insert_param.add_row_id_array(id_array[i]);
}
client_ptr_->Insert(vector_ids, insert_param, status);
} else {
client_ptr_->Insert(vector_ids, insert_param, status);
for (size_t i = 0; i < vector_ids.vector_id_array_size(); i++) {
id_array.push_back(vector_ids.vector_id_array(i));
}
}
#endif
} catch (std::exception &ex) {
......@@ -264,9 +271,7 @@ ClientProxy::DescribeTable(const std::string &table_name, TableSchema &table_sch
Status status = client_ptr_->DescribeTable(grpc_schema, table_name);
table_schema.table_name = grpc_schema.table_name().table_name();
table_schema.index_type = (IndexType) grpc_schema.index_type();
table_schema.dimension = grpc_schema.dimension();
table_schema.store_raw_vector = grpc_schema.store_raw_vector();
return status;
} catch (std::exception &ex) {
......@@ -325,7 +330,15 @@ ClientProxy::ServerStatus() const {
Status
ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name) {
try {
::milvus::grpc::DeleteByRangeParam delete_by_range_param;
delete_by_range_param.set_table_name(table_name);
delete_by_range_param.mutable_range()->set_start_value(range.start_value);
delete_by_range_param.mutable_range()->set_end_value(range.end_value);
return client_ptr_->DeleteByRange(delete_by_range_param);
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what()));
}
}
Status
......@@ -336,18 +349,39 @@ ClientProxy::PreloadTable(const std::string &table_name) const {
Status status = client_ptr_->PreloadTable(grpc_table_name);
return status;
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what()));
}
}
IndexParam
ClientProxy::DescribeIndex(const std::string &table_name) const {
Status
ClientProxy::DescribeIndex(const std::string &table_name, IndexParam &index_param) const {
try {
::milvus::grpc::TableName grpc_table_name;
grpc_table_name.set_table_name(table_name);
::milvus::grpc::IndexParam grpc_index_param;
Status status = client_ptr_->DescribeIndex(grpc_table_name, grpc_index_param);
index_param.index_type = (IndexType)(grpc_index_param.mutable_index()->index_type());
index_param.nlist = grpc_index_param.mutable_index()->nlist();
index_param.index_file_size = grpc_index_param.mutable_index()->index_file_size();
index_param.metric_type = grpc_index_param.mutable_index()->metric_type();
return status;
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to describe index: " + std::string(ex.what()));
}
}
Status
ClientProxy::DropIndex(const std::string &table_name) const {
try {
::milvus::grpc::TableName grpc_table_name;
grpc_table_name.set_table_name(table_name);
Status status = client_ptr_->DropIndex(grpc_table_name);
return status;
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to drop index: " + std::string(ex.what()));
}
}
}
......@@ -75,8 +75,8 @@ public:
virtual Status
PreloadTable(const std::string &table_name) const override;
virtual IndexParam
DescribeIndex(const std::string &table_name) const override;
virtual Status
DescribeIndex(const std::string &table_name, IndexParam &index_param) const override;
virtual Status
DropIndex(const std::string &table_name) const override;
......
......@@ -264,10 +264,63 @@ GrpcClient::PreloadTable(milvus::grpc::TableName &table_name) {
return Status::OK();
}
Status
GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->DeleteByRange(&context, delete_by_range_param, &response);
if (!grpc_status.ok()) {
std::cerr << "DeleteByRange gRPC failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
std::cerr << response.reason() << std::endl;
return Status(StatusCode::ServerFailed, response.reason());
}
return Status::OK();
}
Status
GrpcClient::Disconnect() {
stub_.release();
return Status::OK();
}
Status
GrpcClient::DescribeIndex(grpc::TableName &table_name, grpc::IndexParam &index_param) {
ClientContext context;
::grpc::Status grpc_status = stub_->DescribeIndex(&context, table_name, &index_param);
if (!grpc_status.ok()) {
std::cerr << "DescribeIndex rpc failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (index_param.mutable_table_name()->status().error_code() != grpc::SUCCESS) {
std::cerr << index_param.mutable_table_name()->status().reason() << std::endl;
return Status(StatusCode::ServerFailed, index_param.mutable_table_name()->status().reason());
}
return Status::OK();
}
Status
GrpcClient::DropIndex(grpc::TableName &table_name) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->DropIndex(&context, table_name, &response);
if (!grpc_status.ok()) {
std::cerr << "DropIndex gRPC failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
std::cerr << response.reason() << std::endl;
return Status(StatusCode::ServerFailed, response.reason());
}
return Status::OK();
}
}
\ No newline at end of file
......@@ -76,9 +76,10 @@ struct TopKQueryResult {
*/
struct IndexParam {
std::string table_name;
int32_t index_type;
int64_t nlist;
IndexType index_type;
int32_t nlist;
int32_t index_file_size;
int32_t metric_type;
};
/**
......@@ -354,8 +355,8 @@ class Connection {
*
* @return index informations and indicate if this operation is successful.
*/
virtual IndexParam
DescribeIndex(const std::string &table_name) const = 0;
virtual Status
DescribeIndex(const std::string &table_name, IndexParam &index_param) const = 0;
/**
* @brief drop index
......
......@@ -117,7 +117,7 @@ ConnectionImpl::ServerStatus() const {
Status
ConnectionImpl::DeleteByRange(Range &range,
const std::string &table_name) {
return client_proxy_->DeleteByRange(range, table_name);
}
Status
......@@ -125,14 +125,14 @@ ConnectionImpl::PreloadTable(const std::string &table_name) const {
return client_proxy_->PreloadTable(table_name);
}
IndexParam
ConnectionImpl::DescribeIndex(const std::string &table_name) const {
Status
ConnectionImpl::DescribeIndex(const std::string &table_name, IndexParam& index_param) const {
return client_proxy_->DescribeIndex(table_name, index_param);
}
Status
ConnectionImpl::DropIndex(const std::string &table_name) const {
return client_proxy_->DropIndex(table_name);
}
}
......@@ -81,8 +81,8 @@ public:
virtual Status
PreloadTable(const std::string &table_name) const override;
virtual IndexParam
DescribeIndex(const std::string &table_name) const override;
virtual Status
DescribeIndex(const std::string &table_name, IndexParam& index_param) const override;
virtual Status
DropIndex(const std::string &table_name) const override;
......
......@@ -334,8 +334,7 @@ Status ClientProxy::PreloadTable(const std::string &table_name) const {
return Status::OK();
}
IndexParam ClientProxy::DescribeIndex(const std::string &table_name) const {
IndexParam index_param;
Status ClientProxy::DescribeIndex(const std::string &table_name, IndexParam &index_param) const {
index_param.table_name = table_name;
return index_param;
}
......
......@@ -55,7 +55,7 @@ public:
virtual Status PreloadTable(const std::string &table_name) const override;
virtual IndexParam DescribeIndex(const std::string &table_name) const override;
virtual Status DescribeIndex(const std::string &table_name, IndexParam &index_param) const override;
virtual Status DropIndex(const std::string &table_name) const override;
......
......@@ -36,8 +36,10 @@ static const char* CONFIG_LOG = "log_config";
static const char* CONFIG_CACHE = "cache_config";
static const char* CONFIG_CPU_CACHE_CAPACITY = "cpu_cache_capacity";
static const char* CONFIG_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const char* CACHE_FREE_PERCENT = "cache_free_percent";
static const char* CACHE_FREE_PERCENT = "cpu_cache_free_percent";
static const char* CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately";
static const char* CONFIG_GPU_IDS = "gpu_ids";
static const char *GPU_CACHE_FREE_PERCENT = "gpu_cache_free_percent";
static const char* CONFIG_LICENSE = "license_config";
static const char* CONFIG_LICENSE_PATH = "license_path";
......@@ -48,12 +50,14 @@ static const char* CONFIG_METRIC_COLLECTOR = "collector";
static const char* CONFIG_PROMETHEUS = "prometheus_config";
static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port";
static const char* CONFIG_ENGINE = "engine_config";
static const char* CONFIG_NPROBE = "nprobe";
static const char* CONFIG_NLIST = "nlist";
static const char* CONFIG_DCBT = "use_blas_threshold";
static const char* CONFIG_METRICTYPE = "metric_type";
static const char* CONFIG_OMP_THREAD_NUM = "omp_thread_num";
static const std::string CONFIG_ENGINE = "engine_config";
static const std::string CONFIG_NPROBE = "nprobe";
static const std::string CONFIG_NLIST = "nlist";
static const std::string CONFIG_DCBT = "use_blas_threshold";
static const std::string CONFIG_METRICTYPE = "metric_type";
static const std::string CONFIG_OMP_THREAD_NUM = "omp_thread_num";
static const std::string CONFIG_USE_HYBRID_INDEX = "use_hybrid_index";
static const std::string CONFIG_HYBRID_INDEX_GPU = "hybrid_index_gpu";
class ServerConfig {
public:
......
......@@ -42,7 +42,6 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext *context,
GrpcRequestHandler::DropTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name());
GrpcRequestScheduler::ExecTask(task_ptr, response);
return ::grpc::Status::OK;
......@@ -168,7 +167,12 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context,
GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context,
const ::milvus::grpc::DeleteByRangeParam *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DeleteByRangeTask::Create(*request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->set_error_code(grpc_status.error_code());
response->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
}
::grpc::Status
......@@ -187,14 +191,24 @@ GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context,
GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::IndexParam *response) {
BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), *response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->mutable_table_name()->mutable_status()->set_reason(grpc_status.reason());
response->mutable_table_name()->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DropIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DropIndexTask::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
}
......
......@@ -260,6 +260,9 @@ public:
protected:
DropIndexTask(const std::string &table_name);
ServerError
OnExecute() override;
private:
std::string table_name_;
......
......@@ -51,6 +51,9 @@ constexpr ServerError SERVER_ILLEGAL_SEARCH_RESULT = ToGlobalServerErrorCode(110
constexpr ServerError SERVER_CACHE_ERROR = ToGlobalServerErrorCode(111);
constexpr ServerError SERVER_WRITE_ERROR = ToGlobalServerErrorCode(112);
constexpr ServerError SERVER_INVALID_NPROBE = ToGlobalServerErrorCode(113);
constexpr ServerError SERVER_INVALID_INDEX_NLIST = ToGlobalServerErrorCode(114);
constexpr ServerError SERVER_INVALID_INDEX_METRIC_TYPE = ToGlobalServerErrorCode(115);
constexpr ServerError SERVER_INVALID_INDEX_FILE_SIZE = ToGlobalServerErrorCode(116);
constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(500);
......
......@@ -10,6 +10,7 @@ namespace server {
constexpr size_t table_name_size_limit = 255;
constexpr int64_t table_dimension_limit = 16384;
constexpr int32_t index_file_size_limit = 4096; //index trigger size max = 4096 MB
ServerError
ValidationUtil::ValidateTableName(const std::string &table_name) {
......@@ -65,6 +66,32 @@ ValidationUtil::ValidateTableIndexType(int32_t index_type) {
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateTableIndexNlist(int32_t nlist) {
if(nlist <= 0) {
return SERVER_INVALID_INDEX_NLIST;
}
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateTableIndexFileSize(int32_t index_file_size) {
if(index_file_size <= 0 || index_file_size > index_file_size_limit) {
return SERVER_INVALID_INDEX_FILE_SIZE;
}
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) {
if(metric_type != (int32_t)engine::MetricType::L2 && metric_type != (int32_t)engine::MetricType::IP) {
return SERVER_INVALID_INDEX_METRIC_TYPE;
}
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateGpuIndex(uint32_t gpu_index) {
int num_devices = 0;
......
......@@ -17,6 +17,15 @@ public:
static ServerError
ValidateTableIndexType(int32_t index_type);
static ServerError
ValidateTableIndexNlist(int32_t nlist);
static ServerError
ValidateTableIndexFileSize(int32_t index_file_size);
static ServerError
ValidateTableIndexMetricType(int32_t metric_type);
static ServerError
ValidateGpuIndex(uint32_t gpu_index);
......
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#if 0
// TODO: maybe support static search
#ifdef GPU_VERSION
#include "faiss/gpu/GpuAutoTune.h"
#include "faiss/gpu/StandardGpuResources.h"
#include "faiss/gpu/utils/DeviceUtils.h"
#endif
#include "Index.h"
#include "faiss/index_io.h"
#include "faiss/IndexIVF.h"
#include "faiss/IVFlib.h"
#include "server/ServerConfig.h"
namespace zilliz {
namespace milvus {
namespace engine {
using std::string;
using std::unordered_map;
using std::vector;
Index::Index(const std::shared_ptr<faiss::Index> &raw_index) {
index_ = raw_index;
dim = index_->d;
ntotal = index_->ntotal;
store_on_gpu = false;
}
bool Index::reset() {
try {
index_->reset();
ntotal = index_->ntotal;
}
catch (std::exception &e) {
// LOG(ERROR) << e.what();
return false;
}
return true;
}
bool Index::add_with_ids(idx_t n, const float *xdata, const long *xids) {
try {
index_->add_with_ids(n, xdata, xids);
ntotal += n;
}
catch (std::exception &e) {
// LOG(ERROR) << e.what();
return false;
}
return true;
}
bool Index::search(idx_t n, const float *data, idx_t k, float *distances, long *labels) const {
try {
index_->search(n, data, k, distances, labels);
}
catch (std::exception &e) {
// LOG(ERROR) << e.what();
return false;
}
return true;
}
void write_index(const Index_ptr &index, const std::string &file_name) {
write_index(index->index_.get(), file_name.c_str());
}
Index_ptr read_index(const std::string &file_name) {
std::shared_ptr<faiss::Index> raw_index = nullptr;
raw_index.reset(faiss::read_index(file_name.c_str()));
return std::make_shared<Index>(raw_index);
}
}
}
}
#endif
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#if 0
#pragma once
#include <string>
#include <memory>
#include <iostream>
#include <sstream>
namespace zilliz {
namespace milvus {
namespace engine {
struct Operand {
friend std::ostream &operator<<(std::ostream &os, const Operand &obj);
friend std::istream &operator>>(std::istream &is, Operand &obj);
int d;
std::string index_type = "IVF";
std::string metric_type = "L2"; //> L2 / IP(Inner Product)
std::string preproc;
std::string postproc = "Flat";
std::string index_str;
int ncent = 0;
std::string get_index_type(const int &nb);
};
using Operand_ptr = std::shared_ptr<Operand>;
extern std::string operand_to_str(const Operand_ptr &opd);
extern Operand_ptr str_to_operand(const std::string &input);
}
}
}
#endif
......@@ -2,7 +2,7 @@ ARROW_VERSION=zilliz
BOOST_VERSION=1.70.0
BZIP2_VERSION=1.0.6
EASYLOGGINGPP_VERSION=v9.96.7
FAISS_VERSION=v1.5.3
FAISS_VERSION=branch-0.1.0
MKL_VERSION=2019.4.243
GTEST_VERSION=1.8.1
JSONCONS_VERSION=0.126.0
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册