未验证 提交 36a1dea3 编写于 作者: G groot 提交者: GitHub

describe index request (#3085)

* describe index
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* build error
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 e426b18d
......@@ -86,10 +86,10 @@ class DB {
const CollectionIndex& index) = 0;
virtual Status
DropIndex(const std::string& collection_name, const std::string& field_name) = 0;
DropIndex(const std::string& collection_name, const std::string& field_name = "") = 0;
virtual Status
DropIndex(const std::string& collection_id) = 0;
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) = 0;
virtual Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) = 0;
......
......@@ -385,6 +385,8 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
const std::string& field_name, const CollectionIndex& index) {
CHECK_INITIALIZED;
LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;
// step 1: wait merge file thread finished to avoid duplicate data bug
auto status = Flush();
WaitMergeFileFinish(); // let merge file thread finish
......@@ -399,7 +401,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
}
// step 3: drop old index
DropIndex(collection_name);
DropIndex(collection_name, field_name);
WaitMergeFileFinish(); // let merge file thread finish since DropIndex start a merge task
// step 4: create field element for index
......@@ -438,35 +440,24 @@ Status
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
CHECK_INITIALIZED;
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name;
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));
std::set<std::string> merge_collection_names = {collection_name};
StartMergeTask(merge_collection_names, true);
return Status::OK();
}
Status
DBImpl::DropIndex(const std::string& collection_name) {
DBImpl::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
CHECK_INITIALIZED;
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name;
std::vector<std::string> field_names;
{
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
field_names = ss->GetFieldNames();
}
LOG_ENGINE_DEBUG_ << "Describe index for collection: " << collection_name << " field: " << field_name;
snapshot::OperationContext context;
for (auto& field_name : field_names) {
STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));
}
STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, index));
std::set<std::string> merge_collection_names = {collection_name};
StartMergeTask(merge_collection_names, true);
return Status::OK();
}
......
......@@ -81,10 +81,10 @@ class DBImpl : public DB, public ConfigObserver {
const std::string& field_name, const CollectionIndex& index) override;
Status
DropIndex(const std::string& collection_name, const std::string& field_name) override;
DropIndex(const std::string& collection_name, const std::string& field_name = "") override;
Status
DropIndex(const std::string& collection_name) override;
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
......
......@@ -120,21 +120,31 @@ GetSnapshotIndex(const std::string& collection_name, const std::string& field_na
Status
DeleteSnapshotIndex(const std::string& collection_name, const std::string& field_name) {
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
// drop for all fields or drop for one field?
std::vector<std::string> field_names;
if (field_name.empty()) {
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
field_names = ss->GetFieldNames();
} else {
field_names.push_back(field_name);
}
snapshot::OperationContext context;
std::vector<snapshot::FieldElementPtr> elements = ss->GetFieldElementsByField(field_name);
for (auto& element : elements) {
if (element->GetFtype() == engine::FieldElementType::FET_INDEX ||
element->GetFtype() == engine::FieldElementType::FET_COMPRESS_SQ8) {
context.stale_field_elements.push_back(element);
for (auto& name : field_names) {
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
std::vector<snapshot::FieldElementPtr> elements = ss->GetFieldElementsByField(name);
for (auto& element : elements) {
if (element->GetFtype() == engine::FieldElementType::FET_INDEX ||
element->GetFtype() == engine::FieldElementType::FET_COMPRESS_SQ8) {
snapshot::OperationContext context;
context.stale_field_element = element;
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
STATUS_CHECK(op->Push());
}
}
}
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
STATUS_CHECK(op->Push());
return Status::OK();
}
......
......@@ -25,8 +25,9 @@ const char* DEFAULT_INDEX_COMPRESS_NAME = "_compress";
const char* DEFAULT_STRUCTURED_INDEX_NAME = "SORTED"; // this string should be defined in knowhere::IndexEnum
const char* PARAM_DIMENSION = knowhere::meta::DIM;
const char* PARAM_INDEX_TYPE = "index_type";
const char* PARAM_INDEX_METRIC_TYPE = knowhere::Metric::TYPE;
const char* PARAM_INDEX_EXTRA_PARAMS = "extra_params";
const char* PARAM_INDEX_EXTRA_PARAMS = "params";
const char* PARAM_SEGMENT_ROW_COUNT = "segment_row_count";
} // namespace engine
......
......@@ -124,6 +124,7 @@ extern const char* DEFAULT_INDEX_COMPRESS_NAME;
extern const char* DEFAULT_STRUCTURED_INDEX_NAME;
extern const char* PARAM_DIMENSION;
extern const char* PARAM_INDEX_TYPE;
extern const char* PARAM_INDEX_METRIC_TYPE;
extern const char* PARAM_INDEX_EXTRA_PARAMS;
extern const char* PARAM_SEGMENT_ROW_COUNT;
......
......@@ -22,6 +22,7 @@
#include "server/delivery/request/CreateIndexReq.h"
#include "server/delivery/request/CreatePartitionReq.h"
#include "server/delivery/request/DeleteEntityByIDReq.h"
#include "server/delivery/request/DescribeIndexReq.h"
#include "server/delivery/request/DropCollectionReq.h"
#include "server/delivery/request/DropIndexReq.h"
#include "server/delivery/request/DropPartitionReq.h"
......@@ -134,6 +135,14 @@ ReqHandler::CreateIndex(const std::shared_ptr<Context>& context, const std::stri
return req_ptr->status();
}
Status
ReqHandler::DescribeIndex(const std::shared_ptr<Context>& context, const std::string& collection_name,
const std::string& field_name, std::string& index_name, milvus::json& json_params) {
BaseReqPtr req_ptr = DescribeIndexReq::Create(context, collection_name, field_name, index_name, json_params);
ReqScheduler::ExecReq(req_ptr);
return req_ptr->status();
}
Status
ReqHandler::DropIndex(const std::shared_ptr<Context>& context, const std::string& collection_name,
const std::string& field_name, const std::string& index_name) {
......
......@@ -73,6 +73,10 @@ class ReqHandler {
CreateIndex(const std::shared_ptr<Context>& context, const std::string& collection_name,
const std::string& field_name, const std::string& index_name, const milvus::json& json_params);
Status
DescribeIndex(const std::shared_ptr<Context>& context, const std::string& collection_name,
const std::string& field_name, std::string& index_name, milvus::json& json_params);
Status
DropIndex(const std::shared_ptr<Context>& context, const std::string& collection_name,
const std::string& field_name, const std::string& index_name);
......
......@@ -87,8 +87,8 @@ CreateIndexReq::OnExecute() {
// validate index type
std::string index_type = 0;
if (json_params_.contains("index_type")) {
index_type = json_params_["index_type"].get<std::string>();
if (json_params_.contains(engine::PARAM_INDEX_TYPE)) {
index_type = json_params_[engine::PARAM_INDEX_TYPE].get<std::string>();
}
status = ValidateIndexType(index_type);
if (!status.ok()) {
......@@ -97,8 +97,8 @@ CreateIndexReq::OnExecute() {
// validate metric type
std::string metric_type = 0;
if (json_params_.contains("metric_type")) {
metric_type = json_params_["metric_type"].get<std::string>();
if (json_params_.contains(engine::PARAM_INDEX_METRIC_TYPE)) {
metric_type = json_params_[engine::PARAM_INDEX_METRIC_TYPE].get<std::string>();
}
status = ValidateIndexMetricType(metric_type);
if (!status.ok()) {
......@@ -113,9 +113,11 @@ CreateIndexReq::OnExecute() {
rc.RecordSection("check validation");
index.index_name_ = index_name_;
index.index_name_ = index_type;
index.metric_name_ = metric_type;
index.extra_params_ = json_params_;
if (json_params_.contains(engine::PARAM_INDEX_EXTRA_PARAMS)) {
index.extra_params_ = json_params_[engine::PARAM_INDEX_EXTRA_PARAMS];
}
} else {
index.index_name_ = index_name_;
}
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/delivery/request/DescribeIndexReq.h"
#include "db/SnapshotUtils.h"
#include "db/Utils.h"
#include "server/DBWrapper.h"
#include "server/ValidationUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <fiu-local.h>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
namespace milvus {
namespace server {
DescribeIndexReq::DescribeIndexReq(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, const std::string& field_name,
std::string& index_name, milvus::json& json_params)
: BaseReq(context, ReqType::kDescribeIndex),
collection_name_(collection_name),
field_name_(field_name),
index_name_(index_name),
json_params_(json_params) {
}
BaseReqPtr
DescribeIndexReq::Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::string& field_name, std::string& index_name, milvus::json& json_params) {
return std::shared_ptr<BaseReq>(
new DescribeIndexReq(context, collection_name, field_name, index_name, json_params));
}
Status
DescribeIndexReq::OnExecute() {
try {
std::string hdr = "DescribeIndexReq(collection=" + collection_name_ + ")";
TimeRecorderAuto rc(hdr);
// step 1: check arguments
auto status = ValidateCollectionName(collection_name_);
if (!status.ok()) {
return status;
}
status = ValidateFieldName(field_name_);
if (!status.ok()) {
return status;
}
// only process root collection, ignore partition collection
engine::CollectionIndex index;
status = DBWrapper::DB()->DescribeIndex(collection_name_, field_name_, index);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, "Collection not exist: " + collection_name_);
} else {
return status;
}
}
json_params_[engine::PARAM_INDEX_TYPE] = index.index_name_;
json_params_[engine::PARAM_INDEX_METRIC_TYPE] = index.metric_name_;
json_params_[engine::PARAM_INDEX_EXTRA_PARAMS] = index.extra_params_;
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace server
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "server/delivery/request/BaseReq.h"
namespace milvus {
namespace server {
class DescribeIndexReq : public BaseReq {
public:
static BaseReqPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::string& field_name, std::string& index_name, milvus::json& json_params);
protected:
DescribeIndexReq(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::string& field_name, std::string& index_name, milvus::json& json_params);
Status
OnExecute() override;
private:
const std::string collection_name_;
const std::string field_name_;
std::string& index_name_;
milvus::json& json_params_;
};
} // namespace server
} // namespace milvus
......@@ -107,6 +107,7 @@ enum class ReqType {
/* index operations */
kCreateIndex = 300,
kDropIndex,
kDescribeIndex,
/* data operations */
kInsert = 400,
......
......@@ -821,6 +821,43 @@ GrpcRequestHandler::CreateIndex(::grpc::ServerContext* context, const ::milvus::
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::IndexParam* response) {
CHECK_NULLPTR_RETURN(request)
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
std::string index_name;
milvus::json index_params;
Status status = req_handler_.DescribeIndex(GetContext(context), request->collection_name(), request->field_name(),
index_name, index_params);
response->set_collection_name(request->collection_name());
response->set_field_name(request->field_name());
::milvus::grpc::KeyValuePair* kv = response->add_extra_params();
kv->set_key(EXTRA_PARAM_KEY);
kv->set_value(index_params.dump());
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response->mutable_status(), status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::Status* response) {
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
Status status = req_handler_.DropIndex(GetContext(context), request->collection_name(), request->field_name(),
request->index_name());
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::GetEntityByID(::grpc::ServerContext* context, const ::milvus::grpc::EntityIdentity* request,
::milvus::grpc::Entities* response) {
......@@ -1137,27 +1174,6 @@ GrpcRequestHandler::PreloadCollection(::grpc::ServerContext* context, const ::mi
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::IndexParam* response) {
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::Status* response) {
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
Status status = req_handler_.DropIndex(GetContext(context), request->collection_name(), request->field_name(),
request->index_name());
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::CreatePartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request,
::milvus::grpc::Status* response) {
......
......@@ -28,13 +28,15 @@
using SegmentVisitor = milvus::engine::SegmentVisitor;
namespace {
const char* VECTOR_FIELD_NAME = "vector";
milvus::Status
CreateCollection(std::shared_ptr<DBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<Field>("vector", 0,
auto vector_field = std::make_shared<Field>(VECTOR_FIELD_NAME, 0,
milvus::engine::DataType::VECTOR_FLOAT);
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
milvus::engine::FieldElementType::FET_INDEX);
......@@ -57,7 +59,7 @@ CreateCollection2(std::shared_ptr<DBImpl> db, const std::string& collection_name
milvus::json params;
params[milvus::knowhere::meta::DIM] = COLLECTION_DIM;
auto vector_field = std::make_shared<Field>("vector", 0, milvus::engine::DataType::VECTOR_FLOAT, params);
auto vector_field = std::make_shared<Field>(VECTOR_FIELD_NAME, 0, milvus::engine::DataType::VECTOR_FLOAT, params);
context.fields_schema[vector_field] = {};
std::unordered_map<std::string, milvus::engine::DataType> attr_type = {
......@@ -93,7 +95,7 @@ BuildEntities(uint64_t n, uint64_t batch_index, milvus::engine::DataChunkPtr& da
vectors.id_array_.push_back(n * batch_index + i);
}
milvus::engine::FIXED_FIELD_DATA& raw = data_chunk->fixed_fields_["vector"];
milvus::engine::FIXED_FIELD_DATA& raw = data_chunk->fixed_fields_[VECTOR_FIELD_NAME];
raw.resize(vectors.float_data_.size() * sizeof(float));
memcpy(raw.data(), vectors.float_data_.data(), vectors.float_data_.size() * sizeof(float));
......@@ -500,8 +502,15 @@ TEST_F(DBTest, IndexTest) {
index.index_name_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
index.metric_name_ = milvus::knowhere::Metric::L2;
index.extra_params_["nlist"] = 2048;
status = db_->CreateIndex(dummy_context_, collection_name, "vector", index);
status = db_->CreateIndex(dummy_context_, collection_name, VECTOR_FIELD_NAME, index);
ASSERT_TRUE(status.ok());
milvus::engine::CollectionIndex index_get;
status = db_->DescribeIndex(collection_name, VECTOR_FIELD_NAME, index_get);
ASSERT_TRUE(status.ok());
ASSERT_EQ(index.index_name_, index_get.index_name_);
ASSERT_EQ(index.metric_name_, index_get.metric_name_);
ASSERT_EQ(index.extra_params_, index_get.extra_params_);
}
{
......@@ -513,6 +522,29 @@ TEST_F(DBTest, IndexTest) {
ASSERT_TRUE(status.ok());
status = db_->CreateIndex(dummy_context_, collection_name, "field_2", index);
ASSERT_TRUE(status.ok());
milvus::engine::CollectionIndex index_get;
status = db_->DescribeIndex(collection_name, "field_0", index_get);
ASSERT_TRUE(status.ok());
ASSERT_EQ(index.index_name_, index_get.index_name_);
}
{
status = db_->DropIndex(collection_name, VECTOR_FIELD_NAME);
ASSERT_TRUE(status.ok());
milvus::engine::CollectionIndex index_get;
status = db_->DescribeIndex(collection_name, VECTOR_FIELD_NAME, index_get);
ASSERT_TRUE(index_get.index_name_.empty());
}
{
status = db_->DropIndex(collection_name, "field_0");
ASSERT_TRUE(status.ok());
milvus::engine::CollectionIndex index_get;
status = db_->DescribeIndex(collection_name, "field_0", index_get);
ASSERT_TRUE(index_get.index_name_.empty());
}
}
......@@ -543,7 +575,7 @@ TEST_F(DBTest, StatsTest) {
index.index_name_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
index.metric_name_ = milvus::knowhere::Metric::L2;
index.extra_params_["nlist"] = 2048;
status = db_->CreateIndex(dummy_context_, collection_name, "vector", index);
status = db_->CreateIndex(dummy_context_, collection_name, VECTOR_FIELD_NAME, index);
ASSERT_TRUE(status.ok());
}
......@@ -563,6 +595,6 @@ TEST_F(DBTest, StatsTest) {
int64_t row_count = json_stats[milvus::engine::JSON_ROW_COUNT];
ASSERT_EQ(row_count, entity_count * 2);
std::string ss = json_stats.dump();
std::cout << ss << std::endl;
// std::string ss = json_stats.dump();
// std::cout << ss << std::endl;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册