提交 863cc5db 编写于 作者: X xiaojun.lin

Merge branch '0.6.0' into 0.6.0-#227

......@@ -10,21 +10,35 @@ Please mark all change in change log and use the ticket from JIRA.
- \#316 - Some files not merged after vectors added
- \#327 - Search does not use GPU when index type is FLAT
- \#340 - Test cases run failed on 0.6.0
- \#353 - Rename config.h.in to version.h.in
- \#374 - sdk_simple return empty result
- \#377 - Create partition success if tag name only contains spaces
- \#397 - sdk_simple return incorrect result
- \#399 - Create partition should be failed if partition tag existed
- \#412 - Message returned is confused when partition created with null partition name
- \#416 - Drop the same partition success repeatally
## Feature
- \#12 - Pure CPU version for Milvus
- \#77 - Support table partition
- \#226 - Experimental shards middleware for Milvus
- \#127 - Support new Index type IVFPQ
- \#226 - Experimental shards middleware for Milvus
- \#227 - Support new index types SPTAG-KDT and SPTAG-BKT
- \#346 - Support build index with multiple gpu
## Improvement
- \#255 - Add ivfsq8 test report detailed version
- \#260 - C++ SDK README
- \#266 - Rpc request source code refactor
- \#275 - Rename C++ SDK IndexType
- \#284 - Change C++ SDK to shared library
- \#260 - C++ SDK README
- \#314 - add Find FAISS in CMake
- \#306 - Use int64 for all config integer
- \#310 - Add Q&A for 'protocol https not supported or disable in libcurl' issue
- \#314 - add Find FAISS in CMake
- \#322 - Add option to enable / disable prometheus
- \#358 - Add more information in build.sh and install.md
- \#404 - Add virtual method Init() in Pass abstract class
- \#409 - Add a Fallback pass in optimizer
## Task
......
......@@ -16,7 +16,7 @@ Milvus is the world's fastest similarity search engine for massive-scale feature
For more detailed introduction of Milvus and its architecture, see [Milvus overview](https://www.milvus.io/docs/en/aboutmilvus/overview/).
Milvus provides stable [Python](https://github.com/milvus-io/pymilvus), [Java](https://github.com/milvus-io/milvus-sdk-java) and C++ APIs.
Milvus provides stable [Python](https://github.com/milvus-io/pymilvus), [Java](https://github.com/milvus-io/milvus-sdk-java) and [C++](https://github.com/milvus-io/milvus/tree/master/core/src/sdk) APIs.
Keep up-to-date with newest releases and latest updates by reading Milvus [release notes](https://www.milvus.io/docs/en/release/v0.5.0/).
......
......@@ -20,7 +20,7 @@
Milvus 是一款开源的、针对海量特征向量的相似性搜索引擎。基于异构众核计算框架设计,成本更低,性能更好。在有限的计算资源下,十亿向量搜索仅毫秒响应。
Milvus 提供稳定的 Python、Java 以及 C++ 的 API 接口。
Milvus 提供稳定的 [Python](https://github.com/milvus-io/pymilvus)[Java](https://github.com/milvus-io/milvus-sdk-java) 以及 [C++](https://github.com/milvus-io/milvus/tree/master/core/src/sdk) 的 API 接口。
通过 [版本发布说明](https://milvus.io/docs/zh-CN/release/v0.5.0/) 获取最新发行版本的 Milvus。
......
......@@ -2,7 +2,7 @@
String cron_timezone = "TZ=Asia/Shanghai"
String cron_string = BRANCH_NAME == "master" ? "H 0 * * * " : ""
cron_string = BRANCH_NAME == "0.5.1" ? "H 1 * * * " : cron_string
cron_string = BRANCH_NAME == "0.6.0" ? "H 1 * * * " : cron_string
pipeline {
agent none
......@@ -50,7 +50,7 @@ pipeline {
}
stages {
stage("Run Build") {
stage("Run GPU Version Build") {
agent {
kubernetes {
label "${BINRARY_VERSION}-build"
......@@ -60,7 +60,7 @@ pipeline {
}
stages {
stage('Build') {
stage('GPU Version Build') {
steps {
container('milvus-build-env') {
script {
......@@ -69,7 +69,7 @@ pipeline {
}
}
}
stage('Code Coverage') {
stage('GPU Version Code Coverage') {
steps {
container('milvus-build-env') {
script {
......@@ -78,7 +78,7 @@ pipeline {
}
}
}
stage('Upload Package') {
stage('Upload GPU Version Package') {
steps {
container('milvus-build-env') {
script {
......@@ -90,7 +90,7 @@ pipeline {
}
}
stage("Publish docker images") {
stage("Publish GPU Version docker images") {
agent {
kubernetes {
label "${BINRARY_VERSION}-publish"
......@@ -100,7 +100,7 @@ pipeline {
}
stages {
stage('Publish') {
stage('Publish GPU Version') {
steps {
container('publish-images'){
script {
......@@ -112,7 +112,7 @@ pipeline {
}
}
stage("Deploy to Development") {
stage("Deploy GPU Version to Development") {
agent {
kubernetes {
label "${BINRARY_VERSION}-dev-test"
......@@ -122,7 +122,7 @@ pipeline {
}
stages {
stage("Deploy to Dev") {
stage("Deploy GPU Version to Dev") {
steps {
container('milvus-test-env') {
script {
......@@ -132,7 +132,7 @@ pipeline {
}
}
stage("Dev Test") {
stage("GPU Version Dev Test") {
steps {
container('milvus-test-env') {
script {
......@@ -147,7 +147,7 @@ pipeline {
}
}
stage ("Cleanup Dev") {
stage ("Cleanup GPU Version Dev") {
steps {
container('milvus-test-env') {
script {
......@@ -180,7 +180,7 @@ pipeline {
}
stages {
stage("Run Build") {
stage("Run CPU Version Build") {
agent {
kubernetes {
label "${BINRARY_VERSION}-build"
......@@ -190,7 +190,7 @@ pipeline {
}
stages {
stage('Build') {
stage('Build CPU Version') {
steps {
container('milvus-build-env') {
script {
......@@ -199,7 +199,7 @@ pipeline {
}
}
}
stage('Code Coverage') {
stage('CPU Version Code Coverage') {
steps {
container('milvus-build-env') {
script {
......@@ -208,7 +208,7 @@ pipeline {
}
}
}
stage('Upload Package') {
stage('Upload CPU Version Package') {
steps {
container('milvus-build-env') {
script {
......@@ -220,7 +220,7 @@ pipeline {
}
}
stage("Publish docker images") {
stage("Publish CPU Version docker images") {
agent {
kubernetes {
label "${BINRARY_VERSION}-publish"
......@@ -230,7 +230,7 @@ pipeline {
}
stages {
stage('Publish') {
stage('Publish CPU Version') {
steps {
container('publish-images'){
script {
......@@ -242,7 +242,7 @@ pipeline {
}
}
stage("Deploy to Development") {
stage("Deploy CPU Version to Development") {
agent {
kubernetes {
label "${BINRARY_VERSION}-dev-test"
......@@ -252,7 +252,7 @@ pipeline {
}
stages {
stage("Deploy to Dev") {
stage("Deploy CPU Version to Dev") {
steps {
container('milvus-test-env') {
script {
......@@ -262,7 +262,7 @@ pipeline {
}
}
stage("Dev Test") {
stage("CPU Version Dev Test") {
steps {
container('milvus-test-env') {
script {
......@@ -277,7 +277,7 @@ pipeline {
}
}
stage ("Cleanup Dev") {
stage ("Cleanup CPU Version Dev") {
steps {
container('milvus-test-env') {
script {
......
......@@ -3,11 +3,7 @@ sh 'helm repo update'
dir ('milvus-helm') {
checkout([$class: 'GitSCM', branches: [[name: "0.6.0"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/0.6.0:refs/remotes/origin/0.6.0"]]])
dir ("milvus") {
if ("${env.BINRARY_VERSION}" == "gpu") {
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f gpu_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
} else {
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f ci/filebeat/values.yaml --namespace milvus ."
}
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f ci/db_backend/sqlite_${env.BINRARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
}
}
......@@ -13,11 +13,7 @@ timeout(time: 90, unit: 'MINUTES') {
}
dir ("milvus-helm") {
dir ("milvus") {
if ("${env.BINRARY_VERSION}" == "gpu") {
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f gpu_values.yaml -f ci/db_backend/mysql_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
} else {
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f ci/db_backend/mysql_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
}
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f ci/db_backend/mysql_${env.BINRARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
}
}
dir ("tests/milvus_python_test") {
......
......@@ -14,11 +14,7 @@ timeout(time: 60, unit: 'MINUTES') {
// }
// dir ("milvus-helm") {
// dir ("milvus") {
// if ("${env.BINRARY_VERSION}" == "gpu") {
// sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f gpu_values.yaml -f ci/db_backend/mysql_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
// } else {
// sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f ci/db_backend/mysql_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
// }
// sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.PIPELINE_NAME}-${env.BUILD_NUMBER}-single-${env.BINRARY_VERSION} -f ci/db_backend/mysql_${env.BINRARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ."
// }
// }
// dir ("tests/milvus_python_test") {
......
......@@ -16,6 +16,7 @@ BUILD_TYPE="Debug"
BUILD_UNITTEST="OFF"
INSTALL_PREFIX="/opt/milvus"
FAISS_ROOT=""
CUSTOMIZATION="OFF" # default use origin faiss
BUILD_COVERAGE="OFF"
USE_JFROG_CACHE="OFF"
RUN_CPPLINT="OFF"
......@@ -23,7 +24,7 @@ GPU_VERSION="OFF"
WITH_MKL="OFF"
CUDA_COMPILER=/usr/local/cuda/bin/nvcc
while getopts "o:t:b:f:gulcjmh" arg
while getopts "o:t:b:f:gxulcjmh" arg
do
case $arg in
o)
......@@ -41,6 +42,9 @@ do
g)
GPU_VERSION="ON";
;;
x)
CUSTOMIZATION="ON";
;;
u)
echo "Build and run unittest cases" ;
BUILD_UNITTEST="ON";
......@@ -66,6 +70,7 @@ parameter:
-b: core code build directory
-f: faiss root path
-g: gpu version
-x: milvus customization (default: OFF)
-u: building unit test options(default: OFF)
-l: run cpplint, clang-format and clang-tidy(default: OFF)
-c: code coverage(default: OFF)
......@@ -74,7 +79,7 @@ parameter:
-h: help
usage:
./build.sh -o \${INSTALL_PREFIX} -t \${BUILD_TYPE} -b \${CORE_BUILD_DIR} -f \${FAISS_ROOT} [-u] [-l] [-c] [-j] [-m] [-h]
./build.sh -o \${INSTALL_PREFIX} -t \${BUILD_TYPE} -b \${CORE_BUILD_DIR} -f \${FAISS_ROOT} [-g] [-x] [-u] [-l] [-c] [-j] [-m] [-h]
"
exit 0
;;
......@@ -96,6 +101,7 @@ CMAKE_CMD="cmake \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DCMAKE_CUDA_COMPILER=${CUDA_COMPILER} \
-DMILVUS_GPU_VERSION=${GPU_VERSION} \
-DCUSTOMIZATION=${CUSTOMIZATION} \
-DBUILD_UNIT_TEST=${BUILD_UNITTEST} \
-DBUILD_COVERAGE=${BUILD_COVERAGE} \
-DUSE_JFROG_CACHE=${USE_JFROG_CACHE} \
......
......@@ -9,3 +9,5 @@ output.info
output_new.info
server.info
*.pyc
src/grpc/python_gen.h
src/grpc/python/
......@@ -70,7 +70,7 @@ if (MILVUS_VERSION_MAJOR STREQUAL ""
endif()
message(STATUS "Build version = ${MILVUS_VERSION}")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/config.h.in ${CMAKE_CURRENT_SOURCE_DIR}/src/config.h @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/version.h.in ${CMAKE_CURRENT_SOURCE_DIR}/src/version.h @ONLY)
message(STATUS "Milvus version: "
"${MILVUS_VERSION_MAJOR}.${MILVUS_VERSION_MINOR}.${MILVUS_VERSION_PATCH} "
......
......@@ -27,9 +27,7 @@ metric_config:
port: 8080 # port prometheus uses to fetch metrics, must in range [1025, 65534]
cache_config:
cpu_cache_capacity: 16 # GB, CPU memory used for cache, must be a positive integer
cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered, must be in range (0.0, 1.0]
cpu_cache_capacity: 16 # GB, size of CPU memory used for cache, must be a positive integer
cache_insert_data: false # whether to load inserted data into cache, must be a boolean
engine_config:
......@@ -37,7 +35,10 @@ engine_config:
# if nq >= use_blas_threshold, use OpenBlas, slower with stable response times
gpu_search_threshold: 1000 # threshold beyond which the search computation is executed on GPUs only
resource_config:
search_resources: # define the device used for search computation
- cpu
index_build_device: cpu # CPU used for building index
gpu_resource_config:
enable: false # whether to enable GPU resources
cache_capacity: 4 # GB, size of GPU memory per card used for cache, must be a positive integer
search_resources: # define the GPU devices used for search computation, must be in format gpux
- gpu0
build_index_resources: # define the GPU devices used for index building, must be in format gpux
- gpu0
......@@ -27,10 +27,7 @@ metric_config:
port: 8080 # port prometheus uses to fetch metrics, must in range [1025, 65534]
cache_config:
cpu_cache_capacity: 16 # GB, CPU memory used for cache, must be a positive integer
cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered, must be in range (0.0, 1.0]
gpu_cache_capacity: 4 # GB, GPU memory used for cache, must be a positive integer
gpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered, must be in range (0.0, 1.0]
cpu_cache_capacity: 16 # GB, size of CPU memory used for cache, must be a positive integer
cache_insert_data: false # whether to load inserted data into cache, must be a boolean
engine_config:
......@@ -38,8 +35,10 @@ engine_config:
# if nq >= use_blas_threshold, use OpenBlas, slower with stable response times
gpu_search_threshold: 1000 # threshold beyond which the search computation is executed on GPUs only
resource_config:
search_resources: # define the devices used for search computation, must be in format: cpu or gpux
- cpu
gpu_resource_config:
enable: true # whether to enable GPU resources
cache_capacity: 4 # GB, size of GPU memory per card used for cache, must be a positive integer
search_resources: # define the GPU devices used for search computation, must be in format gpux
- gpu0
build_index_resources: # define the GPU devices used for index building, must be in format gpux
- gpu0
index_build_device: gpu0 # CPU / GPU used for building index, must be in format: cpu or gpux
......@@ -75,7 +75,13 @@ set(thirdparty_files
)
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl/request grpc_request_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_impl_files)
set(grpc_server_files
${grpc_request_files}
${grpc_impl_files}
)
aux_source_directory(${MILVUS_ENGINE_SRC}/utils utils_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files)
......
......@@ -37,7 +37,7 @@ GpuCacheMgr::GpuCacheMgr() {
Status s;
int64_t gpu_cache_cap;
s = config.GetCacheConfigGpuCacheCapacity(gpu_cache_cap);
s = config.GetGpuResourceConfigCacheCapacity(gpu_cache_cap);
if (!s.ok()) {
SERVER_LOG_ERROR << s.message();
}
......@@ -45,7 +45,7 @@ GpuCacheMgr::GpuCacheMgr() {
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL << 32);
float gpu_mem_threshold;
s = config.GetCacheConfigGpuCacheThreshold(gpu_mem_threshold);
s = config.GetGpuResourceConfigCacheThreshold(gpu_mem_threshold);
if (!s.ok()) {
SERVER_LOG_ERROR << s.message();
}
......
......@@ -84,12 +84,12 @@ DBImpl::Start() {
return Status::OK();
}
ENGINE_LOG_TRACE << "DB service start";
// ENGINE_LOG_TRACE << "DB service start";
shutting_down_.store(false, std::memory_order_release);
// for distribute version, some nodes are read only
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
ENGINE_LOG_TRACE << "StartTimerTasks";
// ENGINE_LOG_TRACE << "StartTimerTasks";
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
......@@ -114,7 +114,7 @@ DBImpl::Stop() {
meta_ptr_->CleanUp();
}
ENGINE_LOG_TRACE << "DB service stop";
// ENGINE_LOG_TRACE << "DB service stop";
return Status::OK();
}
......@@ -279,6 +279,11 @@ DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& parti
std::string partition_name;
auto status = meta_ptr_->GetPartitionName(table_id, partition_tag, partition_name);
if (!status.ok()) {
ENGINE_LOG_ERROR << status.message();
return status;
}
return DropPartition(partition_name);
}
......@@ -553,7 +558,7 @@ DBImpl::StartMetricTask() {
return;
}
ENGINE_LOG_TRACE << "Start metric task";
// ENGINE_LOG_TRACE << "Start metric task";
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
......@@ -579,7 +584,7 @@ DBImpl::StartMetricTask() {
server::Metrics::GetInstance().GPUTemperature();
server::Metrics::GetInstance().CPUTemperature();
ENGINE_LOG_TRACE << "Metric task finished";
// ENGINE_LOG_TRACE << "Metric task finished";
}
Status
......@@ -751,7 +756,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE << "Background compaction thread start";
// ENGINE_LOG_TRACE << " Background compaction thread start";
Status status;
for (auto& table_id : table_ids) {
......@@ -774,7 +779,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
ENGINE_LOG_TRACE << "Background compaction thread exit";
// ENGINE_LOG_TRACE << " Background compaction thread exit";
}
void
......@@ -807,7 +812,7 @@ DBImpl::StartBuildIndexTask(bool force) {
void
DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_TRACE << "Background build index thread start";
// ENGINE_LOG_TRACE << "Background build index thread start";
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
......@@ -830,7 +835,7 @@ DBImpl::BackgroundBuildIndex() {
}
}
ENGINE_LOG_TRACE << "Background build index thread exit";
// ENGINE_LOG_TRACE << "Background build index thread exit";
}
Status
......@@ -853,8 +858,12 @@ DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::
auto status = meta_ptr_->ShowPartitions(table_id, partiton_array);
for (auto& tag : partition_tags) {
// trim side-blank of tag, only compare valid characters
// for example: " ab cd " is treated as "ab cd"
std::string valid_tag = tag;
server::StringHelpFunctions::TrimStringBlank(valid_tag);
for (auto& schema : partiton_array) {
if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, tag)) {
if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
partition_name_array.insert(schema.table_id_);
}
}
......
......@@ -33,7 +33,7 @@ static const char* ARCHIVE_CONF_DISK = "disk";
static const char* ARCHIVE_CONF_DAYS = "days";
struct ArchiveConf {
using CriteriaT = std::map<std::string, int>;
using CriteriaT = std::map<std::string, int64_t>;
explicit ArchiveConf(const std::string& type, const std::string& criterias = std::string());
......
......@@ -152,7 +152,14 @@ ExecutionEngineImpl::HybridLoad() const {
}
const std::string key = location_ + ".quantizer";
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
server::Config& config = server::Config::GetInstance();
std::vector<int64_t> gpus;
Status s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
ENGINE_LOG_ERROR << s.message();
return;
}
// cache hit
{
......@@ -363,6 +370,7 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
gpu_num_ = device_id;
auto to_index_data = std::make_shared<ToIndexData>(PhysicalSize());
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
......@@ -586,12 +594,16 @@ ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
Status
ExecutionEngineImpl::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetResourceConfigIndexBuildDevice(gpu_num_);
if (!s.ok()) {
return s;
std::vector<int64_t> gpu_ids;
Status s = config.GetGpuResourceConfigBuildIndexResources(gpu_ids);
for (auto id : gpu_ids) {
if (gpu_num_ == id) {
return Status::OK();
}
}
return Status::OK();
std::string msg = "Invalid gpu_num";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
} // namespace engine
......
......@@ -122,8 +122,8 @@ class ExecutionEngineImpl : public ExecutionEngine {
int64_t dim_;
std::string location_;
int32_t nlist_ = 0;
int32_t gpu_num_ = 0;
int64_t nlist_ = 0;
int64_t gpu_num_ = 0;
};
} // namespace engine
......
......@@ -19,7 +19,7 @@
#include "db/Constants.h"
#include "db/engine/ExecutionEngine.h"
#include "src/config.h"
#include "src/version.h"
#include <map>
#include <memory>
......
......@@ -22,6 +22,7 @@
#include "metrics/Metrics.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
#include <mysql++/mysql++.h>
#include <string.h>
......@@ -1162,17 +1163,23 @@ MySQLMetaImpl::CreatePartition(const std::string& table_id, const std::string& p
// not allow create partition under partition
if (!table_schema.owner_table_.empty()) {
return Status(DB_ERROR, "Nested partition is not allow");
return Status(DB_ERROR, "Nested partition is not allowed");
}
if (partition_name == "") {
// not allow duplicated partition
std::string exist_partition;
GetPartitionName(table_id, tag, exist_partition);
if (!exist_partition.empty()) {
return Status(DB_ERROR, "Duplicated partition is not allow");
}
// trim side-blank of tag, only compare valid characters
// for example: " ab cd " is treated as "ab cd"
std::string valid_tag = tag;
server::StringHelpFunctions::TrimStringBlank(valid_tag);
// not allow duplicated partition
std::string exist_partition;
GetPartitionName(table_id, valid_tag, exist_partition);
if (!exist_partition.empty()) {
return Status(DB_ERROR, "Duplicate partition is not allowed");
}
if (partition_name == "") {
// generate unique partition name
NextTableId(table_schema.table_id_);
} else {
table_schema.table_id_ = partition_name;
......@@ -1182,9 +1189,14 @@ MySQLMetaImpl::CreatePartition(const std::string& table_id, const std::string& p
table_schema.flag_ = 0;
table_schema.created_on_ = utils::GetMicroSecTimeStamp();
table_schema.owner_table_ = table_id;
table_schema.partition_tag_ = tag;
table_schema.partition_tag_ = valid_tag;
return CreateTable(table_schema);
status = CreateTable(table_schema);
if (status.code() == DB_ALREADY_EXIST) {
return Status(DB_ALREADY_EXIST, "Partition already exists");
}
return status;
}
Status
......@@ -1231,6 +1243,12 @@ MySQLMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
try {
server::MetricCollector metric;
mysqlpp::StoreQueryResult res;
// trim side-blank of tag, only compare valid characters
// for example: " ab cd " is treated as "ab cd"
std::string valid_tag = tag;
server::StringHelpFunctions::TrimStringBlank(valid_tag);
{
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
......@@ -1240,7 +1258,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
mysqlpp::Query allPartitionsQuery = connectionPtr->query();
allPartitionsQuery << "SELECT table_id FROM " << META_TABLES << " WHERE owner_table = " << mysqlpp::quote
<< table_id << " AND partition_tag = " << mysqlpp::quote << tag << " AND state <> "
<< table_id << " AND partition_tag = " << mysqlpp::quote << valid_tag << " AND state <> "
<< std::to_string(TableSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allPartitionsQuery.str();
......@@ -1252,7 +1270,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
const mysqlpp::Row& resRow = res[0];
resRow["table_id"].to_string(partition_name);
} else {
return Status(DB_NOT_FOUND, "Partition " + tag + " of table " + table_id + " not found");
return Status(DB_NOT_FOUND, "Partition " + valid_tag + " of table " + table_id + " not found");
}
} catch (std::exception& e) {
return HandleException("GENERAL ERROR WHEN GET PARTITION NAME", e.what());
......
......@@ -22,6 +22,7 @@
#include "metrics/Metrics.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
#include <sqlite_orm.h>
#include <unistd.h>
......@@ -757,17 +758,23 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string&
// not allow create partition under partition
if(!table_schema.owner_table_.empty()) {
return Status(DB_ERROR, "Nested partition is not allow");
return Status(DB_ERROR, "Nested partition is not allowed");
}
if (partition_name == "") {
// not allow duplicated partition
std::string exist_partition;
GetPartitionName(table_id, tag, exist_partition);
if(!exist_partition.empty()) {
return Status(DB_ERROR, "Duplicated partition is not allow");
}
// trim side-blank of tag, only compare valid characters
// for example: " ab cd " is treated as "ab cd"
std::string valid_tag = tag;
server::StringHelpFunctions::TrimStringBlank(valid_tag);
// not allow duplicated partition
std::string exist_partition;
GetPartitionName(table_id, valid_tag, exist_partition);
if(!exist_partition.empty()) {
return Status(DB_ERROR, "Duplicate partition is not allowed");
}
if (partition_name == "") {
// generate unique partition name
NextTableId(table_schema.table_id_);
} else {
table_schema.table_id_ = partition_name;
......@@ -777,9 +784,14 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string&
table_schema.flag_ = 0;
table_schema.created_on_ = utils::GetMicroSecTimeStamp();
table_schema.owner_table_ = table_id;
table_schema.partition_tag_ = tag;
table_schema.partition_tag_ = valid_tag;
status = CreateTable(table_schema);
if (status.code() == DB_ALREADY_EXIST) {
return Status(DB_ALREADY_EXIST, "Partition already exists");
}
return CreateTable(table_schema);
return status;
}
Status
......@@ -814,13 +826,18 @@ SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
try {
server::MetricCollector metric;
// trim side-blank of tag, only compare valid characters
// for example: " ab cd " is treated as "ab cd"
std::string valid_tag = tag;
server::StringHelpFunctions::TrimStringBlank(valid_tag);
auto name = ConnectorPtr->select(columns(&TableSchema::table_id_),
where(c(&TableSchema::owner_table_) == table_id
and c(&TableSchema::partition_tag_) == tag));
and c(&TableSchema::partition_tag_) == valid_tag));
if (name.size() > 0) {
partition_name = std::get<0>(name[0]);
} else {
return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + tag + " not found");
return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
}
} catch (std::exception &e) {
return HandleException("Encounter exception when get partition name", e.what());
......
......@@ -88,14 +88,14 @@ endif ()
include(ThirdPartyPackagesCore)
if (CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE -fopenmp")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE -fopenmp -mavx -mf16c -msse4 -mpopcnt")
if (KNOWHERE_GPU_VERSION)
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O3")
endif ()
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g -fPIC -DELPP_THREAD_SAFE -fopenmp")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -g -fPIC -DELPP_THREAD_SAFE -fopenmp -mavx -mf16c -msse4 -mpopcnt")
if (KNOWHERE_GPU_VERSION)
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O0 -g")
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O3 -g")
endif ()
endif ()
......
......@@ -18,6 +18,8 @@
#pragma once
#include <memory>
#include <sstream>
#include "Log.h"
namespace knowhere {
......@@ -50,6 +52,18 @@ struct Cfg {
CheckValid() {
return true;
}
void
Dump() {
KNOWHERE_LOG_DEBUG << DumpImpl().str();
}
virtual std::stringstream
DumpImpl() {
std::stringstream ss;
ss << "dim: " << d << ", metric: " << int(metric_type) << ", gpuid: " << gpu_id << ", k: " << k;
return ss;
}
};
using Config = std::shared_ptr<Cfg>;
......
......@@ -28,66 +28,66 @@
namespace knowhere {
class CPUSPTAGRNG : public VectorIndex {
public:
explicit CPUSPTAGRNG(const std::string& IndexType);
class CPUSPTAGRNG : public VectorIndex {
public:
explicit CPUSPTAGRNG(const std::string& IndexType);
public:
BinarySet
Serialize() override;
public:
BinarySet
Serialize() override;
VectorIndexPtr
Clone() override;
VectorIndexPtr
Clone() override;
void
Load(const BinarySet& index_array) override;
void
Load(const BinarySet& index_array) override;
public:
// PreprocessorPtr
// BuildPreprocessor(const DatasetPtr &dataset, const Config &config) override;
public:
// PreprocessorPtr
// BuildPreprocessor(const DatasetPtr &dataset, const Config &config) override;
int64_t
Count() override;
int64_t
Count() override;
int64_t
Dimension() override;
int64_t
Dimension() override;
IndexModelPtr
Train(const DatasetPtr& dataset, const Config& config) override;
IndexModelPtr
Train(const DatasetPtr& dataset, const Config& config) override;
void
Add(const DatasetPtr& dataset, const Config& config) override;
void
Add(const DatasetPtr& dataset, const Config& config) override;
DatasetPtr
Search(const DatasetPtr& dataset, const Config& config) override;
DatasetPtr
Search(const DatasetPtr& dataset, const Config& config) override;
void
Seal() override;
void
Seal() override;
private:
void
SetParameters(const Config& config);
private:
void
SetParameters(const Config& config);
private:
PreprocessorPtr preprocessor_;
std::shared_ptr<SPTAG::VectorIndex> index_ptr_;
SPTAG::IndexAlgoType index_type_;
};
private:
PreprocessorPtr preprocessor_;
std::shared_ptr<SPTAG::VectorIndex> index_ptr_;
SPTAG::IndexAlgoType index_type_;
};
using CPUSPTAGRNGPtr = std::shared_ptr<CPUSPTAGRNG>;
using CPUSPTAGRNGPtr = std::shared_ptr<CPUSPTAGRNG>;
class CPUSPTAGRNGIndexModel : public IndexModel {
public:
BinarySet
Serialize() override;
class CPUSPTAGRNGIndexModel : public IndexModel {
public:
BinarySet
Serialize() override;
void
Load(const BinarySet& binary) override;
void
Load(const BinarySet& binary) override;
private:
std::shared_ptr<SPTAG::VectorIndex> index_;
};
private:
std::shared_ptr<SPTAG::VectorIndex> index_;
};
using CPUSPTAGRNGIndexModelPtr = std::shared_ptr<CPUSPTAGRNGIndexModel>;
using CPUSPTAGRNGIndexModelPtr = std::shared_ptr<CPUSPTAGRNGIndexModel>;
} // namespace knowhere
......@@ -34,4 +34,26 @@ GetMetricType(METRICTYPE& type) {
KNOWHERE_THROW_MSG("Metric type is invalid");
}
std::stringstream
IVFCfg::DumpImpl() {
auto ss = Cfg::DumpImpl();
ss << ", nlist: " << nlist << ", nprobe: " << nprobe;
return ss;
}
std::stringstream
IVFSQCfg::DumpImpl() {
auto ss = IVFCfg::DumpImpl();
ss << ", nbits: " << nbits;
return ss;
}
std::stringstream
NSGCfg::DumpImpl() {
auto ss = IVFCfg::DumpImpl();
ss << ", knng: " << knng << ", search_length: " << search_length << ", out_degree: " << out_degree
<< ", candidate: " << candidate_pool_size;
return ss;
}
} // namespace knowhere
......@@ -79,6 +79,9 @@ struct IVFCfg : public Cfg {
IVFCfg() = default;
std::stringstream
DumpImpl() override;
bool
CheckValid() override {
return true;
......@@ -95,6 +98,9 @@ struct IVFSQCfg : public IVFCfg {
: IVFCfg(dim, k, gpu_id, nlist, nprobe, type), nbits(nbits) {
}
std::stringstream
DumpImpl() override;
IVFSQCfg() = default;
bool
......@@ -145,6 +151,9 @@ struct NSGCfg : public IVFCfg {
NSGCfg() = default;
std::stringstream
DumpImpl() override;
bool
CheckValid() override {
return true;
......
......@@ -21,55 +21,55 @@
namespace knowhere {
const KDTConfig&
SPTAGParameterMgr::GetKDTParameters() {
return kdt_config_;
}
const KDTConfig&
SPTAGParameterMgr::GetKDTParameters() {
return kdt_config_;
}
const BKTConfig&
SPTAGParameterMgr::GetBKTParameters() {
return bkt_config_;
}
const BKTConfig&
SPTAGParameterMgr::GetBKTParameters() {
return bkt_config_;
}
SPTAGParameterMgr::SPTAGParameterMgr() {
kdt_config_ = std::make_shared<KDTCfg>();
kdt_config_->kdtnumber = 1;
kdt_config_->numtopdimensionkdtsplit = 5;
kdt_config_->samples = 100;
kdt_config_->tptnumber = 1;
kdt_config_->tptleafsize = 2000;
kdt_config_->numtopdimensiontptsplit = 5;
kdt_config_->neighborhoodsize = 32;
kdt_config_->graphneighborhoodscale = 2;
kdt_config_->graphcefscale = 2;
kdt_config_->refineiterations = 0;
kdt_config_->cef = 1000;
kdt_config_->maxcheckforrefinegraph = 10000;
kdt_config_->numofthreads = 1;
kdt_config_->maxcheck = 8192;
kdt_config_->thresholdofnumberofcontinuousnobetterpropagation = 3;
kdt_config_->numberofinitialdynamicpivots = 50;
kdt_config_->numberofotherdynamicpivots = 4;
SPTAGParameterMgr::SPTAGParameterMgr() {
kdt_config_ = std::make_shared<KDTCfg>();
kdt_config_->kdtnumber = 1;
kdt_config_->numtopdimensionkdtsplit = 5;
kdt_config_->samples = 100;
kdt_config_->tptnumber = 1;
kdt_config_->tptleafsize = 2000;
kdt_config_->numtopdimensiontptsplit = 5;
kdt_config_->neighborhoodsize = 32;
kdt_config_->graphneighborhoodscale = 2;
kdt_config_->graphcefscale = 2;
kdt_config_->refineiterations = 0;
kdt_config_->cef = 1000;
kdt_config_->maxcheckforrefinegraph = 10000;
kdt_config_->numofthreads = 1;
kdt_config_->maxcheck = 8192;
kdt_config_->thresholdofnumberofcontinuousnobetterpropagation = 3;
kdt_config_->numberofinitialdynamicpivots = 50;
kdt_config_->numberofotherdynamicpivots = 4;
bkt_config_ = std::make_shared<BKTCfg>();
bkt_config_->bktnumber = 1;
bkt_config_->bktkmeansk = 32;
bkt_config_->bktleafsize = 8;
bkt_config_->samples = 100;
bkt_config_->tptnumber = 1;
bkt_config_->tptleafsize = 2000;
bkt_config_->numtopdimensiontptsplit = 5;
bkt_config_->neighborhoodsize = 32;
bkt_config_->graphneighborhoodscale = 2;
bkt_config_->graphcefscale = 2;
bkt_config_->refineiterations = 0;
bkt_config_->cef = 1000;
bkt_config_->maxcheckforrefinegraph = 10000;
bkt_config_->numofthreads = 1;
bkt_config_->maxcheck = 8192;
bkt_config_->thresholdofnumberofcontinuousnobetterpropagation = 3;
bkt_config_->numberofinitialdynamicpivots = 50;
bkt_config_->numberofotherdynamicpivots = 4;
}
bkt_config_ = std::make_shared<BKTCfg>();
bkt_config_->bktnumber = 1;
bkt_config_->bktkmeansk = 32;
bkt_config_->bktleafsize = 8;
bkt_config_->samples = 100;
bkt_config_->tptnumber = 1;
bkt_config_->tptleafsize = 2000;
bkt_config_->numtopdimensiontptsplit = 5;
bkt_config_->neighborhoodsize = 32;
bkt_config_->graphneighborhoodscale = 2;
bkt_config_->graphcefscale = 2;
bkt_config_->refineiterations = 0;
bkt_config_->cef = 1000;
bkt_config_->maxcheckforrefinegraph = 10000;
bkt_config_->numofthreads = 1;
bkt_config_->maxcheck = 8192;
bkt_config_->thresholdofnumberofcontinuousnobetterpropagation = 3;
bkt_config_->numberofinitialdynamicpivots = 50;
bkt_config_->numberofotherdynamicpivots = 4;
}
} // namespace knowhere
\ No newline at end of file
} // namespace knowhere
......@@ -27,35 +27,35 @@
namespace knowhere {
using KDTConfig = std::shared_ptr<KDTCfg>;
using BKTConfig = std::shared_ptr<BKTCfg>;
using KDTConfig = std::shared_ptr<KDTCfg>;
using BKTConfig = std::shared_ptr<BKTCfg>;
class SPTAGParameterMgr {
public:
const KDTConfig&
GetKDTParameters();
class SPTAGParameterMgr {
public:
const KDTConfig&
GetKDTParameters();
const BKTConfig&
GetBKTParameters();
const BKTConfig&
GetBKTParameters();
public:
static SPTAGParameterMgr&
GetInstance() {
static SPTAGParameterMgr instance;
return instance;
}
public:
static SPTAGParameterMgr&
GetInstance() {
static SPTAGParameterMgr instance;
return instance;
}
SPTAGParameterMgr(const SPTAGParameterMgr&) = delete;
SPTAGParameterMgr(const SPTAGParameterMgr&) = delete;
SPTAGParameterMgr&
operator=(const SPTAGParameterMgr&) = delete;
SPTAGParameterMgr&
operator=(const SPTAGParameterMgr&) = delete;
private:
SPTAGParameterMgr();
private:
SPTAGParameterMgr();
private:
KDTConfig kdt_config_;
BKTConfig bkt_config_;
};
private:
KDTConfig kdt_config_;
BKTConfig bkt_config_;
};
} // namespace knowhere
......@@ -76,8 +76,8 @@ TEST_P(SPTAGTest, sptag_basic) {
AssertAnns(result, nq, k);
{
//auto ids = result->array()[0];
//auto dists = result->array()[1];
// auto ids = result->array()[0];
// auto dists = result->array()[1];
auto ids = result->ids();
auto dists = result->dist();
......@@ -85,8 +85,8 @@ TEST_P(SPTAGTest, sptag_basic) {
std::stringstream ss_dist;
for (auto i = 0; i < nq; i++) {
for (auto j = 0; j < k; ++j) {
//ss_id << *ids->data()->GetValues<int64_t>(1, i * k + j) << " ";
//ss_dist << *dists->data()->GetValues<float>(1, i * k + j) << " ";
// ss_id << *ids->data()->GetValues<int64_t>(1, i * k + j) << " ";
// ss_dist << *dists->data()->GetValues<float>(1, i * k + j) << " ";
ss_id << *((int64_t*)(ids) + i * k + j) << " ";
ss_dist << *((float*)(dists) + i * k + j) << " ";
}
......
......@@ -153,7 +153,7 @@ void
AssertAnns(const knowhere::DatasetPtr& result, const int& nq, const int& k) {
auto ids = result->ids();
for (auto i = 0; i < nq; i++) {
EXPECT_EQ(i, *((int64_t*)(ids) + i * k));
EXPECT_EQ(i, *((int64_t*)(ids) + i * k));
// EXPECT_EQ(i, *(ids->data()->GetValues<int64_t>(1, i * k)));
}
}
......@@ -167,8 +167,8 @@ PrintResult(const knowhere::DatasetPtr& result, const int& nq, const int& k) {
std::stringstream ss_dist;
for (auto i = 0; i < nq; i++) {
for (auto j = 0; j < k; ++j) {
//ss_id << *(ids->data()->GetValues<int64_t>(1, i * k + j)) << " ";
//ss_dist << *(dists->data()->GetValues<float>(1, i * k + j)) << " ";
// ss_id << *(ids->data()->GetValues<int64_t>(1, i * k + j)) << " ";
// ss_dist << *(dists->data()->GetValues<float>(1, i * k + j)) << " ";
ss_id << *((int64_t*)(ids) + i * k + j) << " ";
ss_dist << *((float*)(dists) + i * k + j) << " ";
}
......
......@@ -25,7 +25,7 @@
#include "easyloggingpp/easylogging++.h"
#include "metrics/Metrics.h"
#include "server/Server.h"
#include "src/config.h"
#include "src/version.h"
#include "utils/CommonUtil.h"
#include "utils/SignalUtil.h"
......
......@@ -85,7 +85,7 @@ JobMgr::worker_function() {
}
for (auto& task : tasks) {
calculate_path(task);
calculate_path(res_mgr_, task);
}
// disk resources NEVER be empty.
......@@ -103,8 +103,8 @@ JobMgr::build_task(const JobPtr& job) {
}
void
JobMgr::calculate_path(const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask) {
JobMgr::calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask && task->type_ != TaskType::BuildIndexTask) {
return;
}
......@@ -114,9 +114,9 @@ JobMgr::calculate_path(const TaskPtr& task) {
std::vector<std::string> path;
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto src = res_mgr->GetDiskResources()[0];
auto dest = spec_label->resource();
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
ShortestPath(src.lock(), dest.lock(), res_mgr, path);
task->path() = Path(path, path.size() - 1);
}
......
......@@ -59,8 +59,9 @@ class JobMgr : public interface::dumpable {
static std::vector<TaskPtr>
build_task(const JobPtr& job);
void
calculate_path(const TaskPtr& task);
public:
static void
calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task);
private:
bool running_ = false;
......
......@@ -45,18 +45,6 @@ std::mutex BuildMgrInst::mutex_;
void
load_simple_config() {
server::Config& config = server::Config::GetInstance();
std::string mode;
config.GetResourceConfigMode(mode);
std::vector<std::string> pool;
config.GetResourceConfigSearchResources(pool);
// get resources
auto gpu_ids = get_gpu_pool();
int32_t build_gpu_id;
config.GetResourceConfigIndexBuildDevice(build_gpu_id);
// create and connect
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
......@@ -64,26 +52,46 @@ load_simple_config() {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
// get resources
#ifdef MILVUS_GPU_VERSION
server::Config& config = server::Config::GetInstance();
std::vector<int64_t> gpu_ids;
config.GetGpuResourceConfigSearchResources(gpu_ids);
std::vector<int64_t> build_gpu_ids;
config.GetGpuResourceConfigBuildIndexResources(build_gpu_ids);
auto pcie = Connection("pcie", 12000);
bool find_build_gpu_id = false;
std::vector<int64_t> not_find_build_ids;
for (auto& build_id : build_gpu_ids) {
bool find_gpu_id = false;
for (auto& gpu_id : gpu_ids) {
if (gpu_id == build_id) {
find_gpu_id = true;
break;
}
}
if (not find_gpu_id) {
not_find_build_ids.emplace_back(build_id);
}
}
for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
if (build_gpu_id == gpu_id) {
find_build_gpu_id = true;
}
}
if (not find_build_gpu_id) {
for (auto& not_find_id : not_find_build_ids) {
ResMgrInst::GetInstance()->Add(
ResourceFactory::Create(std::to_string(build_gpu_id), "GPU", build_gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(build_gpu_id), pcie);
ResourceFactory::Create(std::to_string(not_find_id), "GPU", not_find_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(not_find_id), pcie);
}
#endif
}
void
StartSchedulerService() {
load_simple_config();
OptimizerInst::GetInstance()->Init();
ResMgrInst::GetInstance()->Start();
SchedInst::GetInstance()->Start();
JobMgrInst::GetInstance()->Start();
......
......@@ -21,10 +21,13 @@
#include "JobMgr.h"
#include "ResourceMgr.h"
#include "Scheduler.h"
#include "optimizer/HybridPass.h"
#include "optimizer/LargeSQ8HPass.h"
#include "optimizer/OnlyCPUPass.h"
#include "optimizer/OnlyGPUPass.h"
#include "Utils.h"
#include "optimizer/BuildIndexPass.h"
#include "optimizer/FaissFlatPass.h"
#include "optimizer/FaissIVFFlatPass.h"
#include "optimizer/FaissIVFSQ8HPass.h"
#include "optimizer/FaissIVFSQ8Pass.h"
#include "optimizer/FallbackPass.h"
#include "optimizer/Optimizer.h"
#include "server/Config.h"
......@@ -97,21 +100,15 @@ class OptimizerInst {
if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) {
server::Config& config = server::Config::GetInstance();
std::vector<std::string> search_resources;
bool has_cpu = false;
config.GetResourceConfigSearchResources(search_resources);
for (auto& resource : search_resources) {
if (resource == "cpu") {
has_cpu = true;
}
}
std::vector<PassPtr> pass_list;
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
pass_list.push_back(std::make_shared<HybridPass>());
pass_list.push_back(std::make_shared<OnlyCPUPass>());
pass_list.push_back(std::make_shared<OnlyGPUPass>(has_cpu));
#ifdef MILVUS_GPU_VERSION
pass_list.push_back(std::make_shared<BuildIndexPass>());
pass_list.push_back(std::make_shared<FaissFlatPass>());
pass_list.push_back(std::make_shared<FaissIVFFlatPass>());
pass_list.push_back(std::make_shared<FaissIVFSQ8Pass>());
pass_list.push_back(std::make_shared<FaissIVFSQ8HPass>());
#endif
pass_list.push_back(std::make_shared<FallbackPass>());
instance = std::make_shared<Optimizer>(pass_list);
}
}
......
......@@ -108,10 +108,6 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
switch (task_table_type) {
case TaskLabelType::DEFAULT: {
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
}
case TaskLabelType::SPECIFIED_RESOURCE: {
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
......
......@@ -18,7 +18,6 @@
#include "scheduler/TaskCreator.h"
#include "SchedInst.h"
#include "tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "tasklabel/SpecResLabel.h"
namespace milvus {
......@@ -47,8 +46,7 @@ std::vector<TaskPtr>
TaskCreator::Create(const SearchJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& index_file : job->index_files()) {
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<XSearchTask>(index_file.second, label);
auto task = std::make_shared<XSearchTask>(index_file.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
......@@ -70,12 +68,8 @@ TaskCreator::Create(const DeleteJobPtr& job) {
std::vector<TaskPtr>
TaskCreator::Create(const BuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks;
// TODO(yukun): remove "disk" hardcode here
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
for (auto& to_index_file : job->to_index_files()) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label);
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
......
......@@ -16,8 +16,6 @@
// under the License.
#include "scheduler/Utils.h"
#include "server/Config.h"
#include "utils/Log.h"
#ifdef MILVUS_GPU_VERSION
#include <cuda_runtime.h>
......@@ -46,42 +44,5 @@ get_num_gpu() {
return n_devices;
}
std::vector<uint64_t>
get_gpu_pool() {
std::vector<uint64_t> gpu_pool;
server::Config& config = server::Config::GetInstance();
std::vector<std::string> pool;
Status s = config.GetResourceConfigSearchResources(pool);
if (!s.ok()) {
SERVER_LOG_ERROR << s.message();
}
std::set<uint64_t> gpu_ids;
for (auto& resource : pool) {
if (resource == "cpu") {
continue;
} else {
if (resource.length() < 4 || resource.substr(0, 3) != "gpu") {
// error
exit(-1);
}
auto gpu_id = std::stoi(resource.substr(3));
if (gpu_id >= scheduler::get_num_gpu()) {
// error
exit(-1);
}
gpu_ids.insert(gpu_id);
}
}
for (auto& gpu_id : gpu_ids) {
gpu_pool.push_back(gpu_id);
}
return gpu_pool;
}
} // namespace scheduler
} // namespace milvus
......@@ -27,8 +27,5 @@ get_current_timestamp();
uint64_t
get_num_gpu();
std::vector<uint64_t>
get_gpu_pool();
} // namespace scheduler
} // namespace milvus
......@@ -36,10 +36,6 @@ class Action {
static void
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
static void
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
static void
SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
......
......@@ -101,110 +101,46 @@ Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest)
dest->task_table().Put(task_item->task, task_item);
}
void
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
bool moved = false;
// to support test task, REFACTOR
if (resource->type() == ResourceType::CPU) {
if (auto index_engine = search_task->index_engine_) {
auto location = index_engine->GetLocation();
for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) {
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_, dest_resource);
break;
}
}
}
}
if (not moved) {
PushTaskToNeighbourRandomly(task_item, resource);
}
}
}
void
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto& res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// if (task->job_.lock()->type() == JobType::SEARCH) {
// auto label = task->label();
// auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
// if (spec_label->resource().lock()->type() == ResourceType::CPU) {
// std::vector<std::string> spec_path;
// spec_path.push_back(spec_label->resource().lock()->name());
// spec_path.push_back(resource->name());
// task->path() = Path(spec_path, spec_path.size() - 1);
// } else {
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
// uint64_t min_cost_idx = 0;
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->TotalTasks() == 0) {
// min_cost_idx = i;
// break;
// }
// uint64_t cost = compute_resources[i]->TaskAvgCost() *
// compute_resources[i]->NumOfTaskToExec() +
// transport_costs[i];
// if (min_cost > cost) {
// min_cost = cost;
// min_cost_idx = i;
// }
// }
//
// // step 3: set path in task
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
// task->path() = task_path;
// }
//
// } else
if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config
// get build index gpu resource
server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu;
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
bool find_gpu_res = false;
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->name() ==
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
}
}
}
if (not find_gpu_res) {
task->path() = Path(paths[0], paths[0].size() - 1);
}
}
}
// if (resource->type() == ResourceType::DISK) {
// // step 1: calculate shortest path per resource, from disk to compute resource
// auto compute_resources = res_mgr->GetComputeResources();
// std::vector<std::vector<std::string>> paths;
// std::vector<uint64_t> transport_costs;
// for (auto& res : compute_resources) {
// std::vector<std::string> path;
// uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
// transport_costs.push_back(transport_cost);
// paths.emplace_back(path);
// }
// if (task->job_.lock()->type() == JobType::BUILD) {
// // step2: Read device id in config
// // get build index gpu resource
// server::Config& config = server::Config::GetInstance();
// int32_t build_index_gpu;
// Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
//
// bool find_gpu_res = false;
// if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->name() ==
// res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
// find_gpu_res = true;
// Path task_path(paths[i], paths[i].size() - 1);
// task->path() = task_path;
// break;
// }
// }
// }
// if (not find_gpu_res) {
// task->path() = Path(paths[0], paths[0].size() - 1);
// }
// }
// }
if (resource->name() == task->path().Last()) {
resource->WakeupExecutor();
......
......@@ -15,32 +15,38 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/OnlyCPUPass.h"
#include "scheduler/optimizer/BuildIndexPass.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
void
BuildIndexPass::Init() {
server::Config& config = server::Config::GetInstance();
std::vector<int64_t> build_resources;
Status s = config.GetGpuResourceConfigBuildIndexResources(build_resources);
if (!s.ok()) {
throw;
}
}
bool
OnlyCPUPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask)
BuildIndexPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::BuildIndexTask)
return false;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8 &&
search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT) {
return false;
}
auto gpu_id = get_gpu_pool();
if (not gpu_id.empty())
if (build_gpu_ids_.empty())
return false;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
ResourcePtr res_ptr;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_gpu_ids_[specified_gpu_id_]);
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
specified_gpu_id_ = (specified_gpu_id_ + 1) % build_gpu_ids_.size();
return true;
}
......
......@@ -32,20 +32,23 @@
namespace milvus {
namespace scheduler {
class OnlyGPUPass : public Pass {
class BuildIndexPass : public Pass {
public:
explicit OnlyGPUPass(bool has_cpu);
BuildIndexPass() = default;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
private:
uint64_t specified_gpu_id_ = 0;
bool has_cpu_ = false;
std::vector<int32_t> build_gpu_ids_;
};
using OnlyGPUPassPtr = std::shared_ptr<OnlyGPUPass>;
using BuildIndexPassPtr = std::shared_ptr<BuildIndexPass>;
} // namespace scheduler
} // namespace milvus
......@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/LargeSQ8HPass.h"
#include "scheduler/optimizer/FaissFlatPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
......@@ -27,57 +27,41 @@
namespace milvus {
namespace scheduler {
LargeSQ8HPass::LargeSQ8HPass() {
void
FaissFlatPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
LargeSQ8HPass::Run(const TaskPtr& task) {
FaissFlatPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) {
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IDMAP) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
return false;
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
// std::vector<int64_t> all_free_mem;
// for (auto& gpu : gpus) {
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
// auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
// all_free_mem.push_back(free_mem);
// }
//
// auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
// auto best_index = std::distance(all_free_mem.begin(), max_e);
// auto best_device_id = gpus[best_index];
auto best_device_id = count_ % gpus.size();
count_++;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
if (not res_ptr) {
SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
// TODO: throw critical error and exit
return false;
}
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
......
......@@ -33,20 +33,24 @@
namespace milvus {
namespace scheduler {
class LargeSQ8HPass : public Pass {
class FaissFlatPass : public Pass {
public:
LargeSQ8HPass();
FaissFlatPass() = default;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
using FaissFlatPassPtr = std::shared_ptr<FaissFlatPass>;
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/FaissIVFFlatPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
FaissIVFFlatPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
FaissIVFFlatPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
} // namespace scheduler
} // namespace milvus
......@@ -18,6 +18,7 @@
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
......@@ -32,16 +33,24 @@
namespace milvus {
namespace scheduler {
class HybridPass : public Pass {
class FaissIVFFlatPass : public Pass {
public:
HybridPass() = default;
FaissIVFFlatPass() = default;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using HybridPassPtr = std::shared_ptr<HybridPass>;
using FaissIVFFlatPassPtr = std::shared_ptr<FaissIVFFlatPass>;
} // namespace scheduler
} // namespace milvus
......@@ -15,39 +15,50 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/OnlyGPUPass.h"
#include "scheduler/optimizer/FaissIVFSQ8HPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
OnlyGPUPass::OnlyGPUPass(bool has_cpu) : has_cpu_(has_cpu) {
void
FaissIVFSQ8HPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int64_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
}
bool
OnlyGPUPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask || has_cpu_)
FaissIVFSQ8HPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8 &&
search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT &&
search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IDMAP) {
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) {
return false;
}
auto gpu_id = get_gpu_pool();
if (gpu_id.empty())
return false;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpu_id[specified_gpu_id_]);
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
specified_gpu_id_ = specified_gpu_id_++ % gpu_id.size();
return true;
}
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8HPass : public Pass {
public:
FaissIVFSQ8HPass() = default;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using FaissIVFSQ8HPassPtr = std::shared_ptr<FaissIVFSQ8HPass>;
} // namespace scheduler
} // namespace milvus
......@@ -15,28 +15,54 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/HybridPass.h"
#include "scheduler/optimizer/FaissIVFSQ8Pass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
FaissIVFSQ8Pass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
HybridPass::Run(const TaskPtr& task) {
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
if (task->Type() != TaskType::SearchTask)
FaissIVFSQ8Pass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) {
// TODO: remove "cpu" hardcode
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
return true;
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
return false;
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
} // namespace scheduler
......
......@@ -18,6 +18,7 @@
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
......@@ -32,16 +33,24 @@
namespace milvus {
namespace scheduler {
class OnlyCPUPass : public Pass {
class FaissIVFSQ8Pass : public Pass {
public:
OnlyCPUPass() = default;
FaissIVFSQ8Pass() = default;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using OnlyCPUPassPtr = std::shared_ptr<OnlyCPUPass>;
using FaissIVFSQ8PassPtr = std::shared_ptr<FaissIVFSQ8Pass>;
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/FallbackPass.h"
#include "scheduler/SchedInst.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
void
FallbackPass::Init() {
}
bool
FallbackPass::Run(const TaskPtr& task) {
auto task_type = task->Type();
if (task_type != TaskType::SearchTask && task_type != TaskType::BuildIndexTask) {
return false;
}
// NEVER be empty
auto cpu = ResMgrInst::GetInstance()->GetCpuResources()[0];
auto label = std::make_shared<SpecResLabel>(cpu);
task->label() = label;
return true;
}
} // namespace scheduler
} // namespace milvus
......@@ -14,23 +14,27 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "TaskLabel.h"
#include <bits/stdc++.h>
#include <memory>
#include "Pass.h"
namespace milvus {
namespace scheduler {
class DefaultLabel : public TaskLabel {
class FallbackPass : public Pass {
public:
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {
}
};
FallbackPass() = default;
using DefaultLabelPtr = std::shared_ptr<DefaultLabel>;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
};
} // namespace scheduler
} // namespace milvus
......@@ -20,12 +20,12 @@
namespace milvus {
namespace scheduler {
// void
// Optimizer::Init() {
// for (auto& pass : pass_list_) {
// pass->Init();
// }
// }
void
Optimizer::Init() {
for (auto& pass : pass_list_) {
pass->Init();
}
}
bool
Optimizer::Run(const TaskPtr& task) {
......
......@@ -38,8 +38,8 @@ class Optimizer {
explicit Optimizer(std::vector<PassPtr> pass_list) : pass_list_(std::move(pass_list)) {
}
// void
// Init();
void
Init();
bool
Run(const TaskPtr& task);
......
......@@ -34,9 +34,8 @@ namespace scheduler {
class Pass {
public:
// virtual void
// Init() {
// }
virtual void
Init() = 0;
virtual bool
Run(const TaskPtr& task) = 0;
......
......@@ -146,8 +146,7 @@ XBuildIndexTask::Execute() {
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
<< std::endl;
ENGINE_LOG_ERROR << "Failed to build index, index file is too large or gpu memory is not enough";
build_index_job->BuildIndexDone(to_index_id_);
build_index_job->GetStatus() = Status(DB_ERROR, msg);
......@@ -179,8 +178,8 @@ XBuildIndexTask::Execute() {
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to persist index file: " << table_file.location_
<< ", possible out of disk space" << std::endl;
ENGINE_LOG_ERROR << "Failed to persist index file: " << table_file.location_
<< ", possible out of disk space";
build_index_job->BuildIndexDone(to_index_id_);
build_index_job->GetStatus() = Status(DB_ERROR, msg);
......
......@@ -23,7 +23,6 @@ namespace milvus {
namespace scheduler {
enum class TaskLabelType {
DEFAULT, // means can be executed in any resource
SPECIFIED_RESOURCE, // means must executing in special resource
BROADCAST, // means all enable-executor resource must execute task
};
......
......@@ -167,7 +167,7 @@ Utils::PrintSearchResult(const std::vector<std::pair<int64_t, milvus::RowRecord>
index++;
std::cout << "No." << index << " vector " << search_id << " top " << topk << " search result:" << std::endl;
for (size_t j = 0; j < topk; j++) {
size_t idx = i * nq + j;
size_t idx = i * topk + j;
std::cout << "\t" << topk_query_result.ids[idx] << "\t" << topk_query_result.distances[idx] << std::endl;
}
}
......
......@@ -17,7 +17,7 @@
#include "sdk/grpc/ClientProxy.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "src/config.h"
#include "src/version.h"
#include <memory>
#include <string>
......@@ -204,9 +204,8 @@ ClientProxy::Insert(const std::string& table_name, const std::string& partition_
if (!id_array.empty()) {
/* set user's ids */
auto row_ids = insert_param.mutable_row_id_array();
row_ids->Reserve(static_cast<int>(id_array.size()));
row_ids->Resize(static_cast<int>(id_array.size()), -1);
memcpy(row_ids->mutable_data(), id_array.data(), id_array.size() * sizeof(int64_t));
client_ptr_->Insert(vector_ids, insert_param, status);
} else {
client_ptr_->Insert(vector_ids, insert_param, status);
......
此差异已折叠。
......@@ -59,12 +59,8 @@ static const char* CONFIG_DB_PRELOAD_TABLE = "preload_table";
static const char* CONFIG_CACHE = "cache_config";
static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY = "cpu_cache_capacity";
static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT = "16";
static const char* CONFIG_CACHE_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const char* CONFIG_CACHE_GPU_CACHE_CAPACITY_DEFAULT = "4";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD = "cpu_mem_threshold";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD = "cpu_cache_threshold";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_CACHE_GPU_CACHE_THRESHOLD = "gpu_mem_threshold";
static const char* CONFIG_CACHE_GPU_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data";
static const char* CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false";
......@@ -87,26 +83,23 @@ static const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0";
static const char* CONFIG_ENGINE_GPU_SEARCH_THRESHOLD = "gpu_search_threshold";
static const char* CONFIG_ENGINE_GPU_SEARCH_THRESHOLD_DEFAULT = "1000";
/* resource config */
static const char* CONFIG_RESOURCE = "resource_config";
static const char* CONFIG_RESOURCE_MODE = "mode";
static const char* CONFIG_RESOURCE_MODE_DEFAULT = "simple";
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES = "search_resources";
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES_DELIMITER = ",";
#ifdef MILVUS_CPU_VERSION
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES_DEFAULT = "cpu";
#else
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES_DEFAULT = "cpu,gpu0";
#endif
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE = "index_build_device";
#ifdef MILVUS_CPU_VERSION
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "cpu";
/* gpu resource config */
static const char* CONFIG_GPU_RESOURCE = "gpu_resource_config";
static const char* CONFIG_GPU_RESOURCE_ENABLE = "enable";
#ifdef MILVUS_GPU_VERSION
static const char* CONFIG_GPU_RESOURCE_ENABLE_DEFAULT = "true";
#else
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "gpu0";
static const char* CONFIG_GPU_RESOURCE_ENABLE_DEFAULT = "false";
#endif
const int32_t CPU_DEVICE_ID = -1;
static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY = "cache_capacity";
static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY_DEFAULT = "4";
static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD = "cache_threshold";
static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_GPU_RESOURCE_DELIMITER = ",";
static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES = "search_resources";
static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES_DEFAULT = "gpu0";
static const char* CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES = "build_index_resources";
static const char* CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES_DEFAULT = "gpu0";
class Config {
public:
......@@ -170,10 +163,6 @@ class Config {
Status
CheckCacheConfigCpuCacheThreshold(const std::string& value);
Status
CheckCacheConfigGpuCacheCapacity(const std::string& value);
Status
CheckCacheConfigGpuCacheThreshold(const std::string& value);
Status
CheckCacheConfigCacheInsertData(const std::string& value);
/* engine config */
......@@ -184,13 +173,17 @@ class Config {
Status
CheckEngineConfigGpuSearchThreshold(const std::string& value);
/* resource config */
/* gpu resource config */
Status
CheckGpuResourceConfigEnable(const std::string& value);
Status
CheckGpuResourceConfigCacheCapacity(const std::string& value);
Status
CheckResourceConfigMode(const std::string& value);
CheckGpuResourceConfigCacheThreshold(const std::string& value);
Status
CheckResourceConfigSearchResources(const std::vector<std::string>& value);
CheckGpuResourceConfigSearchResources(const std::vector<std::string>& value);
Status
CheckResourceConfigIndexBuildDevice(const std::string& value);
CheckGpuResourceConfigBuildIndexResources(const std::vector<std::string>& value);
std::string
GetConfigStr(const std::string& parent_key, const std::string& child_key, const std::string& default_value = "");
......@@ -217,11 +210,11 @@ class Config {
Status
GetDBConfigBackendUrl(std::string& value);
Status
GetDBConfigArchiveDiskThreshold(int32_t& value);
GetDBConfigArchiveDiskThreshold(int64_t& value);
Status
GetDBConfigArchiveDaysThreshold(int32_t& value);
GetDBConfigArchiveDaysThreshold(int64_t& value);
Status
GetDBConfigInsertBufferSize(int32_t& value);
GetDBConfigInsertBufferSize(int64_t& value);
Status
GetDBConfigPreloadTable(std::string& value);
......@@ -239,27 +232,27 @@ class Config {
Status
GetCacheConfigCpuCacheThreshold(float& value);
Status
GetCacheConfigGpuCacheCapacity(int64_t& value);
Status
GetCacheConfigGpuCacheThreshold(float& value);
Status
GetCacheConfigCacheInsertData(bool& value);
/* engine config */
Status
GetEngineConfigUseBlasThreshold(int32_t& value);
GetEngineConfigUseBlasThreshold(int64_t& value);
Status
GetEngineConfigOmpThreadNum(int32_t& value);
GetEngineConfigOmpThreadNum(int64_t& value);
Status
GetEngineConfigGpuSearchThreshold(int32_t& value);
GetEngineConfigGpuSearchThreshold(int64_t& value);
/* resource config */
/* gpu resource config */
Status
GetGpuResourceConfigEnable(bool& value);
Status
GetGpuResourceConfigCacheCapacity(int64_t& value);
Status
GetResourceConfigMode(std::string& value);
GetGpuResourceConfigCacheThreshold(float& value);
Status
GetResourceConfigSearchResources(std::vector<std::string>& value);
GetGpuResourceConfigSearchResources(std::vector<int64_t>& value);
Status
GetResourceConfigIndexBuildDevice(int32_t& value);
GetGpuResourceConfigBuildIndexResources(std::vector<int64_t>& value);
public:
/* server config */
......@@ -300,10 +293,6 @@ class Config {
Status
SetCacheConfigCpuCacheThreshold(const std::string& value);
Status
SetCacheConfigGpuCacheCapacity(const std::string& value);
Status
SetCacheConfigGpuCacheThreshold(const std::string& value);
Status
SetCacheConfigCacheInsertData(const std::string& value);
/* engine config */
......@@ -314,13 +303,17 @@ class Config {
Status
SetEngineConfigGpuSearchThreshold(const std::string& value);
/* resource config */
/* gpu resource config */
Status
SetGpuResourceConfigEnable(const std::string& value);
Status
SetGpuResourceConfigCacheCapacity(const std::string& value);
Status
SetResourceConfigMode(const std::string& value);
SetGpuResourceConfigCacheThreshold(const std::string& value);
Status
SetResourceConfigSearchResources(const std::string& value);
SetGpuResourceConfigSearchResources(const std::string& value);
Status
SetResourceConfigIndexBuildDevice(const std::string& value);
SetGpuResourceConfigBuildIndexResources(const std::string& value);
private:
std::unordered_map<std::string, std::unordered_map<std::string, std::string>> config_map_;
......
......@@ -89,7 +89,7 @@ DBWrapper::StartService() {
}
// engine config
int32_t omp_thread;
int64_t omp_thread;
s = config.GetEngineConfigOmpThreadNum(omp_thread);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
......@@ -100,7 +100,7 @@ DBWrapper::StartService() {
omp_set_num_threads(omp_thread);
SERVER_LOG_DEBUG << "Specify openmp thread number: " << omp_thread;
} else {
uint32_t sys_thread_cnt = 8;
int64_t sys_thread_cnt = 8;
if (CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) {
omp_thread = static_cast<int32_t>(ceil(sys_thread_cnt * 0.5));
omp_set_num_threads(omp_thread);
......@@ -108,7 +108,7 @@ DBWrapper::StartService() {
}
// init faiss global variable
int32_t use_blas_threshold;
int64_t use_blas_threshold;
s = config.GetEngineConfigUseBlasThreshold(use_blas_threshold);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
......@@ -119,7 +119,7 @@ DBWrapper::StartService() {
// set archive config
engine::ArchiveConf::CriteriaT criterial;
int32_t disk, days;
int64_t disk, days;
s = config.GetDBConfigArchiveDiskThreshold(disk);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
......
......@@ -25,7 +25,7 @@
#include "server/DBWrapper.h"
#include "server/Server.h"
#include "server/grpc_impl/GrpcServer.h"
#include "src/config.h"
#include "src/version.h"
#include "utils/Log.h"
#include "utils/LogUtil.h"
#include "utils/SignalUtil.h"
......
......@@ -16,7 +16,24 @@
// under the License.
#include "server/grpc_impl/GrpcRequestHandler.h"
#include "server/grpc_impl/GrpcRequestTask.h"
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "server/grpc_impl/request/CmdRequest.h"
#include "server/grpc_impl/request/CountTableRequest.h"
#include "server/grpc_impl/request/CreateIndexRequest.h"
#include "server/grpc_impl/request/CreatePartitionRequest.h"
#include "server/grpc_impl/request/CreateTableRequest.h"
#include "server/grpc_impl/request/DeleteByDateRequest.h"
#include "server/grpc_impl/request/DescribeIndexRequest.h"
#include "server/grpc_impl/request/DescribeTableRequest.h"
#include "server/grpc_impl/request/DropIndexRequest.h"
#include "server/grpc_impl/request/DropPartitionRequest.h"
#include "server/grpc_impl/request/DropTableRequest.h"
#include "server/grpc_impl/request/HasTableRequest.h"
#include "server/grpc_impl/request/InsertRequest.h"
#include "server/grpc_impl/request/PreloadTableRequest.h"
#include "server/grpc_impl/request/SearchRequest.h"
#include "server/grpc_impl/request/ShowPartitionsRequest.h"
#include "server/grpc_impl/request/ShowTablesRequest.h"
#include "utils/TimeRecorder.h"
#include <vector>
......@@ -28,8 +45,8 @@ namespace grpc {
::grpc::Status
GrpcRequestHandler::CreateTable(::grpc::ServerContext* context, const ::milvus::grpc::TableSchema* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = CreateTableTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = CreateTableRequest::Create(request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
......@@ -37,9 +54,9 @@ GrpcRequestHandler::CreateTable(::grpc::ServerContext* context, const ::milvus::
GrpcRequestHandler::HasTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::BoolReply* response) {
bool has_table = false;
BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table);
BaseRequestPtr request_ptr = HasTableRequest::Create(request->table_name(), has_table);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_bool_reply(has_table);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
......@@ -49,25 +66,25 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext* context, const ::milvus::grp
::grpc::Status
GrpcRequestHandler::DropTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name());
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = DropTableRequest::Create(request->table_name());
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::CreateIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = CreateIndexTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = CreateIndexRequest::Create(request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc::InsertParam* request,
::milvus::grpc::VectorIds* response) {
BaseTaskPtr task_ptr = InsertTask::Create(request, response);
BaseRequestPtr request_ptr = InsertRequest::Create(request, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
......@@ -77,9 +94,9 @@ GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc:
GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request,
::milvus::grpc::TopKQueryResult* response) {
std::vector<std::string> file_id_array;
BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response);
BaseRequestPtr request_ptr = SearchRequest::Create(request, file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
......@@ -93,9 +110,10 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext* context, const ::milvus
file_id_array.push_back(request->file_id_array(i));
}
::milvus::grpc::SearchInFilesParam* request_mutable = const_cast<::milvus::grpc::SearchInFilesParam*>(request);
BaseTaskPtr task_ptr = SearchTask::Create(request_mutable->mutable_search_param(), file_id_array, response);
BaseRequestPtr request_ptr =
SearchRequest::Create(request_mutable->mutable_search_param(), file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
......@@ -104,9 +122,9 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext* context, const ::milvus
::grpc::Status
GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::TableSchema* response) {
BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), response);
BaseRequestPtr request_ptr = DescribeTableRequest::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
......@@ -116,9 +134,9 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, const ::milvus
GrpcRequestHandler::CountTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::TableRowCount* response) {
int64_t row_count = 0;
BaseTaskPtr task_ptr = CountTableTask::Create(request->table_name(), row_count);
BaseRequestPtr request_ptr = CountTableRequest::Create(request->table_name(), row_count);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_table_row_count(row_count);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
......@@ -128,9 +146,9 @@ GrpcRequestHandler::CountTable(::grpc::ServerContext* context, const ::milvus::g
::grpc::Status
GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::grpc::Command* request,
::milvus::grpc::TableNameList* response) {
BaseTaskPtr task_ptr = ShowTablesTask::Create(response);
BaseRequestPtr request_ptr = ShowTablesRequest::Create(response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
......@@ -140,9 +158,9 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::g
GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Command* request,
::milvus::grpc::StringReply* response) {
std::string result;
BaseTaskPtr task_ptr = CmdTask::Create(request->cmd(), result);
BaseRequestPtr request_ptr = CmdRequest::Create(request->cmd(), result);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_string_reply(result);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
......@@ -152,9 +170,9 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Co
::grpc::Status
GrpcRequestHandler::DeleteByDate(::grpc::ServerContext* context, const ::milvus::grpc::DeleteByDateParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DeleteByDateTask::Create(request);
BaseRequestPtr request_ptr = DeleteByDateRequest::Create(request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_error_code(grpc_status.error_code());
response->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
......@@ -163,9 +181,9 @@ GrpcRequestHandler::DeleteByDate(::grpc::ServerContext* context, const ::milvus:
::grpc::Status
GrpcRequestHandler::PreloadTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = PreloadTableTask::Create(request->table_name());
BaseRequestPtr request_ptr = PreloadTableRequest::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
......@@ -174,9 +192,9 @@ GrpcRequestHandler::PreloadTable(::grpc::ServerContext* context, const ::milvus:
::grpc::Status
GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::IndexParam* response) {
BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), response);
BaseRequestPtr request_ptr = DescribeIndexRequest::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
......@@ -185,9 +203,9 @@ GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus
::grpc::Status
GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DropIndexTask::Create(request->table_name());
BaseRequestPtr request_ptr = DropIndexRequest::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
......@@ -196,17 +214,17 @@ GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::gr
::grpc::Status
GrpcRequestHandler::CreatePartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = CreatePartitionTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = CreatePartitionRequest::Create(request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::PartitionList* response) {
BaseTaskPtr task_ptr = ShowPartitionsTask::Create(request->table_name(), response);
BaseRequestPtr request_ptr = ShowPartitionsRequest::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
......@@ -215,9 +233,9 @@ GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvu
::grpc::Status
GrpcRequestHandler::DropPartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DropPartitionTask::Create(request);
BaseRequestPtr request_ptr = DropPartitionRequest::Create(request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
......
......@@ -70,43 +70,6 @@ ErrorMap(ErrorCode code) {
}
} // namespace
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcBaseTask::GrpcBaseTask(const std::string& task_group, bool async)
: task_group_(task_group), async_(async), done_(false) {
}
GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish();
}
Status
GrpcBaseTask::Execute() {
status_ = OnExecute();
Done();
return status_;
}
void
GrpcBaseTask::Done() {
done_ = true;
finish_cond_.notify_all();
}
Status
GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string& error_msg) {
status_ = Status(error_code, error_msg);
SERVER_LOG_ERROR << error_msg;
return status_;
}
Status
GrpcBaseTask::WaitToFinish() {
std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; });
return status_;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcRequestScheduler::GrpcRequestScheduler() : stopped_(false) {
Start();
......@@ -117,17 +80,17 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
}
void
GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status) {
if (task_ptr == nullptr) {
GrpcRequestScheduler::ExecRequest(BaseRequestPtr& request_ptr, ::milvus::grpc::Status* grpc_status) {
if (request_ptr == nullptr) {
return;
}
GrpcRequestScheduler& scheduler = GrpcRequestScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
scheduler.ExecuteRequest(request_ptr);
if (!task_ptr->IsAsync()) {
task_ptr->WaitToFinish();
const Status& status = task_ptr->status();
if (!request_ptr->IsAsync()) {
request_ptr->WaitToFinish();
const Status& status = request_ptr->status();
if (!status.ok()) {
grpc_status->set_reason(status.message());
grpc_status->set_error_code(ErrorMap(status.code()));
......@@ -153,7 +116,7 @@ GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO << "Scheduler gonna stop...";
{
std::lock_guard<std::mutex> lock(queue_mtx_);
for (auto iter : task_groups_) {
for (auto iter : request_groups_) {
if (iter.second != nullptr) {
iter.second->Put(nullptr);
}
......@@ -171,64 +134,64 @@ GrpcRequestScheduler::Stop() {
}
Status
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) {
if (task_ptr == nullptr) {
GrpcRequestScheduler::ExecuteRequest(const BaseRequestPtr& request_ptr) {
if (request_ptr == nullptr) {
return Status::OK();
}
auto status = PutTaskToQueue(task_ptr);
auto status = PutToQueue(request_ptr);
if (!status.ok()) {
SERVER_LOG_ERROR << "Put task to queue failed with code: " << status.ToString();
SERVER_LOG_ERROR << "Put request to queue failed with code: " << status.ToString();
return status;
}
if (task_ptr->IsAsync()) {
if (request_ptr->IsAsync()) {
return Status::OK(); // async execution, caller need to call WaitToFinish at somewhere
}
return task_ptr->WaitToFinish(); // sync execution
return request_ptr->WaitToFinish(); // sync execution
}
void
GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) {
GrpcRequestScheduler::TakeToExecute(RequestQueuePtr request_queue) {
if (request_queue == nullptr) {
return;
}
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
BaseRequestPtr request = request_queue->Take();
if (request == nullptr) {
SERVER_LOG_ERROR << "Take null from request queue, stop thread";
break; // stop the thread
}
try {
auto status = task->Execute();
auto status = request->Execute();
if (!status.ok()) {
SERVER_LOG_ERROR << "Task failed with code: " << status.ToString();
SERVER_LOG_ERROR << "Request failed with code: " << status.ToString();
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
SERVER_LOG_ERROR << "Request failed to execute: " << ex.what();
}
}
}
Status
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) {
GrpcRequestScheduler::PutToQueue(const BaseRequestPtr& request_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_);
std::string group_name = task_ptr->TaskGroup();
if (task_groups_.count(group_name) > 0) {
task_groups_[group_name]->Put(task_ptr);
std::string group_name = request_ptr->RequestGroup();
if (request_groups_.count(group_name) > 0) {
request_groups_[group_name]->Put(request_ptr);
} else {
TaskQueuePtr queue = std::make_shared<TaskQueue>();
queue->Put(task_ptr);
task_groups_.insert(std::make_pair(group_name, queue));
RequestQueuePtr queue = std::make_shared<RequestQueue>();
queue->Put(request_ptr);
request_groups_.insert(std::make_pair(group_name, queue));
// start a thread
ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeToExecute, this, queue);
execute_threads_.push_back(thread);
SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
SERVER_LOG_INFO << "Create new thread for request group: " << group_name;
}
return Status::OK();
......
......@@ -19,6 +19,7 @@
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include "utils/BlockingQueue.h"
#include "utils/Status.h"
......@@ -32,57 +33,8 @@ namespace milvus {
namespace server {
namespace grpc {
class GrpcBaseTask {
protected:
explicit GrpcBaseTask(const std::string& task_group, bool async = false);
virtual ~GrpcBaseTask();
public:
Status
Execute();
void
Done();
Status
WaitToFinish();
std::string
TaskGroup() const {
return task_group_;
}
const Status&
status() const {
return status_;
}
bool
IsAsync() const {
return async_;
}
protected:
virtual Status
OnExecute() = 0;
Status
SetStatus(ErrorCode error_code, const std::string& error_msg);
protected:
mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_;
std::string task_group_;
bool async_;
bool done_;
Status status_;
};
using BaseTaskPtr = std::shared_ptr<GrpcBaseTask>;
using TaskQueue = BlockingQueue<BaseTaskPtr>;
using TaskQueuePtr = std::shared_ptr<TaskQueue>;
using RequestQueue = BlockingQueue<BaseRequestPtr>;
using RequestQueuePtr = std::shared_ptr<RequestQueue>;
using ThreadPtr = std::shared_ptr<std::thread>;
class GrpcRequestScheduler {
......@@ -100,10 +52,10 @@ class GrpcRequestScheduler {
Stop();
Status
ExecuteTask(const BaseTaskPtr& task_ptr);
ExecuteRequest(const BaseRequestPtr& request_ptr);
static void
ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status);
ExecRequest(BaseRequestPtr& request_ptr, ::milvus::grpc::Status* grpc_status);
protected:
GrpcRequestScheduler();
......@@ -111,15 +63,15 @@ class GrpcRequestScheduler {
virtual ~GrpcRequestScheduler();
void
TakeTaskToExecute(TaskQueuePtr task_queue);
TakeToExecute(RequestQueuePtr request_queue);
Status
PutTaskToQueue(const BaseTaskPtr& task_ptr);
PutToQueue(const BaseRequestPtr& request_ptr);
private:
mutable std::mutex queue_mtx_;
std::map<std::string, TaskQueuePtr> task_groups_;
std::map<std::string, RequestQueuePtr> request_groups_;
std::vector<ThreadPtr> execute_threads_;
......
此差异已折叠。
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "db/Types.h"
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "utils/Status.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include <condition_variable>
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreateTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::TableSchema* schema);
protected:
explicit CreateTableTask(const ::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;
private:
const ::milvus::grpc::TableSchema* schema_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class HasTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, bool& has_table);
protected:
HasTableTask(const std::string& table_name, bool& has_table);
Status
OnExecute() override;
private:
std::string table_name_;
bool& has_table_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
protected:
DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::TableSchema* schema_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
protected:
explicit DropTableTask(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreateIndexTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::IndexParam* index_param);
protected:
explicit CreateIndexTask(const ::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::IndexParam* index_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class ShowTablesTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(::milvus::grpc::TableNameList* table_name_list);
protected:
explicit ShowTablesTask(::milvus::grpc::TableNameList* table_name_list);
Status
OnExecute() override;
private:
::milvus::grpc::TableNameList* table_name_list_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class InsertTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
protected:
InsertTask(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
Status
OnExecute() override;
private:
const ::milvus::grpc::InsertParam* insert_param_;
::milvus::grpc::VectorIds* record_ids_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
protected:
SearchTask(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
Status
OnExecute() override;
private:
const ::milvus::grpc::SearchParam* search_param_;
std::vector<std::string> file_id_array_;
::milvus::grpc::TopKQueryResult* topk_result_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CountTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, int64_t& row_count);
protected:
CountTableTask(const std::string& table_name, int64_t& row_count);
Status
OnExecute() override;
private:
std::string table_name_;
int64_t& row_count_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CmdTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& cmd, std::string& result);
protected:
CmdTask(const std::string& cmd, std::string& result);
Status
OnExecute() override;
private:
std::string cmd_;
std::string& result_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteByDateTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
protected:
explicit DeleteByDateTask(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::DeleteByDateParam* delete_by_range_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class PreloadTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
protected:
explicit PreloadTableTask(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeIndexTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
protected:
DescribeIndexTask(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::IndexParam* index_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropIndexTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
protected:
explicit DropIndexTask(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreatePartitionTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit CreatePartitionTask(const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::PartitionParam* partition_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class ShowPartitionsTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
protected:
ShowPartitionsTask(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::PartitionList* partition_list_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropPartitionTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit DropPartitionTask(const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::PartitionParam* partition_param_;
};
} // namespace grpc
} // namespace server
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/CmdRequest.h"
#include "scheduler/SchedInst.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
CmdRequest::CmdRequest(const std::string& cmd, std::string& result)
: GrpcBaseRequest(INFO_REQUEST_GROUP), cmd_(cmd), result_(result) {
}
BaseRequestPtr
CmdRequest::Create(const std::string& cmd, std::string& result) {
return std::shared_ptr<GrpcBaseRequest>(new CmdRequest(cmd, result));
}
Status
CmdRequest::OnExecute() {
if (cmd_ == "version") {
result_ = MILVUS_VERSION;
} else if (cmd_ == "tasktable") {
result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables();
} else {
result_ = "OK";
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class CmdRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& cmd, std::string& result);
protected:
CmdRequest(const std::string& cmd, std::string& result);
Status
OnExecute() override;
private:
std::string cmd_;
std::string& result_;
};
} // namespace grpc
} // namespace server
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/CountTableRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
CountTableRequest::CountTableRequest(const std::string& table_name, int64_t& row_count)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), row_count_(row_count) {
}
BaseRequestPtr
CountTableRequest::Create(const std::string& table_name, int64_t& row_count) {
return std::shared_ptr<GrpcBaseRequest>(new CountTableRequest(table_name, row_count));
}
Status
CountTableRequest::OnExecute() {
try {
TimeRecorder rc("CountTableRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: get row count
uint64_t row_count = 0;
status = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
if (!status.ok()) {
if (status.code(), DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
} else {
return status;
}
}
row_count_ = static_cast<int64_t>(row_count);
rc.ElapseFromBegin("total cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class CountTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, int64_t& row_count);
protected:
CountTableRequest(const std::string& table_name, int64_t& row_count);
Status
OnExecute() override;
private:
std::string table_name_;
int64_t& row_count_;
};
} // namespace grpc
} // namespace server
} // namespace milvus
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册