未验证 提交 7cb09006 编写于 作者: Y yukun 提交者: GitHub

Collection and Search call SSDBImpl (#2988)

* Fix test_rpc
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Fix test_search.py and test_index.py
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* CreateCollection to SSDBImpl
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Collection to SSDBImpl
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Search to SSDBImpl
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* code format
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Fix DescribeCollection
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>

* Index to SSDB
Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>
上级 12be12d9
......@@ -23,6 +23,8 @@
namespace milvus {
namespace engine {
static const char* DIMENSION = "dim";
// TODO(linxj): replace with VecIndex::IndexType
enum class EngineType {
INVALID = 0,
......
......@@ -255,8 +255,7 @@ ValidateCollectionIndexType(int32_t index_type) {
}
Status
ValidateIndexParams(const milvus::json& index_params, const engine::meta::CollectionSchema& collection_schema,
int32_t index_type) {
ValidateIndexParams(const milvus::json& index_params, int64_t dimension, int32_t index_type) {
switch (index_type) {
case (int32_t)engine::EngineType::FAISS_IDMAP:
case (int32_t)engine::EngineType::FAISS_BIN_IDMAP: {
......@@ -286,7 +285,7 @@ ValidateIndexParams(const milvus::json& index_params, const engine::meta::Collec
// special check for 'm' parameter
std::vector<int64_t> resset;
milvus::knowhere::IVFPQConfAdapter::GetValidMList(collection_schema.dimension_, resset);
milvus::knowhere::IVFPQConfAdapter::GetValidMList(dimension, resset);
int64_t m_value = index_params[knowhere::IndexParams::m];
if (resset.empty()) {
std::string msg = "Invalid collection dimension, unable to get reasonable values for 'm'";
......
......@@ -40,8 +40,7 @@ extern Status
ValidateCollectionIndexType(int32_t index_type);
extern Status
ValidateIndexParams(const milvus::json& index_params, const engine::meta::CollectionSchema& collection_schema,
int32_t index_type);
ValidateIndexParams(const milvus::json& index_params, int64_t dimension, int32_t index_type);
extern Status
ValidateSearchParams(const milvus::json& search_params, const engine::meta::CollectionSchema& collection_schema,
......
......@@ -18,11 +18,9 @@
#include "server/delivery/request/CmdRequest.h"
#include "server/delivery/request/CompactRequest.h"
#include "server/delivery/request/CountCollectionRequest.h"
#include "server/delivery/request/CreateCollectionRequest.h"
#include "server/delivery/request/CreateIndexRequest.h"
#include "server/delivery/request/CreatePartitionRequest.h"
#include "server/delivery/request/DeleteByIDRequest.h"
#include "server/delivery/request/DescribeCollectionRequest.h"
#include "server/delivery/request/DescribeIndexRequest.h"
#include "server/delivery/request/DropCollectionRequest.h"
#include "server/delivery/request/DropIndexRequest.h"
......@@ -42,7 +40,6 @@
#include "server/delivery/request/ShowPartitionsRequest.h"
#include "server/delivery/hybrid_request/CreateHybridCollectionRequest.h"
#include "server/delivery/hybrid_request/CreateHybridIndexRequest.h"
#include "server/delivery/hybrid_request/DescribeHybridCollectionRequest.h"
#include "server/delivery/hybrid_request/GetEntityByIDRequest.h"
#include "server/delivery/hybrid_request/HybridSearchRequest.h"
......@@ -51,16 +48,6 @@
namespace milvus {
namespace server {
Status
RequestHandler::CreateCollection(const std::shared_ptr<Context>& context, const std::string& collection_name,
int64_t dimension, int64_t index_file_size, int64_t metric_type) {
BaseRequestPtr request_ptr =
CreateCollectionRequest::Create(context, collection_name, dimension, index_file_size, metric_type);
RequestScheduler::ExecRequest(request_ptr);
return request_ptr->status();
}
Status
RequestHandler::HasCollection(const std::shared_ptr<Context>& context, const std::string& collection_name,
bool& has_collection) {
......@@ -156,15 +143,6 @@ RequestHandler::SearchByID(const std::shared_ptr<Context>& context, const std::s
return request_ptr->status();
}
Status
RequestHandler::DescribeCollection(const std::shared_ptr<Context>& context, const std::string& collection_name,
CollectionSchema& collection_schema) {
BaseRequestPtr request_ptr = DescribeCollectionRequest::Create(context, collection_name, collection_schema);
RequestScheduler::ExecRequest(request_ptr);
return request_ptr->status();
}
Status
RequestHandler::CountCollection(const std::shared_ptr<Context>& context, const std::string& collection_name,
int64_t& count) {
......@@ -333,26 +311,14 @@ RequestHandler::GetEntityByID(const std::shared_ptr<Context>& context, const std
}
Status
RequestHandler::HybridSearch(const std::shared_ptr<Context>& context, const std::string& collection_name,
std::vector<std::string>& partition_list, query::GeneralQueryPtr& general_query,
query::QueryPtr& query_ptr, milvus::json& json_params,
std::vector<std::string>& field_names, engine::QueryResult& result) {
BaseRequestPtr request_ptr = HybridSearchRequest::Create(context, collection_name, partition_list, general_query,
query_ptr, json_params, field_names, result);
RequestHandler::HybridSearch(const std::shared_ptr<milvus::server::Context>& context, const query::QueryPtr& query_ptr,
const milvus::json& json_params, engine::QueryResultPtr& result) {
BaseRequestPtr request_ptr = HybridSearchRequest::Create(context, query_ptr, json_params, result);
RequestScheduler::ExecRequest(request_ptr);
return request_ptr->status();
}
Status
RequestHandler::CreateHybridIndex(const std::shared_ptr<Context>& context, const std::string& collection_name,
const std::vector<std::string>& field_names, const milvus::json& json_params) {
BaseRequestPtr request_ptr = CreateHybridIndexRequest::Create(context, collection_name, field_names, json_params);
RequestScheduler::ExecRequest(request_ptr);
return request_ptr->status();
}
} // namespace server
} // namespace milvus
......@@ -29,10 +29,6 @@ class RequestHandler {
public:
RequestHandler() = default;
Status
CreateCollection(const std::shared_ptr<Context>& context, const std::string& collection_name, int64_t dimension,
int64_t index_file_size, int64_t metric_type);
Status
HasCollection(const std::shared_ptr<Context>& context, const std::string& collection_name, bool& has_collection);
......@@ -72,10 +68,6 @@ class RequestHandler {
const std::vector<int64_t>& id_array, int64_t topk, const milvus::json& extra_params,
const std::vector<std::string>& partition_list, TopKQueryResult& result);
Status
DescribeCollection(const std::shared_ptr<Context>& context, const std::string& collection_name,
CollectionSchema& collection_schema);
Status
CountCollection(const std::shared_ptr<Context>& context, const std::string& collection_name, int64_t& count);
......@@ -150,14 +142,8 @@ class RequestHandler {
std::vector<engine::AttrsData>& attrs, std::vector<engine::VectorsData>& vectors);
Status
HybridSearch(const std::shared_ptr<Context>& context, const std::string& collection_name,
std::vector<std::string>& partition_list, query::GeneralQueryPtr& general_query,
query::QueryPtr& query_ptr, milvus::json& json_params, std::vector<std::string>& field_names,
engine::QueryResult& result);
Status
CreateHybridIndex(const std::shared_ptr<Context>& context, const std::string& collection_name,
const std::vector<std::string>& field_names, const milvus::json& json_params);
HybridSearch(const std::shared_ptr<milvus::server::Context>& context, const query::QueryPtr& query_ptr,
const milvus::json& json_params, engine::QueryResultPtr& result);
};
} // namespace server
......
......@@ -19,6 +19,7 @@
#include "utils/TimeRecorder.h"
#include <fiu-local.h>
#include <src/db/snapshot/Context.h>
#include <memory>
#include <string>
#include <unordered_map>
......@@ -123,8 +124,20 @@ CreateHybridCollectionRequest::OnExecute() {
collection_info.metric_type_ = metric_type;
}
// step 3: create collection
status = DBWrapper::DB()->CreateHybridCollection(collection_info, fields_schema);
// step 3: create snapshot collection
engine::snapshot::CreateCollectionContext create_collection_context;
auto ss_collection_schema = std::make_shared<engine::snapshot::Collection>(collection_name_);
create_collection_context.collection = ss_collection_schema;
for (const auto& schema : fields_schema.fields_schema_) {
auto field = std::make_shared<engine::snapshot::Field>(
schema.field_name_, 0, (engine::FieldType)schema.field_type_, json::parse(schema.field_params_));
auto field_element = std::make_shared<engine::snapshot::FieldElement>(
0, 0, schema.index_name_, engine::FieldElementType::FET_INDEX, json::parse(schema.index_param_));
create_collection_context.fields_schema[field] = {field_element};
}
status = DBWrapper::SSDB()->CreateCollection(create_collection_context);
fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_db_execute",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/delivery/hybrid_request/CreateHybridIndexRequest.h"
#include "db/Utils.h"
#include "server/DBWrapper.h"
#include "server/ValidationUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <fiu-local.h>
#include <memory>
#include <string>
#include <unordered_map>
namespace milvus {
namespace server {
CreateHybridIndexRequest::CreateHybridIndexRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name,
const std::vector<std::string>& field_names,
const milvus::json& extra_params)
: BaseRequest(context, BaseRequest::kCreateHybridIndex),
collection_name_(collection_name),
field_names_(field_names),
extra_params_(extra_params) {
}
BaseRequestPtr
CreateHybridIndexRequest::Create(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, const std::vector<std::string>& field_names,
const milvus::json& extra_params) {
return std::shared_ptr<BaseRequest>(
new CreateHybridIndexRequest(context, collection_name, field_names, extra_params));
}
Status
CreateHybridIndexRequest::OnExecute() {
try {
std::string hdr = "CreateIndexRequest(collection=" + collection_name_ + ")";
TimeRecorderAuto rc(hdr);
// step 1: check arguments
auto status = ValidateCollectionName(collection_name_);
if (!status.ok()) {
return status;
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
engine::meta::hybrid::FieldsSchema fields_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema);
std::unordered_map<std::string, engine::meta::hybrid::DataType> attr_types;
for (const auto& schema : fields_schema.fields_schema_) {
attr_types.insert(std::make_pair(schema.field_name_, (engine::meta::hybrid::DataType)schema.field_type_));
}
fiu_do_on("CreateIndexRequest.OnExecute.not_has_collection",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("CreateIndexRequest.OnExecute.throw_std.exception", throw std::exception());
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
int32_t index_type = 0;
if (extra_params_.contains("index_type")) {
std::string index_type_str = extra_params_["index_type"];
index_type = (int32_t)engine::s_map_engine_type.at(index_type_str);
}
status = ValidateCollectionIndexType(index_type);
if (!status.ok()) {
return status;
}
status = ValidateIndexParams(extra_params_, collection_schema, index_type);
if (!status.ok()) {
return status;
}
// step 2: binary and float vector support different index/metric type, need to adapt here
int32_t adapter_index_type = index_type;
if (engine::utils::IsBinaryMetricType(collection_schema.metric_type_)) { // binary vector not allow
if (adapter_index_type == static_cast<int32_t>(engine::EngineType::FAISS_IDMAP)) {
adapter_index_type = static_cast<int32_t>(engine::EngineType::FAISS_BIN_IDMAP);
} else if (adapter_index_type == static_cast<int32_t>(engine::EngineType::FAISS_IVFFLAT)) {
adapter_index_type = static_cast<int32_t>(engine::EngineType::FAISS_BIN_IVFFLAT);
} else {
return Status(SERVER_INVALID_INDEX_TYPE, "Invalid index type for collection metric type");
}
}
rc.RecordSection("check validation");
// // step 3: create index
// status = DBWrapper::DB()->CreateStructuredIndex(context_, collection_name_, field_names_);
// fiu_do_on("CreateIndexRequest.OnExecute.create_index_fail",
// status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
// if (!status.ok()) {
// return status;
// }
//
// engine::CollectionIndex index;
// index.engine_type_ = adapter_index_type;
// index.extra_params_ = extra_params_;
// status = DBWrapper::DB()->CreateIndex(context_, collection_name_, index);
// if (!status.ok()) {
// return status;
// }
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace server
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "server/delivery/request/BaseRequest.h"
namespace milvus {
namespace server {
class CreateHybridIndexRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::vector<std::string>& field_names, const milvus::json& extra_params);
protected:
CreateHybridIndexRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, const std::vector<std::string>& field_names,
const milvus::json& extra_params);
Status
OnExecute() override;
private:
const std::string collection_name_;
std::vector<std::string> field_names_;
const milvus::json& extra_params_;
};
} // namespace server
} // namespace milvus
......@@ -48,17 +48,32 @@ DescribeHybridCollectionRequest::OnExecute() {
TimeRecorderAuto rc(hdr);
try {
engine::meta::CollectionSchema collection_schema;
engine::meta::hybrid::FieldsSchema fields_schema;
collection_schema.collection_id_ = collection_name_;
auto status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema);
fiu_do_on("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
auto status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
if (!status.ok()) {
return status;
}
collection_schema_.collection_name_ = collection_name_;
collection_schema_.extra_params_ = collection->GetParams();
engine::meta::hybrid::FieldsSchema fields_info;
for (auto field_it = fields_schema.begin(); field_it != fields_schema.end(); field_it++) {
engine::meta::hybrid::FieldSchema schema;
auto field = field_it->first;
schema.field_name_ = field->GetName();
schema.field_type_ = (int)field->GetFtype();
schema.field_params_ = field->GetParams().dump();
auto field_elements = field_it->second;
for (const auto& element : field_elements) {
if (element->GetFtype() == (int)engine::FieldElementType::FET_INDEX) {
schema.index_name_ = element->GetName();
schema.index_param_ = element->GetParams().dump();
break;
}
}
}
for (const auto& schema : fields_schema.fields_schema_) {
for (const auto& schema : fields_info.fields_schema_) {
auto field_name = schema.field_name_;
collection_schema_.field_types_.insert(
std::make_pair(field_name, (engine::meta::hybrid::DataType)schema.field_type_));
......@@ -67,7 +82,6 @@ DescribeHybridCollectionRequest::OnExecute() {
milvus::json json_extra_param = milvus::json::parse(schema.field_params_);
collection_schema_.field_params_.insert(std::make_pair(field_name, json_extra_param));
}
collection_schema_.extra_params_["segment_size"] = collection_schema.index_file_size_ / engine::MB;
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
......
......@@ -31,65 +31,53 @@ namespace milvus {
namespace server {
HybridSearchRequest::HybridSearchRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, std::vector<std::string>& partition_list,
query::GeneralQueryPtr& general_query, query::QueryPtr& query_ptr,
milvus::json& json_params, std::vector<std::string>& field_names,
engine::QueryResult& result)
const query::QueryPtr& query_ptr, const milvus::json& json_params,
engine::QueryResultPtr& result)
: BaseRequest(context, BaseRequest::kHybridSearch),
collection_name_(collection_name),
partition_list_(partition_list),
general_query_(general_query),
query_ptr_(query_ptr),
json_params_(json_params),
field_names_(field_names),
result_(result) {
}
BaseRequestPtr
HybridSearchRequest::Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::vector<std::string>& partition_list, query::GeneralQueryPtr& general_query,
query::QueryPtr& query_ptr, milvus::json& json_params,
std::vector<std::string>& field_names, engine::QueryResult& result) {
return std::shared_ptr<BaseRequest>(new HybridSearchRequest(context, collection_name, partition_list, general_query,
query_ptr, json_params, field_names, result));
HybridSearchRequest::Create(const std::shared_ptr<milvus::server::Context>& context, const query::QueryPtr& query_ptr,
const milvus::json& json_params, engine::QueryResultPtr& result) {
return std::shared_ptr<BaseRequest>(new HybridSearchRequest(context, query_ptr, json_params, result));
}
Status
HybridSearchRequest::OnExecute() {
try {
fiu_do_on("HybridSearchRequest.OnExecute.throw_std_exception", throw std::exception());
std::string hdr = "SearchRequest(table=" + collection_name_;
std::string hdr = "SearchRequest(table=" + query_ptr_->collection_id;
TimeRecorder rc(hdr);
// step 1: check table name
auto status = ValidateCollectionName(collection_name_);
auto status = ValidateCollectionName(query_ptr_->collection_id);
if (!status.ok()) {
return status;
}
// step 2: check table existence
// only process root table, ignore partition table
engine::meta::CollectionSchema collection_schema;
engine::meta::hybrid::FieldsSchema fields_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(query_ptr_->collection_id, collection, fields_schema);
fiu_do_on("HybridSearchRequest.OnExecute.describe_table_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(query_ptr_->collection_id));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
int64_t dimension = collection->GetParams()[engine::DIMENSION].get<int64_t>();
// step 3: check partition tags
status = ValidatePartitionTags(partition_list_);
status = ValidatePartitionTags(query_ptr_->partitions);
fiu_do_on("HybridSearchRequest.OnExecute.invalid_partition_tags",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
......@@ -97,7 +85,14 @@ HybridSearchRequest::OnExecute() {
return status;
}
// step 3: check field names
// step 4: Get field info
std::unordered_map<std::string, engine::meta::hybrid::DataType> field_types;
for (auto& schema : fields_schema) {
field_types.insert(
std::make_pair(schema.first->GetName(), (engine::meta::hybrid::DataType)schema.first->GetFtype()));
}
// step 5: check field names
if (json_params_.contains("fields")) {
if (json_params_["fields"].is_array()) {
for (auto& name : json_params_["fields"]) {
......@@ -106,8 +101,8 @@ HybridSearchRequest::OnExecute() {
return status;
}
bool find_field_name = false;
for (const auto& schema : fields_schema.fields_schema_) {
if (name.get<std::string>() == schema.field_name_) {
for (const auto& schema : field_types) {
if (name.get<std::string>() == schema.first) {
find_field_name = true;
break;
}
......@@ -115,20 +110,13 @@ HybridSearchRequest::OnExecute() {
if (not find_field_name) {
return Status{SERVER_INVALID_FIELD_NAME, "Field: " + name.get<std::string>() + " not exist"};
}
field_names_.emplace_back(name.get<std::string>());
query_ptr_->field_names.emplace_back(name.get<std::string>());
}
}
}
std::unordered_map<std::string, engine::meta::hybrid::DataType> attr_type;
for (auto& field_schema : fields_schema.fields_schema_) {
attr_type.insert(
std::make_pair(field_schema.field_name_, (engine::meta::hybrid::DataType)field_schema.field_type_));
}
result_.row_num_ = 0;
status = DBWrapper::DB()->HybridQuery(context_, collection_name_, partition_list_, general_query_, query_ptr_,
field_names_, attr_type, result_);
result_->row_num_ = 0;
status = DBWrapper::SSDB()->Query(context_, query_ptr_, result_);
#ifdef ENABLE_CPU_PROFILING
ProfilerStop();
......@@ -138,13 +126,13 @@ HybridSearchRequest::OnExecute() {
if (!status.ok()) {
return status;
}
fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_.result_ids_.clear());
if (result_.result_ids_.empty()) {
fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_->result_ids_.clear());
if (result_->result_ids_.empty()) {
auto vector_query = query_ptr_->vectors.begin()->second;
if (!vector_query->query_vector.binary_data.empty()) {
result_.row_num_ = vector_query->query_vector.binary_data.size() * 8 / collection_schema.dimension_;
result_->row_num_ = vector_query->query_vector.binary_data.size() * 8 / dimension;
} else if (!vector_query->query_vector.float_data.empty()) {
result_.row_num_ = vector_query->query_vector.float_data.size() / collection_schema.dimension_;
result_->row_num_ = vector_query->query_vector.float_data.size() / dimension;
}
return Status::OK(); // empty table
}
......
......@@ -13,7 +13,7 @@
#include "server/delivery/request/BaseRequest.h"
#include <src/context/HybridSearchContext.h>
#include <map>
#include <memory>
#include <string>
#include <vector>
......@@ -24,27 +24,20 @@ namespace server {
class HybridSearchRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::vector<std::string>& partition_list, query::GeneralQueryPtr& general_query, query::QueryPtr& query_ptr,
milvus::json& json_params, std::vector<std::string>& field_names, engine::QueryResult& result);
Create(const std::shared_ptr<milvus::server::Context>& context, const query::QueryPtr& query_ptr,
const milvus::json& json_params, engine::QueryResultPtr& result);
protected:
HybridSearchRequest(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::vector<std::string>& partition_list, query::GeneralQueryPtr& general_query,
query::QueryPtr& query_ptr, milvus::json& json_params, std::vector<std::string>& field_names,
engine::QueryResult& result);
HybridSearchRequest(const std::shared_ptr<milvus::server::Context>& context, const query::QueryPtr& query_ptr,
const milvus::json& json_params, engine::QueryResultPtr& result);
Status
OnExecute() override;
private:
const std::string collection_name_;
std::vector<std::string> partition_list_;
milvus::query::GeneralQueryPtr general_query_;
milvus::query::QueryPtr query_ptr_;
milvus::json json_params_;
std::vector<std::string>& field_names_;
engine::QueryResult& result_;
engine::QueryResultPtr& result_;
};
} // namespace server
......
......@@ -22,6 +22,8 @@
#include "utils/TimeRecorder.h"
#include <memory>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace server {
......@@ -52,25 +54,21 @@ CompactRequest::OnExecute() {
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
rc.RecordSection("check validation");
// step 2: check collection existence
status = DBWrapper::DB()->Compact(context_, collection_name_, compact_threshold_);
status = DBWrapper::SSDB()->Compact(context_, collection_name_, compact_threshold_);
if (!status.ok()) {
return status;
}
......
......@@ -18,6 +18,8 @@
#include <fiu-local.h>
#include <memory>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace server {
......@@ -46,26 +48,22 @@ CountCollectionRequest::OnExecute() {
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
rc.RecordSection("check validation");
// step 2: get row count
uint64_t row_count = 0;
status = DBWrapper::DB()->GetCollectionRowCount(collection_name_, row_count);
status = DBWrapper::SSDB()->GetCollectionRowCount(collection_name_, row_count);
fiu_do_on("CountCollectionRequest.OnExecute.db_not_found", status = Status(DB_NOT_FOUND, ""));
fiu_do_on("CountCollectionRequest.OnExecute.status_error", status = Status(SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("CountCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/delivery/request/CreateCollectionRequest.h"
#include "db/Utils.h"
#include "server/DBWrapper.h"
#include "server/ValidationUtil.h"
#include "server/delivery/request/BaseRequest.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <fiu-local.h>
#include <memory>
#include <string>
namespace milvus {
namespace server {
CreateCollectionRequest::CreateCollectionRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, int64_t dimension,
int64_t index_file_size, int64_t metric_type)
: BaseRequest(context, BaseRequest::kCreateCollection),
collection_name_(collection_name),
dimension_(dimension),
index_file_size_(index_file_size),
metric_type_(metric_type) {
}
BaseRequestPtr
CreateCollectionRequest::Create(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, int64_t dimension, int64_t index_file_size,
int64_t metric_type) {
return std::shared_ptr<BaseRequest>(
new CreateCollectionRequest(context, collection_name, dimension, index_file_size, metric_type));
}
Status
CreateCollectionRequest::OnExecute() {
std::string hdr =
"CreateCollectionRequest(collection=" + collection_name_ + ", dimension=" + std::to_string(dimension_) + ")";
TimeRecorderAuto rc(hdr);
try {
// step 1: check arguments
auto status = ValidateCollectionName(collection_name_);
if (!status.ok()) {
return status;
}
status = ValidateTableDimension(dimension_, metric_type_);
if (!status.ok()) {
return status;
}
status = ValidateCollectionIndexFileSize(index_file_size_);
fiu_do_on("CreateCollectionRequest.OnExecute.invalid_index_file_size",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
status = ValidateCollectionIndexMetricType(metric_type_);
if (!status.ok()) {
return status;
}
rc.RecordSection("check validation");
// step 2: construct collection schema
engine::meta::CollectionSchema collection_info;
collection_info.collection_id_ = collection_name_;
collection_info.dimension_ = static_cast<uint16_t>(dimension_);
collection_info.index_file_size_ = index_file_size_;
collection_info.metric_type_ = metric_type_;
// some metric type only support binary vector, adapt the index type
if (engine::utils::IsBinaryMetricType(metric_type_)) {
if (collection_info.engine_type_ == static_cast<int32_t>(engine::EngineType::FAISS_IDMAP)) {
collection_info.engine_type_ = static_cast<int32_t>(engine::EngineType::FAISS_BIN_IDMAP);
} else if (collection_info.engine_type_ == static_cast<int32_t>(engine::EngineType::FAISS_IVFFLAT)) {
collection_info.engine_type_ = static_cast<int32_t>(engine::EngineType::FAISS_BIN_IVFFLAT);
}
}
// step 3: create collection
status = DBWrapper::DB()->CreateCollection(collection_info);
fiu_do_on("CreateCollectionRequest.OnExecute.db_already_exist", status = Status(milvus::DB_ALREADY_EXIST, ""));
fiu_do_on("CreateCollectionRequest.OnExecute.create_collection_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("CreateCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
if (!status.ok()) {
// collection could exist
if (status.code() == DB_ALREADY_EXIST) {
return Status(SERVER_INVALID_COLLECTION_NAME, status.message());
}
return status;
}
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace server
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "server/delivery/request/BaseRequest.h"
namespace milvus {
namespace server {
class CreateCollectionRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
int64_t dimension, int64_t index_file_size, int64_t metric_type);
protected:
CreateCollectionRequest(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
int64_t dimension, int64_t index_file_size, int64_t metric_type);
Status
OnExecute() override;
private:
const std::string collection_name_;
int64_t dimension_;
int64_t index_file_size_;
int64_t metric_type_;
};
} // namespace server
} // namespace milvus
......@@ -19,6 +19,8 @@
#include <fiu-local.h>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace server {
......@@ -64,10 +66,9 @@ CreateIndexRequest::OnExecute() {
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
engine::meta::hybrid::FieldsSchema fields_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
fiu_do_on("CreateIndexRequest.OnExecute.not_has_collection",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("CreateIndexRequest.OnExecute.throw_std.exception", throw std::exception());
......@@ -77,9 +78,16 @@ CreateIndexRequest::OnExecute() {
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
int64_t dimension;
for (auto field_it = fields_schema.begin(); field_it != fields_schema.end(); field_it++) {
auto type = field_it->first->GetFtype();
if (type == (int64_t)engine::meta::hybrid::DataType::VECTOR_FLOAT ||
type == (int64_t)engine::meta::hybrid::DataType::VECTOR_BINARY) {
auto params = field_it->first->GetParams();
dimension = params[engine::DIMENSION].get<int64_t>();
break;
}
}
......@@ -97,7 +105,7 @@ CreateIndexRequest::OnExecute() {
return status;
}
status = ValidateIndexParams(json_params_, collection_schema, index_type);
status = ValidateIndexParams(json_params_, dimension, index_type);
if (!status.ok()) {
return status;
}
......@@ -114,9 +122,6 @@ CreateIndexRequest::OnExecute() {
} else if (adapter_index_type == static_cast<int32_t>(engine::EngineType::FAISS_IVFFLAT)) {
adapter_index_type = static_cast<int32_t>(engine::EngineType::FAISS_BIN_IVFFLAT);
}
// else {
// return Status(SERVER_INVALID_INDEX_TYPE, "Invalid index type for collection metric type");
// }
}
rc.RecordSection("check validation");
......@@ -128,8 +133,9 @@ CreateIndexRequest::OnExecute() {
}
index.engine_type_ = adapter_index_type;
index.index_name_ = index_name_;
index.extra_params_ = json_params_;
status = DBWrapper::DB()->CreateIndex(context_, collection_name_, index);
status = DBWrapper::SSDB()->CreateIndex(context_, collection_name_, field_name_, index);
fiu_do_on("CreateIndexRequest.OnExecute.create_index_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
......
......@@ -19,6 +19,7 @@
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "server/DBWrapper.h"
......@@ -30,14 +31,14 @@ namespace milvus {
namespace server {
DeleteByIDRequest::DeleteByIDRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, const std::vector<int64_t>& vector_ids)
: BaseRequest(context, BaseRequest::kDeleteByID), collection_name_(collection_name), vector_ids_(vector_ids) {
const std::string& collection_name, const std::vector<int64_t>& entity_ids)
: BaseRequest(context, BaseRequest::kDeleteByID), collection_name_(collection_name), entity_ids_(entity_ids) {
}
BaseRequestPtr
DeleteByIDRequest::Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::vector<int64_t>& vector_ids) {
return std::shared_ptr<BaseRequest>(new DeleteByIDRequest(context, collection_name, vector_ids));
const std::vector<int64_t>& entity_ids) {
return std::shared_ptr<BaseRequest>(new DeleteByIDRequest(context, collection_name, entity_ids));
}
Status
......@@ -52,21 +53,16 @@ DeleteByIDRequest::OnExecute() {
}
// step 2: check collection existence
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
// Check collection's index type supports delete
#ifdef MILVUS_SUPPORT_SPTAG
if (collection_schema.engine_type_ == (int32_t)engine::EngineType::SPTAG_BKT ||
......@@ -80,7 +76,7 @@ DeleteByIDRequest::OnExecute() {
rc.RecordSection("check validation");
status = DBWrapper::DB()->DeleteEntities(collection_name_, vector_ids_);
status = DBWrapper::SSDB()->DeleteEntities(collection_name_, entity_ids_);
if (!status.ok()) {
return status;
}
......
......@@ -30,18 +30,18 @@ class DeleteByIDRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::vector<int64_t>& vector_ids);
const std::vector<int64_t>& entity_ids);
protected:
DeleteByIDRequest(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::vector<int64_t>& vector_ids);
const std::vector<int64_t>& entity_ids);
Status
OnExecute() override;
private:
const std::string collection_name_;
const std::vector<int64_t>& vector_ids_;
const std::vector<int64_t>& entity_ids_;
};
} // namespace server
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/delivery/request/DescribeCollectionRequest.h"
#include "server/DBWrapper.h"
#include "server/ValidationUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <fiu-local.h>
#include <memory>
namespace milvus {
namespace server {
DescribeCollectionRequest::DescribeCollectionRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, CollectionSchema& schema)
: BaseRequest(context, BaseRequest::kDescribeCollection), collection_name_(collection_name), schema_(schema) {
}
BaseRequestPtr
DescribeCollectionRequest::Create(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, CollectionSchema& schema) {
return std::shared_ptr<BaseRequest>(new DescribeCollectionRequest(context, collection_name, schema));
}
Status
DescribeCollectionRequest::OnExecute() {
std::string hdr = "DescribeCollectionRequest(collection=" + collection_name_ + ")";
TimeRecorderAuto rc(hdr);
try {
// step 1: check arguments
auto status = ValidateCollectionName(collection_name_);
if (!status.ok()) {
return status;
}
// step 2: get collection info
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
fiu_do_on("DescribeCollectionRequest.OnExecute.describe_collection_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("DescribeCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
schema_.collection_name_ = collection_schema.collection_id_;
schema_.dimension_ = static_cast<int64_t>(collection_schema.dimension_);
schema_.index_file_size_ = collection_schema.index_file_size_;
schema_.metric_type_ = collection_schema.metric_type_;
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace server
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "server/delivery/request/BaseRequest.h"
namespace milvus {
namespace server {
class DescribeCollectionRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
CollectionSchema& schema);
protected:
DescribeCollectionRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, CollectionSchema& schema);
Status
OnExecute() override;
private:
const std::string collection_name_;
CollectionSchema& schema_;
};
} // namespace server
} // namespace milvus
......@@ -17,6 +17,8 @@
#include <fiu-local.h>
#include <memory>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace server {
......@@ -46,24 +48,33 @@ DescribeIndexRequest::OnExecute() {
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
fiu_do_on("DropIndexRequest.OnExecute.collection_not_exist",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
// TODO(yukun): Currently field name is vector field name
std::string field_name;
for (auto field_it = fields_schema.begin(); field_it != fields_schema.end(); field_it++) {
auto type = field_it->first->GetFtype();
if (type == (int64_t)engine::meta::hybrid::DataType::VECTOR_FLOAT ||
type == (int64_t)engine::meta::hybrid::DataType::VECTOR_BINARY) {
field_name = field_it->first->GetName();
break;
}
}
// step 2: check collection existence
engine::CollectionIndex index;
status = DBWrapper::DB()->DescribeIndex(collection_name_, index);
status = DBWrapper::SSDB()->DescribeIndex(collection_name_, field_name, index);
if (!status.ok()) {
return status;
}
......
......@@ -17,6 +17,7 @@
#include <fiu-local.h>
#include <memory>
#include <unordered_map>
#include <vector>
namespace milvus {
......@@ -47,9 +48,10 @@ DropCollectionRequest::OnExecute() {
// step 2: check collection existence
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
fiu_do_on("DropCollectionRequest.OnExecute.db_not_found", status = Status(milvus::DB_NOT_FOUND, ""));
fiu_do_on("DropCollectionRequest.OnExecute.describe_collection_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
......@@ -60,16 +62,12 @@ DropCollectionRequest::OnExecute() {
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
rc.RecordSection("check validation");
// step 3: Drop collection
status = DBWrapper::DB()->DropCollection(collection_name_);
status = DBWrapper::SSDB()->DropCollection(collection_name_);
fiu_do_on("DropCollectionRequest.OnExecute.drop_collection_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
......@@ -77,7 +75,7 @@ DropCollectionRequest::OnExecute() {
}
// step 4: flush to trigger CleanUpFilesWithTTL
status = DBWrapper::DB()->Flush();
status = DBWrapper::SSDB()->Flush();
rc.ElapseFromBegin("total cost");
} catch (std::exception& ex) {
......
......@@ -17,6 +17,8 @@
#include <fiu-local.h>
#include <memory>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace server {
......@@ -50,12 +52,9 @@ DropIndexRequest::OnExecute() {
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
engine::meta::hybrid::FieldsSchema fields_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema);
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
fiu_do_on("DropIndexRequest.OnExecute.collection_not_exist",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
......@@ -64,16 +63,12 @@ DropIndexRequest::OnExecute() {
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
rc.RecordSection("check validation");
// step 2: drop index
status = DBWrapper::DB()->DropIndex(collection_name_);
status = DBWrapper::SSDB()->DropIndex(collection_name_, field_name_);
fiu_do_on("DropIndexRequest.OnExecute.drop_index_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
......
......@@ -18,6 +18,7 @@
#include "server/delivery/request/FlushRequest.h"
#include <memory>
#include <unordered_map>
#include "server/DBWrapper.h"
#include "utils/Log.h"
......@@ -58,22 +59,18 @@ FlushRequest::OnExecute() {
// flush specified collections
for (auto& name : collection_names_) {
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = name;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(name, collection, fields_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(name));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(name));
}
}
status = DBWrapper::DB()->Flush(name);
status = DBWrapper::SSDB()->Flush(name);
if (!status.ok()) {
return status;
}
......
......@@ -47,7 +47,7 @@ HasCollectionRequest::OnExecute() {
}
// step 2: check collection existence
status = DBWrapper::DB()->HasNativeCollection(collection_name_, has_collection_);
status = DBWrapper::SSDB()->HasCollection(collection_name_, has_collection_);
fiu_do_on("HasCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
return status;
......
......@@ -17,6 +17,8 @@
#include <fiu-local.h>
#include <memory>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace server {
......@@ -45,24 +47,26 @@ PreloadCollectionRequest::OnExecute() {
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
// TODO(yukun): if PreloadCollection interface needs to add field names as params
std::vector<std::string> field_names;
for (auto field_it : fields_schema) {
field_names.emplace_back(field_it.first->GetName());
}
// step 2: force load collection data into cache
// load each segment and insert into cache even cache capacity is not enough
status = DBWrapper::DB()->PreloadCollection(context_, collection_name_, true);
status = DBWrapper::SSDB()->LoadCollection(context_, collection_name_, field_names, true);
fiu_do_on("PreloadCollectionRequest.OnExecute.preload_collection_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("PreloadCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
......
......@@ -22,6 +22,7 @@
#include "utils/TimeRecorder.h"
#include <memory>
#include <unordered_map>
#include <vector>
namespace milvus {
......@@ -53,22 +54,19 @@ ShowCollectionInfoRequest::OnExecute() {
// step 2: check collection existence
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
engine::snapshot::CollectionPtr collection;
std::unordered_map<engine::snapshot::FieldPtr, std::vector<engine::snapshot::FieldElementPtr>> fields_schema;
status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
// step 3: get partitions
// TODO(yukun): SSDBImpl::GetCollectionInfo has not implemented yet
status = DBWrapper::DB()->GetCollectionInfo(collection_name_, collection_info_);
if (!status.ok()) {
return status;
......
......@@ -38,7 +38,7 @@ ShowCollectionsRequest::OnExecute() {
TimeRecorderAuto rc("ShowCollectionsRequest");
std::vector<std::string> names;
auto status = DBWrapper::DB()->AllCollections(names);
auto status = DBWrapper::SSDB()->AllCollections(names);
fiu_do_on("ShowCollectionsRequest.OnExecute.show_collections_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
......
......@@ -1314,22 +1314,21 @@ GrpcRequestHandler::SearchPB(::grpc::ServerContext* context, const ::milvus::grp
}
}
engine::QueryResult result;
engine::QueryResultPtr result = std::make_shared<engine::QueryResult>();
std::vector<std::string> field_names;
status = request_handler_.HybridSearch(GetContext(context), request->collection_name(), partition_list,
general_query, query_ptr, json_params, field_names, result);
status = request_handler_.HybridSearch(GetContext(context), query_ptr, json_params, result);
// step 6: construct and return result
response->set_row_num(result.row_num_);
response->set_row_num(result->row_num_);
auto grpc_entity = response->mutable_entities();
ConstructEntityResults(result.attrs_, result.vectors_, field_names, grpc_entity);
grpc_entity->mutable_ids()->Resize(static_cast<int>(result.result_ids_.size()), 0);
memcpy(grpc_entity->mutable_ids()->mutable_data(), result.result_ids_.data(),
result.result_ids_.size() * sizeof(int64_t));
ConstructEntityResults(result->attrs_, result->vectors_, field_names, grpc_entity);
grpc_entity->mutable_ids()->Resize(static_cast<int>(result->result_ids_.size()), 0);
memcpy(grpc_entity->mutable_ids()->mutable_data(), result->result_ids_.data(),
result->result_ids_.size() * sizeof(int64_t));
response->mutable_distances()->Resize(static_cast<int>(result.result_distances_.size()), 0.0);
memcpy(response->mutable_distances()->mutable_data(), result.result_distances_.data(),
result.result_distances_.size() * sizeof(float));
response->mutable_distances()->Resize(static_cast<int>(result->result_distances_.size()), 0.0);
memcpy(response->mutable_distances()->mutable_data(), result->result_distances_.data(),
result->result_distances_.size() * sizeof(float));
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__);
SET_RESPONSE(response->mutable_status(), status, context);
......@@ -1631,6 +1630,8 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc:
query::BooleanQueryPtr boolean_query = std::make_shared<query::BooleanQuery>();
query::QueryPtr query_ptr = std::make_shared<query::Query>();
query_ptr->collection_id = request->collection_name();
std::unordered_map<std::string, query::VectorQueryPtr> vectors;
status = DeserializeJsonToBoolQuery(request->vector_param(), request->dsl(), boolean_query, vectors);
......@@ -1663,6 +1664,8 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc:
partition_list[i] = request->partition_tag_array(i);
}
query_ptr->partitions = partition_list;
milvus::json json_params;
for (int i = 0; i < request->extra_params_size(); i++) {
const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i);
......@@ -1671,22 +1674,20 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc:
}
}
engine::QueryResult result;
std::vector<std::string> field_names;
status = request_handler_.HybridSearch(GetContext(context), request->collection_name(), partition_list,
general_query, query_ptr, json_params, field_names, result);
engine::QueryResultPtr result = std::make_shared<engine::QueryResult>();
status = request_handler_.HybridSearch(GetContext(context), query_ptr, json_params, result);
// step 6: construct and return result
response->set_row_num(result.row_num_);
ConstructEntityResults(result.attrs_, result.vectors_, field_names, grpc_entity);
response->set_row_num(result->row_num_);
ConstructEntityResults(result->attrs_, result->vectors_, query_ptr->field_names, grpc_entity);
grpc_entity->mutable_ids()->Resize(static_cast<int>(result.result_ids_.size()), 0);
memcpy(grpc_entity->mutable_ids()->mutable_data(), result.result_ids_.data(),
result.result_ids_.size() * sizeof(int64_t));
grpc_entity->mutable_ids()->Resize(static_cast<int>(result->result_ids_.size()), 0);
memcpy(grpc_entity->mutable_ids()->mutable_data(), result->result_ids_.data(),
result->result_ids_.size() * sizeof(int64_t));
response->mutable_distances()->Resize(static_cast<int>(result.result_distances_.size()), 0.0);
memcpy(response->mutable_distances()->mutable_data(), result.result_distances_.data(),
result.result_distances_.size() * sizeof(float));
response->mutable_distances()->Resize(static_cast<int>(result->result_distances_.size()), 0.0);
memcpy(response->mutable_distances()->mutable_data(), result->result_distances_.data(),
result->result_distances_.size() * sizeof(float));
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__);
SET_RESPONSE(response->mutable_status(), status, context);
......
......@@ -90,7 +90,8 @@ WebRequestHandler::AddStatusToJson(nlohmann::json& json, int64_t code, const std
Status
WebRequestHandler::IsBinaryCollection(const std::string& collection_name, bool& bin) {
CollectionSchema schema;
auto status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema);
auto status = Status::OK();
// status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema);
if (status.ok()) {
auto metric = engine::MetricType(schema.metric_type_);
bin = engine::MetricType::HAMMING == metric || engine::MetricType::JACCARD == metric ||
......@@ -136,7 +137,8 @@ WebRequestHandler::CopyRecordsFromJson(const nlohmann::json& json, engine::Vecto
Status
WebRequestHandler::GetCollectionMetaInfo(const std::string& collection_name, nlohmann::json& json_out) {
CollectionSchema schema;
auto status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema);
auto status = Status::OK();
// status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema);
if (!status.ok()) {
return status;
}
......@@ -835,37 +837,35 @@ WebRequestHandler::HybridSearch(const std::string& collection_name, const nlohma
query_ptr_->root = general_query->bin;
engine::QueryResult result;
std::vector<std::string> field_names;
status = request_handler_.HybridSearch(context_ptr_, collection_name, partition_tags, general_query, query_ptr_,
extra_params, field_names, result);
engine::QueryResultPtr result = std::make_shared<engine::QueryResult>();
status = request_handler_.HybridSearch(context_ptr_, query_ptr_, extra_params, result);
if (!status.ok()) {
return status;
}
nlohmann::json result_json;
result_json["num"] = result.row_num_;
if (result.row_num_ == 0) {
result_json["num"] = result->row_num_;
if (result->row_num_ == 0) {
result_json["result"] = std::vector<int64_t>();
result_str = result_json.dump();
return Status::OK();
}
auto step = result.result_ids_.size() / result.row_num_;
auto step = result->result_ids_.size() / result->row_num_;
nlohmann::json search_result_json;
for (int64_t i = 0; i < result.row_num_; i++) {
for (int64_t i = 0; i < result->row_num_; i++) {
nlohmann::json raw_result_json;
for (size_t j = 0; j < step; j++) {
nlohmann::json one_result_json;
one_result_json["id"] = std::to_string(result.result_ids_.at(i * step + j));
one_result_json["distance"] = std::to_string(result.result_distances_.at(i * step + j));
one_result_json["id"] = std::to_string(result->result_ids_.at(i * step + j));
one_result_json["distance"] = std::to_string(result->result_distances_.at(i * step + j));
raw_result_json.emplace_back(one_result_json);
}
search_result_json.emplace_back(raw_result_json);
}
nlohmann::json attr_json;
ConvertRowToColumnJson(result.attrs_, field_names, result.row_num_, attr_json);
ConvertRowToColumnJson(result->attrs_, query_ptr_->field_names, result->row_num_, attr_json);
result_json["Entity"] = attr_json;
result_json["result"] = search_result_json;
result_str = result_json.dump();
......@@ -1261,10 +1261,11 @@ WebRequestHandler::CreateCollection(const CollectionRequestDto::ObjectWrapper& c
RETURN_STATUS_DTO(ILLEGAL_METRIC_TYPE, "metric_type is illegal")
}
auto status = request_handler_.CreateCollection(
context_ptr_, collection_schema->collection_name->std_str(), collection_schema->dimension,
collection_schema->index_file_size,
static_cast<int64_t>(MetricNameMap.at(collection_schema->metric_type->std_str())));
auto status = Status::OK();
// auto status = request_handler_.CreateCollection(
// context_ptr_, collection_schema->collection_name->std_str(), collection_schema->dimension,
// collection_schema->index_file_size,
// static_cast<int64_t>(MetricNameMap.at(collection_schema->metric_type->std_str())));
ASSIGN_RETURN_STATUS_DTO(status)
}
......
......@@ -31,6 +31,7 @@ add_executable(test_ssdb
${codecs_snapshot_files}
${config_files}
${config_handler_files}
${db_attr_files}
${db_main_files}
${db_attr_files}
${db_engine_files}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册