提交 a6ef8bec 编写于 作者: Y Yu Kun

Merge remote-tracking branch 'upstream/branch-0.4.0' into branch-0.4.0


Former-commit-id: 064eb796b95e9d1679e39708b64ce8ee5f0a2bc5
......@@ -38,9 +38,13 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-390 - Update resource construct function
- MS-391 - Add PushTaskToNeighbourHasExecutor action
- MS-394 - Update scheduler unittest
- MS-400 - Add timestamp record in task state change function
## New Feature
- MS-343 - Implement ResourceMgr
- MS-338 - NewAPI: refine code to support CreateIndex
- MS-339 - NewAPI: refine code to support DropIndex
- MS-340 - NewAPI: implement DescribeIndex
## Task
- MS-297 - disable mysql unit test
......
......@@ -48,5 +48,3 @@ engine_config:
use_blas_threshold: 20
metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP
omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute
use_hybrid_index: false # use GPU/CPU hybrid index
hybrid_index_gpu: 0 # hybrid index gpu device id
......@@ -46,7 +46,7 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) {
return cache_->get(key);
}
engine::Index_ptr CacheMgr::GetIndex(const std::string& key) {
engine::VecIndexPtr CacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
......@@ -65,7 +65,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) {
void CacheMgr::InsertItem(const std::string& key, const engine::VecIndexPtr& index) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......
......@@ -19,10 +19,10 @@ public:
virtual bool ItemExists(const std::string& key);
virtual DataObjPtr GetItem(const std::string& key);
virtual engine::Index_ptr GetIndex(const std::string& key);
virtual engine::VecIndexPtr GetIndex(const std::string& key);
virtual void InsertItem(const std::string& key, const DataObjPtr& data);
virtual void InsertItem(const std::string& key, const engine::Index_ptr& index);
virtual void InsertItem(const std::string& key, const engine::VecIndexPtr& index);
virtual void EraseItem(const std::string& key);
......
......@@ -6,7 +6,7 @@
#pragma once
#include "wrapper/Index.h"
#include "wrapper/knowhere/vec_index.h"
#include <memory>
......@@ -16,17 +16,17 @@ namespace cache {
class DataObj {
public:
DataObj(const engine::Index_ptr& index)
DataObj(const engine::VecIndexPtr& index)
: index_(index)
{}
DataObj(const engine::Index_ptr& index, int64_t size)
DataObj(const engine::VecIndexPtr& index, int64_t size)
: index_(index),
size_(size)
{}
engine::Index_ptr data() { return index_; }
const engine::Index_ptr& data() const { return index_; }
engine::VecIndexPtr data() { return index_; }
const engine::VecIndexPtr& data() const { return index_; }
int64_t size() const {
if(index_ == nullptr) {
......@@ -41,7 +41,7 @@ public:
}
private:
engine::Index_ptr index_ = nullptr;
engine::VecIndexPtr index_ = nullptr;
int64_t size_ = 0;
};
......
......@@ -23,9 +23,9 @@ typedef std::vector<QueryResult> QueryResults;
struct TableIndex {
int32_t engine_type_ = (int)EngineType::FAISS_IDMAP;
int32_t nlist = 16384;
int32_t index_file_size = 1024; //MB
int32_t metric_type = (int)MetricType::L2;
int32_t nlist_ = 16384;
int32_t index_file_size_ = 1024; //MB
int32_t metric_type_ = (int)MetricType::L2;
};
} // namespace engine
......
......@@ -148,9 +148,9 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
return index1.engine_type_ == index2.engine_type_
&& index1.nlist == index2.nlist
&& index1.index_file_size == index2.index_file_size
&& index1.metric_type == index2.metric_type;
&& index1.nlist_ == index2.nlist_
&& index1.index_file_size_ == index2.index_file_size_
&& index1.metric_type_ == index2.metric_type_;
}
} // namespace utils
......
此差异已折叠。
......@@ -367,9 +367,9 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
table_schema.dimension_ = std::get<2>(tables[0]);
table_schema.created_on_ = std::get<3>(tables[0]);
table_schema.engine_type_ = index.engine_type_;
table_schema.nlist_ = index.nlist;
table_schema.index_file_size_ = index.index_file_size;
table_schema.metric_type_ = index.metric_type;
table_schema.nlist_ = index.nlist_;
table_schema.index_file_size_ = index.index_file_size_;
table_schema.metric_type_ = index.metric_type_;
ConnectorPtr->update(table_schema);
} else {
......@@ -407,9 +407,9 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
if (groups.size() == 1) {
index.engine_type_ = std::get<0>(groups[0]);
index.nlist = std::get<1>(groups[0]);
index.index_file_size = std::get<2>(groups[0]);
index.metric_type = std::get<3>(groups[0]);
index.nlist_ = std::get<1>(groups[0]);
index.index_file_size_ = std::get<2>(groups[0]);
index.metric_type_ = std::get<3>(groups[0]);
} else {
return Status::NotFound("Table " + table_id + " not found");
}
......
......@@ -545,7 +545,7 @@ const char descriptor_table_protodef_milvus_2eproto[] PROTOBUF_SECTION_VARIABLE(
"_reply\030\002 \001(\010\"M\n\rTableRowCount\022#\n\006status\030"
"\001 \001(\0132\023.milvus.grpc.Status\022\027\n\017table_row_"
"count\030\002 \001(\003\"\026\n\007Command\022\013\n\003cmd\030\001 \001(\t\"X\n\005I"
"ndex\022\022\n\nindex_type\030\001 \001(\005\022\r\n\005nlist\030\002 \001(\003\022"
"ndex\022\022\n\nindex_type\030\001 \001(\005\022\r\n\005nlist\030\002 \001(\005\022"
"\027\n\017index_file_size\030\003 \001(\005\022\023\n\013metric_type\030"
"\004 \001(\005\"[\n\nIndexParam\022*\n\ntable_name\030\001 \001(\0132"
"\026.milvus.grpc.TableName\022!\n\005index\030\002 \001(\0132\022"
......@@ -5268,16 +5268,16 @@ Index::Index(const Index& from)
: ::PROTOBUF_NAMESPACE_ID::Message(),
_internal_metadata_(nullptr) {
_internal_metadata_.MergeFrom(from._internal_metadata_);
::memcpy(&nlist_, &from.nlist_,
::memcpy(&index_type_, &from.index_type_,
static_cast<size_t>(reinterpret_cast<char*>(&metric_type_) -
reinterpret_cast<char*>(&nlist_)) + sizeof(metric_type_));
reinterpret_cast<char*>(&index_type_)) + sizeof(metric_type_));
// @@protoc_insertion_point(copy_constructor:milvus.grpc.Index)
}
void Index::SharedCtor() {
::memset(&nlist_, 0, static_cast<size_t>(
::memset(&index_type_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&metric_type_) -
reinterpret_cast<char*>(&nlist_)) + sizeof(metric_type_));
reinterpret_cast<char*>(&index_type_)) + sizeof(metric_type_));
}
Index::~Index() {
......@@ -5303,9 +5303,9 @@ void Index::Clear() {
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
::memset(&nlist_, 0, static_cast<size_t>(
::memset(&index_type_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&metric_type_) -
reinterpret_cast<char*>(&nlist_)) + sizeof(metric_type_));
reinterpret_cast<char*>(&index_type_)) + sizeof(metric_type_));
_internal_metadata_.Clear();
}
......@@ -5324,7 +5324,7 @@ const char* Index::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::inte
CHK_(ptr);
} else goto handle_unusual;
continue;
// int64 nlist = 2;
// int32 nlist = 2;
case 2:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 16)) {
nlist_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr);
......@@ -5388,12 +5388,12 @@ bool Index::MergePartialFromCodedStream(
break;
}
// int64 nlist = 2;
// int32 nlist = 2;
case 2: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (16 & 0xFF)) {
DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive<
::PROTOBUF_NAMESPACE_ID::int64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT64>(
::PROTOBUF_NAMESPACE_ID::int32, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT32>(
input, &nlist_)));
} else {
goto handle_unusual;
......@@ -5459,9 +5459,9 @@ void Index::SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32(1, this->index_type(), output);
}
// int64 nlist = 2;
// int32 nlist = 2;
if (this->nlist() != 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64(2, this->nlist(), output);
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32(2, this->nlist(), output);
}
// int32 index_file_size = 3;
......@@ -5492,9 +5492,9 @@ void Index::SerializeWithCachedSizes(
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32ToArray(1, this->index_type(), target);
}
// int64 nlist = 2;
// int32 nlist = 2;
if (this->nlist() != 0) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(2, this->nlist(), target);
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32ToArray(2, this->nlist(), target);
}
// int32 index_file_size = 3;
......@@ -5528,13 +5528,6 @@ size_t Index::ByteSizeLong() const {
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
// int64 nlist = 2;
if (this->nlist() != 0) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int64Size(
this->nlist());
}
// int32 index_type = 1;
if (this->index_type() != 0) {
total_size += 1 +
......@@ -5542,6 +5535,13 @@ size_t Index::ByteSizeLong() const {
this->index_type());
}
// int32 nlist = 2;
if (this->nlist() != 0) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int32Size(
this->nlist());
}
// int32 index_file_size = 3;
if (this->index_file_size() != 0) {
total_size += 1 +
......@@ -5583,12 +5583,12 @@ void Index::MergeFrom(const Index& from) {
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
if (from.nlist() != 0) {
set_nlist(from.nlist());
}
if (from.index_type() != 0) {
set_index_type(from.index_type());
}
if (from.nlist() != 0) {
set_nlist(from.nlist());
}
if (from.index_file_size() != 0) {
set_index_file_size(from.index_file_size());
}
......@@ -5618,8 +5618,8 @@ bool Index::IsInitialized() const {
void Index::InternalSwap(Index* other) {
using std::swap;
_internal_metadata_.Swap(&other->_internal_metadata_);
swap(nlist_, other->nlist_);
swap(index_type_, other->index_type_);
swap(nlist_, other->nlist_);
swap(index_file_size_, other->index_file_size_);
swap(metric_type_, other->metric_type_);
}
......
......@@ -2316,21 +2316,21 @@ class Index :
// accessors -------------------------------------------------------
enum : int {
kNlistFieldNumber = 2,
kIndexTypeFieldNumber = 1,
kNlistFieldNumber = 2,
kIndexFileSizeFieldNumber = 3,
kMetricTypeFieldNumber = 4,
};
// int64 nlist = 2;
void clear_nlist();
::PROTOBUF_NAMESPACE_ID::int64 nlist() const;
void set_nlist(::PROTOBUF_NAMESPACE_ID::int64 value);
// int32 index_type = 1;
void clear_index_type();
::PROTOBUF_NAMESPACE_ID::int32 index_type() const;
void set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value);
// int32 nlist = 2;
void clear_nlist();
::PROTOBUF_NAMESPACE_ID::int32 nlist() const;
void set_nlist(::PROTOBUF_NAMESPACE_ID::int32 value);
// int32 index_file_size = 3;
void clear_index_file_size();
::PROTOBUF_NAMESPACE_ID::int32 index_file_size() const;
......@@ -2346,8 +2346,8 @@ class Index :
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::int64 nlist_;
::PROTOBUF_NAMESPACE_ID::int32 index_type_;
::PROTOBUF_NAMESPACE_ID::int32 nlist_;
::PROTOBUF_NAMESPACE_ID::int32 index_file_size_;
::PROTOBUF_NAMESPACE_ID::int32 metric_type_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
......@@ -3827,15 +3827,15 @@ inline void Index::set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value) {
// @@protoc_insertion_point(field_set:milvus.grpc.Index.index_type)
}
// int64 nlist = 2;
// int32 nlist = 2;
inline void Index::clear_nlist() {
nlist_ = PROTOBUF_LONGLONG(0);
nlist_ = 0;
}
inline ::PROTOBUF_NAMESPACE_ID::int64 Index::nlist() const {
inline ::PROTOBUF_NAMESPACE_ID::int32 Index::nlist() const {
// @@protoc_insertion_point(field_get:milvus.grpc.Index.nlist)
return nlist_;
}
inline void Index::set_nlist(::PROTOBUF_NAMESPACE_ID::int64 value) {
inline void Index::set_nlist(::PROTOBUF_NAMESPACE_ID::int32 value) {
nlist_ = value;
// @@protoc_insertion_point(field_set:milvus.grpc.Index.nlist)
......
......@@ -61,7 +61,7 @@ static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] =
const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
"\n\014status.proto\022\013milvus.grpc\"D\n\006Status\022*\n"
"\nerror_code\030\001 \001(\0162\026.milvus.grpc.ErrorCod"
"e\022\016\n\006reason\030\002 \001(\t*\354\003\n\tErrorCode\022\013\n\007SUCCE"
"e\022\016\n\006reason\030\002 \001(\t*\230\004\n\tErrorCode\022\013\n\007SUCCE"
"SS\020\000\022\024\n\020UNEXPECTED_ERROR\020\001\022\022\n\016CONNECT_FA"
"ILED\020\002\022\025\n\021PERMISSION_DENIED\020\003\022\024\n\020TABLE_N"
"OT_EXISTS\020\004\022\024\n\020ILLEGAL_ARGUMENT\020\005\022\021\n\rILL"
......@@ -73,7 +73,9 @@ const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(
"TA_FAILED\020\017\022\020\n\014CACHE_FAILED\020\020\022\030\n\024CANNOT_"
"CREATE_FOLDER\020\021\022\026\n\022CANNOT_CREATE_FILE\020\022\022"
"\030\n\024CANNOT_DELETE_FOLDER\020\023\022\026\n\022CANNOT_DELE"
"TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025b\006proto3"
"TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025\022\021\n\rILLE"
"GAL_NLIST\020\026\022\027\n\023ILLEGAL_METRIC_TYPE\020\027b\006pr"
"oto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_status_2eproto_deps[1] = {
};
......@@ -83,7 +85,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_sta
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_status_2eproto_once;
static bool descriptor_table_status_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_status_2eproto = {
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 600,
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 644,
&descriptor_table_status_2eproto_once, descriptor_table_status_2eproto_sccs, descriptor_table_status_2eproto_deps, 1, 0,
schemas, file_default_instances, TableStruct_status_2eproto::offsets,
file_level_metadata_status_2eproto, 1, file_level_enum_descriptors_status_2eproto, file_level_service_descriptors_status_2eproto,
......@@ -121,6 +123,8 @@ bool ErrorCode_IsValid(int value) {
case 19:
case 20:
case 21:
case 22:
case 23:
return true;
default:
return false;
......
......@@ -91,12 +91,14 @@ enum ErrorCode : int {
CANNOT_DELETE_FOLDER = 19,
CANNOT_DELETE_FILE = 20,
BUILD_INDEX_ERROR = 21,
ILLEGAL_NLIST = 22,
ILLEGAL_METRIC_TYPE = 23,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ErrorCode_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool ErrorCode_IsValid(int value);
constexpr ErrorCode ErrorCode_MIN = SUCCESS;
constexpr ErrorCode ErrorCode_MAX = BUILD_INDEX_ERROR;
constexpr ErrorCode ErrorCode_MAX = ILLEGAL_METRIC_TYPE;
constexpr int ErrorCode_ARRAYSIZE = ErrorCode_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ErrorCode_descriptor();
......
......@@ -125,7 +125,7 @@ message Command {
*/
message Index {
int32 index_type = 1;
int64 nlist = 2;
int32 nlist = 2;
int32 index_file_size = 3;
int32 metric_type = 4;
}
......
......@@ -93,6 +93,17 @@ ResourceMgr::Dump() {
return str;
}
std::string
ResourceMgr::DumpTaskTables() {
std::stringstream ss;
ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
for (auto &resource : resources_) {
ss << resource->Dump() << std::endl;
ss << resource->task_table().Dump() << std::endl;
}
return ss.str();
}
void
ResourceMgr::event_process() {
while (running_) {
......
......@@ -70,6 +70,9 @@ public:
std::string
Dump();
std::string
DumpTaskTables();
private:
void
event_process();
......
......@@ -8,12 +8,88 @@
#include "event/TaskTableUpdatedEvent.h"
#include <vector>
#include <sstream>
#include <ctime>
namespace zilliz {
namespace milvus {
namespace engine {
uint64_t
get_now_timestamp() {
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
return millis;
}
bool
TaskTableItem::Load() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::START) {
state = TaskTableItemState::LOADING;
lock.unlock();
timestamp.load = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Loaded() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::LOADING) {
state = TaskTableItemState::LOADED;
lock.unlock();
timestamp.loaded = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Execute() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::LOADED) {
state = TaskTableItemState::EXECUTING;
lock.unlock();
timestamp.execute = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Executed() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::EXECUTING) {
state = TaskTableItemState::EXECUTED;
lock.unlock();
timestamp.executed = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Move() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::LOADED) {
state = TaskTableItemState::MOVING;
lock.unlock();
timestamp.move = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Moved() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::MOVING) {
state = TaskTableItemState::MOVED;
lock.unlock();
timestamp.moved = get_now_timestamp();
return true;
}
return false;
}
void
TaskTable::Put(TaskPtr task) {
......@@ -59,78 +135,6 @@ TaskTable::Clear() {
// table_.erase(table_.begin(), iterator);
}
bool
TaskTable::Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::MOVING;
return true;
}
return false;
}
bool
TaskTable::Moved(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::MOVING) {
task->state = TaskTableItemState::MOVED;
return true;
}
return false;
}
bool
TaskTable::Load(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::START) {
task->state = TaskTableItemState::LOADING;
return true;
}
return false;
}
bool
TaskTable::Loaded(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADING) {
task->state = TaskTableItemState::LOADED;
return true;
}
return false;
}
bool
TaskTable::Execute(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::EXECUTING;
return true;
}
return false;
}
bool
TaskTable::Executed(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::EXECUTING) {
task->state = TaskTableItemState::EXECUTED;
return true;
}
return false;
}
std::string
ToString(TaskTableItemState state) {
switch (state) {
......@@ -146,12 +150,27 @@ ToString(TaskTableItemState state) {
}
}
std::string
ToString(const TaskTimestamp &timestamp) {
std::stringstream ss;
ss << "<start=" << timestamp.start;
ss << ", load=" << timestamp.load;
ss << ", loaded=" << timestamp.loaded;
ss << ", execute=" << timestamp.execute;
ss << ", executed=" << timestamp.executed;
ss << ", move=" << timestamp.move;
ss << ", moved=" << timestamp.moved;
ss << ">";
return ss.str();
}
std::string
TaskTable::Dump() {
std::stringstream ss;
for (auto &item : table_) {
ss << "<" << item->id;
ss << ", " << ToString(item->state);
ss << "<id=" << item->id;
ss << ", state=" << ToString(item->state);
ss << ", timestamp=" << ToString(item->timestamp);
ss << ">" << std::endl;
}
return ss.str();
......
......@@ -28,6 +28,16 @@ enum class TaskTableItemState {
MOVED, // moved, termination state
};
struct TaskTimestamp {
uint64_t start = 0;
uint64_t move = 0;
uint64_t moved = 0;
uint64_t load = 0;
uint64_t loaded = 0;
uint64_t execute = 0;
uint64_t executed = 0;
};
struct TaskTableItem {
TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {}
......@@ -39,8 +49,27 @@ struct TaskTableItem {
TaskPtr task; // the task;
TaskTableItemState state; // the state;
std::mutex mutex;
TaskTimestamp timestamp;
uint8_t priority; // just a number, meaningless;
bool
Load();
bool
Loaded();
bool
Execute();
bool
Executed();
bool
Move();
bool
Moved();
};
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
......@@ -111,55 +140,68 @@ public:
public:
/******** Action ********/
/*
* Move a task;
* Set state moving;
* Called by scheduler;
*/
// TODO: bool to Status
bool
Move(uint64_t index);
/*
* Move task finished;
* Set state moved;
* Called by scheduler;
*/
bool
Moved(uint64_t index);
/*
* Load a task;
* Set state loading;
* Called by loader;
*/
bool
Load(uint64_t index);
inline bool
Load(uint64_t index) {
return table_[index]->Load();
}
/*
* Load task finished;
* Set state loaded;
* Called by loader;
*/
bool
Loaded(uint64_t index);
inline bool
Loaded(uint64_t index) {
return table_[index]->Loaded();
}
/*
* Execute a task;
* Set state executing;
* Called by executor;
*/
bool
Execute(uint64_t index);
inline bool
Execute(uint64_t index) {
return table_[index]->Execute();
}
/*
* Execute task finished;
* Set state executed;
* Called by executor;
*/
bool
Executed(uint64_t index);
inline bool
Executed(uint64_t index){
return table_[index]->Executed();
}
/*
* Move a task;
* Set state moving;
* Called by scheduler;
*/
inline bool
Move(uint64_t index){
return table_[index]->Move();
}
/*
* Move task finished;
* Set state moved;
* Called by scheduler;
*/
inline bool
Moved(uint64_t index){
return table_[index]->Moved();
}
public:
/*
......
......@@ -112,7 +112,7 @@ void Resource::loader_function() {
}
LoadFile(task_item->task);
// TODO: wrapper loaded
task_item->state = TaskTableItemState::LOADED;
task_item->Loaded();
if (subscriber_) {
auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
......@@ -138,7 +138,7 @@ void Resource::executor_function() {
break;
}
Process(task_item->task);
task_item->state = TaskTableItemState::EXECUTED;
task_item->Executed();
if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
......
......@@ -113,7 +113,6 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
if (type == LoadType::DISK2CPU) {
index_engine_->Load();
} else if (type == LoadType::CPU2GPU) {
index_engine_->Load();
index_engine_->CopyToGpu(device_id);
} else if (type == LoadType::GPU2CPU) {
index_engine_->CopyToCpu();
......
......@@ -42,7 +42,6 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext *context,
GrpcRequestHandler::DropTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name());
GrpcRequestScheduler::ExecTask(task_ptr, response);
return ::grpc::Status::OK;
......
......@@ -227,12 +227,32 @@ CreateIndexTask::OnExecute() {
return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
}
res = ValidationUtil::ValidateTableIndexType(index_param_.mutable_index()->index_type());
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid index type: " + std::to_string(index_param_.mutable_index()->index_type()));
}
res = ValidationUtil::ValidateTableIndexNlist(index_param_.mutable_index()->nlist());
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid index nlist: " + std::to_string(index_param_.mutable_index()->nlist()));
}
res = ValidationUtil::ValidateTableIndexMetricType(index_param_.mutable_index()->metric_type());
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid index metric type: " + std::to_string(index_param_.mutable_index()->metric_type()));
}
res = ValidationUtil::ValidateTableIndexFileSize(index_param_.mutable_index()->index_file_size());
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid index file size: " + std::to_string(index_param_.mutable_index()->index_file_size()));
}
//step 2: check table existence
engine::TableIndex index;
index.engine_type_ = index_param_.mutable_index()->index_type();
index.nlist = index_param_.mutable_index()->nlist();
index.index_file_size = index_param_.mutable_index()->index_file_size();
index.metric_type = index_param_.mutable_index()->metric_type();
index.nlist_ = index_param_.mutable_index()->nlist();
index.index_file_size_ = index_param_.mutable_index()->index_file_size();
index.metric_type_ = index_param_.mutable_index()->metric_type();
stat = DBWrapper::DB()->CreateIndex(table_name_, index);
if (!stat.ok()) {
return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString());
......@@ -855,9 +875,9 @@ DescribeIndexTask::OnExecute() {
index_param_.mutable_table_name()->set_table_name(table_name_);
index_param_.mutable_index()->set_index_type(index.engine_type_);
index_param_.mutable_index()->set_nlist(index.nlist);
index_param_.mutable_index()->set_index_file_size(index.index_file_size);
index_param_.mutable_index()->set_metric_type(index.metric_type);
index_param_.mutable_index()->set_nlist(index.nlist_);
index_param_.mutable_index()->set_index_file_size(index.index_file_size_);
index_param_.mutable_index()->set_metric_type(index.metric_type_);
rc.ElapseFromBegin("totally cost");
} catch (std::exception &ex) {
......
......@@ -51,6 +51,9 @@ constexpr ServerError SERVER_ILLEGAL_SEARCH_RESULT = ToGlobalServerErrorCode(110
constexpr ServerError SERVER_CACHE_ERROR = ToGlobalServerErrorCode(111);
constexpr ServerError SERVER_WRITE_ERROR = ToGlobalServerErrorCode(112);
constexpr ServerError SERVER_INVALID_NPROBE = ToGlobalServerErrorCode(113);
constexpr ServerError SERVER_INVALID_INDEX_NLIST = ToGlobalServerErrorCode(114);
constexpr ServerError SERVER_INVALID_INDEX_METRIC_TYPE = ToGlobalServerErrorCode(115);
constexpr ServerError SERVER_INVALID_INDEX_FILE_SIZE = ToGlobalServerErrorCode(116);
constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(500);
......
......@@ -10,6 +10,7 @@ namespace server {
constexpr size_t table_name_size_limit = 255;
constexpr int64_t table_dimension_limit = 16384;
constexpr int32_t index_file_size_limit = 4096; //index trigger size max = 4096 MB
ServerError
ValidationUtil::ValidateTableName(const std::string &table_name) {
......@@ -65,6 +66,32 @@ ValidationUtil::ValidateTableIndexType(int32_t index_type) {
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateTableIndexNlist(int32_t nlist) {
if(nlist <= 0) {
return SERVER_INVALID_INDEX_NLIST;
}
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateTableIndexFileSize(int32_t index_file_size) {
if(index_file_size <= 0 || index_file_size > index_file_size_limit) {
return SERVER_INVALID_INDEX_FILE_SIZE;
}
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) {
if(metric_type != (int32_t)engine::MetricType::L2 && metric_type != (int32_t)engine::MetricType::IP) {
return SERVER_INVALID_INDEX_METRIC_TYPE;
}
return SERVER_SUCCESS;
}
ServerError
ValidationUtil::ValidateGpuIndex(uint32_t gpu_index) {
int num_devices = 0;
......
......@@ -17,6 +17,15 @@ public:
static ServerError
ValidateTableIndexType(int32_t index_type);
static ServerError
ValidateTableIndexNlist(int32_t nlist);
static ServerError
ValidateTableIndexFileSize(int32_t index_file_size);
static ServerError
ValidateTableIndexMetricType(int32_t metric_type);
static ServerError
ValidateGpuIndex(uint32_t gpu_index);
......
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "FaissGpuResources.h"
#include "map"
namespace zilliz {
namespace milvus {
namespace engine {
FaissGpuResources::Ptr& FaissGpuResources::GetGpuResources(int device_id) {
static std::map<int, FaissGpuResources::Ptr> gpu_resources_map;
auto search = gpu_resources_map.find(device_id);
if (search != gpu_resources_map.end()) {
return gpu_resources_map[device_id];
} else {
gpu_resources_map[device_id] = std::make_shared<faiss::gpu::StandardGpuResources>();
return gpu_resources_map[device_id];
}
}
void FaissGpuResources::SelectGpu() {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
gpu_num_ = server_config.GetInt32Value(server::CONFIG_GPU_INDEX, 0);
}
int32_t FaissGpuResources::GetGpu() {
return gpu_num_;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "faiss/gpu/GpuResources.h"
#include "faiss/gpu/StandardGpuResources.h"
#include "server/ServerConfig.h"
namespace zilliz {
namespace milvus {
namespace engine {
class FaissGpuResources {
public:
using Ptr = std::shared_ptr<faiss::gpu::GpuResources>;
static FaissGpuResources::Ptr& GetGpuResources(int device_id);
void SelectGpu();
int32_t GetGpu();
FaissGpuResources() : gpu_num_(0) { SelectGpu(); }
private:
int32_t gpu_num_;
};
}
}
}
\ No newline at end of file
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#if 0
// TODO: maybe support static search
#ifdef GPU_VERSION
#include "faiss/gpu/GpuAutoTune.h"
#include "faiss/gpu/StandardGpuResources.h"
#include "faiss/gpu/utils/DeviceUtils.h"
#endif
#include "Index.h"
#include "faiss/index_io.h"
#include "faiss/IndexIVF.h"
#include "faiss/IVFlib.h"
#include "faiss/IndexScalarQuantizer.h"
#include "server/ServerConfig.h"
#include "src/wrapper/FaissGpuResources.h"
namespace zilliz {
namespace milvus {
namespace engine {
using std::string;
using std::unordered_map;
using std::vector;
Index::Index(const std::shared_ptr<faiss::Index> &raw_index) {
index_ = raw_index;
dim = index_->d;
ntotal = index_->ntotal;
store_on_gpu = false;
}
bool Index::reset() {
try {
index_->reset();
ntotal = index_->ntotal;
}
catch (std::exception &e) {
// LOG(ERROR) << e.what();
return false;
}
return true;
}
bool Index::add_with_ids(idx_t n, const float *xdata, const long *xids) {
try {
index_->add_with_ids(n, xdata, xids);
ntotal += n;
}
catch (std::exception &e) {
// LOG(ERROR) << e.what();
return false;
}
return true;
}
bool Index::search(idx_t n, const float *data, idx_t k, float *distances, long *labels) const {
try {
index_->search(n, data, k, distances, labels);
}
catch (std::exception &e) {
// LOG(ERROR) << e.what();
return false;
}
return true;
}
void write_index(const Index_ptr &index, const std::string &file_name) {
write_index(index->index_.get(), file_name.c_str());
}
Index_ptr read_index(const std::string &file_name) {
std::shared_ptr<faiss::Index> raw_index = nullptr;
faiss::Index *cpu_index = faiss::read_index(file_name.c_str());
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode engine_config = config.GetConfig(server::CONFIG_ENGINE);
bool use_hybrid_index_ = engine_config.GetBoolValue(server::CONFIG_USE_HYBRID_INDEX, false);
if (dynamic_cast<faiss::IndexIVFScalarQuantizer *>(cpu_index) != nullptr && use_hybrid_index_) {
int device_id = engine_config.GetInt32Value(server::CONFIG_HYBRID_INDEX_GPU, 0);
auto gpu_resources = engine::FaissGpuResources::GetGpuResources(device_id);
faiss::gpu::GpuClonerOptions clone_option;
clone_option.storeInCpu = true;
faiss::Index *gpu_index = faiss::gpu::index_cpu_to_gpu(gpu_resources.get(), device_id, cpu_index, &clone_option);
delete cpu_index;
raw_index.reset(gpu_index);
return std::make_shared<Index>(raw_index);
} else {
raw_index.reset(cpu_index);
return std::make_shared<Index>(raw_index);
}
}
}
}
}
#endif
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
//#include <vector>
//#include <string>
//#include <unordered_map>
//#include <memory>
//#include <fstream>
//
//#include "faiss/AutoTune.h"
//#include "faiss/index_io.h"
//
//#include "Operand.h"
#include "knowhere/vec_index.h"
namespace zilliz {
namespace milvus {
namespace engine {
using Index_ptr = VecIndexPtr;
#if 0
//class Index;
//using Index_ptr = std::shared_ptr<Index>;
class Index {
typedef long idx_t;
public:
int dim; ///< std::vector dimension
idx_t ntotal; ///< total nb of indexed std::vectors
bool store_on_gpu;
explicit Index(const std::shared_ptr<faiss::Index> &raw_index);
virtual bool reset();
/**
* @brief Same as add, but stores xids instead of sequential ids.
*
* @param data input matrix, size n * d
* @param if ids is not empty ids for the std::vectors
*/
virtual bool add_with_ids(idx_t n, const float *xdata, const long *xids);
/**
* @brief for each query std::vector, find its k nearest neighbors in the database
*
* @param n queries size
* @param data query std::vectors
* @param k top k nearest neighbors
* @param distances top k nearest distances
* @param labels neighbors of the queries
*/
virtual bool search(idx_t n, const float *data, idx_t k, float *distances, long *labels) const;
//virtual bool search(idx_t n, const std::vector<float> &data, idx_t k,
// std::vector<float> &distances, std::vector<float> &labels) const;
//virtual bool remove_ids(const faiss::IDSelector &sel, long &nremove, long &location);
//virtual bool remove_ids_range(const faiss::IDSelector &sel, long &nremove);
//virtual bool index_display();
virtual std::shared_ptr<faiss::Index> data() { return index_; }
virtual const std::shared_ptr<faiss::Index>& data() const { return index_; }
private:
friend void write_index(const Index_ptr &index, const std::string &file_name);
std::shared_ptr<faiss::Index> index_ = nullptr;
};
void write_index(const Index_ptr &index, const std::string &file_name);
extern Index_ptr read_index(const std::string &file_name);
#endif
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#if 0
#include "mutex"
#ifdef GPU_VERSION
#include <faiss/gpu/StandardGpuResources.h>
#include <faiss/gpu/GpuIndexIVFFlat.h>
#include <faiss/gpu/GpuAutoTune.h>
#endif
#include <faiss/IndexFlat.h>
#include <easylogging++.h>
#include "faiss/IndexScalarQuantizer.h"
#include "server/ServerConfig.h"
#include "IndexBuilder.h"
#include "FaissGpuResources.h"
namespace zilliz {
namespace milvus {
namespace engine {
using std::vector;
static std::mutex gpu_resource;
static std::mutex cpu_resource;
IndexBuilder::IndexBuilder(const Operand_ptr &opd) {
opd_ = opd;
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
use_hybrid_index_ = engine_config.GetBoolValue(CONFIG_USE_HYBRID_INDEX, false);
hybrid_index_device_id_ = engine_config.GetInt32Value(server::CONFIG_HYBRID_INDEX_GPU, 0);
}
// Default: build use gpu
Index_ptr IndexBuilder::build_all(const long &nb,
const float *xb,
const long *ids,
const long &nt,
const float *xt) {
std::shared_ptr<faiss::Index> host_index = nullptr;
#ifdef GPU_VERSION
{
LOG(DEBUG) << "Build index by GPU";
// TODO: list support index-type.
faiss::MetricType metric_type = opd_->metric_type == "L2" ? faiss::METRIC_L2 : faiss::METRIC_INNER_PRODUCT;
faiss::Index *ori_index = faiss::index_factory(opd_->d, opd_->get_index_type(nb).c_str(), metric_type);
std::lock_guard<std::mutex> lk(gpu_resource);
#ifdef UNITTEST_ONLY
faiss::gpu::StandardGpuResources res;
int device_id = 0;
faiss::gpu::GpuClonerOptions clone_option;
clone_option.storeInCpu = use_hybrid_index_;
auto device_index = faiss::gpu::index_cpu_to_gpu(&res, device_id, ori_index, &clone_option);
#else
engine::FaissGpuResources res;
int device_id = res.GetGpu();
auto gpu_resources = engine::FaissGpuResources::GetGpuResources(device_id);
faiss::gpu::GpuClonerOptions clone_option;
clone_option.storeInCpu = use_hybrid_index_;
auto device_index = faiss::gpu::index_cpu_to_gpu(gpu_resources.get(), device_id, ori_index, &clone_option);
#endif
if (!device_index->is_trained) {
nt == 0 || xt == nullptr ? device_index->train(nb, xb)
: device_index->train(nt, xt);
}
device_index->add_with_ids(nb, xb, ids); // TODO: support with add_with_IDMAP
if (dynamic_cast<faiss::IndexIVFScalarQuantizer*>(ori_index) != nullptr
&& use_hybrid_index_) {
std::shared_ptr<faiss::Index> device_hybrid_index = nullptr;
if (hybrid_index_device_id_ != device_id) {
auto host_hybrid_index = faiss::gpu::index_gpu_to_cpu(device_index);
auto hybrid_gpu_resources = engine::FaissGpuResources::GetGpuResources(hybrid_index_device_id_);
auto another_device_index = faiss::gpu::index_cpu_to_gpu(hybrid_gpu_resources.get(),
hybrid_index_device_id_,
host_hybrid_index,
&clone_option);
device_hybrid_index.reset(another_device_index);
delete device_index;
delete host_hybrid_index;
} else {
device_hybrid_index.reset(device_index);
}
delete ori_index;
return std::make_shared<Index>(device_hybrid_index);
}
host_index.reset(faiss::gpu::index_gpu_to_cpu(device_index));
delete device_index;
delete ori_index;
}
#else
{
LOG(DEBUG) << "Build index by CPU";
faiss::MetricType metric_type = opd_->metric_type == "L2" ? faiss::METRIC_L2 : faiss::METRIC_INNER_PRODUCT;
faiss::Index *index = faiss::index_factory(opd_->d, opd_->get_index_type(nb).c_str(), metric_type);
if (!index->is_trained) {
nt == 0 || xt == nullptr ? index->train(nb, xb)
: index->train(nt, xt);
}
index->add_with_ids(nb, xb, ids);
host_index.reset(index);
}
#endif
return std::make_shared<Index>(host_index);
}
Index_ptr IndexBuilder::build_all(const long &nb, const vector<float> &xb,
const vector<long> &ids,
const long &nt, const vector<float> &xt) {
return build_all(nb, xb.data(), ids.data(), nt, xt.data());
}
BgCpuBuilder::BgCpuBuilder(const zilliz::milvus::engine::Operand_ptr &opd) : IndexBuilder(opd) {};
Index_ptr BgCpuBuilder::build_all(const long &nb, const float *xb, const long *ids, const long &nt, const float *xt) {
std::shared_ptr<faiss::Index> index = nullptr;
faiss::MetricType metric_type = opd_->metric_type == "L2" ? faiss::METRIC_L2 : faiss::METRIC_INNER_PRODUCT;
index.reset(faiss::index_factory(opd_->d, opd_->get_index_type(nb).c_str(), metric_type));
LOG(DEBUG) << "Build index by CPU";
{
std::lock_guard<std::mutex> lk(cpu_resource);
if (!index->is_trained) {
nt == 0 || xt == nullptr ? index->train(nb, xb)
: index->train(nt, xt);
}
index->add_with_ids(nb, xb, ids);
}
return std::make_shared<Index>(index);
}
IndexBuilderPtr GetIndexBuilder(const Operand_ptr &opd) {
if (opd->index_type == "IDMap") {
IndexBuilderPtr index = nullptr;
return std::make_shared<BgCpuBuilder>(opd);
}
return std::make_shared<IndexBuilder>(opd);
}
}
}
}
#endif
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#if 0
#pragma once
#include "faiss/Index.h"
#include "Operand.h"
#include "Index.h"
namespace zilliz {
namespace milvus {
namespace engine {
class IndexBuilder {
public:
explicit IndexBuilder(const Operand_ptr &opd);
virtual Index_ptr build_all(const long &nb,
const float *xb,
const long *ids,
const long &nt = 0,
const float *xt = nullptr);
virtual Index_ptr build_all(const long &nb,
const std::vector<float> &xb,
const std::vector<long> &ids,
const long &nt = 0,
const std::vector<float> &xt = std::vector<float>());
//void train(const long &nt,
// const std::vector<float> &xt);
//
//Index_ptr add(const long &nb,
// const std::vector<float> &xb,
// const std::vector<long> &ids);
//
//void set_build_option(const Operand_ptr &opd);
protected:
Operand_ptr opd_ = nullptr;
bool use_hybrid_index_;
int hybrid_index_device_id_;
};
class BgCpuBuilder : public IndexBuilder {
public:
BgCpuBuilder(const Operand_ptr &opd);
virtual Index_ptr build_all(const long &nb,
const float *xb,
const long *ids,
const long &nt = 0,
const float *xt = nullptr) override;
};
using IndexBuilderPtr = std::shared_ptr<IndexBuilder>;
extern IndexBuilderPtr GetIndexBuilder(const Operand_ptr &opd);
}
}
}
#endif
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#if 0
#include "Operand.h"
namespace zilliz {
namespace milvus {
namespace engine {
using std::string;
enum IndexType {
Invalid_Option = 0,
IVF = 1,
IDMAP = 2,
IVFSQ8 = 3,
};
IndexType resolveIndexType(const string &index_type) {
if (index_type == "IVF") { return IndexType::IVF; }
if (index_type == "IDMap") { return IndexType::IDMAP; }
if (index_type == "IVFSQ8") { return IndexType::IVFSQ8; }
return IndexType::Invalid_Option;
}
int CalcBacketCount(int nb, size_t nlist) {
int backet_count = int(nb / 1000000.0 * nlist);
if(backet_count == 0) {
backet_count = 1; //avoid faiss rash
}
return backet_count;
}
// nb at least 100
string Operand::get_index_type(const int &nb) {
if (!index_str.empty()) { return index_str; }
switch (resolveIndexType(index_type)) {
case Invalid_Option: {
// TODO: add exception
break;
}
case IVF: {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
size_t nlist = engine_config.GetInt32Value(CONFIG_NLIST, 16384);
index_str += (ncent != 0 ? index_type + std::to_string(ncent) :
index_type + std::to_string(CalcBacketCount(nb, nlist)));
// std::cout<<"nlist = "<<nlist<<std::endl;
if (!postproc.empty()) { index_str += ("," + postproc); }
break;
}
case IVFSQ8: {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
size_t nlist = engine_config.GetInt32Value(CONFIG_NLIST, 16384);
index_str += (ncent != 0 ? "IVF" + std::to_string(ncent) :
"IVF" + std::to_string(CalcBacketCount(nb, nlist)));
index_str += ",SQ8";
// std::cout<<"nlist = "<<nlist<<std::endl;
break;
}
case IDMAP: {
index_str += index_type;
if (!postproc.empty()) { index_str += ("," + postproc); }
break;
}
}
return index_str;
}
std::ostream &operator<<(std::ostream &os, const Operand &obj) {
os << obj.d << " "
<< obj.index_type << " "
<< obj.metric_type << " "
<< obj.preproc << " "
<< obj.postproc << " "
<< obj.ncent;
return os;
}
std::istream &operator>>(std::istream &is, Operand &obj) {
is >> obj.d
>> obj.index_type
>> obj.metric_type
>> obj.preproc
>> obj.postproc
>> obj.ncent;
return is;
}
std::string operand_to_str(const Operand_ptr &opd) {
std::ostringstream ss;
ss << *opd;
return ss.str();
}
Operand_ptr str_to_operand(const std::string &input) {
std::istringstream is(input);
auto opd = std::make_shared<Operand>();
is >> *(opd.get());
return opd;
}
}
}
}
#endif
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#if 0
#pragma once
#include <string>
#include <memory>
#include <iostream>
#include <sstream>
namespace zilliz {
namespace milvus {
namespace engine {
struct Operand {
friend std::ostream &operator<<(std::ostream &os, const Operand &obj);
friend std::istream &operator>>(std::istream &is, Operand &obj);
int d;
std::string index_type = "IVF";
std::string metric_type = "L2"; //> L2 / IP(Inner Product)
std::string preproc;
std::string postproc = "Flat";
std::string index_str;
int ncent = 0;
std::string get_index_type(const int &nb);
};
using Operand_ptr = std::shared_ptr<Operand>;
extern std::string operand_to_str(const Operand_ptr &opd);
extern Operand_ptr str_to_operand(const std::string &input);
}
}
}
#endif
......@@ -449,3 +449,39 @@ TEST_F(DBTest, VECTOR_IDS_TEST)
}
}
TEST_F(NewMemManagerTest, MEMMANAGER_TEST) {
int setenv_res = setenv("MILVUS_USE_OLD_MEM_MANAGER", "ON", 1);
ASSERT_TRUE(setenv_res == 0);
auto options = engine::OptionsFactory::Build();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = "sqlite://:@:/";
auto db_ = engine::DBFactory::Build(options);
engine::meta::TableSchema table_info = BuildTableSchema();
engine::Status stat = db_->CreateTable(table_info);
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
auto start_time = METRICS_NOW_TIME;
int insert_loop = 20;
for (int i = 0; i < insert_loop; ++i) {
int64_t nb = 40960;
std::vector<float> xb;
BuildVectors(nb, xb);
engine::IDNumbers vector_ids;
engine::Status status = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
ASSERT_TRUE(status.ok());
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
LOG(DEBUG) << "total_time spent in INSERT_TEST (ms) : " << total_time;
delete db_;
boost::filesystem::remove_all(options.meta.path);
}
......@@ -57,7 +57,6 @@ TEST(DBSchedulerTest, TASK_QUEUE_TEST) {
ptr = queue.Back();
ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad);
load_task->Execute();
}
TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) {
......
......@@ -8,7 +8,6 @@
#include "cache/GpuCacheMgr.h"
#include "utils/Error.h"
#include "wrapper/Index.h"
#include "wrapper/knowhere/vec_index.h"
using namespace zilliz::milvus;
......@@ -112,7 +111,7 @@ TEST(CacheTest, CPU_CACHE_TEST) {
for (int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000000;//less 1G per index
engine::Index_ptr index(mock_index);
engine::VecIndexPtr index(mock_index);
cpu_mgr->InsertItem("index_" + std::to_string(i), index);
}
......@@ -137,7 +136,7 @@ TEST(CacheTest, CPU_CACHE_TEST) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 6000000;//6G less
engine::Index_ptr index(mock_index);
engine::VecIndexPtr index(mock_index);
cpu_mgr->InsertItem("index_6g", index);
ASSERT_EQ(cpu_mgr->ItemCount(), 0);//data greater than capacity can not be inserted sucessfully
......@@ -154,7 +153,7 @@ TEST(CacheTest, GPU_CACHE_TEST) {
for(int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000;
engine::Index_ptr index(mock_index);
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
......@@ -175,7 +174,7 @@ TEST(CacheTest, INVALID_TEST) {
ASSERT_EQ(mgr.GetItem("test"), nullptr);
mgr.InsertItem("test", cache::DataObjPtr());
mgr.InsertItem("test", engine::Index_ptr(nullptr));
mgr.InsertItem("test", engine::VecIndexPtr(nullptr));
mgr.EraseItem("test");
mgr.PrintInfo();
mgr.ClearCache();
......@@ -189,7 +188,7 @@ TEST(CacheTest, INVALID_TEST) {
for(int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 2;
engine::Index_ptr index(mock_index);
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
mgr.InsertItem("index_" + std::to_string(i), obj);
......
......@@ -178,12 +178,15 @@ TEST(UtilTest, VALIDATE_TABLENAME_TEST) {
res = server::ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, server::SERVER_INVALID_TABLE_NAME);
table_name = "中文";
table_name = "_!@#!@";
res = server::ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, server::SERVER_INVALID_TABLE_NAME);
table_name = "中文";
res = server::ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, server::SERVER_INVALID_TABLE_NAME);
table_name = std::string('a', 32768);
table_name = std::string(10000, 'a');
res = server::ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, server::SERVER_INVALID_TABLE_NAME);
}
......@@ -204,6 +207,15 @@ TEST(UtilTest, VALIDATE_INDEXTYPE_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType((int)engine::EngineType::MAX_VALUE + 1), server::SERVER_INVALID_INDEX_TYPE);
}
TEST(ValidationUtilTest, ValidateGpuTest) {
ASSERT_EQ(server::ValidationUtil::ValidateGpuIndex(0), server::SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateGpuIndex(100), server::SERVER_SUCCESS);
size_t memory = 0;
ASSERT_EQ(server::ValidationUtil::GetGpuMemory(0, memory), server::SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::GetGpuMemory(100, memory), server::SERVER_SUCCESS);
}
TEST(UtilTest, TIMERECORDER_TEST) {
for(int64_t log_level = 0; log_level <= 6; log_level++) {
if(log_level == 5) {
......
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include "utils/ValidationUtil.h"
#include "utils/Error.h"
#include "db/ExecutionEngine.h"
#include <string>
using namespace zilliz::milvus;
using namespace zilliz::milvus::server;
TEST(ValidationUtilTest, TableNameTest) {
std::string table_name = "Normal123_";
ServerError res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_SUCCESS);
table_name = "12sds";
res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_INVALID_TABLE_NAME);
table_name = "";
res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_INVALID_TABLE_NAME);
table_name = "_asdasd";
res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_SUCCESS);
table_name = "!@#!@";
res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_INVALID_TABLE_NAME);
table_name = "_!@#!@";
res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_INVALID_TABLE_NAME);
table_name = "中文";
res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_INVALID_TABLE_NAME);
table_name = std::string(10000, 'a');
res = ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_INVALID_TABLE_NAME);
}
TEST(ValidationUtilTest, TableDimensionTest) {
ASSERT_EQ(ValidationUtil::ValidateTableDimension(-1), SERVER_INVALID_VECTOR_DIMENSION);
ASSERT_EQ(ValidationUtil::ValidateTableDimension(0), SERVER_INVALID_VECTOR_DIMENSION);
ASSERT_EQ(ValidationUtil::ValidateTableDimension(16385), SERVER_INVALID_VECTOR_DIMENSION);
ASSERT_EQ(ValidationUtil::ValidateTableDimension(16384), SERVER_SUCCESS);
ASSERT_EQ(ValidationUtil::ValidateTableDimension(1), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, TableIndexTypeTest) {
ASSERT_EQ(ValidationUtil::ValidateTableIndexType((int)engine::EngineType::INVALID), SERVER_INVALID_INDEX_TYPE);
for(int i = 1; i <= (int)engine::EngineType::MAX_VALUE; i++) {
ASSERT_EQ(ValidationUtil::ValidateTableIndexType(i), SERVER_SUCCESS);
}
ASSERT_EQ(ValidationUtil::ValidateTableIndexType((int)engine::EngineType::MAX_VALUE + 1), SERVER_INVALID_INDEX_TYPE);
}
TEST(ValidationUtilTest, ValidateGpuTest) {
ASSERT_EQ(ValidationUtil::ValidateGpuIndex(0), SERVER_SUCCESS);
ASSERT_NE(ValidationUtil::ValidateGpuIndex(100), SERVER_SUCCESS);
size_t memory = 0;
ASSERT_EQ(ValidationUtil::GetGpuMemory(0, memory), SERVER_SUCCESS);
ASSERT_NE(ValidationUtil::GetGpuMemory(100, memory), SERVER_SUCCESS);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册