提交 c079804b 编写于 作者: Y yukun 提交者: JinHai-CN

Improve ut coverage (#2516) (#2522)

* Improve ut coverage
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Delete unused code
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Add fiu in HybridSearchRequest
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Update helm config
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* Change BinaryQuery validation check
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* code format
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* code format
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* code format
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>
Co-authored-by: NJinHai-CN <hai.jin@zilliz.com>
上级 9fda27ff
......@@ -18,6 +18,7 @@
#include "codecs/default/DefaultAttrsFormat.h"
#include <fcntl.h>
#include <fiu-local.h>
#include <unistd.h>
#include <algorithm>
#include <memory>
......@@ -34,7 +35,9 @@ namespace codec {
void
DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset,
size_t num, std::vector<uint8_t>& raw_attrs, size_t& nbytes) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str());
fiu_do_on("read_attrs_internal_open_file_fail", open_res = false);
if (!open_res) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
......@@ -56,7 +59,9 @@ DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, con
void
DefaultAttrsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
std::vector<int64_t>& uids) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str());
fiu_do_on("read_uids_internal_open_file_fail", open_res = false);
if (!open_res) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
......@@ -76,7 +81,9 @@ DefaultAttrsFormat::read(const milvus::storage::FSHandlerPtr& fs_ptr, milvus::se
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) {
auto is_directory = boost::filesystem::is_directory(dir_path);
fiu_do_on("read_id_directory_false", is_directory = false);
if (!is_directory) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
......@@ -218,7 +225,9 @@ DefaultAttrsFormat::read_uids(const milvus::storage::FSHandlerPtr& fs_ptr, std::
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) {
auto is_directory = boost::filesystem::is_directory(dir_path);
fiu_do_on("is_directory_false", is_directory = false);
if (!is_directory) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
......
......@@ -17,6 +17,7 @@
#include "codecs/default/DefaultIdBloomFilterFormat.h"
#include <fiu-local.h>
#include <memory>
#include <string>
......@@ -37,6 +38,7 @@ DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::I
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
scaling_bloom_t* bloom_filter =
new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr);
if (bloom_filter == nullptr) {
std::string err_msg =
"Failed to read bloom filter from file: " + bloom_filter_file_path + ". " + std::strerror(errno);
......
......@@ -168,6 +168,7 @@ DBImpl::Start() {
}
// background metric thread
fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
if (options_.metric_enable_) {
bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
}
......@@ -1042,6 +1043,7 @@ DBImpl::Flush() {
LOG_ENGINE_DEBUG_ << "Begin flush all collections";
Status status;
fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false);
if (options_.wal_enable_) {
LOG_ENGINE_DEBUG_ << "WAL flush";
auto lsn = wal_mgr_->Flush();
......@@ -1472,7 +1474,10 @@ DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::Vect
engine::utils::GetParentPath(file.location_, segment_dir);
segment::SegmentReader segment_reader(segment_dir);
segment::IdBloomFilterPtr id_bloom_filter_ptr;
segment_reader.LoadBloomFilter(id_bloom_filter_ptr);
auto status = segment_reader.LoadBloomFilter(id_bloom_filter_ptr);
if (!status.ok()) {
return status;
}
for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) {
int64_t vector_id = *it;
......@@ -2343,100 +2348,101 @@ DBImpl::StartMergeTask(const std::set<std::string>& merge_collection_ids, bool f
// LOG_ENGINE_DEBUG_ << "End StartMergeTask";
}
Status
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
// step 1: create table file
meta::SegmentSchema table_file;
table_file.collection_id_ = collection_id;
table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
Status status = meta_ptr_->CreateHybridCollectionFile(table_file);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
return status;
}
// step 2: merge files
/*
ExecutionEnginePtr index =
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
(MetricType)table_file.metric_type_, table_file.nlist_);
*/
meta::SegmentsSchema updated;
std::string new_segment_dir;
utils::GetParentPath(table_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
files_holder.UnmarkFile(file);
auto file_schema = file;
file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
updated.push_back(file_schema);
int64_t size = segment_writer_ptr->Size();
if (size >= file_schema.index_file_size_) {
break;
}
}
// step 3: serialize to disk
try {
status = segment_writer_ptr->Serialize();
fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
} catch (std::exception& ex) {
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << msg;
status = Status(DB_ERROR, msg);
}
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();
// if failed to serialize merge file to disk
// typical error: out of disk space, out of memory or permission denied
table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
status = meta_ptr_->UpdateCollectionFile(table_file);
LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
return status;
}
// step 4: update table files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
// else set file type to RAW, no need to build index
if (!utils::IsRawIndexType(table_file.engine_type_)) {
table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
? meta::SegmentSchema::TO_INDEX
: meta::SegmentSchema::RAW;
} else {
table_file.file_type_ = meta::SegmentSchema::RAW;
}
table_file.file_size_ = segment_writer_ptr->Size();
table_file.row_count_ = segment_writer_ptr->VectorCount();
updated.push_back(table_file);
status = meta_ptr_->UpdateCollectionFiles(updated);
LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
<< " bytes";
if (options_.insert_cache_immediately_) {
segment_writer_ptr->Cache();
}
return status;
}
// Status
// DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
//
// LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
//
// // step 1: create table file
// meta::SegmentSchema table_file;
// table_file.collection_id_ = collection_id;
// table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
// Status status = meta_ptr_->CreateHybridCollectionFile(table_file);
//
// if (!status.ok()) {
// LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
// return status;
// }
//
// // step 2: merge files
// /*
// ExecutionEnginePtr index =
// EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
// (MetricType)table_file.metric_type_, table_file.nlist_);
//*/
// meta::SegmentsSchema updated;
//
// std::string new_segment_dir;
// utils::GetParentPath(table_file.location_, new_segment_dir);
// auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
//
// // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
// milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
// for (auto& file : files) {
// server::CollectMergeFilesMetrics metrics;
// std::string segment_dir_to_merge;
// utils::GetParentPath(file.location_, segment_dir_to_merge);
// segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
//
// files_holder.UnmarkFile(file);
//
// auto file_schema = file;
// file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
// updated.push_back(file_schema);
// int64_t size = segment_writer_ptr->Size();
// if (size >= file_schema.index_file_size_) {
// break;
// }
// }
//
// // step 3: serialize to disk
// try {
// status = segment_writer_ptr->Serialize();
// fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
// fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
// } catch (std::exception& ex) {
// std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
// LOG_ENGINE_ERROR_ << msg;
// status = Status(DB_ERROR, msg);
// }
//
// if (!status.ok()) {
// LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " <<
// status.message();
//
// // if failed to serialize merge file to disk
// // typical error: out of disk space, out of memory or permission denied
// table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
// status = meta_ptr_->UpdateCollectionFile(table_file);
// LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
//
// return status;
// }
//
// // step 4: update table files state
// // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
// // else set file type to RAW, no need to build index
// if (!utils::IsRawIndexType(table_file.engine_type_)) {
// table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
// ? meta::SegmentSchema::TO_INDEX
// : meta::SegmentSchema::RAW;
// } else {
// table_file.file_type_ = meta::SegmentSchema::RAW;
// }
// table_file.file_size_ = segment_writer_ptr->Size();
// table_file.row_count_ = segment_writer_ptr->VectorCount();
// updated.push_back(table_file);
// status = meta_ptr_->UpdateCollectionFiles(updated);
// LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
// << " bytes";
//
// if (options_.insert_cache_immediately_) {
// segment_writer_ptr->Cache();
// }
//
// return status;
//}
void
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
......
......@@ -248,8 +248,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
void
BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all);
Status
MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);
// Status
// MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);
void
StartBuildIndexTask();
......
......@@ -11,6 +11,7 @@
#include "db/insert/MemManagerImpl.h"
#include <fiu-local.h>
#include <thread>
#include "VectorSource.h"
......@@ -36,9 +37,9 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length,
const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << "Insert buffer size exceeds limit. Performing force flush";
// TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
auto status = Flush(flushed_tables, false);
fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
......
......@@ -174,9 +174,6 @@ class Meta {
virtual Status
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) = 0;
virtual Status
CreateHybridCollectionFile(SegmentSchema& file_schema) = 0;
}; // MetaData
using MetaPtr = std::shared_ptr<Meta>;
......
......@@ -2186,8 +2186,8 @@ MySQLMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collecti
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true);
fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception(););
fiu_do_on("MySQLMetaImpl.FilesByTypeEx.null_connection", is_null_connection = true);
fiu_do_on("MySQLMetaImpl.FilesByTypeEx.throw_exception", throw std::exception(););
if (is_null_connection) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
......@@ -3204,11 +3204,6 @@ MySQLMetaImpl::DescribeHybridCollection(CollectionSchema& collection_schema, hyb
return Status::OK();
}
Status
MySQLMetaImpl::CreateHybridCollectionFile(milvus::engine::meta::SegmentSchema& file_schema) {
return Status::OK();
}
} // namespace meta
} // namespace engine
} // namespace milvus
......@@ -161,9 +161,6 @@ class MySQLMetaImpl : public Meta {
Status
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
Status
CreateHybridCollectionFile(SegmentSchema& file_schema) override;
private:
Status
NextFileId(std::string& file_id);
......
此差异已折叠。
......@@ -163,9 +163,6 @@ class SqliteMetaImpl : public Meta {
Status
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
Status
CreateHybridCollectionFile(SegmentSchema& file_schema) override;
private:
Status
NextFileId(std::string& file_id);
......
......@@ -34,23 +34,23 @@ Attr::Attr(const std::vector<uint8_t>& data, size_t nbytes, const std::vector<in
: data_(std::move(data)), nbytes_(nbytes), uids_(std::move(uids)), name_(name) {
}
void
Attr::AddAttr(const std::vector<uint8_t>& data, size_t nbytes) {
data_.reserve(data_.size() + data.size());
data_.insert(data_.end(), std::make_move_iterator(data.begin()), std::make_move_iterator(data.end()));
nbytes_ += nbytes;
}
void
Attr::AddUids(const std::vector<int64_t>& uids) {
uids_.reserve(uids_.size() + uids.size());
uids_.insert(uids_.end(), std::make_move_iterator(uids.begin()), std::make_move_iterator(uids.end()));
}
// void
// Attr::AddAttr(const std::vector<uint8_t>& data, size_t nbytes) {
// data_.reserve(data_.size() + data.size());
// data_.insert(data_.end(), std::make_move_iterator(data.begin()), std::make_move_iterator(data.end()));
// nbytes_ += nbytes;
//}
//
// void
// Attr::AddUids(const std::vector<int64_t>& uids) {
// uids_.reserve(uids_.size() + uids.size());
// uids_.insert(uids_.end(), std::make_move_iterator(uids.begin()), std::make_move_iterator(uids.end()));
//}
void
Attr::SetName(const std::string& name) {
name_ = name;
}
// void
// Attr::SetName(const std::string& name) {
// name_ = name;
//}
const std::vector<uint8_t>&
Attr::GetData() const {
......@@ -87,15 +87,15 @@ Attr::GetCodeLength() const {
return uids_.size() == 0 ? 0 : nbytes_ / uids_.size();
}
void
Attr::Erase(int32_t offset) {
auto code_length = GetCodeLength();
if (code_length != 0) {
auto step = offset * code_length;
data_.erase(data_.begin() + step, data_.begin() + step + code_length);
uids_.erase(uids_.begin() + offset, uids_.begin() + offset + 1);
}
}
// void
// Attr::Erase(int32_t offset) {
// auto code_length = GetCodeLength();
// if (code_length != 0) {
// auto step = offset * code_length;
// data_.erase(data_.begin() + step, data_.begin() + step + code_length);
// uids_.erase(uids_.begin() + offset, uids_.begin() + offset + 1);
// }
//}
void
Attr::Erase(std::vector<int32_t>& offsets) {
......
......@@ -30,14 +30,14 @@ class Attr {
Attr();
void
AddAttr(const std::vector<uint8_t>& data, size_t nbytes);
void
AddUids(const std::vector<int64_t>& uids);
void
SetName(const std::string& name);
// void
// AddAttr(const std::vector<uint8_t>& data, size_t nbytes);
//
// void
// AddUids(const std::vector<int64_t>& uids);
//
// void
// SetName(const std::string& name);
const std::vector<uint8_t>&
GetData() const;
......@@ -60,8 +60,8 @@ class Attr {
size_t
GetCodeLength() const;
void
Erase(int32_t offset);
// void
// Erase(int32_t offset);
void
Erase(std::vector<int32_t>& offsets);
......
......@@ -69,15 +69,20 @@ SegmentWriter::AddAttrs(const std::string& name, const std::unordered_map<std::s
auto attr_data_it = attr_data.begin();
auto attrs = segment_ptr_->attrs_ptr_->attrs;
for (; attr_data_it != attr_data.end(); ++attr_data_it) {
if (attrs.find(attr_data_it->first) != attrs.end()) {
segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)
->AddAttr(attr_data_it->second, attr_nbytes.at(attr_data_it->first));
segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)->AddUids(uids);
} else {
AttrPtr attr = std::make_shared<Attr>(attr_data_it->second, attr_nbytes.at(attr_data_it->first), uids,
attr_data_it->first);
segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr));
}
AttrPtr attr = std::make_shared<Attr>(attr_data_it->second, attr_nbytes.at(attr_data_it->first), uids,
attr_data_it->first);
segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr));
// if (attrs.find(attr_data_it->first) != attrs.end()) {
// segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)
// ->AddAttr(attr_data_it->second, attr_nbytes.at(attr_data_it->first));
// segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)->AddUids(uids);
// } else {
// AttrPtr attr = std::make_shared<Attr>(attr_data_it->second, attr_nbytes.at(attr_data_it->first),
// uids,
// attr_data_it->first);
// segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr));
// }
}
return Status::OK();
}
......
......@@ -57,6 +57,8 @@ CreateHybridCollectionRequest::OnExecute() {
try {
// step 1: check arguments
auto status = ValidationUtil::ValidateCollectionName(collection_name_);
fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_collection_name",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
......@@ -98,6 +100,8 @@ CreateHybridCollectionRequest::OnExecute() {
// step 3: create collection
status = DBWrapper::DB()->CreateHybridCollection(collection_info, fields_schema);
fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_db_execute",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
// collection could exist
if (status.code() == DB_ALREADY_EXIST) {
......
......@@ -53,6 +53,8 @@ DescribeHybridCollectionRequest::OnExecute() {
engine::meta::hybrid::FieldsSchema fields_schema;
collection_schema.collection_id_ = collection_name_;
auto status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema);
fiu_do_on("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
......
......@@ -116,8 +116,8 @@ HybridSearchRequest::OnExecute() {
if (!status.ok()) {
return status;
}
fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_ids.clear());
if (result_ids.empty()) {
fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_.result_ids_.clear());
if (result_.result_ids_.empty()) {
return Status::OK(); // empty table
}
......
......@@ -1159,6 +1159,20 @@ GrpcRequestHandler::DescribeHybridCollection(::grpc::ServerContext* context,
const ::milvus::grpc::CollectionName* request,
::milvus::grpc::Mapping* response) {
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__);
std::unordered_map<std::string, engine::meta::hybrid::DataType> field_types;
Status status =
request_handler_.DescribeHybridCollection(GetContext(context), request->collection_name(), field_types);
response->mutable_status()->set_error_code((milvus::grpc::ErrorCode)status.code());
response->mutable_status()->set_reason(status.message());
response->set_collection_name(request->collection_name());
auto field_it = field_types.begin();
for (; field_it != field_types.end(); field_it++) {
auto field = response->add_fields();
field->set_name(field_it->first);
field->mutable_type()->set_data_type((milvus::grpc::DataType)field_it->second);
}
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__);
return ::grpc::Status::OK;
......
......@@ -11,11 +11,11 @@
#include "utils/LogUtil.h"
#include <fiu-local.h>
#include <libgen.h>
#include <cctype>
#include <string>
#include <fiu-local.h>
#include <yaml-cpp/yaml.h>
#include <boost/filesystem.hpp>
......
......@@ -621,6 +621,11 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) {
db_->Start();
db_->Stop();
fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache");
FIU_ENABLE_FIU("options_metric_enable");
db_->Start();
db_->Stop();
fiu_disable("options_metric_enable");
}
TEST_F(DBTest, BACK_TIMER_THREAD_2) {
......@@ -1219,6 +1224,39 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) {
}
}
TEST_F(DBTest2, GET_VECTOR_BY_ID_INVALID_TEST) {
fiu_init(0);
milvus::engine::meta::CollectionSchema collection_info = BuildCollectionSchema();
auto stat = db_->CreateCollection(collection_info);
ASSERT_TRUE(stat.ok());
uint64_t qb = 1000;
milvus::engine::VectorsData qxb;
BuildVectors(qb, 0, qxb);
std::string partition_name = "part_name";
std::string partition_tag = "part_tag";
stat = db_->CreatePartition(collection_info.collection_id_, partition_name, partition_tag);
ASSERT_TRUE(stat.ok());
std::vector<milvus::engine::VectorsData> vectors;
std::vector<int64_t> empty_array;
stat = db_->GetVectorsByID(collection_info, empty_array, vectors);
ASSERT_FALSE(stat.ok());
stat = db_->InsertVectors(collection_info.collection_id_, partition_tag, qxb);
ASSERT_TRUE(stat.ok());
db_->Flush(collection_info.collection_id_);
fiu_enable("bloom_filter_nullptr", 1, NULL, 0);
stat = db_->GetVectorsByID(collection_info, qxb.id_array_, vectors);
ASSERT_FALSE(stat.ok());
fiu_disable("bloom_filter_nullptr");
}
TEST_F(DBTest2, GET_VECTOR_IDS_TEST) {
milvus::engine::meta::CollectionSchema collection_schema = BuildCollectionSchema();
auto stat = db_->CreateCollection(collection_schema);
......
......@@ -49,15 +49,13 @@ BuildCollectionSchema(milvus::engine::meta::CollectionSchema& collection_schema,
fields[i].collection_id_ = COLLECTION_NAME;
fields[i].field_name_ = "field_" + std::to_string(i);
}
fields[0].field_type_ = (int)milvus::engine::meta::hybrid::DataType::INT32;
fields[1].field_type_ = (int)milvus::engine::meta::hybrid::DataType::INT64;
fields[2].field_type_ = (int)milvus::engine::meta::hybrid::DataType::FLOAT;
fields[3].field_type_ = (int)milvus::engine::meta::hybrid::DataType::VECTOR;
fields_schema.fields_schema_ = fields;
milvus::engine::meta::hybrid::FieldSchema schema;
schema.field_name_ = "field_vector";
schema.collection_id_ = TABLE_NAME;
schema.field_type_ = (int)(milvus::engine::meta::hybrid::DataType::VECTOR);
fields.emplace_back(schema);
attr_type.insert(std::make_pair("field_0", milvus::engine::meta::hybrid::DataType::INT32));
attr_type.insert(std::make_pair("field_1", milvus::engine::meta::hybrid::DataType::INT64));
attr_type.insert(std::make_pair("field_2", milvus::engine::meta::hybrid::DataType::FLOAT));
fields_schema.fields_schema_ = fields;
}
void
......@@ -87,7 +85,7 @@ BuildEntity(uint64_t n, uint64_t batch_index, milvus::engine::Entity& entity) {
vectors.id_array_.push_back(n * batch_index + i);
}
entity.vector_data_.insert(std::make_pair("field_3", vectors));
entity.vector_data_.insert(std::make_pair("field_vector", vectors));
std::vector<int64_t> value_0;
std::vector<int64_t> value_1;
std::vector<double> value_2;
......@@ -174,6 +172,7 @@ ConstructGeneralQuery(milvus::query::GeneralQueryPtr& general_query, milvus::que
query_ptr->root = general_query->bin;
query_ptr->vectors.insert(std::make_pair(vector_placeholder, vector_query));
}
} // namespace
TEST_F(DBTest, HYBRID_DB_TEST) {
......@@ -228,9 +227,9 @@ TEST_F(DBTest, HYBRID_SEARCH_TEST) {
uint64_t qb = 1000;
milvus::engine::Entity entity;
BuildEntity(qb, 0, entity);
BuildComplexEntity(qb, 0, entity);
std::vector<std::string> field_names = {"field_0", "field_1", "field_2"};
std::vector<std::string> field_names = {"field_0", "field_1", "field_2", "field_3", "field_4", "field_5"};
stat = db_->InsertEntities(COLLECTION_NAME, "", field_names, entity, attr_type);
ASSERT_TRUE(stat.ok());
......@@ -347,5 +346,3 @@ TEST_F(DBTest2, GET_ENTITY_BY_ID_TEST) {
ASSERT_EQ(vector.vector_count_, 0);
ASSERT_TRUE(vector.float_data_.empty());
ASSERT_TRUE(vector.binary_data_.empty());
}
}
......@@ -295,6 +295,20 @@ TEST_F(MetaTest, FAILED_TEST) {
ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED);
fiu_disable("SqliteMetaImpl.FilesByType.throw_exception");
}
{
milvus::engine::meta::FilesHolder files_holder;
std::vector<milvus::engine::meta::CollectionSchema> collection_array;
milvus::engine::meta::CollectionSchema schema;
schema.collection_id_ = collection_id;
collection_array.emplace_back(schema);
std::vector<int> file_types;
file_types.push_back(milvus::engine::meta::SegmentSchema::INDEX);
FIU_ENABLE_FIU("SqliteMetaImpl.FilesByTypeEx.throw_exception");
status = impl_->FilesByTypeEx(collection_array, file_types, files_holder);
ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED);
fiu_disable("SqliteMetaImpl.FilesByTypeEx.throw_exception");
status = impl_->FilesByTypeEx(collection_array, file_types, files_holder);
}
{
uint64_t size = 0;
FIU_ENABLE_FIU("SqliteMetaImpl.Size.throw_exception");
......@@ -567,6 +581,9 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
++i;
}
status = impl.GetCollectionFilesBySegmentId(table_file.segment_id_, files_holder);
ASSERT_TRUE(status.ok());
impl.DropAll();
}
......
......@@ -499,6 +499,9 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) {
++i;
}
status = impl.GetCollectionFilesBySegmentId(table_file.segment_id_, files_holder);
ASSERT_TRUE(status.ok());
status = impl.DropAll();
ASSERT_TRUE(status.ok());
}
......@@ -709,6 +712,17 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) {
to_index_files_cnt + index_files_cnt;
ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt);
std::vector<milvus::engine::meta::CollectionSchema> collection_array;
milvus::engine::meta::CollectionSchema schema;
schema.collection_id_ = collection_id;
status = impl_->FilesByTypeEx(collection_array, file_types, files_holder);
ASSERT_TRUE(status.ok());
// FIU_ENABLE_FIU("MySQLMetaImpl.FilesByTypeEx.throw_exception");
// status = impl_->FilesByTypeEx(collection_array, file_types, files_holder);
// ASSERT_FALSE(status.ok());
// fiu_disable("MySQLMetaImpl.FilesByTypeEx.throw_exception");
FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.null_connection");
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_FALSE(status.ok());
......
......@@ -1220,6 +1220,11 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) {
field_1->mutable_type()->mutable_vector_param()->set_dimension(128);
field_1->set_name("field_1");
milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}};
auto extra_param = field_1->add_extra_params();
extra_param->set_key("params");
extra_param->set_value(json_param.dump());
handler->CreateHybridCollection(&context, &mapping, &response);
// Insert Entities
......@@ -1281,6 +1286,15 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) {
term_query->mutable_int_value()->Resize(static_cast<int>(nq), 0);
memcpy(term_query->mutable_int_value()->mutable_data(), term_value.data(), nq * sizeof(int64_t));
auto range_query = boolean_query_2->add_general_query()->mutable_range_query();
range_query->set_field_name("field_0");
auto comp1 = range_query->add_operand();
comp1->set_operator_(::milvus::grpc::CompareOperator::GTE);
comp1->set_operand("0");
auto comp2 = range_query->add_operand();
comp2->set_operator_(::milvus::grpc::CompareOperator::LTE);
comp2->set_operand("100000");
auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query();
vector_query->set_field_name("field_1");
vector_query->set_topk(topk);
......@@ -1297,10 +1311,146 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) {
auto row_record = vector_query->add_records();
CopyRowRecord(row_record, record);
}
auto extra_param = vector_query->add_extra_params();
auto extra_param_1 = vector_query->add_extra_params();
extra_param_1->set_key("params");
milvus::json param = {{"nprobe", 16}};
extra_param_1->set_value(param.dump());
search_param.set_collection_name("test_hybrid");
auto search_extra_param = search_param.add_extra_params();
search_extra_param->set_key("params");
search_extra_param->set_value("");
milvus::grpc::TopKQueryResult topk_query_result;
handler->HybridSearch(&context, &search_param, &topk_query_result);
}
TEST_F(RpcHandlerTest, HYBRID_INVALID_TEST) {
fiu_init(0);
::grpc::ServerContext context;
milvus::grpc::Mapping mapping;
milvus::grpc::Status response;
uint64_t row_num = 1000;
uint64_t dimension = 128;
// Create Hybrid Collection
mapping.set_collection_name("test_hybrid");
auto field_0 = mapping.add_fields();
field_0->set_name("field_0");
field_0->mutable_type()->set_data_type(::milvus::grpc::DataType::INT64);
auto field_1 = mapping.add_fields();
field_1->mutable_type()->mutable_vector_param()->set_dimension(128);
field_1->set_name("field_1");
milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}};
auto extra_param = field_1->add_extra_params();
extra_param->set_key("params");
extra_param->set_value(json_param.dump());
fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name", 1, NULL, 0);
handler->CreateHybridCollection(&context, &mapping, &response);
fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name");
fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0);
handler->CreateHybridCollection(&context, &mapping, &response);
fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute");
handler->CreateHybridCollection(&context, &mapping, &response);
milvus::grpc::CollectionName grpc_collection_name;
grpc_collection_name.set_collection_name("test_hybrid");
fiu_enable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0);
handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping);
fiu_disable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute");
handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping);
// Insert Entities
milvus::grpc::HInsertParam insert_param;
milvus::grpc::HEntityIDs entity_ids;
insert_param.set_collection_name("test_hybrid");
auto entity = insert_param.mutable_entities();
auto field_name_0 = entity->add_field_names();
*field_name_0 = "field_0";
auto field_name_1 = entity->add_field_names();
*field_name_1 = "field_1";
entity->set_row_num(row_num);
std::vector<int64_t> field_value(row_num, 0);
for (uint64_t i = 0; i < row_num; i++) {
field_value[i] = i;
}
entity->set_attr_records(field_value.data(), row_num * sizeof(int64_t));
std::vector<std::vector<float>> vector_field;
vector_field.resize(row_num);
for (uint64_t i = 0; i < row_num; ++i) {
vector_field[i].resize(dimension);
for (uint64_t j = 0; j < dimension; ++j) {
vector_field[i][j] = (float)((i + 10) / (j + 20));
}
}
auto vector_record = entity->add_result_values();
for (uint64_t i = 0; i < row_num; ++i) {
auto record = vector_record->mutable_vector_value()->add_value();
auto vector_data = record->mutable_float_data();
vector_data->Resize(static_cast<int>(vector_field[i].size()), 0.0);
memcpy(vector_data->mutable_data(), vector_field[i].data(), vector_field[i].size() * sizeof(float));
}
fiu_enable("InsertEntityRequest.OnExecute.throw_std_exception", 1, NULL, 0);
handler->InsertEntity(&context, &insert_param, &entity_ids);
fiu_disable("InsertEntityRequest.OnExecute.throw_std_exception");
handler->InsertEntity(&context, &insert_param, &entity_ids);
uint64_t nq = 10;
uint64_t topk = 10;
milvus::grpc::HSearchParam search_param;
auto general_query = search_param.mutable_general_query();
auto boolean_query_1 = general_query->mutable_boolean_query();
boolean_query_1->set_occur(milvus::grpc::Occur::MUST);
auto general_query_1 = boolean_query_1->add_general_query();
auto boolean_query_2 = general_query_1->mutable_boolean_query();
auto term_query = boolean_query_2->add_general_query()->mutable_term_query();
term_query->set_field_name("field_0");
std::vector<int64_t> term_value(nq, 0);
for (uint64_t i = 0; i < nq; ++i) {
term_value[i] = i + nq;
}
term_query->set_value_num(nq);
term_query->set_values(term_value.data(), nq * sizeof(int64_t));
auto range_query = boolean_query_2->add_general_query()->mutable_range_query();
range_query->set_field_name("field_0");
auto comp1 = range_query->add_operand();
comp1->set_operator_(::milvus::grpc::CompareOperator::GTE);
comp1->set_operand("0");
auto comp2 = range_query->add_operand();
comp2->set_operator_(::milvus::grpc::CompareOperator::LTE);
comp2->set_operand("100000");
auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query();
vector_query->set_field_name("field_1");
vector_query->set_topk(topk);
vector_query->set_query_boost(2);
std::vector<std::vector<float>> query_vector;
query_vector.resize(nq);
for (uint64_t i = 0; i < nq; ++i) {
query_vector[i].resize(dimension);
for (uint64_t j = 0; j < dimension; ++j) {
query_vector[i][j] = (float)((j + 1) / (i + dimension));
}
}
for (auto record : query_vector) {
auto row_record = vector_query->add_records();
CopyRowRecord(row_record, record);
}
auto extra_param_1 = vector_query->add_extra_params();
extra_param_1->set_key("params");
milvus::json param = {{"nprobe", 16}};
extra_param->set_value(param.dump());
extra_param_1->set_value(param.dump());
search_param.set_collection_name("test_hybrid");
auto search_extra_param = search_param.add_extra_params();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册