未验证 提交 26502bcb 编写于 作者: C Cai Yudong 提交者: GitHub

grpc partition related APIs use snapshot (#2982)

* opt interface
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update partition grpc APIs
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 67809c6e
......@@ -71,7 +71,6 @@ endif ()
set_milvus_definition(MILVUS_WITH_PROMETHEUS "MILVUS_WITH_PROMETHEUS")
set_milvus_definition(ENABLE_CPU_PROFILING "ENABLE_CPU_PROFILING")
set_milvus_definition(MILVUS_WITH_FIU "FIU_ENABLE")
set_milvus_definition(MILVUS_USE_SNAPSHOT "MILVUS_USE_SNAPSHOT")
config_summary()
......
......@@ -10,12 +10,8 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/DBFactory.h"
#ifdef MILVUS_USE_SNAPSHOT
#include "SSDBImpl.h"
#else
#include "DBImpl.h"
#include "SSDBImpl.h"
#endif
#include "meta/MetaFactory.h"
#include "meta/MySQLMetaImpl.h"
#include "meta/SqliteMetaImpl.h"
......
......@@ -22,6 +22,7 @@
#include <functional>
#include <iostream>
#include <limits>
#include <map>
#include <mutex>
#include <queue>
#include <set>
......
......@@ -13,7 +13,6 @@
#include <atomic>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <set>
......
......@@ -11,7 +11,6 @@
#pragma once
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
......@@ -54,8 +53,9 @@ class SSDB {
DropCollection(const std::string& name) = 0;
virtual Status
DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection,
std::map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema) = 0;
DescribeCollection(
const std::string& collection_name, snapshot::CollectionPtr& collection,
std::unordered_map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema) = 0;
virtual Status
HasCollection(const std::string& collection_name, bool& has_or_not) = 0;
......
......@@ -234,8 +234,9 @@ SSDBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
}
Status
SSDBImpl::DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection,
std::map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema) {
SSDBImpl::DescribeCollection(
const std::string& collection_name, snapshot::CollectionPtr& collection,
std::unordered_map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema) {
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
......
......@@ -13,7 +13,6 @@
#include <atomic>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <set>
......@@ -50,7 +49,7 @@ class SSDBImpl : public SSDB {
Status
DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection,
std::map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema) override;
std::unordered_map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema);
Status
HasCollection(const std::string& collection_name, bool& has_or_not) override;
......
......@@ -246,7 +246,7 @@ RequestHandler::HasPartition(const std::shared_ptr<Context>& context, const std:
Status
RequestHandler::ShowPartitions(const std::shared_ptr<Context>& context, const std::string& collection_name,
std::vector<PartitionParam>& partitions) {
std::vector<std::string>& partitions) {
BaseRequestPtr request_ptr = ShowPartitionsRequest::Create(context, collection_name, partitions);
RequestScheduler::ExecRequest(request_ptr);
......
......@@ -110,7 +110,7 @@ class RequestHandler {
Status
ShowPartitions(const std::shared_ptr<Context>& context, const std::string& collection_name,
std::vector<PartitionParam>& partitions);
std::vector<std::string>& partitions);
Status
DropPartition(const std::shared_ptr<Context>& context, const std::string& collection_name, const std::string& tag);
......
......@@ -102,18 +102,6 @@ struct IndexParam {
}
};
struct PartitionParam {
std::string collection_name_;
std::string tag_;
PartitionParam() = default;
PartitionParam(const std::string& collection_name, const std::string& tag) {
collection_name_ = collection_name;
tag_ = tag;
}
};
class Context;
class BaseRequest {
......
......@@ -43,69 +43,44 @@ CreatePartitionRequest::OnExecute() {
try {
// step 1: check arguments
auto status = ValidateCollectionName(collection_name_);
fiu_do_on("CreatePartitionRequest.OnExecute.invalid_collection_name",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
if (tag_ == milvus::engine::DEFAULT_PARTITON_TAG) {
return Status(SERVER_INVALID_PARTITION_TAG, "'_default' is built-in partition tag");
}
status = ValidatePartitionTags({tag_});
fiu_do_on("CreatePartitionRequest.OnExecute.invalid_partition_name",
auto status = ValidatePartitionTags({tag_});
fiu_do_on("CreatePartitionRequest.OnExecute.invalid_partition_tags",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
fiu_do_on("CreatePartitionRequest.OnExecute.invalid_partition_tags",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
bool exist = false;
status = DBWrapper::DB()->HasCollection(collection_name_, exist);
if (!exist) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
}
// check partition total count
std::vector<engine::meta::CollectionSchema> schema_array;
status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array);
if (schema_array.size() >= MAX_PARTITION_LIMIT) {
return Status(SERVER_UNSUPPORTED_ERROR, "The number of partitions exceeds the upper limit(4096)");
std::vector<std::string> partition_names;
status = DBWrapper::SSDB()->ShowPartitions(collection_name_, partition_names);
if (partition_names.size() >= MAX_PARTITION_LIMIT) {
std::stringstream err_ss;
err_ss << "The number of partitions exceeds the upper limit (" << MAX_PARTITION_LIMIT << ")";
return Status(SERVER_UNSUPPORTED_ERROR, err_ss.str());
}
rc.RecordSection("check validation");
// step 2: create partition
status = DBWrapper::DB()->CreatePartition(collection_name_, "", tag_);
fiu_do_on("CreatePartitionRequest.OnExecute.db_already_exist", status = Status(milvus::DB_ALREADY_EXIST, ""));
status = DBWrapper::SSDB()->CreatePartition(collection_name_, tag_);
fiu_do_on("CreatePartitionRequest.OnExecute.create_partition_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("CreatePartitionRequest.OnExecute.throw_std_exception", throw std::exception());
if (!status.ok()) {
// partition could exist
if (status.code() == DB_ALREADY_EXIST) {
return Status(SERVER_INVALID_COLLECTION_NAME, status.message());
}
return status;
}
return status;
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace server
......
......@@ -21,7 +21,7 @@ namespace server {
class CreatePartitionRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& partition_name,
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
const std::string& tag);
protected:
......
......@@ -38,50 +38,24 @@ DropPartitionRequest::OnExecute() {
std::string hdr = "DropPartitionRequest(collection=" + collection_name_ + ", partition_tag=" + tag_ + ")";
TimeRecorderAuto rc(hdr);
std::string collection_name = collection_name_;
std::string partition_tag = tag_;
// step 1: check collection name
auto status = ValidateCollectionName(collection_name);
fiu_do_on("DropPartitionRequest.OnExecute.invalid_collection_name",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
// step 2: check partition tag
if (partition_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
/* check partition tag */
if (tag_ == milvus::engine::DEFAULT_PARTITON_TAG) {
std::string msg = "Default partition cannot be dropped.";
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_INVALID_COLLECTION_NAME, msg);
}
status = ValidatePartitionTags({partition_tag});
if (!status.ok()) {
return status;
}
// step 3: check collection
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
/* check collection */
bool exist = false;
auto status = DBWrapper::DB()->HasCollection(collection_name_, exist);
if (!exist) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
}
rc.RecordSection("check validation");
// step 4: drop partition
return DBWrapper::DB()->DropPartitionByTag(collection_name, partition_tag);
/* drop partition */
return DBWrapper::SSDB()->DropPartition(collection_name_, tag_);
}
} // namespace server
......
......@@ -24,7 +24,7 @@ namespace server {
ShowPartitionsRequest::ShowPartitionsRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name,
std::vector<PartitionParam>& partition_list)
std::vector<std::string>& partition_list)
: BaseRequest(context, BaseRequest::kShowPartitions),
collection_name_(collection_name),
partition_list_(partition_list) {
......@@ -32,7 +32,7 @@ ShowPartitionsRequest::ShowPartitionsRequest(const std::shared_ptr<milvus::serve
BaseRequestPtr
ShowPartitionsRequest::Create(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, std::vector<PartitionParam>& partition_list) {
const std::string& collection_name, std::vector<std::string>& partition_list) {
return std::shared_ptr<BaseRequest>(new ShowPartitionsRequest(context, collection_name, partition_list));
}
......@@ -41,47 +41,18 @@ ShowPartitionsRequest::OnExecute() {
std::string hdr = "ShowPartitionsRequest(collection=" + collection_name_ + ")";
TimeRecorderAuto rc(hdr);
// step 1: check collection name
auto status = ValidateCollectionName(collection_name_);
fiu_do_on("ShowPartitionsRequest.OnExecute.invalid_collection_name",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
// step 2: check collection existence
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
/* check collection existence */
bool exist = false;
auto status = DBWrapper::DB()->HasCollection(collection_name_, exist);
if (!exist) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
}
// step 3: get partitions
std::vector<engine::meta::CollectionSchema> schema_array;
status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array);
/* get partitions */
status = DBWrapper::SSDB()->ShowPartitions(collection_name_, partition_list_);
fiu_do_on("ShowPartitionsRequest.OnExecute.show_partition_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
partition_list_.clear();
partition_list_.emplace_back(collection_name_, milvus::engine::DEFAULT_PARTITON_TAG);
for (auto& schema : schema_array) {
partition_list_.emplace_back(schema.owner_collection_, schema.partition_tag_);
}
return Status::OK();
return status;
}
} // namespace server
......
......@@ -24,18 +24,18 @@ class ShowPartitionsRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::vector<PartitionParam>& partition_list);
std::vector<std::string>& partition_list);
protected:
ShowPartitionsRequest(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::vector<PartitionParam>& partition_list);
std::vector<std::string>& partition_list);
Status
OnExecute() override;
private:
const std::string collection_name_;
std::vector<PartitionParam>& partition_list_;
std::vector<std::string>& partition_list_;
};
} // namespace server
......
......@@ -1137,10 +1137,10 @@ GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvu
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__);
std::vector<PartitionParam> partitions;
Status status = request_handler_.ShowPartitions(GetContext(context), request->collection_name(), partitions);
for (auto& partition : partitions) {
response->add_partition_tag_array(partition.tag_);
std::vector<std::string> partition_names;
Status status = request_handler_.ShowPartitions(GetContext(context), request->collection_name(), partition_names);
for (auto& pn : partition_names) {
response->add_partition_tag_array(pn);
}
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__);
......
......@@ -1507,27 +1507,27 @@ WebRequestHandler::ShowPartitions(const OString& collection_name, const OQueryPa
all_required = required_str == "True" || required_str == "true";
}
std::vector<PartitionParam> partitions;
status = request_handler_.ShowPartitions(context_ptr_, collection_name->std_str(), partitions);
std::vector<std::string> partition_names;
status = request_handler_.ShowPartitions(context_ptr_, collection_name->std_str(), partition_names);
if (!status.ok()) {
ASSIGN_RETURN_STATUS_DTO(status)
}
if (all_required) {
offset = 0;
page_size = partitions.size();
page_size = partition_names.size();
} else {
offset = std::min((size_t)offset, partitions.size());
page_size = std::min(partitions.size() - offset, (size_t)page_size);
offset = std::min((size_t)offset, partition_names.size());
page_size = std::min(partition_names.size() - offset, (size_t)page_size);
}
partition_list_dto->count = partitions.size();
partition_list_dto->count = partition_names.size();
partition_list_dto->partitions = partition_list_dto->partitions->createShared();
if (offset < (int64_t)(partitions.size())) {
if (offset < (int64_t)(partition_names.size())) {
for (int64_t i = offset; i < page_size + offset; i++) {
auto partition_dto = PartitionFieldsDto::createShared();
partition_dto->partition_tag = partitions.at(i).tag_.c_str();
partition_dto->partition_tag = partition_names.at(i).c_str();
partition_list_dto->partitions->pushBack(partition_dto);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册