From 383da48ba1787a883269fbbe794c9fb436e15eee Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Mon, 24 Aug 2020 14:24:06 +0800 Subject: [PATCH] Fix proxy logic Signed-off-by: zhenshan.cao --- proxy/src/CMakeLists.txt | 7 + proxy/src/db/CMakeLists.txt | 21 + proxy/src/db/Constants.h | 33 + proxy/src/db/Types.cpp | 36 + proxy/src/db/Types.h | 200 ++++ proxy/src/query/GeneralQuery.h | 2 +- proxy/src/server/CMakeLists.txt | 2 + proxy/src/server/Server.cpp | 8 +- proxy/src/server/ValidationUtil.cpp | 10 +- proxy/src/server/ValidationUtil.h | 6 +- proxy/src/server/delivery/ReqHandler.cpp | 8 +- proxy/src/server/delivery/ReqHandler.h | 4 +- .../delivery/request/DescribeIndexReq.cpp | 7 - .../delivery/request/DropCollectionReq.cpp | 6 - .../server/delivery/request/DropIndexReq.cpp | 3 - .../delivery/request/GetCollectionInfoReq.cpp | 5 - .../request/GetCollectionStatsReq.cpp | 6 - .../delivery/request/GetEntityByIDReq.cpp | 27 +- .../delivery/request/GetEntityByIDReq.h | 7 +- .../delivery/request/HasCollectionReq.cpp | 5 - .../delivery/request/HasPartitionReq.cpp | 6 - .../src/server/delivery/request/InsertReq.cpp | 9 +- .../delivery/request/ListIDInSegmentReq.cpp | 5 - .../delivery/request/LoadCollectionReq.cpp | 4 - .../src/server/delivery/request/SearchReq.cpp | 12 +- proxy/src/server/delivery/request/SearchReq.h | 5 +- proxy/src/server/delivery/request/Types.h | 2 +- .../server/grpc_impl/GrpcRequestHandler.cpp | 958 +----------------- proxy/src/version.h | 4 +- writer/writer.go | 54 - 30 files changed, 337 insertions(+), 1125 deletions(-) create mode 100644 proxy/src/db/CMakeLists.txt create mode 100644 proxy/src/db/Constants.h create mode 100644 proxy/src/db/Types.cpp create mode 100644 proxy/src/db/Types.h diff --git a/proxy/src/CMakeLists.txt b/proxy/src/CMakeLists.txt index c86e7994b..85e7a2a6d 100644 --- a/proxy/src/CMakeLists.txt +++ b/proxy/src/CMakeLists.txt @@ -22,10 +22,12 @@ add_subdirectory( tracing ) add_subdirectory( utils ) add_subdirectory( config ) add_subdirectory( query ) +add_subdirectory( db ) # target milvus_engine add_subdirectory( log ) add_subdirectory( server ) set(link_lib + milvus_engine config query utils @@ -44,6 +46,10 @@ set( GRPC_LIB libprotobuf grpc++ ) +set( BOOST_LIB libboost_system.a + libboost_filesystem.a + libboost_serialization.a + ) set( THIRD_PARTY_LIBS yaml-cpp @@ -55,6 +61,7 @@ target_link_libraries( server PUBLIC ${link_lib} tracing ${THIRD_PARTY_LIBS} + ${BOOST_LIB} ) # **************************** Get&Print Include Directories **************************** diff --git a/proxy/src/db/CMakeLists.txt b/proxy/src/db/CMakeLists.txt new file mode 100644 index 000000000..432cad577 --- /dev/null +++ b/proxy/src/db/CMakeLists.txt @@ -0,0 +1,21 @@ +#------------------------------------------------------------------------------- +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License. +#------------------------------------------------------------------------------- + +# **************************** Engine Source Files **************************** +aux_source_directory( ${MILVUS_ENGINE_SRC}/db DB_MAIN_FILES ) + +set( ENGINE_FILES ${DB_MAIN_FILES}) + +# **************************** Add Target milvus engine **************************** +add_library( milvus_engine STATIC) +target_sources( milvus_engine PRIVATE ${ENGINE_FILES}) diff --git a/proxy/src/db/Constants.h b/proxy/src/db/Constants.h new file mode 100644 index 000000000..30d72f359 --- /dev/null +++ b/proxy/src/db/Constants.h @@ -0,0 +1,33 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include + +namespace milvus { +namespace engine { + +constexpr int64_t KB = 1LL << 10; +constexpr int64_t MB = 1LL << 20; +constexpr int64_t GB = 1LL << 30; +constexpr int64_t TB = 1LL << 40; + +constexpr int64_t MAX_TABLE_FILE_MEM = 128 * MB; + +constexpr int64_t MAX_NAME_LENGTH = 255; +constexpr int64_t MAX_DIMENSION = 32768; +constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024; +constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 100000; // default row count per segment when creating collection +constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB; + +} // namespace engine +} // namespace milvus diff --git a/proxy/src/db/Types.cpp b/proxy/src/db/Types.cpp new file mode 100644 index 000000000..b4ec1d7e9 --- /dev/null +++ b/proxy/src/db/Types.cpp @@ -0,0 +1,36 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/Types.h" +// #include "knowhere/index/vector_index/helpers/IndexParameter.h" + +namespace milvus { +namespace engine { + +const char* FIELD_UID = "_id"; + +const char* ELEMENT_RAW_DATA = "_raw"; +const char* ELEMENT_BLOOM_FILTER = "_blf"; +const char* ELEMENT_DELETED_DOCS = "_del"; +const char* ELEMENT_INDEX_COMPRESS = "_compress"; + +const char* PARAM_UID_AUTOGEN = "auto_id"; +// const char* PARAM_DIMENSION = knowhere::meta::DIM; +const char* PARAM_INDEX_TYPE = "index_type"; +// const char* PARAM_INDEX_METRIC_TYPE = knowhere::Metric::TYPE; +const char* PARAM_INDEX_EXTRA_PARAMS = "params"; +const char* PARAM_SEGMENT_ROW_COUNT = "segment_row_count"; + +const char* DEFAULT_STRUCTURED_INDEX = "SORTED"; // this string should be defined in knowhere::IndexEnum +const char* DEFAULT_PARTITON_TAG = "_default"; + +} // namespace engine +} // namespace milvus diff --git a/proxy/src/db/Types.h b/proxy/src/db/Types.h new file mode 100644 index 000000000..65691f0d8 --- /dev/null +++ b/proxy/src/db/Types.h @@ -0,0 +1,200 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +// #include + +#include +#include +#include +#include +#include +#include +#include +#include + +// #include "cache/DataObj.h" +#include "db/Constants.h" +// #include "knowhere/index/vector_index/VecIndex.h" +#include "utils/Json.h" + +namespace milvus { +namespace engine { + +extern const char* FIELD_UID; + +extern const char* ELEMENT_RAW_DATA; +extern const char* ELEMENT_BLOOM_FILTER; +extern const char* ELEMENT_DELETED_DOCS; +extern const char* ELEMENT_INDEX_COMPRESS; + +extern const char* PARAM_UID_AUTOGEN; +extern const char* PARAM_DIMENSION; +extern const char* PARAM_INDEX_TYPE; +extern const char* PARAM_INDEX_METRIC_TYPE; +extern const char* PARAM_INDEX_EXTRA_PARAMS; +extern const char* PARAM_SEGMENT_ROW_COUNT; + +extern const char* DEFAULT_STRUCTURED_INDEX; +extern const char* DEFAULT_PARTITON_TAG; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +using id_t = int64_t; +using offset_t = int32_t; +using date_t = int32_t; + +using IDNumbers = std::vector; + +// using VectorDistance = faiss::Index::distance_t; +using VectorDistance = float; +using VectorDistances = std::vector; + +// using ResultIds = std::vector; +using ResultIds = std::vector; +using ResultDistances = std::vector; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +enum class DataType { + NONE = 0, + BOOL = 1, + INT8 = 2, + INT16 = 3, + INT32 = 4, + INT64 = 5, + + FLOAT = 10, + DOUBLE = 11, + + STRING = 20, + + VECTOR_BINARY = 100, + VECTOR_FLOAT = 101, +}; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +enum class FieldElementType { + FET_NONE = 0, + FET_RAW = 1, + FET_BLOOM_FILTER = 2, + FET_DELETED_DOCS = 3, + FET_INDEX = 4, + FET_COMPRESS_SQ8 = 5, +}; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +// class BinaryData : public cache::DataObj { +// public: +// int64_t +// Size() { +// return data_.size(); +// } + +// public: +// std::vector data_; +// }; +// using BinaryDataPtr = std::shared_ptr; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +// class VaribleData : public cache::DataObj { +// public: +// int64_t +// Size() { +// return data_.size() + offset_.size() * sizeof(int64_t); +// } + +// public: +// std::vector data_; +// std::vector offset_; +// }; +// using VaribleDataPtr = std::shared_ptr; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +// using FIELD_TYPE_MAP = std::unordered_map; +// using FIELD_WIDTH_MAP = std::unordered_map; +// using FIXEDX_FIELD_MAP = std::unordered_map; +// using VARIABLE_FIELD_MAP = std::unordered_map; +// using VECTOR_INDEX_MAP = std::unordered_map; +// using STRUCTURED_INDEX_MAP = std::unordered_map; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +struct DataChunk { + int64_t count_ = 0; + // FIXEDX_FIELD_MAP fixed_fields_; + // VARIABLE_FIELD_MAP variable_fields_; +}; +using DataChunkPtr = std::shared_ptr; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +struct CollectionIndex { + std::string index_name_; + std::string index_type_; + std::string metric_name_; + milvus::json extra_params_ = {{"nlist", 2048}}; +}; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +struct VectorsData { + uint64_t vector_count_ = 0; + std::vector float_data_; + std::vector binary_data_; + IDNumbers id_array_; +}; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +struct AttrsData { + uint64_t attr_count_ = 0; + std::unordered_map attr_type_; + std::unordered_map> attr_data_; + IDNumbers id_array_; +}; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +struct QueryResult { + uint64_t row_num_; + engine::ResultIds result_ids_; + engine::ResultDistances result_distances_; + engine::DataChunkPtr data_chunk_; +}; +using QueryResultPtr = std::shared_ptr; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +struct DBMetaOptions { + std::string path_; + std::string backend_uri_; +}; // DBMetaOptions + +/////////////////////////////////////////////////////////////////////////////////////////////////// +struct DBOptions { + typedef enum { SINGLE = 0, CLUSTER_READONLY, CLUSTER_WRITABLE } MODE; + + DBMetaOptions meta_; + int mode_ = MODE::SINGLE; + + size_t insert_buffer_size_ = 4 * GB; + bool insert_cache_immediately_ = false; + + int64_t auto_flush_interval_ = 1; + + bool metric_enable_ = false; + + // wal relative configurations + bool wal_enable_ = false; + int64_t buffer_size_ = 256; + std::string mxlog_path_ = "/tmp/milvus/wal/"; + + // transcript configurations + bool transcript_enable_ = false; + std::string replay_script_path_; // for replay +}; // Options + +} // namespace engine +} // namespace milvus diff --git a/proxy/src/query/GeneralQuery.h b/proxy/src/query/GeneralQuery.h index 49d0b671f..eae08a2e1 100644 --- a/proxy/src/query/GeneralQuery.h +++ b/proxy/src/query/GeneralQuery.h @@ -18,7 +18,7 @@ #include #include -// #include "db/Types.h" +#include "db/Types.h" #include "utils/Json.h" namespace milvus { diff --git a/proxy/src/server/CMakeLists.txt b/proxy/src/server/CMakeLists.txt index 72a223671..de9ef688f 100644 --- a/proxy/src/server/CMakeLists.txt +++ b/proxy/src/server/CMakeLists.txt @@ -20,12 +20,14 @@ set( GRPC_SERVICE_FILES ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.grpc.pb.cc aux_source_directory( ${MILVUS_ENGINE_SRC}/server SERVER_SERVICE_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/init SERVER_INIT_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/request DELIVERY_REQUEST_FILES ) +aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/strategy DELIVERY_STRATEGY_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery DELIVERY_FILES ) set( SERVER_FILES ${SERVER_INIT_FILES} ${SERVER_SERVICE_FILES} ${SERVER_INIT_FILES} ${DELIVERY_REQUEST_FILES} + ${DELIVERY_STRATEGY_FILES} ${DELIVERY_FILES} ) diff --git a/proxy/src/server/Server.cpp b/proxy/src/server/Server.cpp index 48f7ece37..2dc38926b 100644 --- a/proxy/src/server/Server.cpp +++ b/proxy/src/server/Server.cpp @@ -19,7 +19,7 @@ #include #include "config/ServerConfig.h" - +#include "tracing/TracerUtil.h" #include "log/LogMgr.h" // #include "scheduler/SchedInst.h" #include "server/grpc_impl/GrpcServer.h" @@ -237,8 +237,8 @@ Server::Start() { << std::string(15, '*') << "Config in memory" << std::string(15, '*') << "\n\n" << ConfigMgr::GetInstance().Dump(); - server::Metrics::GetInstance().Init(); - server::SystemInfo::GetInstance().Init(); + // server::Metrics::GetInstance().Init(); + // server::SystemInfo::GetInstance().Init(); return StartService(); } catch (std::exception& ex) { @@ -282,7 +282,7 @@ Server::Stop() { Status Server::StartService() { Status stat; - stat = engine::KnowhereResource::Initialize(); + // stat = engine::KnowhereResource::Initialize(); if (!stat.ok()) { LOG_SERVER_ERROR_ << "KnowhereResource initialize fail: " << stat.message(); goto FAIL; diff --git a/proxy/src/server/ValidationUtil.cpp b/proxy/src/server/ValidationUtil.cpp index 8fc1b7bc3..cf8ab3a70 100644 --- a/proxy/src/server/ValidationUtil.cpp +++ b/proxy/src/server/ValidationUtil.cpp @@ -10,7 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "server/ValidationUtil.h" -// #include "db/Constants.h" +#include "db/Constants.h" // #include "db/Utils.h" // #include "knowhere/index/vector_index/ConfAdapter.h" // #include "knowhere/index/vector_index/helpers/IndexParameter.h" @@ -103,11 +103,11 @@ ValidatePartitionTags(const std::vector& partition_tags) { return Status::OK(); } -Status -ValidateInsertDataSize(const engine::DataChunkPtr& data) { +// Status +// ValidateInsertDataSize(const engine::DataChunkPtr& data) { - return Status::OK(); -} +// return Status::OK(); +// } } // namespace server } // namespace milvus diff --git a/proxy/src/server/ValidationUtil.h b/proxy/src/server/ValidationUtil.h index 1ade47ac6..71284adaf 100644 --- a/proxy/src/server/ValidationUtil.h +++ b/proxy/src/server/ValidationUtil.h @@ -11,7 +11,7 @@ #pragma once -// #include "db/Types.h" +#include "db/Types.h" #include "utils/Json.h" #include "utils/Status.h" @@ -50,7 +50,7 @@ ValidateSearchTopk(int64_t top_k); extern Status ValidatePartitionTags(const std::vector& partition_tags); -extern Status -ValidateInsertDataSize(const engine::DataChunkPtr& data); +// extern Status +// ValidateInsertDataSize(const engine::DataChunkPtr& data); } // namespace server } // namespace milvus diff --git a/proxy/src/server/delivery/ReqHandler.cpp b/proxy/src/server/delivery/ReqHandler.cpp index 5de4333de..31eb8b3fa 100644 --- a/proxy/src/server/delivery/ReqHandler.cpp +++ b/proxy/src/server/delivery/ReqHandler.cpp @@ -159,9 +159,9 @@ ReqHandler::Insert(const ContextPtr& context, const std::string& collection_name Status ReqHandler::GetEntityByID(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& ids, std::vector& field_names, std::vector& valid_row, - engine::snapshot::FieldElementMappings& field_mappings, engine::DataChunkPtr& data_chunk) { + engine::DataChunkPtr& data_chunk) { BaseReqPtr req_ptr = - GetEntityByIDReq::Create(context, collection_name, ids, field_names, valid_row, field_mappings, data_chunk); + GetEntityByIDReq::Create(context, collection_name, ids, field_names, valid_row, data_chunk); ReqScheduler::ExecReq(req_ptr); return req_ptr->status(); } @@ -176,8 +176,8 @@ ReqHandler::DeleteEntityByID(const ContextPtr& context, const std::string& colle Status ReqHandler::Search(const ContextPtr& context, const query::QueryPtr& query_ptr, const milvus::json& json_params, - engine::snapshot::FieldElementMappings& collection_mappings, engine::QueryResultPtr& result) { - BaseReqPtr req_ptr = SearchReq::Create(context, query_ptr, json_params, collection_mappings, result); + engine::QueryResultPtr& result) { + BaseReqPtr req_ptr = SearchReq::Create(context, query_ptr, json_params, result); ReqScheduler::ExecReq(req_ptr); return req_ptr->status(); } diff --git a/proxy/src/server/delivery/ReqHandler.h b/proxy/src/server/delivery/ReqHandler.h index b96dc49d2..e51993fa3 100644 --- a/proxy/src/server/delivery/ReqHandler.h +++ b/proxy/src/server/delivery/ReqHandler.h @@ -85,14 +85,14 @@ class ReqHandler { Status GetEntityByID(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& ids, std::vector& field_names, std::vector& valid_row, - engine::snapshot::FieldElementMappings& field_mappings, engine::DataChunkPtr& data_chunk); + engine::DataChunkPtr& data_chunk); Status DeleteEntityByID(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& ids); Status Search(const ContextPtr& context, const query::QueryPtr& query_ptr, const milvus::json& json_params, - engine::snapshot::FieldElementMappings& collection_mappings, engine::QueryResultPtr& result); + engine::QueryResultPtr& result); Status ListIDInSegment(const ContextPtr& context, const std::string& collection_name, int64_t segment_id, diff --git a/proxy/src/server/delivery/request/DescribeIndexReq.cpp b/proxy/src/server/delivery/request/DescribeIndexReq.cpp index 31a74379f..9ed961bbf 100644 --- a/proxy/src/server/delivery/request/DescribeIndexReq.cpp +++ b/proxy/src/server/delivery/request/DescribeIndexReq.cpp @@ -41,13 +41,6 @@ DescribeIndexReq::Create(const std::shared_ptr& context Status DescribeIndexReq::OnExecute() { - try { - std::string hdr = "DescribeIndexReq(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - - // step 1: check arguments - STATUS_CHECK(ValidateCollectionName(collection_name_)); - STATUS_CHECK(ValidateFieldName(field_name_)); return Status::OK(); } diff --git a/proxy/src/server/delivery/request/DropCollectionReq.cpp b/proxy/src/server/delivery/request/DropCollectionReq.cpp index 8835c1b2c..60bc92c4c 100644 --- a/proxy/src/server/delivery/request/DropCollectionReq.cpp +++ b/proxy/src/server/delivery/request/DropCollectionReq.cpp @@ -28,12 +28,6 @@ DropCollectionReq::Create(const ContextPtr& context, const std::string& collecti Status DropCollectionReq::OnExecute() { - try { - std::string hdr = "DropCollectionReq(collection=" + collection_name_ + ")"; - TimeRecorder rc(hdr); - - STATUS_CHECK(ValidateCollectionName(collection_name_)); - return Status::OK(); } diff --git a/proxy/src/server/delivery/request/DropIndexReq.cpp b/proxy/src/server/delivery/request/DropIndexReq.cpp index f0ec52474..30652f59c 100644 --- a/proxy/src/server/delivery/request/DropIndexReq.cpp +++ b/proxy/src/server/delivery/request/DropIndexReq.cpp @@ -37,9 +37,6 @@ DropIndexReq::Create(const ContextPtr& context, const std::string& collection_na Status DropIndexReq::OnExecute() { - try { - std::string hdr = "DropIndexReq(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); return Status::OK(); } diff --git a/proxy/src/server/delivery/request/GetCollectionInfoReq.cpp b/proxy/src/server/delivery/request/GetCollectionInfoReq.cpp index 2e2f0c9d5..e2d2fd715 100644 --- a/proxy/src/server/delivery/request/GetCollectionInfoReq.cpp +++ b/proxy/src/server/delivery/request/GetCollectionInfoReq.cpp @@ -37,11 +37,6 @@ GetCollectionInfoReq::Create(const ContextPtr& context, const std::string& colle Status GetCollectionInfoReq::OnExecute() { - try { - std::string hdr = "GetCollectionInfoReq(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - - STATUS_CHECK(ValidateCollectionName(collection_name_)); return Status::OK(); diff --git a/proxy/src/server/delivery/request/GetCollectionStatsReq.cpp b/proxy/src/server/delivery/request/GetCollectionStatsReq.cpp index 74a8745fa..1ce857003 100644 --- a/proxy/src/server/delivery/request/GetCollectionStatsReq.cpp +++ b/proxy/src/server/delivery/request/GetCollectionStatsReq.cpp @@ -38,12 +38,6 @@ GetCollectionStatsReq::Create(const ContextPtr& context, const std::string& coll Status GetCollectionStatsReq::OnExecute() { - try { - std::string hdr = "GetCollectionStatsReq(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - - STATUS_CHECK(ValidateCollectionName(collection_name_)); - return Status::OK(); } diff --git a/proxy/src/server/delivery/request/GetEntityByIDReq.cpp b/proxy/src/server/delivery/request/GetEntityByIDReq.cpp index 8e1a00827..601289946 100644 --- a/proxy/src/server/delivery/request/GetEntityByIDReq.cpp +++ b/proxy/src/server/delivery/request/GetEntityByIDReq.cpp @@ -17,7 +17,7 @@ #include "server/delivery/request/GetEntityByIDReq.h" -// #include "db/Types.h" +#include "db/Types.h" #include "server/ValidationUtil.h" #include "utils/TimeRecorder.h" @@ -31,46 +31,27 @@ constexpr uint64_t MAX_COUNT_RETURNED = 1000; GetEntityByIDReq::GetEntityByIDReq(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& id_array, std::vector& field_names, - std::vector& valid_row, engine::snapshot::FieldElementMappings& field_mappings, + std::vector& valid_row, engine::DataChunkPtr& data_chunk) : BaseReq(context, ReqType::kGetEntityByID), collection_name_(collection_name), id_array_(id_array), field_names_(field_names), valid_row_(valid_row), - field_mappings_(field_mappings), data_chunk_(data_chunk) { } BaseReqPtr GetEntityByIDReq::Create(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& id_array, std::vector& field_names_, - std::vector& valid_row, engine::snapshot::FieldElementMappings& field_mappings, + std::vector& valid_row, engine::DataChunkPtr& data_chunk) { return std::shared_ptr( - new GetEntityByIDReq(context, collection_name, id_array, field_names_, valid_row, field_mappings, data_chunk)); + new GetEntityByIDReq(context, collection_name, id_array, field_names_, valid_row, data_chunk)); } Status GetEntityByIDReq::OnExecute() { - try { - std::string hdr = "GetEntityByIDReq(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - - // step 1: check arguments - if (id_array_.empty()) { - return Status(SERVER_INVALID_ARGUMENT, "No entity id specified"); - } - - if (id_array_.size() > MAX_COUNT_RETURNED) { - std::string msg = "Input id array size cannot exceed: " + std::to_string(MAX_COUNT_RETURNED); - return Status(SERVER_INVALID_ARGUMENT, msg); - } - - STATUS_CHECK(ValidateCollectionName(collection_name_)); - - // TODO(yukun) ValidateFieldNames - return Status::OK(); } diff --git a/proxy/src/server/delivery/request/GetEntityByIDReq.h b/proxy/src/server/delivery/request/GetEntityByIDReq.h index dd6fd8dd3..fe892041c 100644 --- a/proxy/src/server/delivery/request/GetEntityByIDReq.h +++ b/proxy/src/server/delivery/request/GetEntityByIDReq.h @@ -24,7 +24,7 @@ #include // #include "db/snapshot/Context.h" // #include "db/snapshot/Resources.h" -#include "segment/Segment.h" +// #include "segment/Segment.h" namespace milvus { namespace server { @@ -34,12 +34,12 @@ class GetEntityByIDReq : public BaseReq { static BaseReqPtr Create(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& id_array, std::vector& field_names, std::vector& valid_row, - engine::snapshot::FieldElementMappings& field_mappings, engine::DataChunkPtr& data_chunk); + engine::DataChunkPtr& data_chunk); protected: GetEntityByIDReq(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& id_array, std::vector& field_names, std::vector& valid_row, - engine::snapshot::FieldElementMappings& field_mappings, engine::DataChunkPtr& data_chunk); + engine::DataChunkPtr& data_chunk); Status OnExecute() override; @@ -48,7 +48,6 @@ class GetEntityByIDReq : public BaseReq { std::string collection_name_; engine::IDNumbers id_array_; std::vector& field_names_; - engine::snapshot::FieldElementMappings& field_mappings_; engine::DataChunkPtr& data_chunk_; std::vector& valid_row_; }; diff --git a/proxy/src/server/delivery/request/HasCollectionReq.cpp b/proxy/src/server/delivery/request/HasCollectionReq.cpp index 20ff4845a..5fd99e242 100644 --- a/proxy/src/server/delivery/request/HasCollectionReq.cpp +++ b/proxy/src/server/delivery/request/HasCollectionReq.cpp @@ -29,11 +29,6 @@ HasCollectionReq::Create(const ContextPtr& context, const std::string& collectio Status HasCollectionReq::OnExecute() { - try { - std::string hdr = "HasCollectionReq(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - - STATUS_CHECK(ValidateCollectionName(collection_name_)); return Status::OK(); } diff --git a/proxy/src/server/delivery/request/HasPartitionReq.cpp b/proxy/src/server/delivery/request/HasPartitionReq.cpp index a24d1407f..f7842f0a8 100644 --- a/proxy/src/server/delivery/request/HasPartitionReq.cpp +++ b/proxy/src/server/delivery/request/HasPartitionReq.cpp @@ -36,12 +36,6 @@ HasPartitionReq::Create(const ContextPtr& context, const std::string& collection Status HasPartitionReq::OnExecute() { - try { - std::string hdr = "HasPartitionReq(collection=" + collection_name_ + " tag=" + partition_tag_ + ")"; - TimeRecorderAuto rc(hdr); - - has_partition_ = false; - return Status::OK(); } diff --git a/proxy/src/server/delivery/request/InsertReq.cpp b/proxy/src/server/delivery/request/InsertReq.cpp index cc6b594d1..3dac4da7d 100644 --- a/proxy/src/server/delivery/request/InsertReq.cpp +++ b/proxy/src/server/delivery/request/InsertReq.cpp @@ -46,14 +46,7 @@ InsertReq::Create(const ContextPtr& context, const std::string& collection_name, Status InsertReq::OnExecute() { LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq."; - try { - std::string hdr = "InsertReq(table=" + collection_name_ + ", partition_name=" + partition_name_ + ")"; - TimeRecorder rc(hdr); - - if (chunk_data_.empty()) { - return Status{SERVER_INVALID_ARGUMENT, - "The vector field is empty, Make sure you have entered vector records"}; - } + return Status::OK(); } diff --git a/proxy/src/server/delivery/request/ListIDInSegmentReq.cpp b/proxy/src/server/delivery/request/ListIDInSegmentReq.cpp index 59be58f3f..08c628280 100644 --- a/proxy/src/server/delivery/request/ListIDInSegmentReq.cpp +++ b/proxy/src/server/delivery/request/ListIDInSegmentReq.cpp @@ -42,12 +42,7 @@ ListIDInSegmentReq::Create(const ContextPtr& context, const std::string& collect Status ListIDInSegmentReq::OnExecute() { - try { - std::string hdr = - "ListIDInSegmentReq(collection=" + collection_name_ + " segment=" + std::to_string(segment_id_) + ")"; - TimeRecorderAuto rc(hdr); - bool exist = false; return Status::OK(); } diff --git a/proxy/src/server/delivery/request/LoadCollectionReq.cpp b/proxy/src/server/delivery/request/LoadCollectionReq.cpp index 843902f00..b6a247c85 100644 --- a/proxy/src/server/delivery/request/LoadCollectionReq.cpp +++ b/proxy/src/server/delivery/request/LoadCollectionReq.cpp @@ -31,10 +31,6 @@ LoadCollectionReq::Create(const ContextPtr& context, const std::string& collecti Status LoadCollectionReq::OnExecute() { - try { - std::string hdr = "LoadCollectionReq(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - return Status::OK(); } diff --git a/proxy/src/server/delivery/request/SearchReq.cpp b/proxy/src/server/delivery/request/SearchReq.cpp index 92142418c..58dad3df7 100644 --- a/proxy/src/server/delivery/request/SearchReq.cpp +++ b/proxy/src/server/delivery/request/SearchReq.cpp @@ -29,28 +29,22 @@ namespace milvus { namespace server { SearchReq::SearchReq(const ContextPtr& context, const query::QueryPtr& query_ptr, const milvus::json& json_params, - engine::snapshot::FieldElementMappings& field_mappings, engine::QueryResultPtr& result) + engine::QueryResultPtr& result) : BaseReq(context, ReqType::kSearch), query_ptr_(query_ptr), json_params_(json_params), - field_mappings_(field_mappings), result_(result) { } BaseReqPtr SearchReq::Create(const ContextPtr& context, const query::QueryPtr& query_ptr, const milvus::json& json_params, - engine::snapshot::FieldElementMappings& field_mappings, engine::QueryResultPtr& result) { - return std::shared_ptr(new SearchReq(context, query_ptr, json_params, field_mappings, result)); + engine::QueryResultPtr& result) { + return std::shared_ptr(new SearchReq(context, query_ptr, json_params, result)); } Status SearchReq::OnExecute() { - try { - std::string hdr = "SearchReq(table=" + query_ptr_->collection_id; - TimeRecorder rc(hdr); - STATUS_CHECK(ValidateCollectionName(query_ptr_->collection_id)); - STATUS_CHECK(ValidatePartitionTags(query_ptr_->partitions)); return Status::OK(); } diff --git a/proxy/src/server/delivery/request/SearchReq.h b/proxy/src/server/delivery/request/SearchReq.h index ac674a8bd..e31eb2f24 100644 --- a/proxy/src/server/delivery/request/SearchReq.h +++ b/proxy/src/server/delivery/request/SearchReq.h @@ -26,11 +26,11 @@ class SearchReq : public BaseReq { public: static BaseReqPtr Create(const ContextPtr& context, const query::QueryPtr& query_ptr, const milvus::json& json_params, - engine::snapshot::FieldElementMappings& collection_mappings, engine::QueryResultPtr& result); + engine::QueryResultPtr& result); protected: SearchReq(const ContextPtr& context, const query::QueryPtr& query_ptr, const milvus::json& json_params, - engine::snapshot::FieldElementMappings& collection_mappings, engine::QueryResultPtr& result); + engine::QueryResultPtr& result); Status OnExecute() override; @@ -38,7 +38,6 @@ class SearchReq : public BaseReq { private: milvus::query::QueryPtr query_ptr_; milvus::json json_params_; - engine::snapshot::FieldElementMappings& field_mappings_; engine::QueryResultPtr& result_; }; diff --git a/proxy/src/server/delivery/request/Types.h b/proxy/src/server/delivery/request/Types.h index 82f4a8753..bf6e0cd33 100644 --- a/proxy/src/server/delivery/request/Types.h +++ b/proxy/src/server/delivery/request/Types.h @@ -11,7 +11,7 @@ #pragma once -// #include "db/Types.h" +#include "db/Types.h" #include "grpc/gen-milvus/milvus.grpc.pb.h" #include "grpc/gen-status/status.grpc.pb.h" #include "grpc/gen-status/status.pb.h" diff --git a/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp b/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp index 8614b9628..aac83006e 100644 --- a/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -123,125 +123,9 @@ CopyVectorData(const google::protobuf::RepeatedPtrField<::milvus::grpc::VectorRo } } -void -CopyRowRecords(const google::protobuf::RepeatedPtrField<::milvus::grpc::VectorRowRecord>& grpc_records, - const google::protobuf::RepeatedField& grpc_id_array, - engine::VectorsData& vectors) { - // step 1: copy vector data - int64_t float_data_size = 0, binary_data_size = 0; - for (auto& record : grpc_records) { - float_data_size += record.float_data_size(); - binary_data_size += record.binary_data().size(); - } - - std::vector float_array(float_data_size, 0.0f); - std::vector binary_array(binary_data_size, 0); - int64_t offset = 0; - if (float_data_size > 0) { - for (auto& record : grpc_records) { - memcpy(&float_array[offset], record.float_data().data(), record.float_data_size() * sizeof(float)); - offset += record.float_data_size(); - } - } else if (binary_data_size > 0) { - for (auto& record : grpc_records) { - memcpy(&binary_array[offset], record.binary_data().data(), record.binary_data().size()); - offset += record.binary_data().size(); - } - } - - // step 2: copy id array - std::vector id_array; - if (grpc_id_array.size() > 0) { - id_array.resize(grpc_id_array.size()); - memcpy(id_array.data(), grpc_id_array.data(), grpc_id_array.size() * sizeof(int64_t)); - } - - // step 3: contruct vectors - vectors.vector_count_ = grpc_records.size(); - vectors.float_data_.swap(float_array); - vectors.binary_data_.swap(binary_array); - vectors.id_array_.swap(id_array); -} - void DeSerialization(const ::milvus::grpc::GeneralQuery& general_query, query::BooleanQueryPtr& boolean_clause, query::QueryPtr& query_ptr) { - if (general_query.has_boolean_query()) { - boolean_clause->SetOccur((query::Occur)general_query.boolean_query().occur()); - for (uint64_t i = 0; i < general_query.boolean_query().general_query_size(); ++i) { - if (general_query.boolean_query().general_query(i).has_boolean_query()) { - query::BooleanQueryPtr query = std::make_shared(); - DeSerialization(general_query.boolean_query().general_query(i), query, query_ptr); - boolean_clause->AddBooleanQuery(query); - } else { - auto leaf_query = std::make_shared(); - auto query = general_query.boolean_query().general_query(i); - // if (query.has_term_query()) { - // query::TermQueryPtr term_query = std::make_shared(); - // term_query->field_name = query.term_query().field_name(); - // term_query->boost = query.term_query().boost(); - // size_t int_size = query.term_query().int_value_size(); - // size_t double_size = query.term_query().double_value_size(); - // if (int_size > 0) { - // term_query->field_value.resize(int_size * sizeof(int64_t)); - // memcpy(term_query->field_value.data(), query.term_query().int_value().data(), - // int_size * sizeof(int64_t)); - // } else if (double_size > 0) { - // term_query->field_value.resize(double_size * sizeof(double)); - // memcpy(term_query->field_value.data(), - // query.term_query().double_value().data(), - // double_size * sizeof(double)); - // } - // leaf_query->term_query = term_query; - // boolean_clause->AddLeafQuery(leaf_query); - // } - // if (query.has_range_query()) { - // query::RangeQueryPtr range_query = std::make_shared(); - // range_query->field_name = query.range_query().field_name(); - // range_query->boost = query.range_query().boost(); - // range_query->compare_expr.resize(query.range_query().operand_size()); - // for (uint64_t j = 0; j < query.range_query().operand_size(); ++j) { - // range_query->compare_expr[j].compare_operator = - // query::CompareOperator(query.range_query().operand(j).operator_()); - // range_query->compare_expr[j].operand = - // query.range_query().operand(j).operand(); - // } - // leaf_query->range_query = range_query; - // boolean_clause->AddLeafQuery(leaf_query); - // } - if (query.has_vector_query()) { - query::VectorQueryPtr vector_query = std::make_shared(); - - engine::VectorsData vectors; - CopyRowRecords(query.vector_query().records(), - google::protobuf::RepeatedField(), vectors); - - vector_query->query_vector.float_data = vectors.float_data_; - vector_query->query_vector.binary_data = vectors.binary_data_; - - vector_query->boost = query.vector_query().query_boost(); - vector_query->field_name = query.vector_query().field_name(); - vector_query->topk = query.vector_query().topk(); - - milvus::json json_params; - for (int j = 0; j < query.vector_query().extra_params_size(); j++) { - const ::milvus::grpc::KeyValuePair& extra = query.vector_query().extra_params(j); - if (extra.key() == EXTRA_PARAM_KEY) { - json_params = json::parse(extra.value()); - } - } - vector_query->extra_params = json_params; - - // TODO(yukun): remove hardcode here - std::string vector_placeholder = "placeholder_1"; - query_ptr->vectors.insert(std::make_pair(vector_placeholder, vector_query)); - - leaf_query->vector_placeholder = vector_placeholder; - boolean_clause->AddLeafQuery(leaf_query); - } - } - } - } } void @@ -261,264 +145,6 @@ ConstructResults(const TopKQueryResult& result, ::milvus::grpc::QueryResult* res result.distance_list_.size() * sizeof(float)); } -void -CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk, - const engine::snapshot::FieldElementMappings& field_mappings, int64_t id_size, - ::milvus::grpc::Entities* response) { - for (const auto& it : field_mappings) { - auto type = it.first->GetFtype(); - std::string name = it.first->GetName(); - - // judge whether data exists - engine::BinaryDataPtr data = data_chunk->fixed_fields_[name]; - if (data == nullptr || data->data_.empty()) - continue; - - auto single_size = (id_size != 0) ? (data->data_.size() / id_size) : 0; - - auto field_value = response->add_fields(); - auto vector_record = field_value->mutable_vector_record(); - - field_value->set_field_name(name); - field_value->set_type(static_cast(type)); - // general data - if (type == engine::DataType::VECTOR_BINARY) { - // add binary vector data - std::vector binary_vector; - auto vector_size = single_size * sizeof(int8_t) / sizeof(int8_t); - binary_vector.resize(vector_size); - for (int i = 0; i < id_size; i++) { - auto vector_row_record = vector_record->add_records(); - auto offset = i * single_size; - memcpy(binary_vector.data(), data->data_.data() + offset, single_size); - vector_row_record->mutable_binary_data()->resize(binary_vector.size()); - memcpy(vector_row_record->mutable_binary_data()->data(), binary_vector.data(), binary_vector.size()); - } - - } else if (type == engine::DataType::VECTOR_FLOAT) { - // add float vector data - std::vector float_vector; - auto vector_size = single_size * sizeof(int8_t) / sizeof(float); - float_vector.resize(vector_size); - for (int i = 0; i < id_size; i++) { - auto vector_row_record = vector_record->add_records(); - auto offset = i * single_size; - memcpy(float_vector.data(), data->data_.data() + offset, single_size); - vector_row_record->mutable_float_data()->Resize(vector_size, 0.0); - memcpy(vector_row_record->mutable_float_data()->mutable_data(), float_vector.data(), - float_vector.size() * sizeof(float)); - } - } else { - // add attribute data - auto attr_record = field_value->mutable_attr_record(); - if (type == engine::DataType::INT32) { - // add int32 data - int32_t int32_value; - for (int i = 0; i < id_size; i++) { - auto offset = i * single_size; - memcpy(&int32_value, data->data_.data() + offset, single_size); - attr_record->add_int32_value(int32_value); - } - } else if (type == engine::DataType::INT64) { - // add int64 data - int64_t int64_value; - for (int i = 0; i < id_size; i++) { - auto offset = i * single_size; - memcpy(&int64_value, data->data_.data() + offset, single_size); - attr_record->add_int64_value(int64_value); - } - } else if (type == engine::DataType::DOUBLE) { - // add double data - double double_value; - for (int i = 0; i < id_size; i++) { - auto offset = i * single_size; - memcpy(&double_value, data->data_.data() + offset, single_size); - attr_record->add_double_value(double_value); - } - } else if (type == engine::DataType::FLOAT) { - // add float data - float float_value; - for (int i = 0; i < id_size; i++) { - auto offset = i * single_size; - memcpy(&float_value, data->data_.data() + offset, single_size); - attr_record->add_float_value(float_value); - } - } - } - } -} - -void -ConstructEntityResults(const std::vector& attrs, const std::vector& vectors, - std::vector& field_names, ::milvus::grpc::Entities* response) { - if (!response) { - return; - } - - auto id_size = vectors.size(); - std::vector id_array(id_size); - for (int64_t i = 0; i < id_size; i++) { - id_array[i] = vectors[i].id_array_[0]; - } - response->mutable_ids()->Resize(static_cast(id_size), 0); - memcpy(response->mutable_ids()->mutable_data(), id_array.data(), id_size * sizeof(int64_t)); - - std::string vector_field_name; - bool set_valid_row = false; - for (auto field_name : field_names) { - if (!attrs.empty()) { - if (attrs[0].attr_type_.find(field_name) != attrs[0].attr_type_.end()) { - auto grpc_field = response->add_fields(); - grpc_field->set_field_name(field_name); - grpc_field->set_type((::milvus::grpc::DataType)attrs[0].attr_type_.at(field_name)); - auto grpc_attr_data = grpc_field->mutable_attr_record(); - - std::vector int32_data; - std::vector int64_data; - std::vector float_data; - std::vector double_data; - for (auto& attr : attrs) { - if (not set_valid_row) { - if (!attr.id_array_.empty()) { - response->add_valid_row(true); - } else { - response->add_valid_row(false); - continue; - } - } - - if (attr.attr_data_.find(field_name) == attr.attr_data_.end()) { - continue; - } - auto attr_data = attr.attr_data_.at(field_name); - int32_t grpc_int32_data; - int64_t grpc_int64_data; - float grpc_float_data; - double grpc_double_data; - switch (attr.attr_type_.at(field_name)) { - case engine::DataType::INT8: { - if (attr_data.size() == sizeof(int8_t)) { - grpc_int32_data = attr_data[0]; - int32_data.emplace_back(grpc_int32_data); - } else { - response->mutable_status()->set_error_code(::milvus::grpc::ErrorCode::UNEXPECTED_ERROR); - return; - } - break; - } - case engine::DataType::INT16: { - if (attr_data.size() == sizeof(int16_t)) { - int16_t value; - memcpy(&value, attr_data.data(), sizeof(int16_t)); - grpc_int32_data = value; - int32_data.emplace_back(grpc_int32_data); - } else { - response->mutable_status()->set_error_code(::milvus::grpc::ErrorCode::UNEXPECTED_ERROR); - return; - } - break; - } - case engine::DataType::INT32: { - if (attr_data.size() == sizeof(int32_t)) { - memcpy(&grpc_int32_data, attr_data.data(), sizeof(int32_t)); - int32_data.emplace_back(grpc_int32_data); - } else { - response->mutable_status()->set_error_code(::milvus::grpc::ErrorCode::UNEXPECTED_ERROR); - return; - } - break; - } - case engine::DataType::INT64: { - if (attr_data.size() == sizeof(int64_t)) { - memcpy(&grpc_int64_data, attr_data.data(), sizeof(int64_t)); - int64_data.emplace_back(grpc_int64_data); - } else { - response->mutable_status()->set_error_code(::milvus::grpc::ErrorCode::UNEXPECTED_ERROR); - return; - } - break; - } - case engine::DataType::FLOAT: { - if (attr_data.size() == sizeof(float)) { - float value; - memcpy(&value, attr_data.data(), sizeof(float)); - grpc_float_data = value; - float_data.emplace_back(grpc_float_data); - } else { - response->mutable_status()->set_error_code(::milvus::grpc::ErrorCode::UNEXPECTED_ERROR); - return; - } - break; - } - case engine::DataType::DOUBLE: { - if (attr_data.size() == sizeof(double)) { - memcpy(&grpc_double_data, attr_data.data(), sizeof(double)); - double_data.emplace_back(grpc_double_data); - } else { - response->mutable_status()->set_error_code(::milvus::grpc::ErrorCode::UNEXPECTED_ERROR); - return; - } - break; - } - default: { break; } - } - } - if (!int32_data.empty()) { - grpc_attr_data->mutable_int32_value()->Resize(static_cast(int32_data.size()), 0); - memcpy(grpc_attr_data->mutable_int32_value()->mutable_data(), int32_data.data(), - int32_data.size() * sizeof(int32_t)); - } else if (!int64_data.empty()) { - grpc_attr_data->mutable_int64_value()->Resize(static_cast(int64_data.size()), 0); - memcpy(grpc_attr_data->mutable_int64_value()->mutable_data(), int64_data.data(), - int64_data.size() * sizeof(int64_t)); - } else if (!float_data.empty()) { - grpc_attr_data->mutable_float_value()->Resize(static_cast(float_data.size()), 0.0); - memcpy(grpc_attr_data->mutable_float_value()->mutable_data(), float_data.data(), - float_data.size() * sizeof(float)); - } else if (!double_data.empty()) { - grpc_attr_data->mutable_double_value()->Resize(static_cast(double_data.size()), 0.0); - memcpy(grpc_attr_data->mutable_double_value()->mutable_data(), double_data.data(), - double_data.size() * sizeof(double)); - } - set_valid_row = true; - } else { - vector_field_name = field_name; - } - } - } - - // TODO(yukun): valid_row not used in vector records serialize - if (!vector_field_name.empty()) { - auto grpc_field = response->add_fields(); - grpc_field->set_field_name(vector_field_name); - ::milvus::grpc::VectorRecord* grpc_vector_data = grpc_field->mutable_vector_record(); - for (auto& vector : vectors) { - auto grpc_data = grpc_vector_data->add_records(); - if (!vector.float_data_.empty()) { - if (not set_valid_row) { - response->add_valid_row(true); - } - grpc_field->set_type(::milvus::grpc::DataType::VECTOR_FLOAT); - grpc_data->mutable_float_data()->Resize(vector.float_data_.size(), 0); - memcpy(grpc_data->mutable_float_data()->mutable_data(), vector.float_data_.data(), - vector.float_data_.size() * sizeof(float)); - } else if (!vector.binary_data_.empty()) { - if (not set_valid_row) { - response->add_valid_row(true); - } - grpc_field->set_type(::milvus::grpc::DataType::VECTOR_BINARY); - grpc_data->mutable_binary_data()->resize(vector.binary_data_.size()); - memcpy(grpc_data->mutable_binary_data()->data(), vector.binary_data_.data(), - vector.binary_data_.size() * sizeof(uint8_t)); - } else { - if (not set_valid_row) { - response->add_valid_row(false); - } - } - } - } -} - class GrpcConnectionContext : public milvus::server::ConnectionContext { public: explicit GrpcConnectionContext(::grpc::ServerContext* context) : context_(context) { @@ -712,48 +338,6 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext* context, const ::mil CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - std::unordered_map fields; - - if (request->fields_size() > MAXIMUM_FIELD_NUM) { - Status status = Status{SERVER_INVALID_FIELD_NUM, "Maximum field's number should be limited to 64"}; - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); - return ::grpc::Status::OK; - } - - for (int i = 0; i < request->fields_size(); ++i) { - const auto& field = request->fields(i); - - FieldSchema field_schema; - field_schema.field_type_ = (engine::DataType)field.type(); - - // Currently only one extra_param - if (field.extra_params_size() != 0) { - if (!field.extra_params(0).value().empty()) { - field_schema.field_params_ = json::parse(field.extra_params(0).value()); - } - } - - for (int j = 0; j < field.index_params_size(); j++) { - field_schema.index_params_[field.index_params(j).key()] = field.index_params(j).value(); - } - - fields[field.name()] = field_schema; - } - - milvus::json json_params; - for (int i = 0; i < request->extra_params_size(); i++) { - const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i); - if (extra.key() == EXTRA_PARAM_KEY) { - json_params = json::parse(extra.value()); - } - } - - Status status = req_handler_.CreateCollection(GetContext(context), request->collection_name(), fields, json_params); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context) - return ::grpc::Status::OK; } @@ -853,43 +437,6 @@ GrpcRequestHandler::GetEntityByID(::grpc::ServerContext* context, const ::milvus ::milvus::grpc::Entities* response) { CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - engine::IDNumbers vector_ids; - vector_ids.reserve(request->id_array_size()); - for (int i = 0; i < request->id_array_size(); i++) { - vector_ids.push_back(request->id_array(i)); - } - - std::vector field_names(request->field_names_size()); - for (int i = 0; i < request->field_names_size(); i++) { - field_names[i] = request->field_names(i); - } - - engine::DataChunkPtr data_chunk; - engine::snapshot::FieldElementMappings field_mappings; - - std::vector valid_row; - - Status status = req_handler_.GetEntityByID(GetContext(context), request->collection_name(), vector_ids, field_names, - valid_row, field_mappings, data_chunk); - - for (auto it : vector_ids) { - response->add_ids(it); - } - - int valid_size = 0; - for (auto it : valid_row) { - response->add_valid_row(it); - if (it) { - valid_size++; - } - } - - CopyDataChunkToEntity(data_chunk, field_mappings, valid_size, response); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; } @@ -899,106 +446,12 @@ GrpcRequestHandler::GetEntityIDs(::grpc::ServerContext* context, const ::milvus: CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - std::vector vector_ids; - Status status = req_handler_.ListIDInSegment(GetContext(context), request->collection_name(), request->segment_id(), - vector_ids); - - if (!vector_ids.empty()) { - response->mutable_entity_id_array()->Resize(vector_ids.size(), -1); - memcpy(response->mutable_entity_id_array()->mutable_data(), vector_ids.data(), - vector_ids.size() * sizeof(int64_t)); - } - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; } -//::grpc::Status -// GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request, -// ::milvus::grpc::QueryResult* response) { -// CHECK_NULLPTR_RETURN(request); -// LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); -// -// // step 1: copy vector data -// engine::VectorsData vectors; -// CopyRowRecords(request->query_record_array(), google::protobuf::RepeatedField(), -// vectors); -// -// // step 2: partition tags -// std::vector partitions; -// std::copy(request->partition_tag_array().begin(), request->partition_tag_array().end(), -// std::back_inserter(partitions)); -// -// // step 3: parse extra parameters -// milvus::json json_params; -// for (int i = 0; i < request->extra_params_size(); i++) { -// const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i); -// if (extra.key() == EXTRA_PARAM_KEY) { -// json_params = json::parse(extra.value()); -// } -// } -// -// // step 4: search vectors -// std::vector file_ids; -// TopKQueryResult result; -// fiu_do_on("GrpcRequestHandler.Search.not_empty_file_ids", file_ids.emplace_back("test_file_id")); -// -// Status status = req_handler_.Search(GetContext(context), request->collection_name(), vectors, request->topk(), -// json_params, partitions, file_ids, result); -// -// // step 5: construct and return result -// ConstructResults(result, response); -// -// LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); -// SET_RESPONSE(response->mutable_status(), status, context); -// -// return ::grpc::Status::OK; -//} - ::grpc::Status GrpcRequestHandler::SearchInSegment(::grpc::ServerContext* context, const ::milvus::grpc::SearchInSegmentParam* request, ::milvus::grpc::QueryResult* response) { - // CHECK_NULLPTR_RETURN(request); - // LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - // - // auto* search_request = &request->search_param(); - // - // // step 1: copy vector data - // engine::VectorsData vectors; - // CopyRowRecords(search_request->query_record_array(), - // google::protobuf::RepeatedField(), - // vectors); - // - // // step 2: copy file id array - // std::vector file_ids; - // std::copy(request->file_id_array().begin(), request->file_id_array().end(), std::back_inserter(file_ids)); - // - // // step 3: partition tags - // std::vector partitions; - // std::copy(search_request->partition_tag_array().begin(), search_request->partition_tag_array().end(), - // std::back_inserter(partitions)); - // - // // step 4: parse extra parameters - // milvus::json json_params; - // for (int i = 0; i < search_request->extra_params_size(); i++) { - // const ::milvus::grpc::KeyValuePair& extra = search_request->extra_params(i); - // if (extra.key() == EXTRA_PARAM_KEY) { - // json_params = json::parse(extra.value()); - // } - // } - // - // // step 5: search vectors - // TopKQueryResult result; - // Status status = req_handler_.Search(GetContext(context), search_request->collection_name(), vectors, - // search_request->topk(), json_params, partitions, file_ids, result); - // - // // step 6: construct and return result - // ConstructResults(result, response); - // - // LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - // SET_RESPONSE(response->mutable_status(), status, context); return ::grpc::Status::OK; } @@ -1147,18 +600,6 @@ GrpcRequestHandler::DeleteByID(::grpc::ServerContext* context, const ::milvus::g CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - // step 1: prepare id array - engine::IDNumbers ids; - for (int i = 0; i < request->id_array_size(); i++) { - ids.push_back(request->id_array(i)); - } - - // step 2: delete vector - Status status = req_handler_.DeleteEntityByID(GetContext(context), request->collection_name(), ids); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); - return ::grpc::Status::OK; } @@ -1277,279 +718,22 @@ GrpcRequestHandler::Compact(::grpc::ServerContext* context, const ::milvus::grpc ::grpc::Status GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc::InsertParam* request, ::milvus::grpc::EntityIds* response) { - // engine::VectorsData vectors; - // CopyRowRecords(request->entity().vector_data(0).value(), request->entity_id_array(), vectors); + CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - auto field_size = request->fields_size(); - - std::unordered_map> chunk_data; - - auto valid_row_count = [&](int32_t& base, int32_t test) -> bool { - if (base < 0) { - base = test; - if (request->entity_id_array_size() > 0 && base != request->entity_id_array_size()) { - auto status = Status{SERVER_INVALID_ROWRECORD_ARRAY, "ID size not matches entity size"}; - SET_RESPONSE(response->mutable_status(), status, context); - return false; - } - } else if (base != test) { - auto status = Status{SERVER_INVALID_ROWRECORD_ARRAY, "Field row count inconsist"}; - SET_RESPONSE(response->mutable_status(), status, context); - return false; - } - return true; - }; - - // copy field data - int32_t row_num = -1; - for (int i = 0; i < field_size; i++) { - auto grpc_int32_size = request->fields(i).attr_record().int32_value_size(); - auto grpc_int64_size = request->fields(i).attr_record().int64_value_size(); - auto grpc_float_size = request->fields(i).attr_record().float_value_size(); - auto grpc_double_size = request->fields(i).attr_record().double_value_size(); - const auto& field = request->fields(i); - auto field_name = field.field_name(); - - std::vector temp_data; - if (grpc_int32_size > 0) { - if (!valid_row_count(row_num, grpc_int32_size)) { - return ::grpc::Status::OK; - } - temp_data.resize(grpc_int32_size * sizeof(int32_t)); - memcpy(temp_data.data(), field.attr_record().int32_value().data(), grpc_int32_size * sizeof(int32_t)); - } else if (grpc_int64_size > 0) { - if (!valid_row_count(row_num, grpc_int64_size)) { - return ::grpc::Status::OK; - } - temp_data.resize(grpc_int64_size * sizeof(int64_t)); - memcpy(temp_data.data(), field.attr_record().int64_value().data(), grpc_int64_size * sizeof(int64_t)); - } else if (grpc_float_size > 0) { - if (!valid_row_count(row_num, grpc_float_size)) { - return ::grpc::Status::OK; - } - temp_data.resize(grpc_float_size * sizeof(float)); - memcpy(temp_data.data(), field.attr_record().float_value().data(), grpc_float_size * sizeof(float)); - } else if (grpc_double_size > 0) { - if (!valid_row_count(row_num, grpc_double_size)) { - return ::grpc::Status::OK; - } - temp_data.resize(grpc_double_size * sizeof(double)); - memcpy(temp_data.data(), field.attr_record().double_value().data(), grpc_double_size * sizeof(double)); - } else { - if (!valid_row_count(row_num, field.vector_record().records_size())) { - return ::grpc::Status::OK; - } - CopyVectorData(field.vector_record().records(), temp_data); - } - - chunk_data.insert(std::make_pair(field_name, temp_data)); - } - - // copy id array - if (request->entity_id_array_size() > 0) { - int64_t size = request->entity_id_array_size() * sizeof(int64_t); - std::vector temp_data(size, 0); - memcpy(temp_data.data(), request->entity_id_array().data(), size); - chunk_data.insert(std::make_pair(engine::FIELD_UID, temp_data)); - } - - std::string collection_name = request->collection_name(); - std::string partition_name = request->partition_tag(); - Status status = req_handler_.Insert(GetContext(context), collection_name, partition_name, row_num, chunk_data); - if (!status.ok()) { - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; - } - - // return generated ids - auto pair = chunk_data.find(engine::FIELD_UID); - if (pair != chunk_data.end()) { - response->mutable_entity_id_array()->Resize(static_cast(pair->second.size() / sizeof(int64_t)), 0); - memcpy(response->mutable_entity_id_array()->mutable_data(), pair->second.data(), pair->second.size()); - } - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; + } ::grpc::Status GrpcRequestHandler::SearchPB(::grpc::ServerContext* context, const ::milvus::grpc::SearchParamPB* request, ::milvus::grpc::QueryResult* response) { CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - auto boolean_query = std::make_shared(); - auto query_ptr = std::make_shared(); - DeSerialization(request->general_query(), boolean_query, query_ptr); - - auto general_query = std::make_shared(); - query::GenBinaryQuery(boolean_query, general_query->bin); - - Status status; - - if (!query::ValidateBinaryQuery(general_query->bin)) { - status = Status{SERVER_INVALID_BINARY_QUERY, "Generate wrong binary query tree"}; - SET_RESPONSE(response->mutable_status(), status, context) - return ::grpc::Status::OK; - } - - std::vector partition_list; - partition_list.resize(request->partition_tag_array_size()); - for (uint64_t i = 0; i < request->partition_tag_array_size(); ++i) { - partition_list[i] = request->partition_tag_array(i); - } - - milvus::json json_params; - for (int i = 0; i < request->extra_params_size(); i++) { - const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i); - if (extra.key() == EXTRA_PARAM_KEY) { - json_params = json::parse(extra.value()); - } - } - - engine::QueryResultPtr result = std::make_shared(); - std::vector field_names; - engine::snapshot::FieldElementMappings field_mappings; - status = req_handler_.Search(GetContext(context), query_ptr, json_params, field_mappings, result); - - // step 6: construct and return result - response->set_row_num(result->row_num_); - auto grpc_entity = response->mutable_entities(); - // ConstructEntityResults(result->attrs_, result->vectors_, field_names, grpc_entity); - grpc_entity->mutable_ids()->Resize(static_cast(result->result_ids_.size()), 0); - memcpy(grpc_entity->mutable_ids()->mutable_data(), result->result_ids_.data(), - result->result_ids_.size() * sizeof(int64_t)); - - response->mutable_distances()->Resize(static_cast(result->result_distances_.size()), 0.0); - memcpy(response->mutable_distances()->mutable_data(), result->result_distances_.data(), - result->result_distances_.size() * sizeof(float)); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); return ::grpc::Status::OK; } -#if 0 -Status -ParseTermQuery(const nlohmann::json& term_json, - std::unordered_map field_type, - query::TermQueryPtr& term_query) { - std::string field_name = term_json["field"].get(); - auto term_value_json = term_json["values"]; - if (!term_value_json.is_array()) { - std::string msg = "Term json string is not an array"; - return Status{SERVER_INVALID_DSL_PARAMETER, msg}; - } - - auto term_size = term_value_json.size(); - term_query->field_name = field_name; - term_query->field_value.resize(term_size * sizeof(int64_t)); - - switch (field_type.at(field_name)) { - case engine::DataType::INT8: { - std::vector term_value(term_size, 0); - for (uint64_t i = 0; i < term_size; i++) { - term_value[i] = term_value_json[i].get(); - } - memcpy(term_query->field_value.data(), term_value.data(), term_size * sizeof(int64_t)); - break; - } - case engine::DataType::INT16: { - std::vector term_value(term_size, 0); - for (uint64_t i = 0; i < term_size; i++) { - term_value[i] = term_value_json[i].get(); - } - memcpy(term_query->field_value.data(), term_value.data(), term_size * sizeof(int64_t)); - break; - } - case engine::DataType::INT32: { - std::vector term_value(term_size, 0); - for (uint64_t i = 0; i < term_size; i++) { - term_value[i] = term_value_json[i].get(); - } - memcpy(term_query->field_value.data(), term_value.data(), term_size * sizeof(int64_t)); - break; - } - case engine::DataType::INT64: { - std::vector term_value(term_size, 0); - for (uint64_t i = 0; i < term_size; ++i) { - term_value[i] = term_value_json[i].get(); - } - memcpy(term_query->field_value.data(), term_value.data(), term_size * sizeof(int64_t)); - break; - } - case engine::DataType::FLOAT: { - std::vector term_value(term_size, 0); - for (uint64_t i = 0; i < term_size; ++i) { - term_value[i] = term_value_json[i].get(); - } - memcpy(term_query->field_value.data(), term_value.data(), term_size * sizeof(double)); - break; - } - case engine::DataType::DOUBLE: { - std::vector term_value(term_size, 0); - for (uint64_t i = 0; i < term_size; ++i) { - term_value[i] = term_value_json[i].get(); - } - memcpy(term_query->field_value.data(), term_value.data(), term_size * sizeof(double)); - break; - } - } - return Status::OK(); -} - -Status -ParseRangeQuery(const nlohmann::json& range_json, query::RangeQueryPtr& range_query) { - std::string field_name = range_json["field"]; - range_query->field_name = field_name; - - auto range_value_json = range_json["values"]; - if (range_value_json.contains("lt")) { - query::CompareExpr compare_expr; - compare_expr.compare_operator = query::CompareOperator::LT; - compare_expr.operand = range_value_json["lt"].get(); - range_query->compare_expr.emplace_back(compare_expr); - } - if (range_value_json.contains("lte")) { - query::CompareExpr compare_expr; - compare_expr.compare_operator = query::CompareOperator::LTE; - compare_expr.operand = range_value_json["lte"].get(); - range_query->compare_expr.emplace_back(compare_expr); - } - if (range_value_json.contains("eq")) { - query::CompareExpr compare_expr; - compare_expr.compare_operator = query::CompareOperator::EQ; - compare_expr.operand = range_value_json["eq"].get(); - range_query->compare_expr.emplace_back(compare_expr); - } - if (range_value_json.contains("ne")) { - query::CompareExpr compare_expr; - compare_expr.compare_operator = query::CompareOperator::NE; - compare_expr.operand = range_value_json["ne"].get(); - range_query->compare_expr.emplace_back(compare_expr); - } - if (range_value_json.contains("gt")) { - query::CompareExpr compare_expr; - compare_expr.compare_operator = query::CompareOperator::GT; - compare_expr.operand = range_value_json["gt"].get(); - range_query->compare_expr.emplace_back(compare_expr); - } - if (range_value_json.contains("gte")) { - query::CompareExpr compare_expr; - compare_expr.compare_operator = query::CompareOperator::GTE; - compare_expr.operand = range_value_json["gte"].get(); - range_query->compare_expr.emplace_back(compare_expr); - } - return Status::OK(); -} -#endif - Status GrpcRequestHandler::ProcessLeafQueryJson(const nlohmann::json& json, query::BooleanQueryPtr& query, std::string& field_name) { @@ -1675,62 +859,7 @@ Status GrpcRequestHandler::DeserializeJsonToBoolQuery( const google::protobuf::RepeatedPtrField<::milvus::grpc::VectorParam>& vector_params, const std::string& dsl_string, query::BooleanQueryPtr& boolean_query, query::QueryPtr& query_ptr) { - try { - nlohmann::json dsl_json = json::parse(dsl_string); - - if (dsl_json.empty()) { - return Status{SERVER_INVALID_ARGUMENT, "Query dsl is null"}; - } - auto status = Status::OK(); - for (const auto& vector_param : vector_params) { - const std::string& vector_string = vector_param.json(); - nlohmann::json vector_json = json::parse(vector_string); - json::iterator it = vector_json.begin(); - std::string placeholder = it.key(); - - auto vector_query = std::make_shared(); - json::iterator vector_param_it = it.value().begin(); - if (vector_param_it != it.value().end()) { - const std::string& field_name = vector_param_it.key(); - vector_query->field_name = field_name; - nlohmann::json param_json = vector_param_it.value(); - int64_t topk = param_json["topk"]; - status = server::ValidateSearchTopk(topk); - if (!status.ok()) { - return status; - } - vector_query->topk = topk; - if (param_json.contains("metric_type")) { - std::string metric_type = param_json["metric_type"]; - vector_query->metric_type = metric_type; - query_ptr->metric_types.insert({field_name, param_json["metric_type"]}); - } - if (!vector_param_it.value()["params"].empty()) { - vector_query->extra_params = vector_param_it.value()["params"]; - } - query_ptr->index_fields.insert(field_name); - } - - engine::VectorsData vector_data; - CopyRowRecords(vector_param.row_record().records(), - google::protobuf::RepeatedField(), vector_data); - vector_query->query_vector.binary_data = vector_data.binary_data_; - vector_query->query_vector.float_data = vector_data.float_data_; - - query_ptr->vectors.insert(std::make_pair(placeholder, vector_query)); - } - if (dsl_json.contains("bool")) { - auto boolean_query_json = dsl_json["bool"]; - JSON_NULL_CHECK(boolean_query_json); - status = ProcessBooleanQueryJson(boolean_query_json, boolean_query, query_ptr); - if (!status.ok()) { - return status; - } - } - return status; - } catch (std::exception& e) { - return Status{SERVER_INVALID_DSL_PARAMETER, e.what()}; - } + return Status::OK(); } ::grpc::Status @@ -1739,87 +868,6 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc: CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - Status status; - - CollectionSchema collection_schema; - status = req_handler_.GetCollectionInfo(GetContext(context), request->collection_name(), collection_schema); - - auto grpc_entity = response->mutable_entities(); - if (!status.ok()) { - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; - } - - query::BooleanQueryPtr boolean_query = std::make_shared(); - query::QueryPtr query_ptr = std::make_shared(); - query_ptr->collection_id = request->collection_name(); - - status = DeserializeJsonToBoolQuery(request->vector_param(), request->dsl(), boolean_query, query_ptr); - if (!status.ok()) { - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; - } - - status = query::ValidateBooleanQuery(boolean_query); - if (!status.ok()) { - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; - } - - query::GeneralQueryPtr general_query = std::make_shared(); - query::GenBinaryQuery(boolean_query, general_query->bin); - query_ptr->root = general_query; - - if (!query::ValidateBinaryQuery(general_query->bin)) { - status = Status{SERVER_INVALID_BINARY_QUERY, "Generate wrong binary query tree"}; - SET_RESPONSE(grpc_entity->mutable_status(), status, context); - return ::grpc::Status::OK; - } - - std::vector partition_list; - partition_list.resize(request->partition_tag_array_size()); - for (int i = 0; i < request->partition_tag_array_size(); ++i) { - partition_list[i] = request->partition_tag_array(i); - } - - query_ptr->partitions = partition_list; - - milvus::json json_params; - for (int i = 0; i < request->extra_params_size(); i++) { - const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i); - if (extra.key() == EXTRA_PARAM_KEY) { - json_params = json::parse(extra.value()); - } - } - - engine::QueryResultPtr result = std::make_shared(); - engine::snapshot::FieldElementMappings field_mappings; - - status = req_handler_.Search(GetContext(context), query_ptr, json_params, field_mappings, result); - - if (!status.ok()) { - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; - } - - // step 6: construct and return result - response->set_row_num(result->row_num_); - int64_t id_size = result->result_ids_.size(); - grpc_entity->mutable_valid_row()->Resize(id_size, true); - - CopyDataChunkToEntity(result->data_chunk_, field_mappings, id_size, grpc_entity); - - grpc_entity->mutable_ids()->Resize(static_cast(result->result_ids_.size()), 0); - memcpy(grpc_entity->mutable_ids()->mutable_data(), result->result_ids_.data(), - result->result_ids_.size() * sizeof(int64_t)); - - response->mutable_distances()->Resize(static_cast(result->result_distances_.size()), 0.0); - memcpy(response->mutable_distances()->mutable_data(), result->result_distances_.data(), - result->result_distances_.size() * sizeof(float)); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; } diff --git a/proxy/src/version.h b/proxy/src/version.h index e315afe1f..ae9a0b734 100644 --- a/proxy/src/version.h +++ b/proxy/src/version.h @@ -11,5 +11,5 @@ #define MILVUS_VERSION "0.10.0" #define BUILD_TYPE "Release" -#define BUILD_TIME "2020-08-21 17:48.25" -#define LAST_COMMIT_ID "8c4a905ce247333b5950bd2f03cf103e34533db8" +#define BUILD_TIME "2020-08-24 14:23.26" +#define LAST_COMMIT_ID "83d51fc3196e51a3057e1fb716ee4f4be10ec0c1" diff --git a/writer/writer.go b/writer/writer.go index 3c532eefe..e69de29bb 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -1,54 +0,0 @@ -package src - -import ( - "context" - "github.com/czs007/suvlim/pulsar/schema" - "github.com/czs007/suvlim/storage/pkg" - "github.com/czs007/suvlim/storage/pkg/types" -) - -type write_node_time_sync struct { - delete_time_sync uint64 - insert_time_sync uint64 -} - -type write_node struct { - open_segment_id int64 - next_segment_id int64 - next_segment_start_time uint64 - stroe *types.Store - time_sync_table *write_node_time_sync -} - -func NewWriteNode(ctx context.Context, open_segment_id int64, time_sync uint64) (*write_node, error) { - ctx = context.Background() - tikv_store, err := storage.NewStore(ctx, "tikv") - write_table_time_sync := &write_node_time_sync{delete_time_sync: time_sync, insert_time_sync: time_sync} - if err != nil { - return nil, err - } - return &write_node{ - stroe: tikv_store, - time_sync_table: write_table_time_sync, - }, nil -} - -func (s *write_node) InsertBatchData(ctx context.Context, data []schema.InsertMsg, time_sync uint64) error { - return nil -} - -func (s *write_node) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, time_sync uint64) error { - return nil -} - -func (s *write_node) AddNewSegment(segment_id int64, open_time uint64) error { - return nil -} - -func (s *write_node) UpdateInsertTimeSync(time_sync uint64) { - s.time_sync_table.insert_time_sync = time_sync -} - -func (s *write_node) UpdateDeleteTimeSync(time_sync uint64) { - s.time_sync_table.delete_time_sync = time_sync -} -- GitLab