提交 0df4068c 编写于 作者: Y yu yunfeng

add metrics


Former-commit-id: 357b4f65b49d91c85a83d00513dfea830372341f
上级 ec579df5
......@@ -4,32 +4,10 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
cmake_minimum_required(VERSION 3.14)
message(STATUS "Building using CMake version: ${CMAKE_VERSION}")
cmake_minimum_required(VERSION 3.12)
set(MEGASEARCH_VERSION "0.1.0")
string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" MEGASEARCH_BASE_VERSION "${MEGASEARCH_VERSION}")
project(megasearch VERSION "${MEGASEARCH_BASE_VERSION}")
project(vecwise_engine LANGUAGES CUDA CXX)
set(MEGASEARCH_VERSION_MAJOR "${megasearch_VERSION_MAJOR}")
set(MEGASEARCH_VERSION_MINOR "${megasearch_VERSION_MINOR}")
set(MEGASEARCH_VERSION_PATCH "${megasearch_VERSION_PATCH}")
if(MEGASEARCH_VERSION_MAJOR STREQUAL ""
OR MEGASEARCH_VERSION_MINOR STREQUAL ""
OR MEGASEARCH_VERSION_PATCH STREQUAL "")
message(FATAL_ERROR "Failed to determine MegaSearch version from '${MEGASEARCH_VERSION}'")
endif()
message(STATUS "MegaSearch version: "
"${MEGASEARCH_VERSION_MAJOR}.${MEGASEARCH_VERSION_MINOR}.${MEGASEARCH_VERSION_PATCH} "
"(full: '${MEGASEARCH_VERSION}')")
set(MEGASEARCH_SOURCE_DIR ${PROJECT_SOURCE_DIR})
set(MEGASEARCH_BINARY_DIR ${PROJECT_BINARY_DIR})
find_package(CUDA)
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -Xcompiler -fPIC -std=c++11 -D_FORCE_INLINES -arch sm_60 --expt-extended-lambda")
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O0 -g")
......@@ -54,31 +32,28 @@ else()
set(VECWISE_BUILD_ARCH unknown)
endif()
if(DEFINED UNIX)
message("building vecwise on Unix")
set(VECWISE_BUILD_SYSTEM macos)
elseif(DEFINED APPLE)
message("building vecwise on MacOS")
set(VECWISE_BUILD_SYSTEM unix)
else()
message("unknown OS")
set(VECWISE_BUILD_SYSTEM unknown)
endif ()
if(CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE")
if (GPU_VERSION STREQUAL "ON")
set(ENABLE_LICENSE "ON")
add_definitions("-DENABLE_LICENSE")
endif ()
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g -fPIC -DELPP_THREAD_SAFE")
endif()
if (GPU_VERSION STREQUAL "ON")
set(ENABLE_LICENSE "ON")
add_definitions("-DENABLE_LICENSE")
endif ()
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
if (BUILD_UNIT_TEST)
option(MEGASEARCH_BUILD_TESTS "Build the megasearch test suite" ON)
endif(BUILD_UNIT_TEST)
include(ExternalProject)
include(ThirdPartyPackages)
include_directories(${MEGASEARCH_SOURCE_DIR})
link_directories(${MEGASEARCH_BINARY_DIR})
## Following should be check
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" ${CMAKE_MODULE_PATH})
set(VECWISE_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include)
set(VECWISE_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src)
......@@ -97,9 +72,11 @@ link_directories(${VECWISE_THIRD_PARTY_BUILD}/lib64)
#execute_process(COMMAND bash build.sh
# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party)
add_subdirectory(src)
add_subdirectory(test_client)
#add_subdirectory(unittest)
add_subdirectory(unittest)
if (BUILD_UNIT_TEST)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/unittest)
......
......@@ -4,22 +4,25 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
aux_source_directory(cache cache_files)
aux_source_directory(config config_files)
aux_source_directory(server server_files)
aux_source_directory(utils utils_files)
aux_source_directory(db db_files)
aux_source_directory(wrapper wrapper_files)
aux_source_directory(metrics metrics_files)
set(license_check_files
license/LicenseLibrary.cpp
license/LicenseCheck.cpp
)
set(license_generator_src
set(license_generator_files
license/LicenseGenerator.cpp
license/LicenseLibrary.cpp
)
../unittest/metrics/counter_test.cpp ../unittest/metrics/metrics_test.cpp)
set(service_files
thrift/gen-cpp/VecService.cpp
......@@ -27,16 +30,18 @@ set(service_files
thrift/gen-cpp/megasearch_types.cpp
)
set(vecwise_engine_src
set(vecwise_engine_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
${cache_files}
${db_files}
${wrapper_files}
${metrics_files}
)
set(get_sys_info_src
set(get_sys_info_files
license/GetSysInfo.cpp)
include_directories(/usr/include)
include_directories(/usr/local/cuda/include)
......@@ -52,6 +57,9 @@ if (GPU_VERSION STREQUAL "ON")
cudart
cublas
libsqlite3.a
libprometheus-cpp-push.a
libprometheus-cpp-pull.a
libprometheus-cpp-core.a
)
else()
set(engine_libs
......@@ -62,6 +70,9 @@ else()
libgfortran.a
libquadmath.a
libsqlite3.a
libprometheus-cpp-push.a
libprometheus-cpp-pull.a
libprometheus-cpp-core.a
)
endif ()
......@@ -80,7 +91,8 @@ if (ENABLE_LICENSE STREQUAL "ON")
endif ()
cuda_add_library(vecwise_engine STATIC ${vecwise_engine_src})
cuda_add_library(vecwise_engine STATIC ${vecwise_engine_files})
target_link_libraries(vecwise_engine ${engine_libs})
if (ENABLE_LICENSE STREQUAL "ON")
......@@ -88,14 +100,26 @@ if (ENABLE_LICENSE STREQUAL "ON")
target_link_libraries(vecwise_license ${license_libs})
endif ()
#set(metrics_lib
# libprometheus-cpp-push.a
# libprometheus-cpp-pull.a
# libprometheus-cpp-core.a
# )
add_library(metrics STATIC ${metrics_files})
#target_link_libraries(metrics ${metrics_lib})
add_executable(vecwise_server
${config_files}
${server_files}
${utils_files}
${service_files}
# ${metrics_files}
${VECWISE_THIRD_PARTY_BUILD}/include/easylogging++.cc
)
set(server_libs
vecwise_engine
librocksdb.a
......@@ -119,11 +143,17 @@ else ()
endif()
if (ENABLE_LICENSE STREQUAL "ON")
add_executable(license_generator ${license_generator_src})
add_executable(get_sys_info ${get_sys_info_src})
add_executable(license_generator ${license_generator_files})
target_link_libraries(get_sys_info ${license_libs} vecwise_license)
target_link_libraries(license_generator ${license_libs})
install(TARGETS get_sys_info DESTINATION bin)
endif ()
install(TARGETS vecwise_server DESTINATION bin)
\ No newline at end of file
install(TARGETS vecwise_server DESTINATION bin)
#target_link_libraries(
# libprometheus-cpp-push.a
# libprometheus-cpp-pull.a
# libprometheus-cpp-core.a
# pthread
# z
# ${CURL_LIBRARIES})
\ No newline at end of file
......@@ -5,6 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
......@@ -37,7 +38,7 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) {
if(cache_ == nullptr) {
return nullptr;
}
METRICS_INSTANCE.CacheAccessTotalIncrement();
return cache_->get(key);
}
......@@ -56,6 +57,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
}
cache_->insert(key, data);
METRICS_INSTANCE.CacheAccessTotalIncrement();
}
void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) {
......@@ -65,6 +67,7 @@ void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index
DataObjPtr obj = std::make_shared<DataObj>(index);
cache_->insert(key, obj);
METRICS_INSTANCE.CacheAccessTotalIncrement();
}
void CacheMgr::EraseItem(const std::string& key) {
......@@ -73,6 +76,7 @@ void CacheMgr::EraseItem(const std::string& key) {
}
cache_->erase(key);
METRICS_INSTANCE.CacheAccessTotalIncrement();
}
void CacheMgr::PrintInfo() {
......
......@@ -13,15 +13,18 @@
#include <cstring>
#include <easylogging++.h>
#include <cache/CpuCacheMgr.h>
#include "../utils/Log.h"
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
namespace engine {
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
: _env(options.env),
......@@ -36,41 +39,92 @@ DBImpl<EngineT>::DBImpl(const Options& options)
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::GroupSchema& group_info) {
return _pMeta->add_group(group_info);
Status result = _pMeta->add_group(group_info);
if(result.ok()){
// SERVER_LOG_INFO << "add_group request successed";
// server::Metrics::GetInstance().add_group_success_total().Increment();
} else{
// SERVER_LOG_INFO << "add_group request failed";
// server::Metrics::GetInstance().add_group_fail_total().Increment();
}
return result;
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
return _pMeta->get_group(group_info);
Status result = _pMeta->get_group(group_info);
if(result.ok()){
// SERVER_LOG_INFO << "get_group request successed";
// server::Metrics::GetInstance().get_group_success_total().Increment();
} else{
// SERVER_LOG_INFO << "get_group request failed";
// server::Metrics::GetInstance().get_group_fail_total().Increment();
}
return result;
}
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
return _pMeta->has_group(group_id_, has_or_not_);
Status result = _pMeta->has_group(group_id_, has_or_not_);
if(result.ok()){
// SERVER_LOG_INFO << "has_group request successed";
// server::Metrics::GetInstance().has_group_success_total().Increment();
} else{
// SERVER_LOG_INFO << "has_group request failed";
// server::Metrics::GetInstance().has_group_fail_total().Increment();
}
return result;
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group_files(const std::string& group_id,
const int date_delta,
meta::GroupFilesSchema& group_files_info) {
return _pMeta->get_group_files(group_id, date_delta, group_files_info);
Status result = _pMeta->get_group_files(group_id, date_delta, group_files_info);
if(result.ok()){
// SERVER_LOG_INFO << "get_group_files request successed";
// server::Metrics::GetInstance().get_group_files_success_total().Increment();
} else{
// SERVER_LOG_INFO << "get_group_files request failed";
// server::Metrics::GetInstance().get_group_files_fail_total().Increment();
}
return result;
}
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
auto start_time = METRICS_NOW_TIME;
Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
METRICS_INSTANCE.AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (!status.ok()) {
METRICS_INSTANCE.AddVectorsFailTotalIncrement(n);
return status;
}
METRICS_INSTANCE.AddVectorsSuccessTotalIncrement(n);
}
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string &group_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
meta::DatesT dates = {meta::Meta::GetDate()};
return search(group_id, k, nq, vectors, dates, results);
}
template<typename EngineT>
......@@ -132,11 +186,33 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
index.Load();
auto file_size = index.PhysicalSize()/(1024*1024);
search_set_size += file_size;
LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
<< file_size << " M";
int inner_k = index.Count() < k ? index.Count() : k;
auto start_time = METRICS_NOW_TIME;
index.Search(nq, vectors, inner_k, output_distence, output_ids);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
if(file.file_type == meta::GroupFileSchema::RAW) {
METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time);
METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024);
} else if(file.file_type == meta::GroupFileSchema::TO_INDEX) {
METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time);
METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024);
} else {
METRICS_INSTANCE.SearchIndexDataDurationSecondsHistogramObserve(total_time);
METRICS_INSTANCE.IndexFileSizeHistogramObserve(file_size*1024*1024);
METRICS_INSTANCE.IndexFileSizeTotalIncrement(file_size*1024*1024);
METRICS_INSTANCE.IndexFileSizeGaugeSet(file_size*1024*1024);
}
cluster(output_ids, output_distence, inner_k); // cluster to each query
memset(output_distence, 0, k * nq * sizeof(float));
memset(output_ids, 0, k * nq * sizeof(long));
......@@ -269,8 +345,14 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
long index_size = 0;
for (auto& file : files) {
auto start_time = METRICS_NOW_TIME;
index.Merge(file.location);
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
updated.push_back(file_schema);
LOG(DEBUG) << "Merging file " << file_schema.file_id;
......@@ -279,6 +361,7 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
if (index_size >= _options.index_trigger_size) break;
}
index.Serialize();
if (index_size >= _options.index_trigger_size) {
......@@ -340,7 +423,11 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
EngineT to_index(file.dimension, file.location);
to_index.Load();
auto start_time = METRICS_NOW_TIME;
auto index = to_index.BuildIndex(group_file.location);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
METRICS_INSTANCE.BuildIndexDurationSecondsHistogramObserve(total_time);
group_file.file_type = meta::GroupFileSchema::INDEX;
group_file.rows = index->Size();
......
......@@ -14,6 +14,7 @@
#include <easylogging++.h>
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
......@@ -105,6 +106,7 @@ Status DBMetaImpl::initialize() {
}
Status DBMetaImpl::add_group(GroupSchema& group_info) {
METRICS_INSTANCE.MetaAccessTotalIncrement();
if (group_info.group_id == "") {
std::stringstream ss;
SimpleIDGenerator g;
......@@ -113,7 +115,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
}
group_info.files_cnt = 0;
group_info.id = -1;
auto start_time = METRICS_NOW_TIME;
{
try {
auto id = ConnectorPtr->insert(group_info);
......@@ -123,6 +125,9 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
return Status::DBTransactionError("Add Group Error");
}
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
auto group_path = GetGroupPath(group_info.group_id);
......@@ -143,11 +148,16 @@ Status DBMetaImpl::get_group(GroupSchema& group_info) {
Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto groups = ConnectorPtr->select(columns(&GroupSchema::id,
&GroupSchema::group_id,
&GroupSchema::files_cnt,
&GroupSchema::dimension),
where(c(&GroupSchema::group_id) == group_info.group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
assert(groups.size() <= 1);
if (groups.size() == 1) {
group_info.id = std::get<0>(groups[0]);
......@@ -166,8 +176,13 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto groups = ConnectorPtr->select(columns(&GroupSchema::id),
where(c(&GroupSchema::group_id) == group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
assert(groups.size() <= 1);
if (groups.size() == 1) {
has_or_not = true;
......@@ -204,7 +219,12 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
{
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto id = ConnectorPtr->insert(group_file);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
group_file.id = id;
/* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */
} catch (...) {
......@@ -229,6 +249,8 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
files.clear();
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time =METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
......@@ -236,6 +258,9 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
&GroupFileSchema::rows,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
std::map<std::string, GroupSchema> groups;
......@@ -277,6 +302,8 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
const DatesT& dates = (partition.empty() == true) ? today : partition;
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
......@@ -288,7 +315,9 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
(c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or
c(&GroupFileSchema::file_type) == (int) GroupFileSchema::TO_INDEX or
c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX)));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
......@@ -325,6 +354,8 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
files.clear();
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
......@@ -333,7 +364,9 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and
c(&GroupFileSchema::group_id) == group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
......@@ -389,7 +422,12 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_,
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
group_file.updated_time = GetMicroSecTimeStamp();
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
ConnectorPtr->update(group_file);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
/* auto commited = ConnectorPtr->transaction([&] () mutable { */
/* ConnectorPtr->update(group_file); */
/* return true; */
......@@ -407,11 +445,16 @@ Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
file.updated_time = GetMicroSecTimeStamp();
ConnectorPtr->update(file);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
return true;
});
if (!commited) {
......@@ -500,13 +543,17 @@ Status DBMetaImpl::cleanup() {
Status DBMetaImpl::count(const std::string& group_id, long& result) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows,
&GroupFileSchema::date),
where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and
c(&GroupFileSchema::group_id) == group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
......
......@@ -16,6 +16,7 @@
#include <cache/CpuCacheMgr.h>
#include "FaissExecutionEngine.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
......@@ -65,6 +66,7 @@ template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Load() {
auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool to_cache = false;
auto start_time = METRICS_NOW_TIME;
if (!index) {
index = read_index(location_);
to_cache = true;
......@@ -74,6 +76,16 @@ Status FaissExecutionEngine<IndexTrait>::Load() {
pIndex_ = index->data();
if (to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
METRICS_INSTANCE.FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double total_size = (pIndex_->d) * (pIndex_->ntotal) * 4;
METRICS_INSTANCE.FaissDiskLoadSizeBytesHistogramObserve(total_size);
METRICS_INSTANCE.FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time));
}
return Status::OK();
}
......
......@@ -74,7 +74,7 @@ LicenseCheck::AlterFile(const std::string &license_file_path,
{
exit(1);
}
// printf("---runing---\n");
printf("---runing---\n");
pt->expires_at(pt->expires_at() + boost::posix_time::hours(1));
pt->async_wait(boost::bind(AlterFile, license_file_path, boost::asio::placeholders::error, pt));
return SERVER_SUCCESS;
......@@ -83,7 +83,8 @@ LicenseCheck::AlterFile(const std::string &license_file_path,
ServerError
LicenseCheck::StartCountingDown(const std::string &license_file_path) {
if (!LicenseLibrary::IsFileExistent(license_file_path)) exit(1);
if (!LicenseLibrary::IsFileExistent(license_file_path)) return SERVER_LICENSE_FILE_NOT_EXIST;
boost::asio::io_service io;
boost::asio::deadline_timer t(io, boost::posix_time::hours(1));
t.async_wait(boost::bind(AlterFile, license_file_path, boost::asio::placeholders::error, &t));
......
......@@ -36,6 +36,7 @@ class LicenseCheck {
static ServerError
StartCountingDown(const std::string &license_file_path);
private:
};
......
......@@ -324,7 +324,7 @@ LicenseLibrary::GPUinfoFileDeserialization(const std::string &path,
}
ServerError
LicenseLibrary::GetDateTime(const char *cha, time_t &data_time) {
LicenseLibrary::GetDateTime(char *cha, time_t &data_time) {
tm tm_;
int year, month, day;
sscanf(cha, "%d-%d-%d", &year, &month, &day);
......
......@@ -92,7 +92,7 @@ class LicenseLibrary {
std::map<int, std::string> &uuid_encrption_map);
static ServerError
GetDateTime(const char *cha, time_t &data_time);
GetDateTime(char *cha, time_t &data_time);
private:
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Metrics.h"
namespace zilliz {
namespace vecwise {
namespace server {
ServerError
PrometheusMetrics::Init() {
ConfigNode& configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true:false;
// Following should be read from config file.
const std::string bind_address = "8080";
const std::string uri = std::string("/metrics");
const std::size_t num_threads = 2;
// Init Exposer
exposer_ptr_ = std::make_shared<prometheus::Exposer>(bind_address, uri, num_threads);
// Exposer Registry
exposer_ptr_->RegisterCollectable(registry_);
return SERVER_SUCCESS;
}
}
}
}
此差异已折叠。
#include "MegaSearch.h"
namespace megasearch {
std::shared_ptr<Connection>
Create() {
return nullptr;
}
Status
Destroy(std::shared_ptr<Connection> &connection_ptr) {
return Status::OK();
}
/**
Status
Connection::Connect(const ConnectParam &param) {
return Status::NotSupported("Connect interface is not supported.");
}
Status
Connection::Connect(const std::string &uri) {
return Status::NotSupported("Connect interface is not supported.");
}
Status
Connection::Connected() const {
return Status::NotSupported("Connected interface is not supported.");
}
Status
Connection::Disconnect() {
return Status::NotSupported("Disconnect interface is not supported.");
}
std::string
Connection::ClientVersion() const {
return std::string("Current Version");
}
Status
Connection::CreateTable(const TableSchema &param) {
return Status::NotSupported("Create table interface interface is not supported.");
}
Status
Connection::CreateTablePartition(const CreateTablePartitionParam &param) {
return Status::NotSupported("Create table partition interface is not supported.");
}
Status
Connection::DeleteTablePartition(const DeleteTablePartitionParam &param) {
return Status::NotSupported("Delete table partition interface is not supported.");
}
Status
Connection::DeleteTable(const std::string &table_name) {
return Status::NotSupported("Create table interface is not supported.");
}
Status
Connection::AddVector(const std::string &table_name,
const std::vector<RowRecord> &record_array,
std::vector<int64_t> &id_array) {
return Status::NotSupported("Add vector array interface is not supported.");
}
Status
Connection::SearchVector(const std::string &table_name,
const std::vector<QueryRecord> &query_record_array,
std::vector<TopKQueryResult> &topk_query_result_array,
int64_t topk) {
return Status::NotSupported("Query vector array interface is not supported.");
}
Status
Connection::DescribeTable(const std::string &table_name, TableSchema &table_schema) {
return Status::NotSupported("Show table interface is not supported.");
}
Status
Connection::ShowTables(std::vector<std::string> &table_array) {
return Status::NotSupported("List table array interface is not supported.");
}
std::string
Connection::ServerVersion() const {
return std::string("Server version.");
}
std::string
Connection::ServerStatus() const {
return std::string("Server status");
}
**/
}
\ No newline at end of file
#pragma once
#include "Status.h"
#include <string>
#include <vector>
#include <map>
#include <memory>
/** \brief MegaSearch SDK namespace
*/
namespace megasearch {
/**
* @brief Column Type
*/
enum class ColumnType {
invalid,
int8,
int16,
int32,
int64,
float32,
float64,
date,
vector
};
/**
* @brief Index Type
*/
enum class IndexType {
raw,
ivfflat
};
/**
* @brief Connect API parameter
*/
struct ConnectParam {
std::string ip_address; ///< Server IP address
std::string port; ///< Server PORT
};
/**
* @brief Table column description
*/
struct Column {
ColumnType type = ColumnType::invalid; ///< Column Type: enum ColumnType
std::string name; ///< Column name
};
/**
* @brief Table vector column description
*/
struct VectorColumn : public Column {
VectorColumn() { type = ColumnType::vector; }
int64_t dimension = 0; ///< Vector dimension
IndexType index_type = IndexType::raw; ///< Index type
bool store_raw_vector = false; ///< Is vector self stored in the table
};
/**
* @brief Table Schema
*/
struct TableSchema {
std::string table_name; ///< Table name
std::vector<VectorColumn> vector_column_array; ///< Vector column description
std::vector<Column> attribute_column_array; ///< Columns description
std::vector<std::string> partition_column_name_array; ///< Partition column name
};
/**
* @brief Range information
*/
struct Range {
std::string start_value; ///< Range start
std::string end_value; ///< Range stop
};
/**
* @brief Create table partition parameters
*/
struct CreateTablePartitionParam {
std::string table_name; ///< Table name, vector/float32/float64 type column is not allowed for partition
std::string partition_name; ///< Partition name, created partition name
std::map<std::string, Range> range_map; ///< Column name to PartitionRange map
};
/**
* @brief Delete table partition parameters
*/
struct DeleteTablePartitionParam {
std::string table_name; ///< Table name
std::vector<std::string> partition_name_array; ///< Partition name array
};
/**
* @brief Record inserted
*/
struct RowRecord {
std::map<std::string, std::vector<float>> vector_map; ///< Vector columns
std::map<std::string, std::string> attribute_map; ///< Other attribute columns
};
/**
* @brief Query record
*/
struct QueryRecord {
std::map<std::string, std::vector<float>> vector_map; ///< Query vectors
std::vector<std::string> selected_column_array; ///< Output column array
std::map<std::string, std::vector<Range>> partition_filter_column_map; ///< Range used to select partitions
};
/**
* @brief Query result
*/
struct QueryResult {
int64_t id; ///< Output result
double score; ///< Vector similarity score: 0 ~ 100
std::map<std::string, std::string> column_map; ///< Other column
};
/**
* @brief TopK query result
*/
struct TopKQueryResult {
std::vector<QueryResult> query_result_arrays; ///< TopK query result
};
/**
* @brief SDK main class
*/
class Connection {
public:
/**
* @brief CreateConnection
*
* Create a connection instance and return it's shared pointer
*
* @return Connection instance pointer
*/
static std::shared_ptr<Connection>
Create();
/**
* @brief DestroyConnection
*
* Destroy the connection instance
*
* @param connection, the shared pointer to the instance to be destroyed
*
* @return if destroy is successful
*/
static Status
Destroy(std::shared_ptr<Connection> connection_ptr);
/**
* @brief Connect
*
* Connect function should be called before any operations
* Server will be connected after Connect return OK
*
* @param param, use to provide server information
*
* @return Indicate if connect is successful
*/
virtual Status Connect(const ConnectParam &param) = 0;
/**
* @brief Connect
*
* Connect function should be called before any operations
* Server will be connected after Connect return OK
*
* @param uri, use to provide server information, example: megasearch://ipaddress:port
*
* @return Indicate if connect is successful
*/
virtual Status Connect(const std::string &uri) = 0;
/**
* @brief connected
*
* Connection status.
*
* @return Indicate if connection status
*/
virtual Status Connected() const = 0;
/**
* @brief Disconnect
*
* Server will be disconnected after Disconnect return OK
*
* @return Indicate if disconnect is successful
*/
virtual Status Disconnect() = 0;
/**
* @brief Create table method
*
* This method is used to create table
*
* @param param, use to provide table information to be created.
*
* @return Indicate if table is created successfully
*/
virtual Status CreateTable(const TableSchema &param) = 0;
/**
* @brief Delete table method
*
* This method is used to delete table.
*
* @param table_name, table name is going to be deleted.
*
* @return Indicate if table is delete successfully.
*/
virtual Status DeleteTable(const std::string &table_name) = 0;
/**
* @brief Create table partition
*
* This method is used to create table partition.
*
* @param param, use to provide partition information to be created.
*
* @return Indicate if table partition is created successfully.
*/
virtual Status CreateTablePartition(const CreateTablePartitionParam &param) = 0;
/**
* @brief Delete table partition
*
* This method is used to delete table partition.
*
* @param param, use to provide partition information to be deleted.
*
* @return Indicate if table partition is delete successfully.
*/
virtual Status DeleteTablePartition(const DeleteTablePartitionParam &param) = 0;
/**
* @brief Add vector to table
*
* This method is used to add vector array to table.
*
* @param table_name, table_name is inserted.
* @param record_array, vector array is inserted.
* @param id_array, after inserted every vector is given a id.
*
* @return Indicate if vector array are inserted successfully
*/
virtual Status AddVector(const std::string &table_name,
const std::vector<RowRecord> &record_array,
std::vector<int64_t> &id_array) = 0;
/**
* @brief Search vector
*
* This method is used to query vector in table.
*
* @param table_name, table_name is queried.
* @param query_record_array, all vector are going to be queried.
* @param topk_query_result_array, result array.
* @param topk, how many similarity vectors will be searched.
*
* @return Indicate if query is successful.
*/
virtual Status SearchVector(const std::string &table_name,
const std::vector<QueryRecord> &query_record_array,
std::vector<TopKQueryResult> &topk_query_result_array,
int64_t topk) = 0;
/**
* @brief Show table description
*
* This method is used to show table information.
*
* @param table_name, which table is show.
* @param table_schema, table_schema is given when operation is successful.
*
* @return Indicate if this operation is successful.
*/
virtual Status DescribeTable(const std::string &table_name, TableSchema &table_schema) = 0;
/**
* @brief Show all tables in database
*
* This method is used to list all tables.
*
* @param table_array, all tables are push into the array.
*
* @return Indicate if this operation is successful.
*/
virtual Status ShowTables(std::vector<std::string> &table_array) = 0;
/**
* @brief Give the client version
*
* This method is used to give the client version.
*
* @return Client version.
*/
virtual std::string ClientVersion() const = 0;
/**
* @brief Give the server version
*
* This method is used to give the server version.
*
* @return Server version.
*/
virtual std::string ServerVersion() const = 0;
/**
* @brief Give the server status
*
* This method is used to give the server status.
*
* @return Server status.
*/
virtual std::string ServerStatus() const = 0;
};
}
\ No newline at end of file
#include "Status.h"
namespace megasearch {
Status::~Status() noexcept {
if (state_ != nullptr) {
delete state_;
state_ = nullptr;
}
}
static inline std::ostream &operator<<(std::ostream &os, const Status &x) {
os << x.ToString();
return os;
}
void Status::MoveFrom(Status &s) {
delete state_;
state_ = s.state_;
s.state_ = nullptr;
}
Status::Status(const Status &s)
: state_((s.state_ == nullptr) ? nullptr : new State(*s.state_)) {}
Status &Status::operator=(const Status &s) {
if (state_ != s.state_) {
CopyFrom(s);
}
return *this;
}
Status &Status::operator=(Status &&s) noexcept {
MoveFrom(s);
return *this;
}
Status Status::operator&(const Status &status) const noexcept {
if (ok()) {
return status;
} else {
return *this;
}
}
Status Status::operator&(Status &&s) const noexcept {
if (ok()) {
return std::move(s);
} else {
return *this;
}
}
Status &Status::operator&=(const Status &s) noexcept {
if (ok() && !s.ok()) {
CopyFrom(s);
}
return *this;
}
Status &Status::operator&=(Status &&s) noexcept {
if (ok() && !s.ok()) {
MoveFrom(s);
}
return *this;
}
Status::Status(StatusCode code, const std::string &message) {
state_ = new State;
state_->code = code;
state_->message = message;
}
void Status::CopyFrom(const Status &status) {
delete state_;
if (status.state_ == nullptr) {
state_ = nullptr;
} else {
state_ = new State(*status.state_);
}
}
std::string Status::CodeAsString() const {
if (state_ == nullptr) {
return "OK";
}
const char *type = nullptr;
switch (code()) {
case StatusCode::OK: type = "OK";
break;
case StatusCode::Invalid: type = "Invalid";
break;
case StatusCode::UnknownError: type = "Unknown error";
break;
case StatusCode::NotSupported: type = "Not Supported";
break;
default: type = "Unknown";
break;
}
return std::string(type);
}
std::string Status::ToString() const {
std::string result(CodeAsString());
if (state_ == nullptr) {
return result;
}
result += ": ";
result += state_->message;
return result;
}
}
\ No newline at end of file
#pragma once
#include <string>
#include <sstream>
/** \brief MegaSearch SDK namespace
*/
namespace megasearch {
/**
* @brief Status Code for SDK interface return
*/
enum class StatusCode {
OK = 0,
Invalid = 1,
UnknownError = 2,
NotSupported = 3
};
/**
* @brief Status for SDK interface return
*/
class Status {
public:
/**
* @brief Status
*
* Default constructor.
*
*/
Status() = default;
/**
* @brief Status
*
* Destructor.
*
*/
~Status() noexcept;
/**
* @brief Status
*
* Constructor
*
* @param code, status code.
* @param message, status message.
*
*/
Status(StatusCode code, const std::string &message);
/**
* @brief Status
*
* Copy constructor
*
* @param status, status to be copied.
*
*/
inline Status(const Status &status);
/**
* @brief Status
*
* Assignment operator
*
* @param status, status to be copied.
* @return, the status is assigned.
*
*/
inline Status &operator=(const Status &s);
/**
* @brief Status
*
* Move constructor
*
* @param status, status to be moved.
*
*/
inline Status(Status &&s) noexcept : state_(s.state_) {};
/**
* @brief Status
*
* Move assignment operator
*
* @param status, status to be moved.
* @return, the status is moved.
*
*/
inline Status &operator=(Status &&s) noexcept;
/**
* @brief Status
*
* AND operator
*
* @param status, status to be AND.
* @return, the status after AND operation.
*
*/
inline Status operator&(const Status &s) const noexcept;
/**
* @brief Status
*
* AND operator
*
* @param status, status to be AND.
* @return, the status after AND operation.
*
*/
inline Status operator&(Status &&s) const noexcept;
/**
* @brief Status
*
* AND operator
*
* @param status, status to be AND.
* @return, the status after AND operation.
*
*/
inline Status &operator&=(const Status &s) noexcept;
/**
* @brief Status
*
* AND operator
*
* @param status, status to be AND.
* @return, the status after AND operation.
*
*/
inline Status &operator&=(Status &&s) noexcept;
/**
* @brief OK
*
* static OK status constructor
*
* @return, the status with OK.
*
*/
static Status OK() { return Status(); }
/**
* @brief OK
*
* static OK status constructor with a specific message
*
* @param, serveral specific messages
* @return, the status with OK.
*
*/
template<typename... Args>
static Status OK(Args &&... args) {
return Status(StatusCode::OK, MessageBuilder(std::forward<Args>(args)...));
}
/**
* @brief Invalid
*
* static Invalid status constructor with a specific message
*
* @param, serveral specific messages
* @return, the status with Invalid.
*
*/
template<typename... Args>
static Status Invalid(Args &&... args) {
return Status(StatusCode::Invalid,
MessageBuilder(std::forward<Args>(args)...));
}
/**
* @brief Unknown Error
*
* static unknown error status constructor with a specific message
*
* @param, serveral specific messages
* @return, the status with unknown error.
*
*/
template<typename... Args>
static Status UnknownError(Args &&... args) {
return Status(StatusCode::UnknownError, MessageBuilder(std::forward<Args>(args)...));
}
/**
* @brief not supported Error
*
* static not supported status constructor with a specific message
*
* @param, serveral specific messages
* @return, the status with not supported error.
*
*/
template<typename... Args>
static Status NotSupported(Args &&... args) {
return Status(StatusCode::NotSupported, MessageBuilder(std::forward<Args>(args)...));
}
/**
* @brief ok
*
* Return true iff the status indicates success.
*
* @return, if the status indicates success.
*
*/
bool ok() const { return (state_ == nullptr); }
/**
* @brief IsInvalid
*
* Return true iff the status indicates invalid.
*
* @return, if the status indicates invalid.
*
*/
bool IsInvalid() const { return code() == StatusCode::Invalid; }
/**
* @brief IsUnknownError
*
* Return true iff the status indicates unknown error.
*
* @return, if the status indicates unknown error.
*
*/
bool IsUnknownError() const { return code() == StatusCode::UnknownError; }
/**
* @brief IsNotSupported
*
* Return true iff the status indicates not supported.
*
* @return, if the status indicates not supported.
*
*/
bool IsNotSupported() const { return code() == StatusCode::NotSupported; }
/**
* @brief ToString
*
* Return error message string.
*
* @return, error message string.
*
*/
std::string ToString() const;
/**
* @brief CodeAsString
*
* Return a string representation of the status code.
*
* @return, a string representation of the status code.
*
*/
std::string CodeAsString() const;
/**
* @brief code
*
* Return the StatusCode value attached to this status.
*
* @return, the status code value attached to this status.
*
*/
StatusCode code() const { return ok() ? StatusCode::OK : state_->code; }
/**
* @brief message
*
* Return the specific error message attached to this status.
*
* @return, the specific error message attached to this status.
*
*/
std::string message() const { return ok() ? "" : state_->message; }
private:
struct State {
StatusCode code;
std::string message;
};
// OK status has a `nullptr` state_. Otherwise, `state_` points to
// a `State` structure containing the error code and message.
State *state_ = nullptr;
void DeleteState() {
delete state_;
state_ = nullptr;
}
void CopyFrom(const Status &s);
inline void MoveFrom(Status &s);
template<typename Head>
static void MessageBuilderRecursive(std::stringstream &stream, Head &&head) {
stream << head;
}
template<typename Head, typename... Tail>
static void MessageBuilderRecursive(std::stringstream &stream, Head &&head, Tail &&... tail) {
MessageBuilderRecursive(stream, std::forward<Head>(head));
MessageBuilderRecursive(stream, std::forward<Tail>(tail)...);
}
template<typename... Args>
static std::string MessageBuilder(Args &&... args) {
std::stringstream stream;
MessageBuilderRecursive(stream, std::forward<Args>(args)...);
return stream.str();
}
};
}
\ No newline at end of file
......@@ -18,8 +18,6 @@ namespace zilliz {
namespace vecwise {
namespace server {
static const std::string ROCKSDB_DEFAULT_GROUP = "default";
RocksIdMapper::RocksIdMapper()
: db_(nullptr) {
OpenDb();
......@@ -30,8 +28,6 @@ RocksIdMapper::~RocksIdMapper() {
}
void RocksIdMapper::OpenDb() {
std::lock_guard<std::mutex> lck(db_mutex_);
if(db_) {
return;
}
......@@ -83,8 +79,6 @@ void RocksIdMapper::OpenDb() {
}
void RocksIdMapper::CloseDb() {
std::lock_guard<std::mutex> lck(db_mutex_);
for(auto& iter : column_handles_) {
delete iter.second;
}
......@@ -96,117 +90,7 @@ void RocksIdMapper::CloseDb() {
}
}
ServerError RocksIdMapper::AddGroup(const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return AddGroupInternal(group);
}
bool RocksIdMapper::IsGroupExist(const std::string& group) const {
std::lock_guard<std::mutex> lck(db_mutex_);
return IsGroupExistInternal(group);
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return PutInternal(nid, sid, group);
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
std::lock_guard<std::mutex> lck(db_mutex_);
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = PutInternal(nid[i], sid[i], group);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
std::lock_guard<std::mutex> lck(db_mutex_);
return GetInternal(nid, sid, group);
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
std::lock_guard<std::mutex> lck(db_mutex_);
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = GetInternal(nid[i], str_id, group);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return DeleteInternal(nid, group);
}
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return DeleteGroupInternal(group);
}
//internal methods(whitout lock)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ServerError RocksIdMapper::AddGroupInternal(const std::string& group) {
if(!IsGroupExistInternal(group)) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
try {//add group
rocksdb::ColumnFamilyHandle *cfh = nullptr;
rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
} else {
column_handles_.insert(std::make_pair(group, cfh));
}
} catch(std::exception& ex) {
SERVER_LOG_ERROR << "ID mapper failed to create group: " << ex.what();
return SERVER_UNEXPECTED_ERROR;
}
}
return SERVER_SUCCESS;
}
bool RocksIdMapper::IsGroupExistInternal(const std::string& group) const {
std::string group_name = group;
if(group_name.empty()){
group_name = ROCKSDB_DEFAULT_GROUP;
}
return (column_handles_.count(group_name) > 0 && column_handles_[group_name] != nullptr);
}
ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string& sid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
......@@ -220,12 +104,22 @@ ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string
return SERVER_UNEXPECTED_ERROR;
}
} else {
//try create group
if(AddGroupInternal(group) != SERVER_SUCCESS){
return SERVER_UNEXPECTED_ERROR;
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) == 0) {
try {//add group
rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString();
} else {
column_handles_.insert(std::make_pair(group, cfh));
}
} catch(std::exception& ex) {
std::cout << ex.what() << std::endl;
}
} else {
cfh = column_handles_[group];
}
rocksdb::ColumnFamilyHandle *cfh = column_handles_[group];
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), cfh, key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
......@@ -236,7 +130,23 @@ ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid, const std::string& group) const {
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i], group);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
sid = "";
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
......@@ -263,8 +173,28 @@ ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid,
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::string& group) {
if(db_ == nullptr) {
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id, group);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
......@@ -288,7 +218,7 @@ ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::str
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::DeleteGroupInternal(const std::string& group) {
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
......
......@@ -13,7 +13,6 @@
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
namespace zilliz {
namespace vecwise {
......@@ -24,9 +23,6 @@ public:
RocksIdMapper();
~RocksIdMapper();
ServerError AddGroup(const std::string& group) override;
bool IsGroupExist(const std::string& group) const override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
......@@ -40,22 +36,9 @@ private:
void OpenDb();
void CloseDb();
ServerError AddGroupInternal(const std::string& group);
bool IsGroupExistInternal(const std::string& group) const;
ServerError PutInternal(const std::string& nid, const std::string& sid, const std::string& group);
ServerError GetInternal(const std::string& nid, std::string& sid, const std::string& group) const;
ServerError DeleteInternal(const std::string& nid, const std::string& group);
ServerError DeleteGroupInternal(const std::string& group);
private:
rocksdb::DB* db_;
mutable std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
mutable std::mutex db_mutex_;
std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
};
}
......
......@@ -19,6 +19,8 @@
#include <unistd.h>
#include <string.h>
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
namespace server {
......@@ -133,6 +135,10 @@ Server::Daemonize() {
int
Server::Start() {
// server::Metrics::GetInstance().Init();
// server::Metrics::GetInstance().exposer_ptr()->RegisterCollectable(server::Metrics::GetInstance().registry_ptr());
METRICS_INSTANCE.Init();
if (daemonized_) {
Daemonize();
}
......@@ -160,8 +166,10 @@ Server::Start() {
exit(1);
}
std::thread counting_down(&server::LicenseCheck::StartCountingDown, license_file_path);
counting_down.detach();
if(server::LicenseCheck::StartCountingDown(license_file_path) != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "License counter start error";
exit(1);
}
#endif
// Handle Signal
......
......@@ -35,6 +35,10 @@ static const std::string CONFIG_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const std::string CONFIG_LICENSE = "license_config";
static const std::string CONFIG_LICENSE_PATH = "license_path";
static const std::string CONFIG_METRIC = "metric_config";
static const std::string CONFIG_METRIC_IS_STARTUP = "is_startup";
static const std::string CONFIG_METRIC_COLLECTOR = "collector";
class ServerConfig {
public:
static ServerConfig &GetInstance();
......
......@@ -39,17 +39,6 @@ SimpleIdMapper::~SimpleIdMapper() {
}
ServerError SimpleIdMapper::AddGroup(const std::string& group) {
if(id_groups_.count(group) == 0) {
id_groups_.insert(std::make_pair(group, ID_MAPPING()));
}
}
//not thread-safe
bool SimpleIdMapper::IsGroupExist(const std::string& group) const {
return id_groups_.count(group) > 0;
}
//not thread-safe
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
......
......@@ -25,9 +25,6 @@ public:
virtual ~IVecIdMapper(){}
virtual ServerError AddGroup(const std::string& group) = 0;
virtual bool IsGroupExist(const std::string& group) const = 0;
virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") = 0;
......@@ -44,9 +41,6 @@ public:
SimpleIdMapper();
~SimpleIdMapper();
ServerError AddGroup(const std::string& group) override;
bool IsGroupExist(const std::string& group) const override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
......
......@@ -87,7 +87,6 @@ BaseTaskPtr AddGroupTask::Create(int32_t dimension,
ServerError AddGroupTask::OnExecute() {
try {
IVecIdMapper::GetInstance()->AddGroup(group_id_);
engine::meta::GroupSchema group_info;
group_info.dimension = (size_t)dimension_;
group_info.group_id = group_id_;
......@@ -244,13 +243,6 @@ const AttribMap& AddVectorTask::GetVecAttrib() const {
ServerError AddVectorTask::OnExecute() {
try {
if(!IVecIdMapper::GetInstance()->IsGroupExist(group_id_)) {
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "group not exist";
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t vec_dim = GetVecDimension();
std::vector<float> vec_f;
vec_f.resize(vec_dim);
......@@ -488,14 +480,10 @@ ServerError AddBatchVectorTask::OnExecute() {
std::list<std::future<void>> threads_list;
uint64_t begin_index = 0, end_index = USE_MT;
while(true) {
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index, tensor_ids_));
if(end_index >= vec_count) {
break;
}
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册