From a7f211f8ad6c8235f309df92fb4cca957498c16f Mon Sep 17 00:00:00 2001 From: chen qingxiang <67679556+godchen0212@users.noreply.github.com> Date: Wed, 29 Jul 2020 12:06:11 +0800 Subject: [PATCH] add valid row implementation (#3053) * fix bug in test case Signed-off-by: godchen0212 * fix bugs in test case Signed-off-by: godchen0212 * fix bug in DB GetEntityById interface Signed-off-by: godchen0212 * add valid row implementation Signed-off-by: godchen0212 * cancel annotation of index thread Signed-off-by: godchen0212 --- core/src/db/DB.h | 3 ++- core/src/db/DBImpl.cpp | 6 ++++-- core/src/db/DBImpl.h | 3 ++- core/src/db/SnapshotHandlers.cpp | 9 +++++++-- core/src/db/SnapshotHandlers.h | 3 ++- core/src/server/delivery/ReqHandler.cpp | 5 +++-- core/src/server/delivery/ReqHandler.h | 2 +- .../delivery/request/GetEntityByIDReq.cpp | 14 ++++++++------ .../delivery/request/GetEntityByIDReq.h | 6 ++++-- .../server/grpc_impl/GrpcRequestHandler.cpp | 19 +++++++++++++++---- .../web_impl/handler/WebRequestHandler.cpp | 6 ++++-- 11 files changed, 52 insertions(+), 24 deletions(-) diff --git a/core/src/db/DB.h b/core/src/db/DB.h index 4c54b0a5..4b3de5b3 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -95,7 +95,8 @@ class DB { virtual Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, - const std::vector& field_names, DataChunkPtr& data_chunk) = 0; + const std::vector& field_names, std::vector& valid_row, + DataChunkPtr& data_chunk) = 0; virtual Status DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) = 0; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 3c1c051e..248f6f19 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -524,14 +524,16 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ Status DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, - const std::vector& field_names, DataChunkPtr& data_chunk) { + const std::vector& field_names, std::vector& valid_row, + DataChunkPtr& data_chunk) { CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); std::string dir_root = options_.meta_.path_; - auto handler = std::make_shared(nullptr, ss, dir_root, id_array, field_names); + auto handler = + std::make_shared(nullptr, ss, dir_root, id_array, field_names, valid_row); handler->Iterate(); STATUS_CHECK(handler->GetStatus()); diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 8d8b376b..395a91d7 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -90,7 +90,8 @@ class DBImpl : public DB { Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, - const std::vector& field_names, DataChunkPtr& data_chunk) override; + const std::vector& field_names, std::vector& valid_row, + DataChunkPtr& data_chunk) override; Status DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) override; diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index 821dd2a7..5c933908 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -123,8 +123,9 @@ SegmentsToIndexCollector::Handle(const snapshot::SegmentCommitPtr& segment_commi GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr& context, engine::snapshot::ScopedSnapshotT ss, const std::string& dir_root, const IDNumbers& ids, - const std::vector& field_names) - : BaseT(ss), context_(context), dir_root_(dir_root), ids_(ids), field_names_(field_names) { + const std::vector& field_names, + std::vector& valid_row) + : BaseT(ss), context_(context), dir_root_(dir_root), ids_(ids), field_names_(field_names), valid_row_(valid_row) { } Status @@ -149,6 +150,7 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { for (auto id : ids_) { // fast check using bloom filter if (!id_bloom_filter_ptr->Check(id)) { + valid_row_.push_back(false); continue; } @@ -158,6 +160,7 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { } auto found = std::find(uids.begin(), uids.end(), id); if (found == uids.end()) { + valid_row_.push_back(false); continue; } @@ -170,9 +173,11 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); if (deleted != deleted_docs.end()) { + valid_row_.push_back(false); continue; } } + valid_row_.push_back(true); offsets.push_back(offset); } diff --git a/core/src/db/SnapshotHandlers.h b/core/src/db/SnapshotHandlers.h index 80ae28fe..fc81534c 100644 --- a/core/src/db/SnapshotHandlers.h +++ b/core/src/db/SnapshotHandlers.h @@ -79,7 +79,7 @@ struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler; GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const std::string& dir_root, const IDNumbers& ids, - const std::vector& field_names); + const std::vector& field_names, std::vector& valid_row); Status Handle(const typename ResourceT::Ptr&) override; @@ -89,6 +89,7 @@ struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler field_names_; engine::DataChunkPtr data_chunk_; + std::vector& valid_row_; }; /////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/server/delivery/ReqHandler.cpp b/core/src/server/delivery/ReqHandler.cpp index 13c85da3..41b0e58e 100644 --- a/core/src/server/delivery/ReqHandler.cpp +++ b/core/src/server/delivery/ReqHandler.cpp @@ -157,9 +157,10 @@ ReqHandler::Insert(const std::shared_ptr& context, const std::string& c Status ReqHandler::GetEntityByID(const std::shared_ptr& context, const std::string& collection_name, const engine::IDNumbers& ids, std::vector& field_names, - engine::snapshot::CollectionMappings& field_mappings, engine::DataChunkPtr& data_chunk) { + std::vector& valid_row, engine::snapshot::CollectionMappings& field_mappings, + engine::DataChunkPtr& data_chunk) { BaseReqPtr req_ptr = - GetEntityByIDReq::Create(context, collection_name, ids, field_names, field_mappings, data_chunk); + GetEntityByIDReq::Create(context, collection_name, ids, field_names, valid_row, field_mappings, data_chunk); ReqScheduler::ExecReq(req_ptr); return req_ptr->status(); } diff --git a/core/src/server/delivery/ReqHandler.h b/core/src/server/delivery/ReqHandler.h index d50eb568..c9b3c628 100644 --- a/core/src/server/delivery/ReqHandler.h +++ b/core/src/server/delivery/ReqHandler.h @@ -86,7 +86,7 @@ class ReqHandler { Status GetEntityByID(const std::shared_ptr& context, const std::string& collection_name, - const engine::IDNumbers& ids, std::vector& field_names, + const engine::IDNumbers& ids, std::vector& field_names, std::vector& valid_row, engine::snapshot::CollectionMappings& field_mappings, engine::DataChunkPtr& data_chunk); Status diff --git a/core/src/server/delivery/request/GetEntityByIDReq.cpp b/core/src/server/delivery/request/GetEntityByIDReq.cpp index 605b2eef..cf15fc17 100644 --- a/core/src/server/delivery/request/GetEntityByIDReq.cpp +++ b/core/src/server/delivery/request/GetEntityByIDReq.cpp @@ -31,13 +31,14 @@ constexpr uint64_t MAX_COUNT_RETURNED = 1000; GetEntityByIDReq::GetEntityByIDReq(const std::shared_ptr& context, const std::string& collection_name, const engine::IDNumbers& id_array, - std::vector& field_names, + std::vector& field_names, std::vector& valid_row, engine::snapshot::CollectionMappings& field_mappings, engine::DataChunkPtr& data_chunk) : BaseReq(context, BaseReq::kGetEntityByID), collection_name_(collection_name), id_array_(id_array), field_names_(field_names), + valid_row_(valid_row), field_mappings_(field_mappings), data_chunk_(data_chunk) { } @@ -45,9 +46,10 @@ GetEntityByIDReq::GetEntityByIDReq(const std::shared_ptr& context, const std::string& collection_name, const engine::IDNumbers& id_array, std::vector& field_names_, - engine::snapshot::CollectionMappings& field_mappings, engine::DataChunkPtr& data_chunk) { + std::vector& valid_row, engine::snapshot::CollectionMappings& field_mappings, + engine::DataChunkPtr& data_chunk) { return std::shared_ptr( - new GetEntityByIDReq(context, collection_name, id_array, field_names_, field_mappings, data_chunk)); + new GetEntityByIDReq(context, collection_name, id_array, field_names_, valid_row, field_mappings, data_chunk)); } Status @@ -82,8 +84,8 @@ GetEntityByIDReq::OnExecute() { if (field_names_.empty()) { for (const auto& schema : field_mappings_) { - if (schema.first->GetFtype() != engine::meta::hybrid::DataType::UID) - field_names_.emplace_back(schema.first->GetName()); + // if (schema.first->GetFtype() != engine::meta::hybrid::DataType::UID) + field_names_.emplace_back(schema.first->GetName()); } } else { for (const auto& name : field_names_) { @@ -101,7 +103,7 @@ GetEntityByIDReq::OnExecute() { } // step 2: get vector data, now only support get one id - status = DBWrapper::DB()->GetEntityByID(collection_name_, id_array_, field_names_, data_chunk_); + status = DBWrapper::DB()->GetEntityByID(collection_name_, id_array_, field_names_, valid_row_, data_chunk_); if (!status.ok()) { return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); } diff --git a/core/src/server/delivery/request/GetEntityByIDReq.h b/core/src/server/delivery/request/GetEntityByIDReq.h index 82b724cb..6f010508 100644 --- a/core/src/server/delivery/request/GetEntityByIDReq.h +++ b/core/src/server/delivery/request/GetEntityByIDReq.h @@ -33,13 +33,14 @@ class GetEntityByIDReq : public BaseReq { public: static BaseReqPtr Create(const std::shared_ptr& context, const std::string& collection_name, - const engine::IDNumbers& id_array, std::vector& field_names_, + const engine::IDNumbers& id_array, std::vector& field_names_, std::vector& valid_row, engine::snapshot::CollectionMappings& field_mappings, engine::DataChunkPtr& data_chunk); protected: GetEntityByIDReq(const std::shared_ptr& context, const std::string& collection_name, const engine::IDNumbers& id_array, std::vector& field_names, - engine::snapshot::CollectionMappings& field_mappings, engine::DataChunkPtr& data_chunk); + std::vector& valid_row, engine::snapshot::CollectionMappings& field_mappings, + engine::DataChunkPtr& data_chunk); Status OnExecute() override; @@ -50,6 +51,7 @@ class GetEntityByIDReq : public BaseReq { std::vector& field_names_; engine::snapshot::CollectionMappings& field_mappings_; engine::DataChunkPtr& data_chunk_; + std::vector& valid_row_; }; } // namespace server diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.cpp b/core/src/server/grpc_impl/GrpcRequestHandler.cpp index 6a72f872..dfbec12a 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/core/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -747,9 +747,14 @@ GrpcRequestHandler::GetEntityByID(::grpc::ServerContext* context, const ::milvus std::vector attrs; std::vector vectors; + std::vector valid_row; Status status = req_handler_.GetEntityByID(GetContext(context), request->collection_name(), vector_ids, field_names, - field_mappings, data_chunk); + valid_row, field_mappings, data_chunk); + + for (auto it : valid_row) { + response->add_valid_row(it); + } auto id_size = vector_ids.size(); for (const auto& it : field_mappings) { @@ -757,9 +762,16 @@ GrpcRequestHandler::GetEntityByID(::grpc::ServerContext* context, const ::milvus std::string name = it.first->GetName(); std::vector data = data_chunk->fixed_fields_[name]; + auto single_size = data.size() / id_size; + if (type == engine::meta::hybrid::DataType::UID) { - response->mutable_ids()->Resize(data.size(), 0); - memcpy(response->mutable_ids()->mutable_data(), data.data(), data.size() * sizeof(uint64_t)); + int64_t int64_value; + auto int64_size = single_size * sizeof(int8_t) / sizeof(int64_t); + for (int i = 0; i < id_size; i++) { + auto offset = i * single_size; + memcpy(&int64_value, data.data() + offset, single_size); + response->add_ids(int64_value); + } continue; } @@ -768,7 +780,6 @@ GrpcRequestHandler::GetEntityByID(::grpc::ServerContext* context, const ::milvus field_value->set_field_name(name); field_value->set_type(static_cast(type)); - auto single_size = data.size() / id_size; // general data if (type == engine::meta::hybrid::DataType::VECTOR_BINARY) { // add binary vector data diff --git a/core/src/server/web_impl/handler/WebRequestHandler.cpp b/core/src/server/web_impl/handler/WebRequestHandler.cpp index 265fde53..f7ccda34 100644 --- a/core/src/server/web_impl/handler/WebRequestHandler.cpp +++ b/core/src/server/web_impl/handler/WebRequestHandler.cpp @@ -804,13 +804,14 @@ WebRequestHandler::DeleteByIDs(const std::string& collection_name, const nlohman Status WebRequestHandler::GetEntityByIDs(const std::string& collection_name, const std::vector& ids, std::vector& field_names, nlohmann::json& json_out) { + std::vector valid_row; engine::DataChunkPtr data_chunk; engine::snapshot::CollectionMappings field_mappings; std::vector attr_batch; std::vector vector_batch; - auto status = - req_handler_.GetEntityByID(context_ptr_, collection_name, ids, field_names, field_mappings, data_chunk); + auto status = req_handler_.GetEntityByID(context_ptr_, collection_name, ids, field_names, valid_row, field_mappings, + data_chunk); if (!status.ok()) { return status; } @@ -1679,6 +1680,7 @@ WebRequestHandler::GetEntity(const milvus::server::web::OString& collection_name StringHelpFunctions::SplitStringByDelimeter(query_fields->c_str(), ",", field_names); } + std::vector valid_row; nlohmann::json entity_result_json; status = GetEntityByIDs(collection_name->std_str(), entity_ids, field_names, entity_result_json); if (!status.ok()) { -- GitLab