提交 14685f77 编写于 作者: W wxyu

solve conflicts


Former-commit-id: d6f938da342cc6c7643f3cc1fd41f5a6212a5ecd
......@@ -9,6 +9,7 @@ container('milvus-build-env') {
sh "git config --global user.email \"test@zilliz.com\""
sh "git config --global user.name \"test\""
withCredentials([usernamePassword(credentialsId: "${params.JFROG_USER}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
sh "./build.sh -l"
sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -j -u -c"
}
}
......
......@@ -9,6 +9,7 @@ container('milvus-build-env') {
sh "git config --global user.email \"test@zilliz.com\""
sh "git config --global user.name \"test\""
withCredentials([usernamePassword(credentialsId: "${params.JFROG_USER}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
sh "./build.sh -l"
sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -j"
}
}
......
......@@ -44,7 +44,7 @@ metadata:
spec:
containers:
- name: milvus-build-env
image: registry.zilliz.com/milvus/milvus-build-env:v0.12
image: registry.zilliz.com/milvus/milvus-build-env:v0.13
command:
- cat
tty: true
......
......@@ -44,7 +44,7 @@ metadata:
spec:
containers:
- name: milvus-build-env
image: registry.zilliz.com/milvus/milvus-build-env:v0.12
image: registry.zilliz.com/milvus/milvus-build-env:v0.13
command:
- cat
tty: true
......
......@@ -44,7 +44,7 @@ metadata:
spec:
containers:
- name: milvus-build-env
image: registry.zilliz.com/milvus/milvus-build-env:v0.12
image: registry.zilliz.com/milvus/milvus-build-env:v0.13
command:
- cat
tty: true
......
......@@ -10,6 +10,9 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-577 - Unittest Query randomly hung
- MS-587 - Count get wrong result after adding vectors and index built immediately
- MS-599 - search wrong result when table created with metric_type: IP
- MS-601 - Docker logs error caused by get CPUTemperature error
- MS-622 - Delete vectors should be failed if date range is invalid
- MS-620 - Get table row counts display wrong error code
## Improvement
- MS-552 - Add and change the easylogging library
......@@ -28,6 +31,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-609 - Update task construct function
- MS-611 - Add resources validity check in ResourceMgr
- MS-619 - Add optimizer class in scheduler
- MS-614 - Preload table at startup
- MS-626 - Refactor DataObj to support cache any type data
## New Feature
......
......@@ -105,15 +105,31 @@ please reinstall CMake with curl:
```
##### code format and linting
Install clang-format and clang-tidy
```shell
CentOS 7:
$ yum install clang
Ubuntu 16.04 or 18.04:
$ sudo apt-get install clang-format clang-tidy
Ubuntu 16.04:
$ sudo apt-get install clang-tidy
$ sudo su
$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
$ apt-add-repository "deb http://apt.llvm.org/xenial/ llvm-toolchain-xenial-6.0 main"
$ apt-get update
$ apt-get install clang-format-6.0
Ubuntu 18.04:
$ sudo apt-get install clang-tidy clang-format
$ rm cmake_build/CMakeCache.txt
```
Check code style
```shell
$ ./build.sh -l
```
To format the code
```shell
$ cd cmake_build
$ make clang-format
```
##### Run unit test
......@@ -122,13 +138,14 @@ $ ./build.sh -u
```
##### Run code coverage
Install lcov
```shell
CentOS 7:
$ yum install lcov
Ubuntu 16.04 or 18.04:
$ sudo apt-get install lcov
```
```shell
$ ./build.sh -u -c
```
......
......@@ -5,4 +5,5 @@
*thirdparty*
*easylogging++*
*SqliteMetaImpl.cpp
*src/grpc*
\ No newline at end of file
*src/grpc*
*milvus/include*
\ No newline at end of file
......@@ -112,13 +112,13 @@ if [[ ${RUN_CPPLINT} == "ON" ]]; then
fi
echo "clang-format check passed!"
# clang-tidy check
make check-clang-tidy
if [ $? -ne 0 ]; then
echo "ERROR! clang-tidy check failed"
exit 1
fi
echo "clang-tidy check passed!"
# # clang-tidy check
# make check-clang-tidy
# if [ $? -ne 0 ]; then
# echo "ERROR! clang-tidy check failed"
# exit 1
# fi
# echo "clang-tidy check passed!"
else
# compile and build
make -j 4 || exit 1
......
......@@ -18,6 +18,9 @@ db_config:
# sum of insert_buffer_size and cpu_cache_capacity cannot exceed total memory
build_index_gpu: 0 # gpu id used for building index
preload_table: # preload data at startup, '*' means load all tables, empty value means no preload
# you can specify preload tables like this: table1,table2,table3
metric_config:
enable_monitor: false # enable monitoring or not
collector: prometheus # prometheus
......
......@@ -104,7 +104,7 @@ ${LCOV_CMD} -r "${FILE_INFO_OUTPUT}" -o "${FILE_INFO_OUTPUT_NEW}" \
"src/metrics/MetricBase.h"\
"src/server/Server.cpp"\
"src/server/DBWrapper.cpp"\
"src/server/grpc_impl/GrpcMilvusServer.cpp"\
"src/server/grpc_impl/GrpcServer.cpp"\
"src/utils/easylogging++.h"\
"src/utils/easylogging++.cc"\
......
......@@ -76,7 +76,7 @@ ConvertToDataset(std::vector<SPTAG::QueryResult> query_results) {
auto p_id = (int64_t*)malloc(sizeof(int64_t) * elems);
auto p_dist = (float*)malloc(sizeof(float) * elems);
// TODO: throw if malloc failed.
// TODO: throw if malloc failed.
#pragma omp parallel for
for (auto i = 0; i < query_results.size(); ++i) {
......
......@@ -36,7 +36,7 @@ struct BufferDeleter {
free((void*)buffer->data());
}
};
}
} // namespace internal
inline BufferPtr
MakeBufferSmart(uint8_t* data, const int64_t size) {
......
......@@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.
#include <faiss/IndexIVF.h>
#include <faiss/index_io.h>
#include <utility>
#include "knowhere/common/Exception.h"
#include "knowhere/index/vector_index/FaissBaseIndex.h"
#include "knowhere/index/vector_index/IndexIVF.h"
#include "knowhere/index/vector_index/helpers/FaissIO.h"
namespace knowhere {
......
......@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
#include <faiss/IndexIVFPQ.h>
#include <faiss/gpu/GpuAutoTune.h>
#include <faiss/gpu/GpuIndexFlat.h>
#include <faiss/gpu/GpuIndexIVF.h>
......@@ -26,6 +25,7 @@
#include "knowhere/adapter/VectorAdapter.h"
#include "knowhere/common/Exception.h"
#include "knowhere/index/vector_index/IndexGPUIVF.h"
#include "knowhere/index/vector_index/IndexIVFPQ.h"
#include "knowhere/index/vector_index/helpers/Cloner.h"
#include "knowhere/index/vector_index/helpers/FaissIO.h"
......
......@@ -23,6 +23,7 @@
#include "knowhere/adapter/VectorAdapter.h"
#include "knowhere/common/Exception.h"
#include "knowhere/index/vector_index/IndexGPUIVFPQ.h"
#include "knowhere/index/vector_index/IndexIVFPQ.h"
namespace knowhere {
......
......@@ -43,9 +43,9 @@ namespace kn = knowhere;
} // namespace
using ::testing::Combine;
using ::testing::TestWithParam;
using ::testing::Values;
using ::testing::Combine;
constexpr int device_id = 0;
constexpr int64_t DIM = 128;
......
......@@ -34,9 +34,9 @@ namespace kn = knowhere;
} // namespace
using ::testing::Combine;
using ::testing::TestWithParam;
using ::testing::Values;
using ::testing::Combine;
class KDTTest : public DataGen, public ::testing::Test {
protected:
......
......@@ -32,9 +32,9 @@ namespace kn = knowhere;
} // namespace
using ::testing::Combine;
using ::testing::TestWithParam;
using ::testing::Values;
using ::testing::Combine;
constexpr int64_t DEVICE_ID = 1;
......
......@@ -26,6 +26,7 @@
#include "meta/SqliteMetaImpl.h"
#include "metrics/Metrics.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/BuildIndexJob.h"
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
#include "utils/Log.h"
......@@ -206,7 +207,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
size += engine->PhysicalSize();
if (size > available_size) {
break;
return Status(SERVER_CACHE_FULL, "Cache is full");
} else {
try {
// step 1: load index
......@@ -296,7 +297,8 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
std::vector<int> file_types;
if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
file_types = {
static_cast<int32_t>(meta::TableFileSchema::NEW), static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
static_cast<int32_t>(meta::TableFileSchema::NEW),
static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
};
} else {
file_types = {
......@@ -639,8 +641,9 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
index_size = index->Size();
if (index_size >= file_schema.index_file_size_)
if (index_size >= file_schema.index_file_size_) {
break;
}
}
// step 3: serialize to disk
......@@ -896,17 +899,32 @@ DBImpl::BackgroundBuildIndex() {
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
status = BuildIndex(file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
}
if (shutting_down_.load(std::memory_order_acquire)) {
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
break;
}
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
// step 2: put build index task to scheduler
for (auto& file : to_index_files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddToIndexFiles(file_ptr);
}
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitBuildIndexFinish();
if (!job->GetStatus().ok()) {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
}
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break;
// }
// }
ENGINE_LOG_TRACE << "Background build index thread exit";
}
......
......@@ -66,6 +66,9 @@ class ExecutionEngine {
virtual Status
CopyToGpu(uint64_t device_id) = 0;
virtual Status
CopyToIndexFileToGpu(uint64_t device_id) = 0;
virtual Status
CopyToCpu() = 0;
......
......@@ -241,6 +241,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
return Status::OK();
}
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (!already_in_cache) {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(nullptr, PhysicalSize());
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
}
return Status::OK();
}
Status
ExecutionEngineImpl::CopyToCpu() {
auto index = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
......
......@@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
Status
CopyToGpu(uint64_t device_id) override;
Status
CopyToIndexFileToGpu(uint64_t device_id) override;
Status
CopyToCpu() override;
......
......@@ -55,9 +55,9 @@ MemTableFile::CreateTableFile() {
Status
MemTableFile::Add(const VectorSourcePtr& source, IDNumbers& vector_ids) {
if (table_file_schema_.dimension_ <= 0) {
std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " +
std::to_string(table_file_schema_.dimension_) + ", table_id = " +
table_file_schema_.table_id_;
std::string err_msg =
"MemTableFile::Add: table_file_schema dimension = " + std::to_string(table_file_schema_.dimension_) +
", table_id = " + table_file_schema_.table_id_;
ENGINE_LOG_ERROR << err_msg;
return Status(DB_ERROR, "Not able to create table file");
}
......
......@@ -148,15 +148,18 @@ static const MetaSchema TABLES_SCHEMA(META_TABLES, {
});
// TableFiles schema
static const MetaSchema TABLEFILES_SCHEMA(
META_TABLEFILES,
{
MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"), MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"), MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"), MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"), MetaField("updated_time", "BIGINT", "NOT NULL"),
MetaField("created_on", "BIGINT", "NOT NULL"), MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
});
static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"),
MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"),
MetaField("updated_time", "BIGINT", "NOT NULL"),
MetaField("created_on", "BIGINT", "NOT NULL"),
MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
});
} // namespace
......
......@@ -46,7 +46,7 @@ PrometheusMetrics::Init() {
return s.code();
}
const std::string uri = std::string("/tmp/metrics");
const std::string uri = std::string("/metrics");
const std::size_t num_threads = 2;
// Init Exposer
......
......@@ -16,8 +16,12 @@
// under the License.
#include "metrics/SystemInfo.h"
#include "utils/Log.h"
#include <dirent.h>
#include <nvml.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <fstream>
......@@ -60,12 +64,12 @@ SystemInfo::Init() {
nvmlReturn_t nvmlresult;
nvmlresult = nvmlInit();
if (NVML_SUCCESS != nvmlresult) {
printf("System information initilization failed");
SERVER_LOG_ERROR << "System information initilization failed";
return;
}
nvmlresult = nvmlDeviceGetCount(&num_device_);
if (NVML_SUCCESS != nvmlresult) {
printf("Unable to get devidce number");
SERVER_LOG_ERROR << "Unable to get devidce number";
return;
}
......@@ -151,7 +155,7 @@ SystemInfo::getTotalCpuTime(std::vector<uint64_t>& work_time_array) {
std::vector<uint64_t> total_time_array;
FILE* file = fopen("/proc/stat", "r");
if (file == NULL) {
perror("Could not open stat file");
SERVER_LOG_ERROR << "Could not open stat file";
return total_time_array;
}
......@@ -162,7 +166,7 @@ SystemInfo::getTotalCpuTime(std::vector<uint64_t>& work_time_array) {
char buffer[1024];
char* ret = fgets(buffer, sizeof(buffer) - 1, file);
if (ret == NULL) {
perror("Could not read stat file");
SERVER_LOG_ERROR << "Could not read stat file";
fclose(file);
return total_time_array;
}
......@@ -237,18 +241,39 @@ SystemInfo::GPUTemperature() {
std::vector<float>
SystemInfo::CPUTemperature() {
std::vector<float> result;
for (int i = 0; i <= num_physical_processors_; ++i) {
std::string path = "/sys/class/thermal/thermal_zone" + std::to_string(i) + "/temp";
FILE* file = fopen(path.data(), "r");
if (file == nullptr) {
perror("Could not open thermal file");
return result;
std::string path = "/sys/class/hwmon/";
DIR* dir = NULL;
dir = opendir(path.c_str());
if (!dir) {
SERVER_LOG_ERROR << "Could not open hwmon directory";
return result;
}
struct dirent* ptr = NULL;
while ((ptr = readdir(dir)) != NULL) {
std::string filename(path);
filename.append(ptr->d_name);
char buf[100];
if (readlink(filename.c_str(), buf, 100) != -1) {
std::string m(buf);
if (m.find("coretemp") != std::string::npos) {
std::string object = filename;
object += "/temp1_input";
FILE* file = fopen(object.c_str(), "r");
if (file == nullptr) {
SERVER_LOG_ERROR << "Could not open temperature file";
return result;
}
float temp;
fscanf(file, "%f", &temp);
result.push_back(temp / 1000);
}
}
float temp;
fscanf(file, "%f", &temp);
result.push_back(temp / 1000);
fclose(file);
}
closedir(dir);
return result;
}
std::vector<uint64_t>
......
......@@ -16,6 +16,8 @@
// under the License.
#include "scheduler/TaskCreator.h"
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "SchedInst.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
......@@ -31,6 +33,9 @@ TaskCreator::Create(const JobPtr& job) {
case JobType::DELETE: {
return Create(std::static_pointer_cast<DeleteJob>(job));
}
case JobType::BUILD: {
return Create(std::static_pointer_cast<BuildIndexJob>(job));
}
default: {
// TODO(wxyu): error
return std::vector<TaskPtr>();
......@@ -62,5 +67,20 @@ TaskCreator::Create(const DeleteJobPtr& job) {
return tasks;
}
std::vector<TaskPtr>
TaskCreator::Create(const BuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks;
// TODO(yukun): remove "disk" hardcode here
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
for (auto& to_index_file : job->to_index_files()) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label);
task->job_ = job;
tasks.emplace_back(task);
}
return tasks;
}
} // namespace scheduler
} // namespace milvus
......@@ -30,6 +30,7 @@
#include "job/DeleteJob.h"
#include "job/Job.h"
#include "job/SearchJob.h"
#include "task/BuildIndexTask.h"
#include "task/DeleteTask.h"
#include "task/SearchTask.h"
#include "task/Task.h"
......@@ -48,6 +49,9 @@ class TaskCreator {
static std::vector<TaskPtr>
Create(const DeleteJobPtr& job);
static std::vector<TaskPtr>
Create(const BuildIndexJobPtr& job);
};
} // namespace scheduler
......
......@@ -20,6 +20,7 @@
#include "../Algorithm.h"
#include "Action.h"
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"
namespace milvus {
namespace scheduler {
......@@ -143,26 +144,49 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
if (task->job_.lock()->type() == JobType::SEARCH) {
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
}
uint64_t cost =
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
}
}
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
} else if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config
// get build index gpu resource
server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu;
Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);
bool find_gpu_res = false;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
if (compute_resources[i]->name() ==
res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
}
}
}
uint64_t cost =
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
if (not find_gpu_res) {
task->path() = Path(paths[0], paths[0].size() - 1);
}
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
}
if (resource->name() == task->path().Last()) {
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include <utility>
namespace milvus {
namespace scheduler {
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
}
bool
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr& to_index_file) {
std::unique_lock<std::mutex> lock(mutex_);
if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) {
return false;
}
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_;
to_index_files_[to_index_file->id_] = to_index_file;
}
Status&
BuildIndexJob::WaitBuildIndexFinish() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return to_index_files_.empty(); });
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " all done";
}
void
BuildIndexJob::BuildIndexDone(size_t to_index_id) {
std::unique_lock<std::mutex> lock(mutex_);
to_index_files_.erase(to_index_id);
cv_.notify_all();
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id;
}
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Job.h"
#include "db/meta/Meta.h"
#include "scheduler/Definition.h"
namespace milvus {
namespace scheduler {
using engine::meta::TableFileSchemaPtr;
using Id2ToIndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
public:
bool
AddToIndexFiles(const TableFileSchemaPtr& to_index_file);
Status&
WaitBuildIndexFinish();
void
BuildIndexDone(size_t to_index_id);
public:
Status&
GetStatus() {
return status_;
}
Id2ToIndexMap&
to_index_files() {
return to_index_files_;
}
engine::meta::MetaPtr
meta() const {
return meta_ptr_;
}
engine::DBOptions
options() const {
return options_;
}
private:
Id2ToIndexMap to_index_files_;
engine::meta::MetaPtr meta_ptr_;
engine::DBOptions options_;
Status status_;
std::mutex mutex_;
std::condition_variable cv_;
};
using BuildIndexJobPtr = std::shared_ptr<BuildIndexJob>;
} // namespace scheduler
} // namespace milvus
......@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "HybridPass.h"
#include "scheduler/optimizer/HybridPass.h"
#include "scheduler/task/SearchTask.h"
namespace milvus {
......@@ -24,12 +24,12 @@ namespace scheduler {
bool
HybridPass::Run(const TaskPtr& task) {
// TODO: Index::IVFSQ8Hybrid, if nq < threshold set cpu, else set gpu
if (task->Type() != TaskType::SearchTask) return false;
if (task->Type() != TaskType::SearchTask)
return false;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
// if (search_task->file_->engine_type_ == engine::EngineType::FAISS_IVFSQ8Hybrid)
return false;
}
}
}
} // namespace scheduler
} // namespace milvus
......@@ -16,16 +16,16 @@
// under the License.
#pragma once
#include <string>
#include <vector>
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <deque>
#include <unordered_map>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <unordered_map>
#include <vector>
#include "Pass.h"
......@@ -43,6 +43,5 @@ class HybridPass : public Pass {
using HybridPassPtr = std::shared_ptr<HybridPass>;
}
}
} // namespace scheduler
} // namespace milvus
......@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "Optimizer.h"
#include "scheduler/optimizer/Optimizer.h"
namespace milvus {
namespace scheduler {
......@@ -38,6 +38,5 @@ Optimizer::Run(const TaskPtr& task) {
return false;
}
}
}
} // namespace scheduler
} // namespace milvus
......@@ -16,16 +16,16 @@
// under the License.
#pragma once
#include <string>
#include <vector>
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <deque>
#include <unordered_map>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <unordered_map>
#include <vector>
#include "Pass.h"
......@@ -40,12 +40,11 @@ class Optimizer {
Init();
bool
Run(const TaskPtr &task);
Run(const TaskPtr& task);
private:
std::vector<PassPtr> pass_list_;
};
}
}
} // namespace scheduler
} // namespace milvus
......@@ -16,16 +16,16 @@
// under the License.
#pragma once
#include <string>
#include <vector>
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <deque>
#include <unordered_map>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <unordered_map>
#include <vector>
#include "scheduler/task/Task.h"
......@@ -35,12 +35,13 @@ namespace scheduler {
class Pass {
public:
virtual void
Init() {}
Init() {
}
virtual bool
Run(const TaskPtr& task) = 0;
};
using PassPtr = std::shared_ptr<Pass>;
}
}
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/task/BuildIndexTask.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <memory>
#include <string>
#include <thread>
#include <utility>
namespace milvus {
namespace scheduler {
XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label)
: Task(TaskType::BuildIndexTask, std::move(label)), file_(file) {
if (file_) {
to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType)file_->engine_type_,
(MetricType)file_->metric_type_, file_->nlist_);
}
}
void
XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
TimeRecorder rc("");
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
auto options = build_index_job->options();
try {
if (type == LoadType::DISK2CPU) {
stat = to_index_engine_->Load(options.insert_cache_immediately_);
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = to_index_engine_->CopyToIndexFileToGpu(device_id);
type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) {
stat = to_index_engine_->CopyToCpu();
type_str = "GPU2CPU";
} else {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load to_index file: " + std::string(ex.what());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (!stat.ok()) {
Status s;
if (stat.ToString().find("out of memory") != std::string::npos) {
error_msg = "out of memory: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
} else {
error_msg = "Failed to load to_index file: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
build_index_job->BuildIndexDone(file_->id_);
}
return;
}
size_t file_size = to_index_engine_->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) +
" file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
" bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
to_index_id_ = file_->id_;
to_index_type_ = file_->file_type_;
}
}
void
XBuildIndexTask::Execute() {
if (to_index_engine_ == nullptr) {
return;
}
TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_));
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
std::string location = file_->location_;
EngineType engine_type = (EngineType)file_->engine_type_;
std::shared_ptr<engine::ExecutionEngine> index;
// step 2: create table file
engine::meta::TableFileSchema table_file;
table_file.table_id_ = file_->table_id_;
table_file.date_ = file_->date_;
table_file.file_type_ = engine::meta::TableFileSchema::NEW_INDEX;
engine::meta::MetaPtr meta_ptr = build_index_job->meta();
Status status = build_index_job->meta()->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
build_index_job->BuildIndexDone(to_index_id_);
build_index_job->GetStatus() = status;
return;
}
// step 3: build index
try {
index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
if (index == nullptr) {
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
<< " to to_delete";
return;
}
} catch (std::exception& ex) {
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
<< std::endl;
build_index_job->GetStatus() = Status(DB_ERROR, msg);
return;
}
// step 4: if table has been deleted, dont save index file
bool has_table = false;
meta_ptr->HasTable(file_->table_id_, has_table);
if (!has_table) {
meta_ptr->DeleteTableFiles(file_->table_id_);
return;
}
// step 5: save index file
try {
index->Serialize();
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to persist index file: " << table_file.location_
<< ", possible out of disk space" << std::endl;
build_index_job->GetStatus() = Status(DB_ERROR, msg);
return;
}
// step 6: update meta
table_file.file_type_ = engine::meta::TableFileSchema::INDEX;
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
auto origin_file = *file_;
origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP;
engine::meta::TableFilesSchema update_files = {table_file, origin_file};
status = meta_ptr->UpdateTableFiles(update_files);
if (status.ok()) {
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
<< " bytes"
<< " from file " << origin_file.file_id_;
// index->Cache();
} else {
// failed to update meta, mark the new file as to_delete, don't delete old file
origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX;
status = meta_ptr->UpdateTableFile(origin_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_
<< " to to_delete";
}
build_index_job->BuildIndexDone(to_index_id_);
}
rc.ElapseFromBegin("totally cost");
to_index_engine_ = nullptr;
}
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "Task.h"
#include "scheduler/Definition.h"
#include "scheduler/job/BuildIndexJob.h"
namespace milvus {
namespace scheduler {
class XBuildIndexTask : public Task {
public:
explicit XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
public:
TableFileSchemaPtr file_;
TableFileSchema table_file_;
size_t to_index_id_ = 0;
int to_index_type_ = 0;
ExecutionEnginePtr to_index_engine_ = nullptr;
};
} // namespace scheduler
} // namespace milvus
......@@ -157,8 +157,8 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
size_t file_size = index_engine_->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" +
std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
std::string info = "Load file id:" + std::to_string(file_->id_) +
" file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
" bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
// for (auto &context : search_contexts_) {
......
......@@ -39,6 +39,7 @@ enum class LoadType {
enum class TaskType {
SearchTask,
DeleteTask,
BuildIndexTask,
TestTask,
};
......
......@@ -18,13 +18,14 @@
#pragma once
#include "TaskLabel.h"
#include "scheduler/ResourceMgr.h"
#include <memory>
#include <string>
class Resource;
using ResourceWPtr = std::weak_ptr<Resource>;
// class Resource;
//
// using ResourceWPtr = std::weak_ptr<Resource>;
namespace milvus {
namespace scheduler {
......
......@@ -20,12 +20,12 @@
#include <string>
/** \brief Milvus SDK namespace
*/
*/
namespace milvus {
/**
* @brief Status Code for SDK interface return
*/
* @brief Status Code for SDK interface return
*/
enum class StatusCode {
OK = 0,
......@@ -41,8 +41,8 @@ enum class StatusCode {
};
/**
* @brief Status for SDK interface return
*/
* @brief Status for SDK interface return
*/
class Status {
public:
Status(StatusCode code, const std::string& msg);
......
......@@ -655,294 +655,62 @@ Config::SetConfigValueInMem(const std::string& parent_key, const std::string& ch
}
////////////////////////////////////////////////////////////////////////////////
/* server config */
std::string
Config::GetServerConfigStrAddress() {
std::string value;
if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value).ok()) {
value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_ADDRESS, CONFIG_SERVER_ADDRESS_DEFAULT);
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value);
}
return value;
}
std::string
Config::GetServerConfigStrPort() {
std::string value;
if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value).ok()) {
value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_PORT, CONFIG_SERVER_PORT_DEFAULT);
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value);
}
return value;
}
std::string
Config::GetServerConfigStrDeployMode() {
std::string value;
if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_DEPLOY_MODE, value).ok()) {
value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_DEPLOY_MODE, CONFIG_SERVER_DEPLOY_MODE_DEFAULT);
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_DEPLOY_MODE, value);
}
return value;
}
std::string
Config::GetServerConfigStrTimeZone() {
std::string value;
if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value).ok()) {
value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_TIME_ZONE, CONFIG_SERVER_TIME_ZONE_DEFAULT);
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value);
}
return value;
}
////////////////////////////////////////////////////////////////////////////////
/* db config */
std::string
Config::GetDBConfigStrPrimaryPath() {
std::string value;
if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_PRIMARY_PATH, value).ok()) {
value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_PRIMARY_PATH, CONFIG_DB_PRIMARY_PATH_DEFAULT);
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_PRIMARY_PATH, value);
}
return value;
}
std::string
Config::GetDBConfigStrSecondaryPath() {
std::string value;
if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_SECONDARY_PATH, value).ok()) {
value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_SECONDARY_PATH, CONFIG_DB_SECONDARY_PATH_DEFAULT);
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_SECONDARY_PATH, value);
}
return value;
}
std::string
Config::GetDBConfigStrBackendUrl() {
std::string value;
if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value).ok()) {
value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_BACKEND_URL, CONFIG_DB_BACKEND_URL_DEFAULT);
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value);
}
return value;
}
std::string
Config::GetDBConfigStrArchiveDiskThreshold() {
std::string value;
if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value).ok()) {
value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_ARCHIVE_DISK_THRESHOLD,
CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT);
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value);
}
return value;
}
std::string
Config::GetDBConfigStrArchiveDaysThreshold() {
std::string value;
if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value).ok()) {
value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_ARCHIVE_DAYS_THRESHOLD,
CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT);
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value);
}
return value;
}
std::string
Config::GetDBConfigStrInsertBufferSize() {
std::string value;
if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_INSERT_BUFFER_SIZE, value).ok()) {
value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT);
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_INSERT_BUFFER_SIZE, value);
}
return value;
}
std::string
Config::GetDBConfigStrBuildIndexGPU() {
std::string value;
if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value).ok()) {
value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_BUILD_INDEX_GPU, CONFIG_DB_BUILD_INDEX_GPU_DEFAULT);
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value);
}
return value;
}
////////////////////////////////////////////////////////////////////////////////
/* metric config */
std::string
Config::GetMetricConfigStrEnableMonitor() {
Config::GetConfigStr(const std::string& parent_key, const std::string& child_key, const std::string& default_value) {
std::string value;
if (!GetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_ENABLE_MONITOR, value).ok()) {
value =
GetConfigNode(CONFIG_METRIC).GetValue(CONFIG_METRIC_ENABLE_MONITOR, CONFIG_METRIC_ENABLE_MONITOR_DEFAULT);
SetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_ENABLE_MONITOR, value);
if (!GetConfigValueInMem(parent_key, child_key, value).ok()) {
value = GetConfigNode(parent_key).GetValue(child_key, default_value);
SetConfigValueInMem(parent_key, child_key, value);
}
return value;
}
std::string
Config::GetMetricConfigStrCollector() {
std::string value;
if (!GetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_COLLECTOR, value).ok()) {
value = GetConfigNode(CONFIG_METRIC).GetValue(CONFIG_METRIC_COLLECTOR, CONFIG_METRIC_COLLECTOR_DEFAULT);
SetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_COLLECTOR, value);
}
return value;
}
std::string
Config::GetMetricConfigStrPrometheusPort() {
std::string value;
if (!GetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_PROMETHEUS_PORT, value).ok()) {
value =
GetConfigNode(CONFIG_METRIC).GetValue(CONFIG_METRIC_PROMETHEUS_PORT, CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT);
SetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_PROMETHEUS_PORT, value);
}
return value;
}
////////////////////////////////////////////////////////////////////////////////
/* cache config */
std::string
Config::GetCacheConfigStrCpuCacheCapacity() {
std::string value;
if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, value).ok()) {
value = GetConfigNode(CONFIG_CACHE)
.GetValue(CONFIG_CACHE_CPU_CACHE_CAPACITY, CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT);
SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, value);
}
return value;
}
std::string
Config::GetCacheConfigStrCpuCacheThreshold() {
std::string value;
if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_THRESHOLD, value).ok()) {
value = GetConfigNode(CONFIG_CACHE)
.GetValue(CONFIG_CACHE_CPU_CACHE_THRESHOLD, CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT);
SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_THRESHOLD, value);
}
return value;
}
std::string
Config::GetCacheConfigStrGpuCacheCapacity() {
std::string value;
if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_CAPACITY, value).ok()) {
value = GetConfigNode(CONFIG_CACHE)
.GetValue(CONFIG_CACHE_GPU_CACHE_CAPACITY, CONFIG_CACHE_GPU_CACHE_CAPACITY_DEFAULT);
SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_CAPACITY, value);
}
return value;
}
std::string
Config::GetCacheConfigStrGpuCacheThreshold() {
std::string value;
if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_THRESHOLD, value).ok()) {
value = GetConfigNode(CONFIG_CACHE)
.GetValue(CONFIG_CACHE_GPU_CACHE_THRESHOLD, CONFIG_CACHE_GPU_CACHE_THRESHOLD_DEFAULT);
SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_THRESHOLD, value);
}
return value;
}
std::string
Config::GetCacheConfigStrCacheInsertData() {
std::string value;
if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, value).ok()) {
value = GetConfigNode(CONFIG_CACHE)
.GetValue(CONFIG_CACHE_CACHE_INSERT_DATA, CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT);
SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, value);
}
return value;
}
////////////////////////////////////////////////////////////////////////////////
/* engine config */
std::string
Config::GetEngineConfigStrUseBlasThreshold() {
std::string value;
if (!GetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, value).ok()) {
value = GetConfigNode(CONFIG_ENGINE)
.GetValue(CONFIG_ENGINE_USE_BLAS_THRESHOLD, CONFIG_ENGINE_USE_BLAS_THRESHOLD_DEFAULT);
SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, value);
}
return value;
}
std::string
Config::GetEngineConfigStrOmpThreadNum() {
std::string value;
if (!GetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_OMP_THREAD_NUM, value).ok()) {
value =
GetConfigNode(CONFIG_ENGINE).GetValue(CONFIG_ENGINE_OMP_THREAD_NUM, CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT);
SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_OMP_THREAD_NUM, value);
}
return value;
}
////////////////////////////////////////////////////////////////////////////////
/* resource config */
std::string
Config::GetResourceConfigStrMode() {
std::string value;
if (!GetConfigValueInMem(CONFIG_RESOURCE, CONFIG_RESOURCE_MODE, value).ok()) {
value = GetConfigNode(CONFIG_RESOURCE).GetValue(CONFIG_RESOURCE_MODE, CONFIG_RESOURCE_MODE_DEFAULT);
SetConfigValueInMem(CONFIG_RESOURCE, CONFIG_RESOURCE_MODE, value);
}
return value;
}
////////////////////////////////////////////////////////////////////////////////
Status
Config::GetServerConfigAddress(std::string& value) {
value = GetServerConfigStrAddress();
value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, CONFIG_SERVER_ADDRESS_DEFAULT);
return CheckServerConfigAddress(value);
}
Status
Config::GetServerConfigPort(std::string& value) {
value = GetServerConfigStrPort();
value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_PORT, CONFIG_SERVER_PORT_DEFAULT);
return CheckServerConfigPort(value);
}
Status
Config::GetServerConfigDeployMode(std::string& value) {
value = GetServerConfigStrDeployMode();
value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_DEPLOY_MODE, CONFIG_SERVER_DEPLOY_MODE_DEFAULT);
return CheckServerConfigDeployMode(value);
}
Status
Config::GetServerConfigTimeZone(std::string& value) {
value = GetServerConfigStrTimeZone();
value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, CONFIG_SERVER_TIME_ZONE_DEFAULT);
return CheckServerConfigTimeZone(value);
}
Status
Config::GetDBConfigPrimaryPath(std::string& value) {
value = GetDBConfigStrPrimaryPath();
value = GetConfigStr(CONFIG_DB, CONFIG_DB_PRIMARY_PATH, CONFIG_DB_PRIMARY_PATH_DEFAULT);
return CheckDBConfigPrimaryPath(value);
}
Status
Config::GetDBConfigSecondaryPath(std::string& value) {
value = GetDBConfigStrSecondaryPath();
value = GetConfigStr(CONFIG_DB, CONFIG_DB_SECONDARY_PATH, CONFIG_DB_SECONDARY_PATH_DEFAULT);
return Status::OK();
}
Status
Config::GetDBConfigBackendUrl(std::string& value) {
value = GetDBConfigStrBackendUrl();
value = GetConfigStr(CONFIG_DB, CONFIG_DB_BACKEND_URL, CONFIG_DB_BACKEND_URL_DEFAULT);
return CheckDBConfigBackendUrl(value);
}
Status
Config::GetDBConfigArchiveDiskThreshold(int32_t& value) {
std::string str = GetDBConfigStrArchiveDiskThreshold();
std::string str =
GetConfigStr(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT);
Status s = CheckDBConfigArchiveDiskThreshold(str);
if (!s.ok()) {
return s;
......@@ -954,7 +722,8 @@ Config::GetDBConfigArchiveDiskThreshold(int32_t& value) {
Status
Config::GetDBConfigArchiveDaysThreshold(int32_t& value) {
std::string str = GetDBConfigStrArchiveDaysThreshold();
std::string str =
GetConfigStr(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT);
Status s = CheckDBConfigArchiveDaysThreshold(str);
if (!s.ok()) {
return s;
......@@ -966,7 +735,7 @@ Config::GetDBConfigArchiveDaysThreshold(int32_t& value) {
Status
Config::GetDBConfigInsertBufferSize(int32_t& value) {
std::string str = GetDBConfigStrInsertBufferSize();
std::string str = GetConfigStr(CONFIG_DB, CONFIG_DB_INSERT_BUFFER_SIZE, CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT);
Status s = CheckDBConfigInsertBufferSize(str);
if (!s.ok()) {
return s;
......@@ -978,7 +747,7 @@ Config::GetDBConfigInsertBufferSize(int32_t& value) {
Status
Config::GetDBConfigBuildIndexGPU(int32_t& value) {
std::string str = GetDBConfigStrBuildIndexGPU();
std::string str = GetConfigStr(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, CONFIG_DB_BUILD_INDEX_GPU_DEFAULT);
Status s = CheckDBConfigBuildIndexGPU(str);
if (!s.ok()) {
return s;
......@@ -988,9 +757,15 @@ Config::GetDBConfigBuildIndexGPU(int32_t& value) {
return Status::OK();
}
Status
Config::GetDBConfigPreloadTable(std::string& value) {
value = GetConfigStr(CONFIG_DB, CONFIG_DB_PRELOAD_TABLE);
return Status::OK();
}
Status
Config::GetMetricConfigEnableMonitor(bool& value) {
std::string str = GetMetricConfigStrEnableMonitor();
std::string str = GetConfigStr(CONFIG_METRIC, CONFIG_METRIC_ENABLE_MONITOR, CONFIG_METRIC_ENABLE_MONITOR_DEFAULT);
Status s = CheckMetricConfigEnableMonitor(str);
if (!s.ok()) {
return s;
......@@ -1003,19 +778,20 @@ Config::GetMetricConfigEnableMonitor(bool& value) {
Status
Config::GetMetricConfigCollector(std::string& value) {
value = GetMetricConfigStrCollector();
value = GetConfigStr(CONFIG_METRIC, CONFIG_METRIC_COLLECTOR, CONFIG_METRIC_COLLECTOR_DEFAULT);
return Status::OK();
}
Status
Config::GetMetricConfigPrometheusPort(std::string& value) {
value = GetMetricConfigStrPrometheusPort();
value = GetConfigStr(CONFIG_METRIC, CONFIG_METRIC_PROMETHEUS_PORT, CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT);
return CheckMetricConfigPrometheusPort(value);
}
Status
Config::GetCacheConfigCpuCacheCapacity(int32_t& value) {
std::string str = GetCacheConfigStrCpuCacheCapacity();
std::string str =
GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT);
Status s = CheckCacheConfigCpuCacheCapacity(str);
if (!s.ok()) {
return s;
......@@ -1027,7 +803,8 @@ Config::GetCacheConfigCpuCacheCapacity(int32_t& value) {
Status
Config::GetCacheConfigCpuCacheThreshold(float& value) {
std::string str = GetCacheConfigStrCpuCacheThreshold();
std::string str =
GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_THRESHOLD, CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT);
Status s = CheckCacheConfigCpuCacheThreshold(str);
if (!s.ok()) {
return s;
......@@ -1039,7 +816,8 @@ Config::GetCacheConfigCpuCacheThreshold(float& value) {
Status
Config::GetCacheConfigGpuCacheCapacity(int32_t& value) {
std::string str = GetCacheConfigStrGpuCacheCapacity();
std::string str =
GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_CAPACITY, CONFIG_CACHE_GPU_CACHE_CAPACITY_DEFAULT);
Status s = CheckCacheConfigGpuCacheCapacity(str);
if (!s.ok()) {
return s;
......@@ -1051,7 +829,8 @@ Config::GetCacheConfigGpuCacheCapacity(int32_t& value) {
Status
Config::GetCacheConfigGpuCacheThreshold(float& value) {
std::string str = GetCacheConfigStrGpuCacheThreshold();
std::string str =
GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_THRESHOLD, CONFIG_CACHE_GPU_CACHE_THRESHOLD_DEFAULT);
Status s = CheckCacheConfigGpuCacheThreshold(str);
if (!s.ok()) {
return s;
......@@ -1063,7 +842,8 @@ Config::GetCacheConfigGpuCacheThreshold(float& value) {
Status
Config::GetCacheConfigCacheInsertData(bool& value) {
std::string str = GetCacheConfigStrCacheInsertData();
std::string str =
GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT);
Status s = CheckCacheConfigCacheInsertData(str);
if (!s.ok()) {
return s;
......@@ -1076,7 +856,8 @@ Config::GetCacheConfigCacheInsertData(bool& value) {
Status
Config::GetEngineConfigUseBlasThreshold(int32_t& value) {
std::string str = GetEngineConfigStrUseBlasThreshold();
std::string str =
GetConfigStr(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, CONFIG_ENGINE_USE_BLAS_THRESHOLD_DEFAULT);
Status s = CheckEngineConfigUseBlasThreshold(str);
if (!s.ok()) {
return s;
......@@ -1088,7 +869,7 @@ Config::GetEngineConfigUseBlasThreshold(int32_t& value) {
Status
Config::GetEngineConfigOmpThreadNum(int32_t& value) {
std::string str = GetEngineConfigStrOmpThreadNum();
std::string str = GetConfigStr(CONFIG_ENGINE, CONFIG_ENGINE_OMP_THREAD_NUM, CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT);
Status s = CheckEngineConfigOmpThreadNum(str);
if (!s.ok()) {
return s;
......@@ -1100,7 +881,7 @@ Config::GetEngineConfigOmpThreadNum(int32_t& value) {
Status
Config::GetResourceConfigMode(std::string& value) {
value = GetResourceConfigStrMode();
value = GetConfigStr(CONFIG_RESOURCE, CONFIG_RESOURCE_MODE, CONFIG_RESOURCE_MODE_DEFAULT);
return CheckResourceConfigMode(value);
}
......
......@@ -56,6 +56,7 @@ static const char* CONFIG_DB_INSERT_BUFFER_SIZE = "insert_buffer_size";
static const char* CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT = "4";
static const char* CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu";
static const char* CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0";
static const char* CONFIG_DB_PRELOAD_TABLE = "preload_table";
/* cache config */
static const char* CONFIG_CACHE = "cache_config";
......@@ -178,62 +179,8 @@ class Config {
Status
CheckResourceConfigPool(const std::vector<std::string>& value);
///////////////////////////////////////////////////////////////////////////
/* server config */
std::string
GetServerConfigStrAddress();
std::string
GetServerConfigStrPort();
std::string
GetServerConfigStrDeployMode();
std::string
GetServerConfigStrTimeZone();
/* db config */
std::string
GetDBConfigStrPrimaryPath();
std::string
GetDBConfigStrSecondaryPath();
std::string
GetDBConfigStrBackendUrl();
std::string
GetDBConfigStrArchiveDiskThreshold();
std::string
GetDBConfigStrArchiveDaysThreshold();
std::string
GetDBConfigStrInsertBufferSize();
std::string
GetDBConfigStrBuildIndexGPU();
/* metric config */
std::string
GetMetricConfigStrEnableMonitor();
std::string
GetMetricConfigStrCollector();
std::string
GetMetricConfigStrPrometheusPort();
/* cache config */
std::string
GetCacheConfigStrCpuCacheCapacity();
std::string
GetCacheConfigStrCpuCacheThreshold();
std::string
GetCacheConfigStrGpuCacheCapacity();
std::string
GetCacheConfigStrGpuCacheThreshold();
std::string
GetCacheConfigStrCacheInsertData();
/* engine config */
std::string
GetEngineConfigStrUseBlasThreshold();
std::string
GetEngineConfigStrOmpThreadNum();
/* resource config */
std::string
GetResourceConfigStrMode();
GetConfigStr(const std::string& parent_key, const std::string& child_key, const std::string& default_value = "");
public:
/* server config */
......@@ -261,6 +208,8 @@ class Config {
GetDBConfigInsertBufferSize(int32_t& value);
Status
GetDBConfigBuildIndexGPU(int32_t& value);
Status
GetDBConfigPreloadTable(std::string& value);
/* metric config */
Status
......
......@@ -25,6 +25,7 @@
#include <faiss/utils.h>
#include <omp.h>
#include <string>
#include <vector>
namespace milvus {
namespace server {
......@@ -155,6 +156,20 @@ DBWrapper::StartService() {
db_->Start();
// preload table
std::string preload_tables;
s = config.GetDBConfigPreloadTable(preload_tables);
if (!s.ok()) {
return s;
}
s = PreloadTables(preload_tables);
if (!s.ok()) {
std::cerr << "ERROR! Failed to preload tables: " << preload_tables << std::endl;
std::cerr << s.ToString() << std::endl;
kill(0, SIGUSR1);
}
return Status::OK();
}
......@@ -167,5 +182,34 @@ DBWrapper::StopService() {
return Status::OK();
}
Status
DBWrapper::PreloadTables(const std::string& preload_tables) {
if (preload_tables.empty()) {
// do nothing
} else if (preload_tables == "*") {
// load all tables
std::vector<engine::meta::TableSchema> table_schema_array;
db_->AllTables(table_schema_array);
for (auto& schema : table_schema_array) {
auto status = db_->PreloadTable(schema.table_id_);
if (!status.ok()) {
return status;
}
}
} else {
std::vector<std::string> table_names;
StringHelpFunctions::SplitStringByDelimeter(preload_tables, ",", table_names);
for (auto& name : table_names) {
auto status = db_->PreloadTable(name);
if (!status.ok()) {
return status;
}
}
}
return Status::OK();
}
} // namespace server
} // namespace milvus
......@@ -21,6 +21,7 @@
#include "utils/Status.h"
#include <memory>
#include <string>
namespace milvus {
namespace server {
......@@ -52,6 +53,10 @@ class DBWrapper {
return db_;
}
private:
Status
PreloadTables(const std::string& preload_tables);
private:
engine::DBPtr db_;
};
......
......@@ -148,25 +148,25 @@ class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service {
::milvus::grpc::TopKQueryResultList* response) override;
/**
* @brief Internal use query interface
*
* This method is used to query vector in specified files.
*
* @param context, add context for every RPC
* @param request:
* file_id_array, specified files id array, queried.
* query_record_array, all vector are going to be queried.
* query_range_array, optional ranges for conditional search. If not specified, search whole table
* topk, how many similarity vectors will be searched.
*
* @param writer, write query result array.
*
* @return status
*
* @param context
* @param request
* @param writer
*/
* @brief Internal use query interface
*
* This method is used to query vector in specified files.
*
* @param context, add context for every RPC
* @param request:
* file_id_array, specified files id array, queried.
* query_record_array, all vector are going to be queried.
* query_range_array, optional ranges for conditional search. If not specified, search whole table
* topk, how many similarity vectors will be searched.
*
* @param writer, write query result array.
*
* @return status
*
* @param context
* @param request
* @param writer
*/
::grpc::Status
SearchInFiles(::grpc::ServerContext* context, const ::milvus::grpc::SearchInFilesParam* request,
::milvus::grpc::TopKQueryResultList* response) override;
......
......@@ -36,7 +36,6 @@ ErrorMap(ErrorCode code) {
{SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND},
{SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER},
{SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE},
{SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER},
......@@ -57,7 +56,7 @@ ErrorMap(ErrorCode code) {
{SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID},
{SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT},
{SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED},
{SERVER_CACHE_FULL, ::milvus::grpc::ErrorCode::CACHE_FAILED},
{DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED},
{SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR},
{SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY},
......
......@@ -42,6 +42,8 @@ static const char* DQL_TASK_GROUP = "dql";
static const char* DDL_DML_TASK_GROUP = "ddl_dml";
static const char* PING_TASK_GROUP = "ping";
constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
using DB_META = milvus::engine::meta::Meta;
using DB_DATE = milvus::engine::meta::DateT;
......@@ -78,8 +80,6 @@ IndexType(engine::EngineType type) {
return map_type[type];
}
constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
Status
ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array, std::vector<DB_DATE>& dates) {
dates.clear();
......@@ -94,10 +94,10 @@ ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array,
return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
}
int64_t days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / DAY_SECONDS;
if (days == 0) {
int64_t days = (tt_end - tt_start) / DAY_SECONDS;
if (days <= 0) {
return Status(SERVER_INVALID_TIME_RANGE,
"Invalid time range: " + range.start_value() + " to " + range.end_value());
"Invalid time range: The start-date should be smaller than end-date!");
}
// range: [start_day, end_day)
......@@ -511,8 +511,8 @@ InsertTask::OnExecute() {
}
// step 6: update table flag
user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID : table_info.flag_ |=
engine::meta::FLAG_MASK_NO_USERID;
user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
: table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_);
#ifdef MILVUS_ENABLE_PROFILING
......@@ -706,7 +706,11 @@ CountTableTask::OnExecute() {
uint64_t row_count = 0;
status = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
if (!status.ok()) {
return status;
if (status.code(), DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
} else {
return status;
}
}
row_count_ = static_cast<int64_t>(row_count);
......
......@@ -56,7 +56,6 @@ constexpr ErrorCode SERVER_NULL_POINTER = ToServerErrorCode(3);
constexpr ErrorCode SERVER_INVALID_ARGUMENT = ToServerErrorCode(4);
constexpr ErrorCode SERVER_FILE_NOT_FOUND = ToServerErrorCode(5);
constexpr ErrorCode SERVER_NOT_IMPLEMENT = ToServerErrorCode(6);
constexpr ErrorCode SERVER_BLOCKING_QUEUE_EMPTY = ToServerErrorCode(7);
constexpr ErrorCode SERVER_CANNOT_CREATE_FOLDER = ToServerErrorCode(8);
constexpr ErrorCode SERVER_CANNOT_CREATE_FILE = ToServerErrorCode(9);
constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10);
......@@ -74,7 +73,7 @@ constexpr ErrorCode SERVER_INVALID_ROWRECORD_ARRAY = ToServerErrorCode(107);
constexpr ErrorCode SERVER_INVALID_TOPK = ToServerErrorCode(108);
constexpr ErrorCode SERVER_ILLEGAL_VECTOR_ID = ToServerErrorCode(109);
constexpr ErrorCode SERVER_ILLEGAL_SEARCH_RESULT = ToServerErrorCode(110);
constexpr ErrorCode SERVER_CACHE_ERROR = ToServerErrorCode(111);
constexpr ErrorCode SERVER_CACHE_FULL = ToServerErrorCode(111);
constexpr ErrorCode SERVER_WRITE_ERROR = ToServerErrorCode(112);
constexpr ErrorCode SERVER_INVALID_NPROBE = ToServerErrorCode(113);
constexpr ErrorCode SERVER_INVALID_INDEX_NLIST = ToServerErrorCode(114);
......
......@@ -31,7 +31,7 @@ static int warning_idx = 0;
static int trace_idx = 0;
static int error_idx = 0;
static int fatal_idx = 0;
}
} // namespace
// TODO(yzb) : change the easylogging library to get the log level from parameter rather than filename
void
......
......@@ -71,7 +71,11 @@ ValidationUtil::ValidateTableName(const std::string& table_name) {
Status
ValidationUtil::ValidateTableDimension(int64_t dimension) {
if (dimension <= 0 || dimension > TABLE_DIMENSION_LIMIT) {
if (dimension <= 0) {
std::string msg = "Dimension value should be greater than 0";
SERVER_LOG_ERROR << msg;
return Status(SERVER_INVALID_VECTOR_DIMENSION, msg);
} else if (dimension > TABLE_DIMENSION_LIMIT) {
std::string msg = "Table dimension excceed the limitation: " + std::to_string(TABLE_DIMENSION_LIMIT);
SERVER_LOG_ERROR << msg;
return Status(SERVER_INVALID_VECTOR_DIMENSION, msg);
......@@ -95,7 +99,7 @@ ValidationUtil::ValidateTableIndexType(int32_t index_type) {
Status
ValidationUtil::ValidateTableIndexNlist(int32_t nlist) {
if (nlist <= 0) {
std::string msg = "Invalid nlist value: " + std::to_string(nlist);
std::string msg = "nlist value should be greater than 0";
SERVER_LOG_ERROR << msg;
return Status(SERVER_INVALID_INDEX_NLIST, msg);
}
......@@ -128,7 +132,7 @@ ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) {
Status
ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema& table_schema) {
if (top_k <= 0 || top_k > 2048) {
std::string msg = "Invalid top k value: " + std::to_string(top_k);
std::string msg = "Invalid top k value: " + std::to_string(top_k) + ", rational range [1, 2048]";
SERVER_LOG_ERROR << msg;
return Status(SERVER_INVALID_TOPK, msg);
}
......@@ -139,7 +143,8 @@ ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchem
Status
ValidationUtil::ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema& table_schema) {
if (nprobe <= 0 || nprobe > table_schema.nlist_) {
std::string msg = "Invalid nprobe value: " + std::to_string(nprobe);
std::string msg = "Invalid nprobe value: " + std::to_string(nprobe) + ", rational range [1, " +
std::to_string(table_schema.nlist_) + "]";
SERVER_LOG_ERROR << msg;
return Status(SERVER_INVALID_NPROBE, msg);
}
......
......@@ -66,6 +66,7 @@ ms::engine::DBOptions MetricTest::GetOptions() {
}
void MetricTest::SetUp() {
boost::filesystem::remove_all("/tmp/milvus_test");
InitLog();
auto options = GetOptions();
db_ = ms::engine::DBFactory::Build(options);
......
......@@ -351,11 +351,11 @@ TEST_F(RpcHandlerTest, TABLES_TEST) {
handler->Insert(&context, &request, &vector_ids);
//Show table
// ::milvus::grpc::Command cmd;
// ::grpc::ServerWriter<::milvus::grpc::TableName> *writer;
// status = handler->ShowTables(&context, &cmd, writer);
// ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
//show tables
::milvus::grpc::Command cmd;
::milvus::grpc::TableNameList table_name_list;
status = handler->ShowTables(&context, &cmd, &table_name_list);
ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
//Count Table
::milvus::grpc::TableRowCount count;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册