提交 e6d4b638 编写于 作者: S starlord

merge 0.3.1


Former-commit-id: fbb9a3a12fbfe4a596a383eaed833f2d366c17a3
timeout(time: 10, unit: 'MINUTES') {
timeout(time: 25, unit: 'MINUTES') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
......
......@@ -34,6 +34,10 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-380 - Update resource loader and executor, work util all finished
- MS-383 - Modify condition variable usage in scheduler
- MS-384 - Add global instance of ResourceMgr and Scheduler
- MS-389 - Add clone interface in Task
- MS-390 - Update resource construct function
- MS-391 - Add PushTaskToNeighbourHasExecutor action
- MS-394 - Update scheduler unittest
## New Feature
- MS-343 - Implement ResourceMgr
......@@ -62,6 +66,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-288 - Update compile scripts
- MS-330 - Stability test failed caused by server core dumped
- MS-347 - Build index hangs again
- MS-382 - fix MySQLMetaImpl::CleanUpFilesWithTTL unknown column bug
## Improvement
- MS-156 - Add unittest for merge result functions
......@@ -90,6 +95,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-324 - Show error when there is not enough gpu memory to build index
- MS-328 - Check metric type on server start
- MS-332 - Set grpc and thrift server run concurrently
- MS-352 - Add hybrid index
## New Feature
- MS-180 - Add new mem manager
......@@ -149,8 +155,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-130 - Add prometheus_test
- MS-144 - Add nprobe config
- MS-147 - Enable IVF
- MS-130 - Add prometheus_test
## Task
- MS-74 - Change README.md in cpp
- MS-88 - Add support for arm architecture
......
......@@ -86,7 +86,7 @@ if [[ ! -d cmake_build ]]; then
fi
cd cmake_build
git
CUDA_COMPILER=/usr/local/cuda/bin/nvcc
if [[ ${MAKE_CLEAN} == "ON" ]]; then
......
......@@ -157,7 +157,6 @@ if (UNIX)
endif (APPLE)
endif (UNIX)
# ----------------------------------------------------------------------
# thirdparty directory
set(THIRDPARTY_DIR "${MILVUS_SOURCE_DIR}/thirdparty")
......@@ -167,7 +166,7 @@ set(THIRDPARTY_DIR "${MILVUS_SOURCE_DIR}/thirdparty")
if(NOT DEFINED USE_JFROG_CACHE)
set(USE_JFROG_CACHE "OFF")
endif()
if(USE_JFROG_CACHE STREQUAL "ON")
if(USE_JFROG_CACHE STREQUAL "ON")
set(JFROG_ARTFACTORY_CACHE_URL "http://192.168.1.201:80/artifactory/generic-local/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${MILVUS_BUILD_ARCH}/${BUILD_TYPE}")
set(JFROG_USER_NAME "test")
set(JFROG_PASSWORD "Fantast1c")
......@@ -308,9 +307,11 @@ set(EASYLOGGINGPP_MD5 "b78cd319db4be9b639927657b8aa7732")
if(DEFINED ENV{MILVUS_FAISS_URL})
set(FAISS_SOURCE_URL "$ENV{MILVUS_FAISS_URL}")
else()
set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz")
set(FAISS_SOURCE_URL "http://192.168.1.105:6060/jinhai/faiss/-/archive/${FAISS_VERSION}/faiss-${FAISS_VERSION}.tar.gz")
# set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz")
endif()
set(FAISS_MD5 "0bc12737b23def156f6a1eb782050135")
set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
if(DEFINED ENV{MILVUS_KNOWHERE_URL})
set(KNOWHERE_SOURCE_URL "$ENV{MILVUS_KNOWHERE_URL}")
......@@ -462,6 +463,7 @@ else()
endif()
set(GRPC_MD5 "7ec59ad54c85a12dcbbfede09bf413a9")
# ----------------------------------------------------------------------
# ARROW
......@@ -686,7 +688,7 @@ macro(build_bzip2)
set(BZIP2_STATIC_LIB
"${BZIP2_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}bz2${CMAKE_STATIC_LIBRARY_SUFFIX}")
if(USE_JFROG_CACHE STREQUAL "ON")
if(USE_JFROG_CACHE STREQUAL "ON")
set(BZIP2_CACHE_PACKAGE_NAME "bzip2_${BZIP2_MD5}.tar.gz")
set(BZIP2_CACHE_URL "${JFROG_ARTFACTORY_CACHE_URL}/${BZIP2_CACHE_PACKAGE_NAME}")
set(BZIP2_CACHE_PACKAGE_PATH "${THIRDPARTY_PACKAGE_CACHE}/${BZIP2_CACHE_PACKAGE_NAME}")
......@@ -1184,7 +1186,7 @@ macro(build_faiss)
INTERFACE_INCLUDE_DIRECTORIES "${FAISS_INCLUDE_DIR}"
INTERFACE_LINK_LIBRARIES "openblas;lapack" )
endif()
add_dependencies(faiss faiss_ep)
if(${BUILD_FAISS_WITH_MKL} STREQUAL "OFF")
......@@ -1321,7 +1323,7 @@ if (MILVUS_BUILD_TESTS)
if(NOT GTEST_VENDORED)
endif()
get_target_property(GTEST_INCLUDE_DIR gtest INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM "${GTEST_PREFIX}/lib")
include_directories(SYSTEM ${GTEST_INCLUDE_DIR})
......@@ -1828,7 +1830,7 @@ endmacro()
if(MILVUS_WITH_SNAPPY)
resolve_dependency(Snappy)
get_target_property(SNAPPY_INCLUDE_DIRS snappy INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM ${SNAPPY_PREFIX}/lib/)
include_directories(SYSTEM ${SNAPPY_INCLUDE_DIRS})
......@@ -2131,7 +2133,7 @@ endmacro()
if(MILVUS_WITH_YAMLCPP)
resolve_dependency(yaml-cpp)
get_target_property(YAMLCPP_INCLUDE_DIR yaml-cpp INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM ${YAMLCPP_PREFIX}/lib/)
include_directories(SYSTEM ${YAMLCPP_INCLUDE_DIR})
......@@ -2203,7 +2205,7 @@ endmacro()
if(MILVUS_WITH_ZLIB)
resolve_dependency(ZLIB)
get_target_property(ZLIB_INCLUDE_DIR zlib INTERFACE_INCLUDE_DIRECTORIES)
include_directories(SYSTEM ${ZLIB_INCLUDE_DIR})
endif()
......@@ -2301,7 +2303,7 @@ endmacro()
if(MILVUS_WITH_ZSTD)
resolve_dependency(ZSTD)
get_target_property(ZSTD_INCLUDE_DIR zstd INTERFACE_INCLUDE_DIRECTORIES)
link_directories(SYSTEM ${ZSTD_PREFIX}/lib)
include_directories(SYSTEM ${ZSTD_INCLUDE_DIR})
......@@ -2406,7 +2408,7 @@ endmacro()
if(MILVUS_WITH_AWS)
resolve_dependency(AWS)
link_directories(SYSTEM ${AWS_PREFIX}/lib)
get_target_property(AWS_CPP_SDK_S3_INCLUDE_DIR aws-cpp-sdk-s3 INTERFACE_INCLUDE_DIRECTORIES)
......
......@@ -45,3 +45,5 @@ engine_config:
use_blas_threshold: 20
metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP
omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute
use_hybrid_index: false # use GPU/CPU hybrid index
hybrid_index_gpu: 0 # hybrid index gpu device id
......@@ -73,19 +73,19 @@ YamlConfigMgr::SetChildConfig(const YAML::Node& node,
return false;
}
bool
YamlConfigMgr::SetSequence(const YAML::Node &node,
const std::string &child_name,
ConfigNode &config) {
if(node[child_name].IsDefined ()) {
size_t cnt = node[child_name].size();
for(size_t i = 0; i < cnt; i++){
config.AddSequenceItem(child_name, node[child_name][i].as<std::string>());
}
return true;
}
return false;
}
//bool
//YamlConfigMgr::SetSequence(const YAML::Node &node,
// const std::string &child_name,
// ConfigNode &config) {
// if(node[child_name].IsDefined ()) {
// size_t cnt = node[child_name].size();
// for(size_t i = 0; i < cnt; i++){
// config.AddSequenceItem(child_name, node[child_name][i].as<std::string>());
// }
// return true;
// }
// return false;
//}
void
YamlConfigMgr::LoadConfigNode(const YAML::Node& node, ConfigNode& config) {
......@@ -98,8 +98,8 @@ YamlConfigMgr::LoadConfigNode(const YAML::Node& node, ConfigNode& config) {
SetConfigValue(node, key, config);
} else if(node[key].IsMap()){
SetChildConfig(node, key, config);
} else if(node[key].IsSequence()){
SetSequence(node, key, config);
// } else if(node[key].IsSequence()){
// SetSequence(node, key, config);
}
}
}
......
......@@ -33,10 +33,10 @@ class YamlConfigMgr : public IConfigMgr {
const std::string &name,
ConfigNode &config);
bool
SetSequence(const YAML::Node &node,
const std::string &child_name,
ConfigNode &config);
// bool
// SetSequence(const YAML::Node &node,
// const std::string &child_name,
// ConfigNode &config);
void LoadConfigNode(const YAML::Node& node, ConfigNode& config);
......
......@@ -60,6 +60,7 @@ void CollectQueryMetrics(double total_time, size_t nq) {
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
#if 0
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
switch(file_type) {
case meta::TableFileSchema::RAW:
......@@ -79,6 +80,7 @@ void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
}
}
}
#endif
}
......@@ -105,13 +107,18 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date
//dates partly delete files of the table but currently we don't support
ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
if (dates.empty()) {
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
scheduler.Schedule(context);
} else {
meta_ptr_->DropPartitionsByDates(table_id, dates);
}
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
scheduler.Schedule(context);
return Status::OK();
}
......@@ -205,7 +212,7 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
ENGINE_LOG_DEBUG << "Query by vectors";
ENGINE_LOG_DEBUG << "Query by vectors " << table_id;
//get all table files from table
meta::DatePartionedTableFilesSchema files;
......@@ -568,7 +575,7 @@ Status DBImpl::BuildIndex(const std::string& table_id) {
int times = 1;
while (has) {
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
ENGINE_LOG_DEBUG << "Non index files detected in " << table_id << "! Will build index " << times;
meta_ptr_->UpdateTableFilesToIndex(table_id);
/* StartBuildIndexTask(true); */
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
......
......@@ -90,11 +90,11 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
}
}
std::shared_ptr<DB> DBFactory::Build() {
auto options = OptionsFactory::Build();
auto db = DBFactory::Build(options);
return std::shared_ptr<DB>(db);
}
//std::shared_ptr<DB> DBFactory::Build() {
// auto options = OptionsFactory::Build();
// auto db = DBFactory::Build(options);
// return std::shared_ptr<DB>(db);
//}
DB* DBFactory::Build(const Options& options) {
return new DBImpl(options);
......
......@@ -33,7 +33,7 @@ struct DBMetaImplFactory {
};
struct DBFactory {
static std::shared_ptr<DB> Build();
//static std::shared_ptr<DB> Build();
static DB *Build(const Options &);
};
......
......@@ -30,13 +30,13 @@ namespace meta {
}
}
int MySQLConnectionPool::getConnectionsInUse() {
return conns_in_use_;
}
void MySQLConnectionPool::set_max_idle_time(int max_idle) {
max_idle_time_ = max_idle;
}
// int MySQLConnectionPool::getConnectionsInUse() {
// return conns_in_use_;
// }
//
// void MySQLConnectionPool::set_max_idle_time(int max_idle) {
// max_idle_time_ = max_idle;
// }
std::string MySQLConnectionPool::getDB() {
return db_;
......
......@@ -44,9 +44,9 @@ public:
// Other half of in-use conn count limit
void release(const mysqlpp::Connection *pc) override;
int getConnectionsInUse();
void set_max_idle_time(int max_idle);
// int getConnectionsInUse();
//
// void set_max_idle_time(int max_idle);
std::string getDB();
......
......@@ -1810,7 +1810,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT file_id " <<
"FROM TableFiles " <<
"WHERE table_id = " << table_id << ";";
"WHERE table_id = " << quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
......@@ -1821,6 +1821,10 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}
}
}
} catch (const BadQuery &er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
} catch (const Exception &er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL" << ": " << er.what();
......
......@@ -311,6 +311,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
has = true;
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
std::vector<std::string> file_ids;
for (auto &file : selected) {
switch (std::get<1>(file)) {
case (int) TableFileSchema::RAW:
......
......@@ -12,13 +12,16 @@ namespace milvus {
namespace engine {
std::shared_ptr<Resource>
ResourceFactory::Create(const std::string &name, const std::string &alias) {
ResourceFactory::Create(const std::string &name,
const std::string &alias,
bool enable_loader,
bool enable_executor) {
if (name == "disk") {
return std::make_shared<DiskResource>(alias);
return std::make_shared<DiskResource>(alias, enable_loader, enable_executor);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias);
return std::make_shared<CpuResource>(alias, enable_loader, enable_executor);
} else if (name == "gpu") {
return std::make_shared<GpuResource>(alias);
return std::make_shared<GpuResource>(alias, enable_loader, enable_executor);
} else {
return nullptr;
}
......
......@@ -21,7 +21,10 @@ namespace engine {
class ResourceFactory {
public:
static std::shared_ptr<Resource>
Create(const std::string &name, const std::string &alias = "");
Create(const std::string &name,
const std::string &alias = "",
bool enable_loader = true,
bool enable_executor = true);
};
......
......@@ -114,13 +114,14 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
resource->WakeupExecutor();
if (resource->Type() == ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_);
} else {
Action::PushTaskToNeighbourHasExecutor(event->resource_);
}
}
}
void
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
// Action::PushTaskToNeighbour(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
......
......@@ -20,6 +20,11 @@ public:
static void
PushTaskToNeighbour(const ResourceWPtr &self);
/*
* Push task to neighbour that has executor;
*/
static void
PushTaskToNeighbourHasExecutor(const ResourceWPtr &self);
/*
* Pull task From neighbour;
......
......@@ -4,7 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include <list>
#include "Action.h"
......@@ -13,29 +13,65 @@ namespace milvus {
namespace engine {
void
push_task(const ResourcePtr &self, const ResourcePtr &other) {
auto &self_task_table = self->task_table();
auto &other_task_table = other->task_table();
next(std::list<ResourcePtr> &neighbours, std::list<ResourcePtr>::iterator &it) {
it++;
if (neighbours.end() == it) {
it = neighbours.begin();
}
}
void
push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighbours) {
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 10);
auto it = neighbours.begin();
if (it == neighbours.end()) return;
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
other_task_table.Put(task);
// TODO: mark moved future
task = task->Clone();
(*it)->task_table().Put(task);
next(neighbours, it);
}
}
}
void
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &neighbour : self->GetNeighbours()) {
if (auto n = neighbour.neighbour_node.lock()) {
push_task(self, std::static_pointer_cast<Resource>(n));
}
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
neighbours.emplace_back(resource);
}
push_task_round_robin(self->task_table(), neighbours);
}
void
Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) {
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
if (resource->HasExecutor()) {
neighbours.emplace_back(resource);
}
}
push_task_round_robin(self->task_table(), neighbours);
}
......
......@@ -16,8 +16,8 @@ std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
return out;
}
CpuResource::CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
CpuResource::CpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, enable_loader, enable_executor) {}
void CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0);
......@@ -29,4 +29,4 @@ void CpuResource::Process(TaskPtr task) {
}
}
}
\ No newline at end of file
}
......@@ -17,7 +17,7 @@ namespace engine {
class CpuResource : public Resource {
public:
explicit
CpuResource(std::string name);
CpuResource(std::string name, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -15,8 +15,8 @@ std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
return out;
}
DiskResource::DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK, true, false) {
DiskResource::DiskResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::DISK, enable_loader, enable_executor) {
}
void DiskResource::LoadFile(TaskPtr task) {
......
......@@ -16,7 +16,7 @@ namespace engine {
class DiskResource : public Resource {
public:
explicit
DiskResource(std::string name);
DiskResource(std::string name, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -16,8 +16,8 @@ std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
return out;
}
GpuResource::GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {}
GpuResource::GpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, enable_loader, enable_executor) {}
void GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, 0);
......
......@@ -16,7 +16,7 @@ namespace engine {
class GpuResource : public Resource {
public:
explicit
GpuResource(std::string name);
GpuResource(std::string name, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -45,8 +45,30 @@ enum class RegisterType {
class Resource : public Node, public std::enable_shared_from_this<Resource> {
public:
/*
* Event function MUST be a short function, never blocking;
* Start loader and executor if enable;
*/
void
Start();
/*
* Stop loader and executor, join it, blocking util thread exited;
*/
void
Stop();
/*
* wake up loader;
*/
void
WakeupLoader();
/*
* wake up executor;
*/
void
WakeupExecutor();
public:
template<typename T>
void Register_T(const RegisterType &type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); });
......@@ -65,11 +87,17 @@ public:
return type_;
}
void
Start();
// TODO: better name?
inline bool
HasLoader() {
return enable_loader_;
}
void
Stop();
// TODO: better name?
inline bool
HasExecutor() {
return enable_executor_;
}
TaskTable &
task_table();
......@@ -81,24 +109,11 @@ public:
friend std::ostream &operator<<(std::ostream &out, const Resource &resource);
public:
/*
* wake up loader;
*/
void
WakeupLoader();
/*
* wake up executor;
*/
void
WakeupExecutor();
protected:
Resource(std::string name,
ResourceType type,
bool enable_loader = true,
bool enable_executor = true);
bool enable_loader,
bool enable_executor);
// TODO: SearchContextPtr to TaskPtr
/*
......
......@@ -20,6 +20,11 @@ XDeleteTask::Execute() {
}
TaskPtr
XDeleteTask::Clone() {
return nullptr;
}
}
}
}
......@@ -19,6 +19,9 @@ public:
void
Execute() override;
TaskPtr
Clone() override;
};
}
......
......@@ -99,16 +99,27 @@ CollectDurationMetrics(int index_type, double total_time) {
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType) file_->engine_type_);
}
void
XSearchTask::Load(LoadType type, uint8_t device_id) {
server::TimeRecorder rc("");
//step 1: load index
ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType) file_->engine_type_);
try {
index_ptr->Load();
if (type == LoadType::DISK2CPU) {
index_engine_->Load();
} else if (type == LoadType::CPU2GPU) {
index_engine_->Load();
index_engine_->CopyToGpu(device_id);
} else if (type == LoadType::GPU2CPU) {
index_engine_->CopyToCpu();
} else {
// TODO: exception
}
} catch (std::exception &ex) {
//typical error: out of disk space or permition denied
std::string msg = "Failed to load index file: " + std::string(ex.what());
......@@ -121,7 +132,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
return;
}
size_t file_size = index_ptr->PhysicalSize();
size_t file_size = index_engine_->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + std::to_string(file_->file_type_)
+ " size:" + std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally cost";
......@@ -135,7 +146,6 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
//step 2: return search task for later execution
index_id_ = file_->id_;
index_type_ = file_->file_type_;
index_engine_ = index_ptr;
search_contexts_.swap(search_contexts_);
}
......@@ -157,12 +167,13 @@ XSearchTask::Execute() {
for (auto &context : search_contexts_) {
//step 1: allocate memory
auto inner_k = context->topk();
auto nprobe = context->nprobe();
output_ids.resize(inner_k * context->nq());
output_distence.resize(inner_k * context->nq());
try {
//step 2: search
index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(),
index_engine_->Search(context->nq(), context->vectors(), inner_k, nprobe, output_distence.data(),
output_ids.data());
double span = rc.RecordSection("do search for context:" + context->Identity());
......@@ -199,6 +210,16 @@ XSearchTask::Execute() {
rc.ElapseFromBegin("totally cost");
}
TaskPtr
XSearchTask::Clone() {
auto ret = std::make_shared<XSearchTask>(file_);
ret->index_id_ = index_id_;
ret->index_engine_ = index_engine_->Clone();
ret->search_contexts_ = search_contexts_;
ret->metric_l2 = metric_l2;
return ret;
}
Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distence,
uint64_t nq,
......@@ -343,6 +364,7 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
return Status::OK();
}
}
}
}
......@@ -14,12 +14,18 @@ namespace engine {
class XSearchTask : public Task {
public:
explicit
XSearchTask(TableFileSchemaPtr file);
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
TaskPtr
Clone() override;
public:
static Status ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distence,
......
......@@ -35,6 +35,9 @@ public:
virtual void
Execute() = 0;
virtual TaskPtr
Clone() = 0;
public:
std::vector<SearchContextPtr> search_contexts_;
ScheduleTaskPtr task_;
......
......@@ -16,8 +16,7 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) {
switch (schedule_task->type()) {
case ScheduleTaskType::kIndexLoad: {
auto load_task = std::static_pointer_cast<IndexLoadTask>(schedule_task);
auto task = std::make_shared<XSearchTask>();
task->file_ = load_task->file_;
auto task = std::make_shared<XSearchTask>(load_task->file_);
task->search_contexts_ = load_task->search_contexts_;
task->task_ = schedule_task;
return task;
......
......@@ -6,6 +6,7 @@
#include "TestTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
......@@ -17,7 +18,23 @@ TestTask::Load(LoadType type, uint8_t device_id) {
void
TestTask::Execute() {
std::lock_guard<std::mutex> lock(mutex_);
exec_count_++;
done_ = true;
}
TaskPtr
TestTask::Clone() {
auto ret = std::make_shared<TestTask>();
ret->load_count_ = load_count_;
ret->exec_count_ = exec_count_;
return ret;
}
void
TestTask::Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_; });
}
}
......
......@@ -23,9 +23,19 @@ public:
void
Execute() override;
TaskPtr
Clone() override;
void
Wait();
public:
uint64_t load_count_;
uint64_t exec_count_;
uint64_t load_count_ = 0;
uint64_t exec_count_ = 0;
bool done_ = false;
std::mutex mutex_;
std::condition_variable cv_;
};
......
......@@ -302,6 +302,15 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "DropIndex function call status: " << stat.ToString() << std::endl;
}
{//delete by range
Range rg;
rg.start_value = CurrentTmDate(-2);
rg.end_value = CurrentTmDate(-3);
Status stat = conn->DeleteByRange(rg, TABLE_NAME);
std::cout << "DeleteByRange function call status: " << stat.ToString() << std::endl;
}
{//delete table
Status stat = conn->DropTable(TABLE_NAME);
std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
......
......@@ -330,7 +330,15 @@ ClientProxy::ServerStatus() const {
Status
ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name) {
try {
::milvus::grpc::DeleteByRangeParam delete_by_range_param;
delete_by_range_param.set_table_name(table_name);
delete_by_range_param.mutable_range()->set_start_value(range.start_value);
delete_by_range_param.mutable_range()->set_end_value(range.end_value);
return client_ptr_->DeleteByRange(delete_by_range_param);
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what()));
}
}
Status
......@@ -341,7 +349,7 @@ ClientProxy::PreloadTable(const std::string &table_name) const {
Status status = client_ptr_->PreloadTable(grpc_table_name);
return status;
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what()));
}
}
......
......@@ -265,13 +265,26 @@ GrpcClient::PreloadTable(milvus::grpc::TableName &table_name) {
}
Status
GrpcClient::Disconnect() {
stub_.release();
GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->DeleteByRange(&context, delete_by_range_param, &response);
if (!grpc_status.ok()) {
std::cerr << "DeleteByRange gRPC failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
std::cerr << response.reason() << std::endl;
return Status(StatusCode::ServerFailed, response.reason());
}
return Status::OK();
}
Status
GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) {
GrpcClient::Disconnect() {
stub_.release();
return Status::OK();
}
......
......@@ -117,7 +117,7 @@ ConnectionImpl::ServerStatus() const {
Status
ConnectionImpl::DeleteByRange(Range &range,
const std::string &table_name) {
return client_proxy_->DeleteByRange(range, table_name);
}
Status
......
......@@ -48,12 +48,14 @@ static const char* CONFIG_METRIC_COLLECTOR = "collector";
static const char* CONFIG_PROMETHEUS = "prometheus_config";
static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port";
static const char* CONFIG_ENGINE = "engine_config";
static const char* CONFIG_NPROBE = "nprobe";
static const char* CONFIG_NLIST = "nlist";
static const char* CONFIG_DCBT = "use_blas_threshold";
static const char* CONFIG_METRICTYPE = "metric_type";
static const char* CONFIG_OMP_THREAD_NUM = "omp_thread_num";
static const std::string CONFIG_ENGINE = "engine_config";
static const std::string CONFIG_NPROBE = "nprobe";
static const std::string CONFIG_NLIST = "nlist";
static const std::string CONFIG_DCBT = "use_blas_threshold";
static const std::string CONFIG_METRICTYPE = "metric_type";
static const std::string CONFIG_OMP_THREAD_NUM = "omp_thread_num";
static const std::string CONFIG_USE_HYBRID_INDEX = "use_hybrid_index";
static const std::string CONFIG_HYBRID_INDEX_GPU = "hybrid_index_gpu";
class ServerConfig {
public:
......
......@@ -168,7 +168,12 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context,
GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context,
const ::milvus::grpc::DeleteByRangeParam *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DeleteByRangeTask::Create(*request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->set_error_code(grpc_status.error_code());
response->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
}
::grpc::Status
......
......@@ -716,6 +716,73 @@ CmdTask::OnExecute() {
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param)
: GrpcBaseTask(DDL_DML_TASK_GROUP),
delete_by_range_param_(delete_by_range_param){
}
BaseTaskPtr
DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) {
return std::shared_ptr<GrpcBaseTask>(new DeleteByRangeTask(delete_by_range_param));
}
ServerError
DeleteByRangeTask::OnExecute() {
try {
TimeRecorder rc("DeleteByRangeTask");
//step 1: check arguments
std::string table_name = delete_by_range_param_.table_name();
ServerError res = ValidationUtil::ValidateTableName(table_name);
if (res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name);
}
//step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name;
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
if (!stat.ok()) {
if (stat.IsNotFound()) {
return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name + " not exists");
} else {
return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
}
}
rc.ElapseFromBegin("check validation");
//step 3: check date range, and convert to db dates
std::vector<DB_DATE> dates;
ServerError error_code = SERVER_SUCCESS;
std::string error_msg;
std::vector<::milvus::grpc::Range> range_array;
range_array.emplace_back(delete_by_range_param_.range());
ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg);
if (error_code != SERVER_SUCCESS) {
return SetError(error_code, error_msg);
}
#ifdef MILVUS_ENABLE_PROFILING
std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
"_top_" + std::to_string(this->top_k_) + "_" +
GetCurrTimeStr() + ".profiling";
ProfilerStart(fname.c_str());
#endif
engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates);
if (!status.ok()) {
return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
}
} catch (std::exception &ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PreloadTableTask::PreloadTableTask(const std::string &table_name)
: GrpcBaseTask(DDL_DML_TASK_GROUP),
......
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "FaissGpuResources.h"
#include "map"
namespace zilliz {
namespace milvus {
namespace engine {
FaissGpuResources::Ptr& FaissGpuResources::GetGpuResources(int device_id) {
static std::map<int, FaissGpuResources::Ptr> gpu_resources_map;
auto search = gpu_resources_map.find(device_id);
if (search != gpu_resources_map.end()) {
return gpu_resources_map[device_id];
} else {
gpu_resources_map[device_id] = std::make_shared<faiss::gpu::StandardGpuResources>();
return gpu_resources_map[device_id];
}
}
void FaissGpuResources::SelectGpu() {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
gpu_num_ = server_config.GetInt32Value(server::CONFIG_GPU_INDEX, 0);
}
int32_t FaissGpuResources::GetGpu() {
return gpu_num_;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "faiss/gpu/GpuResources.h"
#include "faiss/gpu/StandardGpuResources.h"
#include "server/ServerConfig.h"
namespace zilliz {
namespace milvus {
namespace engine {
class FaissGpuResources {
public:
using Ptr = std::shared_ptr<faiss::gpu::GpuResources>;
static FaissGpuResources::Ptr& GetGpuResources(int device_id);
void SelectGpu();
int32_t GetGpu();
FaissGpuResources() : gpu_num_(0) { SelectGpu(); }
private:
int32_t gpu_num_;
};
}
}
}
\ No newline at end of file
......@@ -7,16 +7,22 @@
#if 0
// TODO: maybe support static search
#ifdef GPU_VERSION
#include "faiss/gpu/GpuAutoTune.h"
#include "faiss/gpu/StandardGpuResources.h"
#include "faiss/gpu/utils/DeviceUtils.h"
#endif
#include "Index.h"
#include "faiss/index_io.h"
#include "faiss/IndexIVF.h"
#include "faiss/IVFlib.h"
#include "faiss/IndexScalarQuantizer.h"
#include "server/ServerConfig.h"
#include "src/wrapper/FaissGpuResources.h"
namespace zilliz {
namespace milvus {
......@@ -74,8 +80,27 @@ void write_index(const Index_ptr &index, const std::string &file_name) {
Index_ptr read_index(const std::string &file_name) {
std::shared_ptr<faiss::Index> raw_index = nullptr;
raw_index.reset(faiss::read_index(file_name.c_str()));
return std::make_shared<Index>(raw_index);
faiss::Index *cpu_index = faiss::read_index(file_name.c_str());
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode engine_config = config.GetConfig(server::CONFIG_ENGINE);
bool use_hybrid_index_ = engine_config.GetBoolValue(server::CONFIG_USE_HYBRID_INDEX, false);
if (dynamic_cast<faiss::IndexIVFScalarQuantizer *>(cpu_index) != nullptr && use_hybrid_index_) {
int device_id = engine_config.GetInt32Value(server::CONFIG_HYBRID_INDEX_GPU, 0);
auto gpu_resources = engine::FaissGpuResources::GetGpuResources(device_id);
faiss::gpu::GpuClonerOptions clone_option;
clone_option.storeInCpu = true;
faiss::Index *gpu_index = faiss::gpu::index_cpu_to_gpu(gpu_resources.get(), device_id, cpu_index, &clone_option);
delete cpu_index;
raw_index.reset(gpu_index);
return std::make_shared<Index>(raw_index);
} else {
raw_index.reset(cpu_index);
return std::make_shared<Index>(raw_index);
}
}
}
......
......@@ -83,7 +83,6 @@ void write_index(const Index_ptr &index, const std::string &file_name);
extern Index_ptr read_index(const std::string &file_name);
#endif
}
}
}
......@@ -17,41 +17,17 @@
#include <faiss/IndexFlat.h>
#include <easylogging++.h>
#include "faiss/IndexScalarQuantizer.h"
#include "server/ServerConfig.h"
#include "IndexBuilder.h"
#include "FaissGpuResources.h"
namespace zilliz {
namespace milvus {
namespace engine {
class GpuResources {
public:
static GpuResources &GetInstance() {
static GpuResources instance;
return instance;
}
void SelectGpu() {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
gpu_num = server_config.GetInt32Value(server::CONFIG_GPU_INDEX, 0);
}
int32_t GetGpu() {
return gpu_num;
}
private:
GpuResources() : gpu_num(0) { SelectGpu(); }
private:
int32_t gpu_num;
};
using std::vector;
static std::mutex gpu_resource;
......@@ -59,6 +35,12 @@ static std::mutex cpu_resource;
IndexBuilder::IndexBuilder(const Operand_ptr &opd) {
opd_ = opd;
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
use_hybrid_index_ = engine_config.GetBoolValue(CONFIG_USE_HYBRID_INDEX, false);
hybrid_index_device_id_ = engine_config.GetInt32Value(server::CONFIG_HYBRID_INDEX_GPU, 0);
}
// Default: build use gpu
......@@ -76,14 +58,48 @@ Index_ptr IndexBuilder::build_all(const long &nb,
faiss::Index *ori_index = faiss::index_factory(opd_->d, opd_->get_index_type(nb).c_str(), metric_type);
std::lock_guard<std::mutex> lk(gpu_resource);
#ifdef UNITTEST_ONLY
faiss::gpu::StandardGpuResources res;
auto device_index = faiss::gpu::index_cpu_to_gpu(&res, GpuResources::GetInstance().GetGpu(), ori_index);
int device_id = 0;
faiss::gpu::GpuClonerOptions clone_option;
clone_option.storeInCpu = use_hybrid_index_;
auto device_index = faiss::gpu::index_cpu_to_gpu(&res, device_id, ori_index, &clone_option);
#else
engine::FaissGpuResources res;
int device_id = res.GetGpu();
auto gpu_resources = engine::FaissGpuResources::GetGpuResources(device_id);
faiss::gpu::GpuClonerOptions clone_option;
clone_option.storeInCpu = use_hybrid_index_;
auto device_index = faiss::gpu::index_cpu_to_gpu(gpu_resources.get(), device_id, ori_index, &clone_option);
#endif
if (!device_index->is_trained) {
nt == 0 || xt == nullptr ? device_index->train(nb, xb)
: device_index->train(nt, xt);
}
device_index->add_with_ids(nb, xb, ids); // TODO: support with add_with_IDMAP
if (dynamic_cast<faiss::IndexIVFScalarQuantizer*>(ori_index) != nullptr
&& use_hybrid_index_) {
std::shared_ptr<faiss::Index> device_hybrid_index = nullptr;
if (hybrid_index_device_id_ != device_id) {
auto host_hybrid_index = faiss::gpu::index_gpu_to_cpu(device_index);
auto hybrid_gpu_resources = engine::FaissGpuResources::GetGpuResources(hybrid_index_device_id_);
auto another_device_index = faiss::gpu::index_cpu_to_gpu(hybrid_gpu_resources.get(),
hybrid_index_device_id_,
host_hybrid_index,
&clone_option);
device_hybrid_index.reset(another_device_index);
delete device_index;
delete host_hybrid_index;
} else {
device_hybrid_index.reset(device_index);
}
delete ori_index;
return std::make_shared<Index>(device_hybrid_index);
}
host_index.reset(faiss::gpu::index_gpu_to_cpu(device_index));
delete device_index;
......
......@@ -45,6 +45,8 @@ class IndexBuilder {
protected:
Operand_ptr opd_ = nullptr;
bool use_hybrid_index_;
int hybrid_index_device_id_;
};
class BgCpuBuilder : public IndexBuilder {
......
......@@ -144,7 +144,9 @@ VecIndexPtr VecIndexImpl::CopyToGpu(const int64_t &device_id, const Config &cfg)
// TODO(linxj): update type
auto gpu_index = zilliz::knowhere::CopyCpuToGpu(index_, device_id, cfg);
return std::make_shared<VecIndexImpl>(gpu_index, type);
auto new_index = std::make_shared<VecIndexImpl>(gpu_index, type);
new_index->dim = dim;
return new_index;
}
// TODO(linxj): rename copytocpu => copygputocpu
......
......@@ -2,7 +2,7 @@ ARROW_VERSION=zilliz
BOOST_VERSION=1.70.0
BZIP2_VERSION=1.0.6
EASYLOGGINGPP_VERSION=v9.96.7
FAISS_VERSION=v1.5.3
FAISS_VERSION=branch-0.1.0
MKL_VERSION=2019.4.243
GTEST_VERSION=1.8.1
JSONCONS_VERSION=0.126.0
......
......@@ -9,6 +9,7 @@
#include "db/meta/MetaConsts.h"
#include "db/Factories.h"
#include "cache/CpuCacheMgr.h"
#include "utils/CommonUtil.h"
#include <gtest/gtest.h>
#include <easylogging++.h>
......@@ -26,6 +27,8 @@ namespace {
static constexpr int64_t TABLE_DIM = 256;
static constexpr int64_t VECTOR_COUNT = 250000;
static constexpr int64_t INSERT_LOOP = 10000;
static constexpr int64_t SECONDS_EACH_HOUR = 3600;
static constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
engine::meta::TableSchema BuildTableSchema() {
engine::meta::TableSchema table_info;
......@@ -45,6 +48,52 @@ namespace {
}
}
std::string CurrentTmDate(int64_t offset_day = 0) {
time_t tt;
time( &tt );
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
+ "-" + std::to_string(t->tm_mday);
return str;
}
void
ConvertTimeRangeToDBDates(const std::string &start_value,
const std::string &end_value,
std::vector<engine::meta::DateT > &dates) {
dates.clear();
time_t tt_start, tt_end;
tm tm_start, tm_end;
if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(start_value, tt_start, tm_start)) {
return;
}
if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(end_value, tt_end, tm_end)) {
return;
}
long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) /
DAY_SECONDS;
if (days == 0) {
return;
}
for (long i = 0; i < days; i++) {
time_t tt_day = tt_start + DAY_SECONDS * i;
tm tm_day;
zilliz::milvus::server::CommonUtil::ConvertTime(tt_day, tm_day);
long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 +
tm_day.tm_mday;//according to db logic
dates.push_back(date);
}
}
}
TEST_F(DBTest, CONFIG_TEST) {
......@@ -94,7 +143,6 @@ TEST_F(DBTest, CONFIG_TEST) {
TEST_F(DBTest, DB_TEST) {
db_->Open(GetOptions(), &db_);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
......@@ -314,8 +362,6 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
};
TEST_F(DBTest2, DELETE_TEST) {
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
......@@ -350,4 +396,45 @@ TEST_F(DBTest2, DELETE_TEST) {
db_->HasTable(TABLE_NAME, has_table);
ASSERT_FALSE(has_table);
};
\ No newline at end of file
};
TEST_F(DBTest2, DELETE_BY_RANGE_TEST) {
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = "sqlite://:@:/";
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids;
uint64_t size;
db_->Size(size);
int64_t nb = INSERT_LOOP;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 20;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::vector<engine::meta::DateT> dates;
std::string start_value = CurrentTmDate(-3);
std::string end_value = CurrentTmDate(-2);
ConvertTimeRangeToDBDates(start_value, end_value, dates);
db_->DeleteTable(TABLE_NAME, dates);
}
\ No newline at end of file
......@@ -84,6 +84,7 @@ TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) {
vector_ids = source.GetVectorIds();
ASSERT_EQ(vector_ids.size(), 100);
status = impl_->DropAll();
ASSERT_TRUE(status.ok());
}
......@@ -198,6 +199,8 @@ TEST_F(NewMemManagerTest, MEM_TABLE_TEST) {
status = mem_table.Serialize();
ASSERT_TRUE(status.ok());
status = impl_->DropAll();
ASSERT_TRUE(status.ok());
}
......@@ -372,7 +375,6 @@ TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) {
delete db_;
boost::filesystem::remove_all(options.meta.path);
};
TEST_F(DBTest, VECTOR_IDS_TEST)
......
......@@ -135,4 +135,8 @@ TEST(DBMiscTest, UTILS_TEST) {
status = engine::utils::DeleteTablePath(options, TABLE_NAME);
ASSERT_TRUE(status.ok());
status = engine::utils::DeleteTableFilePath(options, file);
ASSERT_TRUE(status.ok());
}
\ No newline at end of file
......@@ -57,7 +57,7 @@ TEST_F(DISABLED_MySQLTest, TABLE_TEST) {
table.table_id_ = "";
status = impl.CreateTable(table);
ASSERT_TRUE(status.ok());
// ASSERT_TRUE(status.ok());
status = impl.DropAll();
ASSERT_TRUE(status.ok());
......@@ -82,16 +82,22 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) {
table.dimension_ = 256;
auto status = impl.CreateTable(table);
meta::TableFileSchema table_file;
table_file.table_id_ = table.table_id_;
status = impl.CreateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW);
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
status = impl.DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_FALSE(status.ok());
uint64_t cnt = 0;
status = impl.Count(table_id, cnt);
ASSERT_TRUE(status.ok());
ASSERT_EQ(cnt, 0UL);
// ASSERT_TRUE(status.ok());
// ASSERT_EQ(cnt, 0UL);
auto file_id = table_file.file_id_;
......@@ -102,11 +108,6 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.file_type_, new_file_type);
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
status = impl.DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_FALSE(status.ok());
dates.clear();
for (auto i=2; i < 10; ++i) {
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
......@@ -132,6 +133,8 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) {
ASSERT_EQ(files.size(), 1UL);
ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE);
// status = impl.NextTableId(table_id);
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
......@@ -223,6 +226,10 @@ TEST_F(DISABLED_MySQLTest, ARCHIVE_TEST_DISK) {
table.table_id_ = table_id;
auto status = impl.CreateTable(table);
meta::TableSchema table_schema;
table_schema.table_id_ = "";
status = impl.CreateTable(table_schema);
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id_ = table.table_id_;
......
......@@ -4,6 +4,7 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "db/scheduler/task/SearchTask.h"
#include "server/ServerConfig.h"
#include "utils/TimeRecorder.h"
#include <gtest/gtest.h>
......@@ -213,6 +214,10 @@ TEST(DBSearchTest, MERGE_TEST) {
}
TEST(DBSearchTest, PARALLEL_CLUSTER_TEST) {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& db_config = config.GetConfig(server::CONFIG_DB);
db_config.SetValue(server::CONFIG_DB_PARALLEL_REDUCE, "true");
bool ascending = true;
std::vector<long> target_ids;
std::vector<float> target_distence;
......@@ -245,6 +250,10 @@ TEST(DBSearchTest, PARALLEL_CLUSTER_TEST) {
}
TEST(DBSearchTest, PARALLEL_TOPK_TEST) {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& db_config = config.GetConfig(server::CONFIG_DB);
db_config.SetValue(server::CONFIG_DB_PARALLEL_REDUCE, "true");
std::vector<long> target_ids;
std::vector<float> target_distence;
engine::SearchContext::ResultSet src_result;
......
......@@ -91,9 +91,10 @@ zilliz::milvus::engine::DBMetaOptions DISABLED_MySQLTest::getDBMetaOptions() {
zilliz::milvus::engine::DBMetaOptions options;
options.path = "/tmp/milvus_test";
options.backend_uri = DBTestEnvironment::getURI();
if(options.backend_uri.empty()) {
throw std::exception();
// throw std::exception();
options.backend_uri = "mysql://root:Fantast1c@192.168.1.194:3306/";
}
return options;
......@@ -123,6 +124,10 @@ int main(int argc, char **argv) {
if (argc > 1) {
uri = argv[1];
}
// if(uri.empty()) {
// uri = "mysql://root:Fantast1c@192.168.1.194:3306/";
// }
// std::cout << uri << std::endl;
::testing::AddGlobalTestEnvironment(new DBTestEnvironment);
return RUN_ALL_TESTS();
......
#-------------------------------------------------------------------------------
# Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
# Unauthorized copying of this file, via any medium is strictly prohibited.
# Proprietary and confidential.
#-------------------------------------------------------------------------------
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
set(util_files
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp)
# Make sure that your call to link_directories takes place before your call to the relevant add_executable.
include_directories(/usr/local/cuda/include)
link_directories("/usr/local/cuda/lib64")
set(wrapper_test_src
${unittest_srcs}
${wrapper_src}
${config_files}
${util_files}
${require_files}
wrapper_test.cpp
)
add_executable(wrapper_test ${wrapper_test_src})
set(wrapper_libs
stdc++
boost_system_static
boost_filesystem_static
faiss
cudart
cublas
sqlite
snappy
bz2
z
zstd
lz4
)
if(${BUILD_FAISS_WITH_MKL} STREQUAL "ON")
set(wrapper_libs ${wrapper_libs} ${MKL_LIBS} ${MKL_LIBS})
else()
set(wrapper_libs ${wrapper_libs}
lapack
openblas)
endif()
target_link_libraries(wrapper_test ${wrapper_libs} ${unittest_libs})
add_definitions("-DUNITTEST_ONLY")
set(topk_test_src
topk_test.cpp
${CMAKE_SOURCE_DIR}/src/wrapper/gpu/Topk.cu)
install(TARGETS wrapper_test DESTINATION bin)
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "wrapper/Operand.h"
#include "wrapper/Index.h"
#include "wrapper/IndexBuilder.h"
#include "wrapper/FaissGpuResources.h"
#include "server/ServerConfig.h"
#include <gtest/gtest.h>
#include <random>
#include <src/wrapper/FaissGpuResources.h>
using namespace zilliz::milvus;
using namespace zilliz::milvus::engine;
TEST(operand_test, Wrapper_Test) {
using std::cout;
using std::endl;
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->preproc = "OPQ";
opd->postproc = "PQ";
opd->metric_type = "L2";
opd->d = 64;
auto opd_str = operand_to_str(opd);
auto new_opd = str_to_operand(opd_str);
// TODO: fix all place where using opd to build index.
assert(new_opd->get_index_type(10000) == opd->get_index_type(10000));
auto opd_sq8 = std::make_shared<Operand>();
opd_sq8->index_type = "IVFSQ8";
opd_sq8->preproc = "OPQ";
opd_sq8->postproc = "PQ";
opd_sq8->metric_type = "L2";
opd_sq8->d = 64;
auto opd_str_sq8 = operand_to_str(opd_sq8);
auto new_opd_sq8 = str_to_operand(opd_str_sq8);
assert(new_opd_sq8->get_index_type(10000) == opd_sq8->get_index_type(10000));
}
TEST(build_test, Wrapper_Test) {
// dimension of the vectors to index
int d = 3;
// make a set of nt training vectors in the unit cube
size_t nt = 10000;
// a reasonable number of cetroids to index nb vectors
int ncentroids = 16;
std::random_device rd;
std::mt19937 gen(rd());
std::vector<float> xb;
std::vector<long> ids;
//prepare train data
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
std::vector<float> xt(nt * d);
for (size_t i = 0; i < nt * d; i++) {
xt[i] = dis_xt(gen);
}
//train the index
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->d = d;
opd->ncent = ncentroids;
IndexBuilderPtr index_builder_1 = GetIndexBuilder(opd);
auto index_1 = index_builder_1->build_all(0, xb, ids, nt, xt);
ASSERT_TRUE(index_1 != nullptr);
// size of the database we plan to index
size_t nb = 100000;
//prepare raw data
xb.resize(nb);
ids.resize(nb);
for (size_t i = 0; i < nb; i++) {
xb[i] = dis_xt(gen);
ids[i] = i;
}
index_1->add_with_ids(nb, xb.data(), ids.data());
//search in first quadrant
int nq = 1, k = 10;
std::vector<float> xq = {0.5, 0.5, 0.5};
float *result_dists = new float[k];
long *result_ids = new long[k];
index_1->search(nq, xq.data(), k, result_dists, result_ids);
for (int i = 0; i < k; i++) {
if (result_ids[i] < 0) {
ASSERT_TRUE(false);
break;
}
long id = result_ids[i];
std::cout << "No." << id << " [" << xb[id * 3] << ", " << xb[id * 3 + 1] << ", "
<< xb[id * 3 + 2] << "] distance = " << result_dists[i] << std::endl;
//makesure result vector is in first quadrant
ASSERT_TRUE(xb[id * 3] > 0.0);
ASSERT_TRUE(xb[id * 3 + 1] > 0.0);
ASSERT_TRUE(xb[id * 3 + 2] > 0.0);
}
delete[] result_dists;
delete[] result_ids;
}
TEST(gpu_build_test, Wrapper_Test) {
using std::vector;
int d = 256;
int nb = 3 * 1000 * 100;
int nq = 100;
vector<float> xb(d * nb);
vector<float> xq(d * nq);
vector<long> ids(nb);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
for (auto &e : xb) { e = float(dis_xt(gen)); }
for (auto &e : xq) { e = float(dis_xt(gen)); }
for (int i = 0; i < nb; ++i) { ids[i] = i; }
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->d = d;
opd->ncent = 256;
IndexBuilderPtr index_builder_1 = GetIndexBuilder(opd);
auto index_1 = index_builder_1->build_all(nb, xb.data(), ids.data());
assert(index_1->ntotal == nb);
assert(index_1->dim == d);
// sanity check: search 5 first vectors of xb
int k = 1;
vector<long> I(5 * k);
vector<float> D(5 * k);
index_1->search(5, xb.data(), k, D.data(), I.data());
for (int i = 0; i < 5; ++i) { assert(i == I[i]); }
}
TEST(gpu_resource_test, Wrapper_Test) {
FaissGpuResources res_mgr;
FaissGpuResources::Ptr& res = res_mgr.GetGpuResources(0);
ASSERT_NE(res, nullptr);
res = res_mgr.GetGpuResources(0);
ASSERT_NE(res, nullptr);
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& server_config = config.GetConfig(server::CONFIG_SERVER);
server_config.SetValue(server::CONFIG_GPU_INDEX, "0");
res_mgr.SelectGpu();
int32_t gpu_num = res_mgr.GetGpu();
ASSERT_EQ(gpu_num, 0);
}
TEST(index_test, Wrapper_Test) {
std::vector<float> data;
std::vector<long> ids;
long vec_count = 10000;
for(long i = 0; i < vec_count; i++) {
data.push_back(i/3);
data.push_back(i/9);
ids.push_back(i);
}
faiss::Index* faiss_index = faiss::index_factory(2, "IVF128,SQ8");
faiss_index->train(vec_count, data.data());
std::shared_ptr<faiss::Index> raw_index(faiss_index);
engine::Index_ptr index = std::make_shared<engine::Index>(raw_index);
index->add_with_ids(vec_count, data.data(), ids.data());
ASSERT_EQ(index->ntotal, vec_count);
std::string file_name = "/tmp/index_test.t";
write_index(index, file_name);
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& engine_config = config.GetConfig(server::CONFIG_ENGINE);
engine_config.SetValue(server::CONFIG_USE_HYBRID_INDEX, "true");
Index_ptr index_out = read_index(file_name);
ASSERT_NE(index_out, nullptr);
bool res = index_out->reset();
ASSERT_TRUE(res);
}
#include "scheduler/TaskTable.h"
#include "scheduler/Cost.h"
#include <gtest/gtest.h>
#include "scheduler/task/TestTask.h"
using namespace zilliz::milvus::engine;
......@@ -10,7 +11,7 @@ protected:
void
SetUp() override {
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<XSearchTask>();
auto task = std::make_shared<TestTask>();
table_.Put(task);
}
table_.Get(0)->state = TaskTableItemState::INVALID;
......
......@@ -13,10 +13,10 @@ TEST(normal_test, test1) {
// ResourceMgr only compose resources, provide unified event
// auto res_mgr = std::make_shared<ResourceMgr>();
auto res_mgr = ResMgrInst::GetInstance();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd"));
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu"));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu"));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu"));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false));
auto IO = Connection("IO", 500.0);
auto PCIE = Connection("IO", 11000.0);
......@@ -30,7 +30,7 @@ TEST(normal_test, test1) {
auto scheduler = SchedInst::GetInstance();
scheduler->Start();
const uint64_t NUM_TASK = 100;
const uint64_t NUM_TASK = 1000;
std::vector<std::shared_ptr<TestTask>> tasks;
for (uint64_t i = 0; i < NUM_TASK; ++i) {
if (auto observe = disk.lock()) {
......@@ -45,8 +45,10 @@ TEST(normal_test, test1) {
scheduler->Stop();
res_mgr->Stop();
for (uint64_t i = 0 ; i < NUM_TASK; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
ASSERT_EQ(tasks[i]->exec_count_, 1);
auto pcpu = cpu.lock();
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task = std::static_pointer_cast<TestTask>(pcpu->task_table()[i]->task);
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
}
#include "scheduler/TaskTable.h"
#include "scheduler/task/TestTask.h"
#include <gtest/gtest.h>
......@@ -43,8 +44,8 @@ protected:
void
SetUp() override {
invalid_task_ = nullptr;
task1_ = std::make_shared<XSearchTask>();
task2_ = std::make_shared<XSearchTask>();
task1_ = std::make_shared<TestTask>();
task2_ = std::make_shared<TestTask>();
}
TaskPtr invalid_task_;
......@@ -83,7 +84,7 @@ protected:
void
SetUp() override {
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<XSearchTask>();
auto task = std::make_shared<TestTask>();
table1_.Put(task);
}
......
......@@ -4,9 +4,12 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <gtest/gtest-death-test.h>
#include "config/IConfigMgr.h"
#include "server/ServerConfig.h"
#include "utils/CommonUtil.h"
#include "utils/ValidationUtil.h"
using namespace zilliz::milvus;
......@@ -15,6 +18,10 @@ namespace {
static const char* CONFIG_FILE_PATH = "./milvus/conf/server_config.yaml";
static const char* LOG_FILE_PATH = "./milvus/conf/log_config.conf";
static constexpr uint64_t KB = 1024;
static constexpr uint64_t MB = KB*1024;
static constexpr uint64_t GB = MB*1024;
}
TEST(ConfigTest, CONFIG_TEST) {
......@@ -103,6 +110,43 @@ TEST(ConfigTest, SERVER_CONFIG_TEST) {
config.PrintAll();
const server::ServerConfig const_config = config;
server::ConfigNode node = const_config.GetConfig("aaa");
unsigned long total_mem = 0, free_mem = 0;
server::CommonUtil::GetSystemMemInfo(total_mem, free_mem);
size_t gpu_mem = 0;
server::ValidationUtil::GetGpuMemory(0, gpu_mem);
server::ConfigNode& server_config = config.GetConfig("server_config");
server::ConfigNode& db_config = config.GetConfig("db_config");
server::ConfigNode& cache_config = config.GetConfig(server::CONFIG_CACHE);
cache_config.SetValue(server::CACHE_FREE_PERCENT, "2.0");
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
size_t cache_cap = 16;
size_t insert_buffer_size = (total_mem - cache_cap*GB + 1*GB)/GB;
db_config.SetValue(server::CONFIG_DB_INSERT_BUFFER_SIZE, std::to_string(insert_buffer_size));
cache_config.SetValue(server::CONFIG_CPU_CACHE_CAPACITY, std::to_string(cache_cap));
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
cache_cap = total_mem/GB + 2;
cache_config.SetValue(server::CONFIG_CPU_CACHE_CAPACITY, std::to_string(cache_cap));
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
size_t index_building_threshold = (gpu_mem + 1*MB)/MB;
db_config.SetValue(server::CONFIG_DB_INDEX_TRIGGER_SIZE,
std::to_string(index_building_threshold));
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
insert_buffer_size = total_mem/GB + 2;
db_config.SetValue(server::CONFIG_DB_INSERT_BUFFER_SIZE, std::to_string(insert_buffer_size));
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
server_config.SetValue(server::CONFIG_GPU_INDEX, "9999");
err = config.ValidateConfig();
ASSERT_NE(err, server::SERVER_SUCCESS);
}
\ No newline at end of file
......@@ -225,4 +225,3 @@ TEST(UtilTest, TIMERECORDER_TEST) {
rc.RecordSection("end");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册