diff --git a/core/src/config/ConfigMgr.cpp b/core/src/config/ConfigMgr.cpp index 41a586e162e7a7b88624ca88e94acdb802ef3d35..f5533d14c35e5841f146e8c0bf06587e44293243 100644 --- a/core/src/config/ConfigMgr.cpp +++ b/core/src/config/ConfigMgr.cpp @@ -185,6 +185,12 @@ ConfigMgr::ConfigMgr() { {"system.lock.enable", CreateBoolConfig("system.lock.enable", false, &config.system.lock.enable.value, true, nullptr, nullptr)}, + + /* transcript */ + {"transcript.enable", + CreateBoolConfig("transcript.enable", false, &config.transcript.enable.value, false, nullptr, nullptr)}, + {"transcript.replay", + CreateStringConfig("transcript.replay", false, &config.transcript.replay.value, "", nullptr, nullptr)}, }; } diff --git a/core/src/config/ServerConfig.h b/core/src/config/ServerConfig.h index 78c9bf7732cad4ee4c75ed526372a16e333c8dcd..259ae219288ef01e63ae7f09782a3e26aa84ea02 100644 --- a/core/src/config/ServerConfig.h +++ b/core/src/config/ServerConfig.h @@ -165,6 +165,11 @@ struct ServerConfig { Bool enable{false}; } lock; } system; + + struct Transcript { + Bool enable{false}; + String replay{""}; + } transcript; }; extern ServerConfig config; diff --git a/core/src/db/Constants.h b/core/src/db/Constants.h index 13fe8c6a61b0f814affbdb0851ee4b233e711351..f0a3d861e7d4172ec964cb819197e8ec0cc8c46d 100644 --- a/core/src/db/Constants.h +++ b/core/src/db/Constants.h @@ -29,6 +29,7 @@ constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024; // max row count of constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 512 * 1024; // default row count per segment when creating collection constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB; // max data size in one insert action constexpr int64_t MAX_WAL_FILE_SIZE = 256 * MB; // max file size of wal file +constexpr int64_t MAX_SCRIPT_FILE_SIZE = 256 * MB; // max file size of transcript file constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3; // retry times if build index failed diff --git a/core/src/db/transcript/ScriptCodec.cpp b/core/src/db/transcript/ScriptCodec.cpp index ecb977fed469e8ca9531d930ab0013b71becec6d..a69514ad47eff221d292c120c8164bb292e5ce08 100644 --- a/core/src/db/transcript/ScriptCodec.cpp +++ b/core/src/db/transcript/ScriptCodec.cpp @@ -11,13 +11,588 @@ #include "db/transcript/ScriptCodec.h" +#include +#include +#include + namespace milvus { namespace engine { -ScriptCodec& -ScriptCodec::GetInstance() { - static ScriptCodec s_codec; - return s_codec; +// action names +const char* ActionCreateCollection = "CreateCollection"; +const char* ActionDropCollection = "DropCollection"; +const char* ActionHasCollection = "HasCollection"; +const char* ActionListCollections = "ListCollections"; +const char* ActionGetCollectionInfo = "GetCollectionInfo"; +const char* ActionGetCollectionStats = "GetCollectionStats"; +const char* ActionCountEntities = "CountEntities"; +const char* ActionCreatePartition = "CreatePartition"; +const char* ActionDropPartition = "DropPartition"; +const char* ActionHasPartition = "HasPartition"; +const char* ActionListPartitions = "ListPartitions"; +const char* ActionCreateIndex = "CreateIndex"; +const char* ActionDropIndex = "DropIndex"; +const char* ActionDescribeIndex = "DescribeIndex"; +const char* ActionInsert = "Insert"; +const char* ActionGetEntityByID = "GetEntityByID"; +const char* ActionDeleteEntityByID = "DeleteEntityByID"; +const char* ActionListIDInSegment = "ListIDInSegment"; +const char* ActionQuery = "Query"; +const char* ActionLoadCollection = "LoadCollection"; +const char* ActionFlush = "Flush"; +const char* ActionCompact = "Compact"; + +// json keys +const char* J_ACTION_TYPE = "action"; +const char* J_ACTION_TS = "time"; // action timestamp +const char* J_COLLECTION_NAME = "col_name"; +const char* J_PARTITION_NAME = "part_name"; +const char* J_FIELD_NAME = "field_name"; +const char* J_FIELD_NAMES = "field_names"; +const char* J_FIELD_TYPE = "field_type"; +const char* J_MAPPINGS = "mappings"; +const char* J_PARAMS = "params"; +const char* J_ID_ARRAY = "id_array"; +const char* J_SEGMENT_ID = "segment_id"; +const char* J_THRESHOLD = "threshold"; +const char* J_FORCE = "force"; +const char* J_INDEX_NAME = "index_name"; +const char* J_INDEX_TYPE = "index_type"; +const char* J_METRIC_TYPE = "metric_type"; +const char* J_CHUNK_COUNT = "count"; +const char* J_FIXED_FIELDS = "fixed_fields"; +const char* J_VARIABLE_FIELDS = "variable_fields"; +const char* J_CHUNK_DATA = "data"; +const char* J_CHUNK_OFFSETS = "offsets"; + +const char* J_PARTITIONS = "partitions"; +const char* J_FIELDS = "fields"; +const char* J_METRIC_TYPES = "metric_types"; +const char* J_KEY = "key"; +const char* J_VECTOR_QUERIES = "vector_queries"; +const char* J_TOPK = "topk"; +const char* J_NQ = "nq"; +const char* J_BOOST = "boost"; +const char* J_FLOAT_DATA = "float_data"; +const char* J_BIN_DATA = "bin_data"; +const char* J_GENERAL_QUERY = "general_query"; +const char* J_QUERY_LEAF = "leaf"; +const char* J_QUERY_BIN = "bin"; +const char* J_QUERY_RELATION = "relation"; +const char* J_QUERY_LEFT = "left"; +const char* J_QUERY_RIGHT = "right"; +const char* J_QUERY_TERM = "term"; +const char* J_QUERY_RANGE = "range"; +const char* J_QUERY_PLACEHOLDER = "placeholder"; + +// encode methods +Status +ScriptCodec::EncodeAction(milvus::json& json_obj, const std::string& action_type) { + json_obj[J_ACTION_TYPE] = action_type; + json_obj[J_ACTION_TS] = utils::GetMicroSecTimeStamp(); + return Status::OK(); +} + +Status +ScriptCodec::Encode(milvus::json& json_obj, const snapshot::CreateCollectionContext& context) { + if (context.collection == nullptr) { + return Status::OK(); + } + + json_obj[J_COLLECTION_NAME] = context.collection->GetName(); + + // params + json_obj[J_PARAMS] = context.collection->GetParams(); + + // mappings + milvus::json json_fields; + for (const auto& pair : context.fields_schema) { + auto& field = pair.first; + milvus::json json_field; + json_field[J_FIELD_NAME] = field->GetName(); + json_field[J_FIELD_TYPE] = field->GetFtype(); + json_field[J_PARAMS] = field->GetParams(); + json_fields.push_back(json_field); + } + json_obj[J_MAPPINGS] = json_fields; + + return Status::OK(); +} + +Status +ScriptCodec::EncodeCollectionName(milvus::json& json_obj, const std::string& collection_name) { + json_obj[J_COLLECTION_NAME] = collection_name; + return Status::OK(); +} + +Status +ScriptCodec::EncodePartitionName(milvus::json& json_obj, const std::string& partition_name) { + json_obj[J_PARTITION_NAME] = partition_name; + return Status::OK(); +} + +Status +ScriptCodec::EncodeFieldName(milvus::json& json_obj, const std::string& field_name) { + json_obj[J_FIELD_NAME] = field_name; + return Status::OK(); +} + +Status +ScriptCodec::EncodeFieldNames(milvus::json& json_obj, const std::vector& field_names) { + json_obj[J_FIELD_NAMES] = field_names; + return Status::OK(); +} + +Status +ScriptCodec::Encode(milvus::json& json_obj, const CollectionIndex& index) { + json_obj[J_INDEX_NAME] = index.index_name_; + json_obj[J_INDEX_TYPE] = index.index_type_; + json_obj[J_METRIC_TYPE] = index.metric_name_; + json_obj[J_PARAMS] = index.extra_params_; + + return Status::OK(); +} + +Status +ScriptCodec::Encode(milvus::json& json_obj, const DataChunkPtr& data_chunk) { + if (data_chunk == nullptr) { + return Status::OK(); + } + + json_obj[J_CHUNK_COUNT] = data_chunk->count_; + + // fixed fields + { + milvus::json json_fields; + for (const auto& pair : data_chunk->fixed_fields_) { + auto& data = pair.second; + if (data == nullptr) { + continue; + } + + milvus::json json_field; + json_field[J_FIELD_NAME] = pair.first; + json_field[J_CHUNK_DATA] = data->data_; + json_fields.push_back(json_field); + } + json_obj[J_FIXED_FIELDS] = json_fields; + } + + // variable fields + { + milvus::json json_fields; + for (const auto& pair : data_chunk->variable_fields_) { + auto& data = pair.second; + if (data == nullptr) { + continue; + } + + milvus::json json_field; + json_field[J_FIELD_NAME] = pair.first; + json_field[J_CHUNK_DATA] = data->data_; + json_field[J_CHUNK_OFFSETS] = data->offset_; + + json_fields.push_back(json_field); + } + json_obj[J_VARIABLE_FIELDS] = json_fields; + } + + return Status::OK(); +} + +Status +ScriptCodec::Encode(milvus::json& json_obj, const IDNumbers& id_array) { + json_obj[J_ID_ARRAY] = id_array; + return Status::OK(); +} + +Status +ScriptCodec::EncodeSegmentID(milvus::json& json_obj, int64_t segment_id) { + json_obj[J_SEGMENT_ID] = segment_id; + return Status::OK(); +} + +Status +ScriptCodec::Encode(milvus::json& json_obj, const query::QueryPtr& query_ptr) { + if (query_ptr == nullptr) { + return Status::OK(); + } + + EncodeCollectionName(json_obj, query_ptr->collection_id); + json_obj[J_PARTITIONS] = query_ptr->partitions; + json_obj[J_FIELD_NAMES] = query_ptr->field_names; + json_obj[J_FIELDS] = query_ptr->index_fields; + milvus::json metrics; + for (auto& pair : query_ptr->metric_types) { + milvus::json metric; + metric[J_FIELD_NAME] = pair.first; + metric[J_METRIC_TYPE] = pair.second; + metrics.push_back(metric); + } + json_obj[J_METRIC_TYPES] = metrics; + + // vector query + milvus::json vector_queries; + for (auto& pair : query_ptr->vectors) { + milvus::query::VectorQueryPtr& query = pair.second; + if (query == nullptr) { + continue; + } + + milvus::json vector_query; + vector_query[J_KEY] = pair.first; + vector_query[J_FIELD_NAME] = query->field_name; + vector_query[J_PARAMS] = query->extra_params; + vector_query[J_TOPK] = query->topk; + vector_query[J_NQ] = query->nq; + vector_query[J_METRIC_TYPE] = query->metric_type; + vector_query[J_BOOST] = query->boost; + vector_query[J_FLOAT_DATA] = query->query_vector.float_data; + vector_query[J_BIN_DATA] = query->query_vector.binary_data; + + vector_queries.push_back(vector_query); + } + json_obj[J_VECTOR_QUERIES] = vector_queries; + + // general query + if (query_ptr->root) { + milvus::json general_query; + EncodeGeneralQuery(general_query, query_ptr->root); + json_obj[J_GENERAL_QUERY] = general_query; + } + + return Status::OK(); +} + +Status +ScriptCodec::EncodeGeneralQuery(milvus::json& json_obj, query::GeneralQueryPtr& query) { + if (query == nullptr) { + return Status::OK(); + } + + if (query->leaf) { + milvus::json json_leaf; + json_leaf[J_QUERY_PLACEHOLDER] = query->leaf->vector_placeholder; + json_leaf[J_BOOST] = query->leaf->query_boost; + if (query->leaf->term_query) { + milvus::json json_term; + json_term[J_PARAMS] = query->leaf->term_query->json_obj; + json_leaf[J_QUERY_TERM] = json_term; + } + if (query->leaf->range_query) { + milvus::json json_range; + json_range[J_PARAMS] = query->leaf->range_query->json_obj; + json_leaf[J_QUERY_RANGE] = json_range; + } + + json_obj[J_QUERY_LEAF] = json_leaf; + } + if (query->bin) { + milvus::json json_bin; + json_bin[J_QUERY_RELATION] = query->bin->relation; + json_bin[J_BOOST] = query->bin->query_boost; + + if (query->bin->left_query) { + milvus::json json_left; + EncodeGeneralQuery(json_left, query->bin->left_query); + json_bin[J_QUERY_LEFT] = json_left; + } + if (query->bin->right_query) { + milvus::json json_right; + EncodeGeneralQuery(json_right, query->bin->right_query); + json_bin[J_QUERY_RIGHT] = json_right; + } + + json_obj[J_QUERY_BIN] = json_bin; + } + return Status::OK(); +} + +Status +ScriptCodec::EncodeThreshold(milvus::json& json_obj, double threshold) { + json_obj[J_THRESHOLD] = threshold; + return Status::OK(); +} + +Status +ScriptCodec::EncodeForce(milvus::json& json_obj, bool force) { + json_obj[J_FORCE] = force; + return Status::OK(); +} + +// decode methods +Status +ScriptCodec::DecodeAction(milvus::json& json_obj, std::string& action_type, int64_t& action_ts) { + action_type = ""; + action_ts = 0; + if (json_obj.find(J_ACTION_TYPE) != json_obj.end()) { + action_type = json_obj[J_ACTION_TYPE].get(); + } else { + return Status(DB_ERROR, "element doesn't exist"); + } + + if (json_obj.find(J_ACTION_TS) != json_obj.end()) { + action_ts = json_obj[J_ACTION_TS].get(); + } else { + return Status(DB_ERROR, "element doesn't exist"); + } + + return Status::OK(); +} + +Status +ScriptCodec::Decode(milvus::json& json_obj, snapshot::CreateCollectionContext& context) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + milvus::json params; + if (json_obj.find(J_PARAMS) != json_obj.end()) { + params = json_obj[J_PARAMS]; + } + context.collection = std::make_shared(collection_name, params); + + // mappings + if (json_obj.find(J_MAPPINGS) != json_obj.end()) { + milvus::json& fields = json_obj[J_MAPPINGS]; + for (auto& field : fields) { + auto field_name = field[J_FIELD_NAME].get(); + auto field_type = static_cast(field[J_FIELD_TYPE].get()); + milvus::json& field_params = field[J_PARAMS]; + auto field_ptr = std::make_shared(field_name, 0, field_type, field_params); + context.fields_schema[field_ptr] = {}; + } + } + + return Status::OK(); +} + +Status +ScriptCodec::DecodeCollectionName(milvus::json& json_obj, std::string& collection_name) { + if (json_obj.find(J_COLLECTION_NAME) != json_obj.end()) { + collection_name = json_obj[J_COLLECTION_NAME].get(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); +} + +Status +ScriptCodec::DecodePartitionName(milvus::json& json_obj, std::string& partition_name) { + if (json_obj.find(J_PARTITION_NAME) != json_obj.end()) { + partition_name = json_obj[J_PARTITION_NAME].get(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); +} + +Status +ScriptCodec::DecodeFieldName(milvus::json& json_obj, std::string& field_name) { + if (json_obj.find(J_FIELD_NAME) != json_obj.end()) { + field_name = json_obj[J_FIELD_NAME].get(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); +} + +Status +ScriptCodec::DecodeFieldNames(milvus::json& json_obj, std::vector& field_names) { + if (json_obj.find(J_FIELD_NAMES) != json_obj.end()) { + field_names = json_obj[J_FIELD_NAMES].get>(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); +} + +Status +ScriptCodec::Decode(milvus::json& json_obj, CollectionIndex& index) { + if (json_obj.find(J_INDEX_NAME) != json_obj.end()) { + index.index_name_ = json_obj[J_INDEX_NAME].get(); + } + if (json_obj.find(J_INDEX_TYPE) != json_obj.end()) { + index.index_type_ = json_obj[J_INDEX_TYPE].get(); + } + if (json_obj.find(J_METRIC_TYPE) != json_obj.end()) { + index.metric_name_ = json_obj[J_METRIC_TYPE].get(); + } + if (json_obj.find(J_PARAMS) != json_obj.end()) { + index.extra_params_ = json_obj[J_PARAMS]; + } + + return Status::OK(); +} + +Status +ScriptCodec::Decode(milvus::json& json_obj, DataChunkPtr& data_chunk) { + data_chunk = std::make_shared(); + if (json_obj.find(J_CHUNK_COUNT) != json_obj.end()) { + data_chunk->count_ = json_obj[J_CHUNK_COUNT].get(); + } + + // fixed fields + if (json_obj.find(J_FIXED_FIELDS) != json_obj.end()) { + auto& fields = json_obj[J_FIXED_FIELDS]; + for (auto& field : fields) { + auto name = field[J_FIELD_NAME].get(); + BinaryDataPtr bin = std::make_shared(); + bin->data_ = field[J_CHUNK_DATA].get>(); + data_chunk->fixed_fields_.insert(std::make_pair(name, bin)); + } + } + + // variable fields + if (json_obj.find(J_VARIABLE_FIELDS) != json_obj.end()) { + auto& fields = json_obj[J_VARIABLE_FIELDS]; + for (auto& field : fields) { + auto name = field[J_FIELD_NAME].get(); + VaribleDataPtr bin = std::make_shared(); + bin->data_ = field[J_CHUNK_DATA].get>(); + bin->offset_ = field[J_CHUNK_OFFSETS].get>(); + + data_chunk->variable_fields_.insert(std::make_pair(name, bin)); + } + } + + return Status::OK(); +} + +Status +ScriptCodec::Decode(milvus::json& json_obj, IDNumbers& id_array) { + if (json_obj.find(J_ID_ARRAY) != json_obj.end()) { + id_array = json_obj[J_ID_ARRAY].get(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); +} + +Status +ScriptCodec::DecodeSegmentID(milvus::json& json_obj, int64_t& segment_id) { + if (json_obj.find(J_SEGMENT_ID) != json_obj.end()) { + segment_id = json_obj[J_SEGMENT_ID].get(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); +} + +Status +ScriptCodec::Decode(milvus::json& json_obj, query::QueryPtr& query_ptr) { + query_ptr = std::make_shared(); + DecodeCollectionName(json_obj, query_ptr->collection_id); + DecodeFieldNames(json_obj, query_ptr->field_names); + if (json_obj.find(J_PARTITIONS) != json_obj.end()) { + query_ptr->partitions = json_obj[J_PARTITIONS].get>(); + } + if (json_obj.find(J_FIELDS) != json_obj.end()) { + query_ptr->index_fields = json_obj[J_FIELDS].get>(); + } + + if (json_obj.find(J_METRIC_TYPES) != json_obj.end()) { + milvus::json& metrics = json_obj[J_METRIC_TYPES]; + for (auto& metric : metrics) { + std::string field_name = metric[J_FIELD_NAME]; + std::string metric_type = metric[J_METRIC_TYPE]; + query_ptr->metric_types.insert(std::make_pair(field_name, metric_type)); + } + } + + // vector queries + if (json_obj.find(J_VECTOR_QUERIES) != json_obj.end()) { + milvus::json& vector_queries = json_obj[J_VECTOR_QUERIES]; + for (auto& vector_query : vector_queries) { + std::string key = vector_query[J_KEY]; + + milvus::query::VectorQueryPtr query = std::make_shared(); + query->field_name = vector_query[J_FIELD_NAME]; + query->extra_params = vector_query[J_PARAMS]; + query->topk = vector_query[J_TOPK]; + query->nq = vector_query[J_NQ]; + query->metric_type = vector_query[J_METRIC_TYPE]; + query->boost = vector_query[J_BOOST]; + query->query_vector.float_data = vector_query[J_FLOAT_DATA].get>(); + query->query_vector.binary_data = vector_query[J_BIN_DATA].get>(); + + query_ptr->vectors.insert(std::make_pair(key, query)); + } + } + + // general query + if (json_obj.find(J_GENERAL_QUERY) != json_obj.end()) { + milvus::json& json_query = json_obj[J_GENERAL_QUERY]; + query_ptr->root = std::make_shared(); + DecodeGeneralQuery(json_query, query_ptr->root); + } + + return Status::OK(); +} + +Status +ScriptCodec::DecodeGeneralQuery(milvus::json& json_obj, query::GeneralQueryPtr& query) { + if (query == nullptr) { + return Status::OK(); + } + + if (json_obj.find(J_QUERY_LEAF) != json_obj.end()) { + milvus::json& json_leaf = json_obj[J_QUERY_LEAF]; + query->leaf = std::make_shared(); + query->leaf->vector_placeholder = json_leaf[J_QUERY_PLACEHOLDER]; + query->leaf->query_boost = json_leaf[J_BOOST]; + + if (json_leaf.find(J_QUERY_TERM) != json_leaf.end()) { + milvus::json& json_term = json_leaf[J_QUERY_TERM]; + query->leaf->term_query = std::make_shared(); + query->leaf->term_query->json_obj = json_term[J_PARAMS]; + } + if (json_leaf.find(J_QUERY_RANGE) != json_leaf.end()) { + milvus::json& json_range = json_leaf[J_QUERY_RANGE]; + query->leaf->range_query = std::make_shared(); + query->leaf->range_query->json_obj = json_range[J_PARAMS]; + } + } + + if (json_obj.find(J_QUERY_BIN) != json_obj.end()) { + milvus::json& json_bin = json_obj[J_QUERY_BIN]; + query->bin = std::make_shared(); + query->bin->relation = json_bin[J_QUERY_RELATION]; + query->bin->query_boost = json_bin[J_BOOST]; + + if (json_bin.find(J_QUERY_LEFT) != json_bin.end()) { + milvus::json& json_left = json_bin[J_QUERY_LEFT]; + query->bin->left_query = std::make_shared(); + DecodeGeneralQuery(json_left, query->bin->left_query); + } + + if (json_bin.find(J_QUERY_RIGHT) != json_bin.end()) { + milvus::json& json_right = json_bin[J_QUERY_RIGHT]; + query->bin->right_query = std::make_shared(); + DecodeGeneralQuery(json_right, query->bin->right_query); + } + } + + return Status::OK(); +} + +Status +ScriptCodec::DecodeThreshold(milvus::json& json_obj, double& threshold) { + if (json_obj.find(J_THRESHOLD) != json_obj.end()) { + threshold = json_obj[J_THRESHOLD].get(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); +} + +Status +ScriptCodec::DecodeForce(milvus::json& json_obj, bool& force) { + if (json_obj.find(J_FORCE) != json_obj.end()) { + force = json_obj[J_FORCE].get(); + return Status::OK(); + } + + return Status(DB_ERROR, "element doesn't exist"); } } // namespace engine diff --git a/core/src/db/transcript/ScriptCodec.h b/core/src/db/transcript/ScriptCodec.h index 5027fea428a7704c32b055e72e23f6106ab37f21..bfa727b9d34f51abffffeef84a7053f68c17e6f2 100644 --- a/core/src/db/transcript/ScriptCodec.h +++ b/core/src/db/transcript/ScriptCodec.h @@ -11,25 +11,135 @@ #pragma once +#include "db/DB.h" +#include "utils/Json.h" +#include "utils/Status.h" + #include +#include namespace milvus { namespace engine { +// action names +extern const char* ActionCreateCollection; +extern const char* ActionDropCollection; +extern const char* ActionHasCollection; +extern const char* ActionListCollections; +extern const char* ActionGetCollectionInfo; +extern const char* ActionGetCollectionStats; +extern const char* ActionCountEntities; +extern const char* ActionCreatePartition; +extern const char* ActionDropPartition; +extern const char* ActionHasPartition; +extern const char* ActionListPartitions; +extern const char* ActionCreateIndex; +extern const char* ActionDropIndex; +extern const char* ActionDescribeIndex; +extern const char* ActionInsert; +extern const char* ActionGetEntityByID; +extern const char* ActionDeleteEntityByID; +extern const char* ActionListIDInSegment; +extern const char* ActionQuery; +extern const char* ActionLoadCollection; +extern const char* ActionFlush; +extern const char* ActionCompact; + +// json keys +extern const char* J_ACTION_TYPE; +extern const char* J_ACTION_TS; // action timestamp + class ScriptCodec { public: - ScriptCodec() = default; + // encode methods + static Status + EncodeAction(milvus::json& json_obj, const std::string& action_type); + + static Status + Encode(milvus::json& json_obj, const snapshot::CreateCollectionContext& context); + + static Status + EncodeCollectionName(milvus::json& json_obj, const std::string& collection_name); + + static Status + EncodePartitionName(milvus::json& json_obj, const std::string& partition_name); + + static Status + EncodeFieldName(milvus::json& json_obj, const std::string& field_name); + + static Status + EncodeFieldNames(milvus::json& json_obj, const std::vector& field_names); + + static Status + Encode(milvus::json& json_obj, const CollectionIndex& index); + + static Status + Encode(milvus::json& json_obj, const DataChunkPtr& data_chunk); + + static Status + Encode(milvus::json& json_obj, const IDNumbers& id_array); + + static Status + EncodeSegmentID(milvus::json& json_obj, int64_t segment_id); + + static Status + Encode(milvus::json& json_obj, const query::QueryPtr& query_ptr); + + static Status + EncodeThreshold(milvus::json& json_obj, double threshold); + + static Status + EncodeForce(milvus::json& json_obj, bool force); - static ScriptCodec& - GetInstance(); + // decode methods + static Status + DecodeAction(milvus::json& json_obj, std::string& action_type, int64_t& action_ts); + + static Status + Decode(milvus::json& json_obj, snapshot::CreateCollectionContext& context); + + static Status + DecodeCollectionName(milvus::json& json_obj, std::string& collection_name); + + static Status + DecodePartitionName(milvus::json& json_obj, std::string& partition_name); + + static Status + DecodeFieldName(milvus::json& json_obj, std::string& field_name); + + static Status + DecodeFieldNames(milvus::json& json_obj, std::vector& field_names); + + static Status + Decode(milvus::json& json_obj, CollectionIndex& index); + + static Status + Decode(milvus::json& json_obj, DataChunkPtr& data_chunk); + + static Status + Decode(milvus::json& json_obj, IDNumbers& id_array); + + static Status + DecodeSegmentID(milvus::json& json_obj, int64_t& segment_id); + + static Status + Decode(milvus::json& json_obj, query::QueryPtr& query_ptr); + + static Status + DecodeThreshold(milvus::json& json_obj, double& threshold); + + static Status + DecodeForce(milvus::json& json_obj, bool& force); + + private: + static Status + EncodeGeneralQuery(milvus::json& json_obj, query::GeneralQueryPtr& query); - void - SetScriptPath(const std::string& path) { - script_path_ = path; - } + static Status + DecodeGeneralQuery(milvus::json& json_obj, query::GeneralQueryPtr& query); private: - std::string script_path_; + ScriptCodec() = delete; }; } // namespace engine diff --git a/core/src/db/transcript/ScriptFile.cpp b/core/src/db/transcript/ScriptFile.cpp new file mode 100644 index 0000000000000000000000000000000000000000..927a0b39b41162cc519f14daac626eb0e0f0bfbc --- /dev/null +++ b/core/src/db/transcript/ScriptFile.cpp @@ -0,0 +1,107 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/transcript/ScriptFile.h" +#include "db/Types.h" + +#include + +namespace milvus { +namespace engine { + +ScriptFile::~ScriptFile() { + CloseFile(); +} + +Status +ScriptFile::OpenWrite(const std::string& path) { + try { + if (writer_.is_open()) { + writer_.close(); + } + + file_size_ = 0; + writer_.open(path.c_str(), std::ios::out | std::ios::app); + } catch (std::exception& ex) { + return Status(DB_ERROR, ex.what()); + } + + return Status::OK(); +} + +Status +ScriptFile::OpenRead(const std::string& path) { + try { + if (reader_.is_open()) { + reader_.close(); + } + reader_.open(path.c_str(), std::ios::in); + } catch (std::exception& ex) { + return Status(DB_ERROR, ex.what()); + } + + return Status::OK(); +} + +Status +ScriptFile::CloseFile() { + try { + if (reader_.is_open()) { + reader_.close(); + } + if (writer_.is_open()) { + writer_.close(); + } + } catch (std::exception& ex) { + return Status(DB_ERROR, ex.what()); + } + file_size_ = 0; + + return Status::OK(); +} + +Status +ScriptFile::WriteLine(const std::string& str) { + try { + if (writer_.is_open()) { + writer_.write(str.c_str(), str.size()); + writer_.write("\n", 1); + writer_.flush(); + file_size_ += str.size() + 1; + } + } catch (std::exception& ex) { + return Status(DB_ERROR, ex.what()); + } + + return Status::OK(); +} + +bool +ScriptFile::ReadLine(std::string& str) { + str = ""; + if (reader_.is_open()) { + while (getline(reader_, str)) { + if (!str.empty()) { + return true; + } + } + } + + return false; +} + +bool +ScriptFile::ExceedMaxSize(int64_t append_size) { + return (file_size_ + append_size) > MAX_SCRIPT_FILE_SIZE; +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/transcript/ScriptFile.h b/core/src/db/transcript/ScriptFile.h new file mode 100644 index 0000000000000000000000000000000000000000..79dd8f790e20d53f84469106c2c486cf8ba9c7eb --- /dev/null +++ b/core/src/db/transcript/ScriptFile.h @@ -0,0 +1,59 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include "db/Types.h" +#include "utils/Status.h" + +#include +#include +#include +#include + +namespace milvus { +namespace engine { + +class ScriptFile { + public: + ScriptFile() = default; + virtual ~ScriptFile(); + + Status + OpenWrite(const std::string& path); + + Status + OpenRead(const std::string& path); + + Status + WriteLine(const std::string& str); + + bool + ReadLine(std::string& str); + + bool + ExceedMaxSize(int64_t append_size); + + private: + Status + CloseFile(); + + private: + std::ifstream reader_; + std::ofstream writer_; + + int64_t file_size_ = 0; +}; + +using ScriptFilePtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/transcript/ScriptRecorder.cpp b/core/src/db/transcript/ScriptRecorder.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2177e129493afc781a0b5f2586f3aee5f8956f63 --- /dev/null +++ b/core/src/db/transcript/ScriptRecorder.cpp @@ -0,0 +1,296 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/transcript/ScriptRecorder.h" +#include "db/Utils.h" +#include "db/transcript/ScriptCodec.h" +#include "utils/CommonUtil.h" +#include "utils/Json.h" + +#include +#include + +namespace milvus { +namespace engine { + +ScriptRecorder::ScriptRecorder(const std::string& path) { + std::experimental::filesystem::path script_path(path); + + std::string time_str; + CommonUtil::GetCurrentTimeStr(time_str); + script_path /= time_str; + script_path_ = script_path.c_str(); + CommonUtil::CreateDirectory(script_path_); +} + +ScriptRecorder::~ScriptRecorder() { +} + +std::string +ScriptRecorder::GetScriptPath() const { + return script_path_; +} + +ScriptFilePtr +ScriptRecorder::GetFile() { + if (file_ == nullptr || file_->ExceedMaxSize(0)) { + file_ = std::make_shared(); + int64_t current_time = utils::GetMicroSecTimeStamp(); + + std::experimental::filesystem::path file_path(script_path_); + std::string file_name = std::to_string(current_time) + ".txt"; + file_path /= file_name; + + file_->OpenWrite(file_path.c_str()); + } + + return file_; +} + +Status +ScriptRecorder::WriteJson(milvus::json& json_obj) { + auto file = GetFile(); + std::string str = json_obj.dump(); + return file->WriteLine(str); +} + +Status +ScriptRecorder::CreateCollection(const snapshot::CreateCollectionContext& context) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionCreateCollection); + ScriptCodec::Encode(json_obj, context); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::DropCollection(const std::string& collection_name) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionDropCollection); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::HasCollection(const std::string& collection_name, bool& has_or_not) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionHasCollection); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::ListCollections(std::vector& names) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionListCollections); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection, + snapshot::FieldElementMappings& fields_schema) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionGetCollectionInfo); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionGetCollectionStats); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::CountEntities(const std::string& collection_name, int64_t& row_count) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionCountEntities); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::CreatePartition(const std::string& collection_name, const std::string& partition_name) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionCreatePartition); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodePartitionName(json_obj, partition_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::DropPartition(const std::string& collection_name, const std::string& partition_name) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionDropPartition); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodePartitionName(json_obj, partition_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::HasPartition(const std::string& collection_name, const std::string& partition_name, bool& exist) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionHasPartition); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodePartitionName(json_obj, partition_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::ListPartitions(const std::string& collection_name, std::vector& partition_names) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionListPartitions); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::CreateIndex(const server::ContextPtr& context, const std::string& collection_name, + const std::string& field_name, const CollectionIndex& index) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionCreateIndex); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodeFieldName(json_obj, field_name); + ScriptCodec::Encode(json_obj, index); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::DropIndex(const std::string& collection_name, const std::string& field_name) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionDropIndex); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodeFieldName(json_obj, field_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::DescribeIndex(const std::string& collection_name, const std::string& field_name, + CollectionIndex& index) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionDescribeIndex); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodeFieldName(json_obj, field_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + idx_t op_id) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionInsert); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodePartitionName(json_obj, partition_name); + ScriptCodec::Encode(json_obj, data_chunk); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, + const std::vector& field_names, std::vector& valid_row, + DataChunkPtr& data_chunk) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionGetEntityByID); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::Encode(json_obj, id_array); + ScriptCodec::EncodeFieldNames(json_obj, field_names); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionDeleteEntityByID); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::Encode(json_obj, entity_ids); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionListIDInSegment); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodeSegmentID(json_obj, segment_id); + ScriptCodec::Encode(json_obj, entity_ids); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, + engine::QueryResultPtr& result) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionQuery); + ScriptCodec::Encode(json_obj, query_ptr); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::LoadCollection(const server::ContextPtr& context, const std::string& collection_name, + const std::vector& field_names, bool force) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionLoadCollection); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodeFieldNames(json_obj, field_names); + ScriptCodec::EncodeForce(json_obj, force); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::Flush(const std::string& collection_name) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionFlush); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::Flush() { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionFlush); + + return WriteJson(json_obj); +} + +Status +ScriptRecorder::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) { + milvus::json json_obj; + ScriptCodec::EncodeAction(json_obj, ActionCompact); + ScriptCodec::EncodeCollectionName(json_obj, collection_name); + ScriptCodec::EncodeThreshold(json_obj, threshold); + + return WriteJson(json_obj); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/transcript/ScriptRecorder.h b/core/src/db/transcript/ScriptRecorder.h new file mode 100644 index 0000000000000000000000000000000000000000..016a8f1a3588c4a0be29a008ce70b37362993838 --- /dev/null +++ b/core/src/db/transcript/ScriptRecorder.h @@ -0,0 +1,122 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include "db/DB.h" +#include "db/transcript/ScriptFile.h" + +#include +#include +#include + +namespace milvus { +namespace engine { + +class ScriptRecorder { + public: + explicit ScriptRecorder(const std::string& path); + + ~ScriptRecorder(); + + std::string + GetScriptPath() const; + + Status + CreateCollection(const snapshot::CreateCollectionContext& context); + + Status + DropCollection(const std::string& collection_name); + + Status + HasCollection(const std::string& collection_name, bool& has_or_not); + + Status + ListCollections(std::vector& names); + + Status + GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection, + snapshot::FieldElementMappings& fields_schema); + + Status + GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats); + + Status + CountEntities(const std::string& collection_name, int64_t& row_count); + + Status + CreatePartition(const std::string& collection_name, const std::string& partition_name); + + Status + DropPartition(const std::string& collection_name, const std::string& partition_name); + + Status + HasPartition(const std::string& collection_name, const std::string& partition_name, bool& exist); + + Status + ListPartitions(const std::string& collection_name, std::vector& partition_names); + + Status + CreateIndex(const server::ContextPtr& context, const std::string& collection_name, const std::string& field_name, + const CollectionIndex& index); + + Status + DropIndex(const std::string& collection_name, const std::string& field_name); + + Status + DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index); + + Status + Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + idx_t op_id); + + Status + GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, + const std::vector& field_names, std::vector& valid_row, DataChunkPtr& data_chunk); + + Status + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id); + + Status + ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids); + + Status + Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result); + + Status + LoadCollection(const server::ContextPtr& context, const std::string& collection_name, + const std::vector& field_names, bool force); + + Status + Flush(const std::string& collection_name); + + Status + Flush(); + + Status + Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold); + + private: + ScriptFilePtr + GetFile(); + + Status + WriteJson(milvus::json& json_obj); + + private: + std::string script_path_; + ScriptFilePtr file_; +}; + +using ScriptRecorderPtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/transcript/ScriptReplay.cpp b/core/src/db/transcript/ScriptReplay.cpp index c14d52fe6be91e06c59621bd6c5398ac8e51856a..cff78d1e3ed0d4a2813b07797053f39c50f64300 100644 --- a/core/src/db/transcript/ScriptReplay.cpp +++ b/core/src/db/transcript/ScriptReplay.cpp @@ -11,14 +11,235 @@ #include "db/transcript/ScriptReplay.h" #include "db/transcript/ScriptCodec.h" +#include "db/transcript/ScriptFile.h" +#include "utils/Json.h" + +#include +#include +#include +#include +#include namespace milvus { namespace engine { Status ScriptReplay::Replay(const DBPtr& db, const std::string& replay_script_path) { + // get all script files under this folder, arrange them in ascending order + std::map files_map; + + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; + DirectoryIterator iter(replay_script_path); + DirectoryIterator end; + for (; iter != end; ++iter) { + auto path = (*iter).path(); + if (std::experimental::filesystem::is_directory(path)) { + continue; + } + + std::string file_name = path.filename().c_str(); + int64_t file_ts = atol(file_name.c_str()); + if (file_ts > 0) { + files_map.insert(std::make_pair(file_ts, path)); + } + } + + // replay the script files + for (auto& pair : files_map) { + ScriptFile file; + file.OpenRead(pair.second.c_str()); + + std::string str_line; + while (file.ReadLine(str_line)) { + STATUS_CHECK(PerformAction(db, str_line)); + } + } + return Status::OK(); } +Status +ScriptReplay::PerformAction(const DBPtr& db, const std::string& str_action) { + try { + milvus::json json_obj = milvus::json::parse(str_action); + std::string action_type; + int64_t action_ts = 0; + ScriptCodec::DecodeAction(json_obj, action_type, action_ts); + + if (action_type == ActionCreateCollection) { + snapshot::CreateCollectionContext context; + ScriptCodec::Decode(json_obj, context); + + db->CreateCollection(context); + } else if (action_type == ActionDropCollection) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + db->DropCollection(collection_name); + } else if (action_type == ActionHasCollection) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + bool has = false; + db->HasCollection(collection_name, has); + } else if (action_type == ActionListCollections) { + std::vector names; + db->ListCollections(names); + } else if (action_type == ActionGetCollectionInfo) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + snapshot::CollectionPtr collection; + snapshot::FieldElementMappings fields_schema; + db->GetCollectionInfo(collection_name, collection, fields_schema); + } else if (action_type == ActionGetCollectionStats) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + milvus::json collection_stats; + db->GetCollectionStats(collection_name, collection_stats); + } else if (action_type == ActionCountEntities) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + int64_t count = 0; + db->CountEntities(collection_name, count); + } else if (action_type == ActionCreatePartition) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::string partition_name; + ScriptCodec::DecodePartitionName(json_obj, partition_name); + + db->CreatePartition(collection_name, partition_name); + } else if (action_type == ActionDropPartition) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::string partition_name; + ScriptCodec::DecodePartitionName(json_obj, partition_name); + + db->DropPartition(collection_name, partition_name); + } else if (action_type == ActionHasPartition) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::string partition_name; + ScriptCodec::DecodePartitionName(json_obj, partition_name); + + bool has = false; + db->HasPartition(collection_name, partition_name, has); + } else if (action_type == ActionListPartitions) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + std::vector partition_names; + db->ListPartitions(collection_name, partition_names); + } else if (action_type == ActionCreateIndex) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::string field_name; + ScriptCodec::DecodeFieldName(json_obj, field_name); + CollectionIndex index; + ScriptCodec::Decode(json_obj, index); + + std::vector partition_names; + db->CreateIndex(nullptr, collection_name, field_name, index); + } else if (action_type == ActionDropIndex) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::string field_name; + ScriptCodec::DecodeFieldName(json_obj, field_name); + + db->DropIndex(collection_name, field_name); + } else if (action_type == ActionDescribeIndex) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::string field_name; + ScriptCodec::DecodeFieldName(json_obj, field_name); + + CollectionIndex index; + db->DescribeIndex(collection_name, field_name, index); + } else if (action_type == ActionInsert) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::string partition_name; + ScriptCodec::DecodePartitionName(json_obj, partition_name); + DataChunkPtr data_chunk; + ScriptCodec::Decode(json_obj, data_chunk); + + db->Insert(collection_name, partition_name, data_chunk, 0); + } else if (action_type == ActionGetEntityByID) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + IDNumbers id_array; + ScriptCodec::Decode(json_obj, id_array); + std::vector field_names; + ScriptCodec::DecodeFieldNames(json_obj, field_names); + + std::vector valid_row; + DataChunkPtr data_chunk; + db->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk); + } else if (action_type == ActionDeleteEntityByID) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + IDNumbers id_array; + ScriptCodec::Decode(json_obj, id_array); + + db->DeleteEntityByID(collection_name, id_array, 0); + } else if (action_type == ActionListIDInSegment) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + int64_t segment_id = 0; + ScriptCodec::DecodeSegmentID(json_obj, segment_id); + + IDNumbers entity_ids; + db->ListIDInSegment(collection_name, segment_id, entity_ids); + } else if (action_type == ActionQuery) { + query::QueryPtr query_ptr; + ScriptCodec::Decode(json_obj, query_ptr); + + if (query_ptr != nullptr) { + engine::QueryResultPtr result; + db->Query(nullptr, query_ptr, result); + } + } else if (action_type == ActionLoadCollection) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + std::vector field_names; + ScriptCodec::DecodeFieldNames(json_obj, field_names); + bool force = false; + ScriptCodec::DecodeForce(json_obj, force); + + std::vector valid_row; + DataChunkPtr data_chunk; + db->LoadCollection(nullptr, collection_name, field_names, force); + } else if (action_type == ActionFlush) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + + if (collection_name.empty()) { + db->Flush(); + } else { + db->Flush(collection_name); + } + } else if (action_type == ActionCompact) { + std::string collection_name; + ScriptCodec::DecodeCollectionName(json_obj, collection_name); + double threshold = 0.0; + ScriptCodec::DecodeThreshold(json_obj, threshold); + + db->Compact(nullptr, collection_name, threshold); + } else { + std::string msg = "Unsupportted action: " + action_type; + LOG_SERVER_ERROR_ << msg; + return Status(DB_ERROR, msg); + } + + return Status::OK(); + } catch (std::exception& ex) { + std::string msg = "Failed to perform script action, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; + return Status(DB_ERROR, msg); + } +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/transcript/ScriptReplay.h b/core/src/db/transcript/ScriptReplay.h index 68bb7870f68516b8e7ddd5e9012ea83616fbca51..f285f677d89e02b94fcc64476736bb52058282d1 100644 --- a/core/src/db/transcript/ScriptReplay.h +++ b/core/src/db/transcript/ScriptReplay.h @@ -24,6 +24,10 @@ class ScriptReplay { Status Replay(const DBPtr& db, const std::string& replay_script_path); + + private: + Status + PerformAction(const DBPtr& db, const std::string& str_action); }; } // namespace engine diff --git a/core/src/db/transcript/TranscriptProxy.cpp b/core/src/db/transcript/TranscriptProxy.cpp index 17b2bfd3c343fe9e0331fe44acbaf7920fb1b377..2e8db8377de1309d4f7a43211bd9afb9c0ad6e0c 100644 --- a/core/src/db/transcript/TranscriptProxy.cpp +++ b/core/src/db/transcript/TranscriptProxy.cpp @@ -10,16 +10,21 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/transcript/TranscriptProxy.h" -#include "db/transcript/ScriptCodec.h" +#include "db/transcript/ScriptRecorder.h" #include "db/transcript/ScriptReplay.h" #include "utils/CommonUtil.h" #include "utils/Exception.h" -#include +#include namespace milvus { namespace engine { +#define CHECK_RECORDER \ + if (recorder_ == nullptr) { \ + return Status(DB_ERROR, "Transcript recoder is null pointer"); \ + } + TranscriptProxy::TranscriptProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) { // db must implemented if (db == nullptr) { @@ -38,106 +43,137 @@ TranscriptProxy::Start() { // replay script in necessary if (!options_.replay_script_path_.empty()) { ScriptReplay replay; - return replay.Replay(db_, options_.replay_script_path_); - } else { - ScriptCodec& codec = ScriptCodec::GetInstance(); - boost::filesystem::path db_path(options_.meta_.path_); - auto transcript_path = db_path.parent_path(); - transcript_path /= "transcript"; - std::string path = transcript_path.c_str(); - status = CommonUtil::CreateDirectory(path); - if (!status.ok()) { - std::cerr << "Error: Failed to create transcript path: " << path << std::endl; - kill(0, SIGUSR1); - } - - codec.SetScriptPath(path); + STATUS_CHECK(replay.Replay(db_, options_.replay_script_path_)); + } + + // prepare for transcript + std::experimental::filesystem::path db_path(options_.meta_.path_); + auto transcript_path = db_path.parent_path(); + transcript_path /= "transcript"; + std::string path = transcript_path.c_str(); + status = CommonUtil::CreateDirectory(path); + if (!status.ok()) { + std::cerr << "Error: Failed to create transcript path: " << path << std::endl; + kill(0, SIGUSR1); } + recorder_ = std::make_shared(path); + return Status::OK(); } Status TranscriptProxy::Stop() { + recorder_ = nullptr; return db_->Stop(); } Status TranscriptProxy::CreateCollection(const snapshot::CreateCollectionContext& context) { + CHECK_RECORDER + recorder_->CreateCollection(context); return db_->CreateCollection(context); } Status TranscriptProxy::DropCollection(const std::string& collection_name) { + CHECK_RECORDER + recorder_->DropCollection(collection_name); return db_->DropCollection(collection_name); } Status TranscriptProxy::HasCollection(const std::string& collection_name, bool& has_or_not) { + CHECK_RECORDER + recorder_->HasCollection(collection_name, has_or_not); return db_->HasCollection(collection_name, has_or_not); } Status TranscriptProxy::ListCollections(std::vector& names) { + CHECK_RECORDER + recorder_->ListCollections(names); return db_->ListCollections(names); } Status TranscriptProxy::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection, snapshot::FieldElementMappings& fields_schema) { + CHECK_RECORDER + recorder_->GetCollectionInfo(collection_name, collection, fields_schema); return db_->GetCollectionInfo(collection_name, collection, fields_schema); } Status TranscriptProxy::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) { + CHECK_RECORDER + recorder_->GetCollectionStats(collection_name, collection_stats); return db_->GetCollectionStats(collection_name, collection_stats); } Status TranscriptProxy::CountEntities(const std::string& collection_name, int64_t& row_count) { + CHECK_RECORDER + recorder_->CountEntities(collection_name, row_count); return db_->CountEntities(collection_name, row_count); } Status TranscriptProxy::CreatePartition(const std::string& collection_name, const std::string& partition_name) { + CHECK_RECORDER + recorder_->CreatePartition(collection_name, partition_name); return db_->CreatePartition(collection_name, partition_name); } Status TranscriptProxy::DropPartition(const std::string& collection_name, const std::string& partition_name) { + CHECK_RECORDER + recorder_->DropPartition(collection_name, partition_name); return db_->DropPartition(collection_name, partition_name); } Status TranscriptProxy::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) { + CHECK_RECORDER + recorder_->HasPartition(collection_name, partition_tag, exist); return db_->HasPartition(collection_name, partition_tag, exist); } Status TranscriptProxy::ListPartitions(const std::string& collection_name, std::vector& partition_names) { + CHECK_RECORDER + recorder_->ListPartitions(collection_name, partition_names); return db_->ListPartitions(collection_name, partition_names); } Status TranscriptProxy::CreateIndex(const server::ContextPtr& context, const std::string& collection_name, const std::string& field_name, const CollectionIndex& index) { + CHECK_RECORDER + recorder_->CreateIndex(context, collection_name, field_name, index); return db_->CreateIndex(context, collection_name, field_name, index); } Status TranscriptProxy::DropIndex(const std::string& collection_name, const std::string& field_name) { + CHECK_RECORDER + recorder_->DropIndex(collection_name, field_name); return db_->DropIndex(collection_name, field_name); } Status TranscriptProxy::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) { + CHECK_RECORDER + recorder_->DescribeIndex(collection_name, field_name, index); return db_->DescribeIndex(collection_name, field_name, index); } Status TranscriptProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, idx_t op_id) { + CHECK_RECORDER + recorder_->Insert(collection_name, partition_name, data_chunk, op_id); return db_->Insert(collection_name, partition_name, data_chunk); } @@ -145,44 +181,60 @@ Status TranscriptProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, const std::vector& field_names, std::vector& valid_row, DataChunkPtr& data_chunk) { + CHECK_RECORDER + recorder_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk); return db_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk); } Status TranscriptProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) { + CHECK_RECORDER + recorder_->DeleteEntityByID(collection_name, entity_ids, op_id); return db_->DeleteEntityByID(collection_name, entity_ids); } Status TranscriptProxy::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) { + CHECK_RECORDER + recorder_->ListIDInSegment(collection_name, segment_id, entity_ids); return db_->ListIDInSegment(collection_name, segment_id, entity_ids); } Status TranscriptProxy::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) { + CHECK_RECORDER + recorder_->Query(context, query_ptr, result); return db_->Query(context, query_ptr, result); } Status TranscriptProxy::LoadCollection(const server::ContextPtr& context, const std::string& collection_name, const std::vector& field_names, bool force) { + CHECK_RECORDER + recorder_->LoadCollection(context, collection_name, field_names, force); return db_->LoadCollection(context, collection_name, field_names, force); } Status TranscriptProxy::Flush(const std::string& collection_name) { + CHECK_RECORDER + recorder_->Flush(collection_name); return db_->Flush(collection_name); } Status TranscriptProxy::Flush() { + CHECK_RECORDER + recorder_->Flush(); return db_->Flush(); } Status TranscriptProxy::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) { + CHECK_RECORDER + recorder_->Compact(context, collection_name, threshold); return db_->Compact(context, collection_name, threshold); } diff --git a/core/src/db/transcript/TranscriptProxy.h b/core/src/db/transcript/TranscriptProxy.h index 78c997a84356174acb0f479f6c3cf2cbf61131dd..74304f2c471f8640e01cf38c03c17110da8e59c4 100644 --- a/core/src/db/transcript/TranscriptProxy.h +++ b/core/src/db/transcript/TranscriptProxy.h @@ -12,6 +12,7 @@ #pragma once #include "db/DBProxy.h" +#include "db/transcript/ScriptRecorder.h" #include #include @@ -24,6 +25,11 @@ class TranscriptProxy : public DBProxy { public: TranscriptProxy(const DBPtr& db, const DBOptions& options); + ScriptRecorderPtr + GetScriptRecorder() const { + return recorder_; + } + Status Start() override; @@ -106,6 +112,7 @@ class TranscriptProxy : public DBProxy { Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) override; private: + ScriptRecorderPtr recorder_; }; } // namespace engine diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 6720b63377b706685ae37d335420bcb2be85da1d..28fc6eed391631eb83c9e71b6090a7d935ee3e09 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -26,6 +26,39 @@ namespace engine { const char* WAL_MAX_OP_FILE_NAME = "max_op"; const char* WAL_DEL_FILE_NAME = "del"; +namespace { + +bool +StrToID(const std::string& str, idx_t& id) { + try { + id = std::stol(str); + return true; + } catch (std::exception& ex) { + return false; + } +} + +void +FindWalFiles(const std::experimental::filesystem::path& folder, + std::map& files) { + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; + DirectoryIterator iter(folder); + DirectoryIterator end; + for (; iter != end; ++iter) { + auto path_inner = (*iter).path(); + std::string file_name = path_inner.filename().c_str(); + if (file_name == WAL_MAX_OP_FILE_NAME || file_name == WAL_DEL_FILE_NAME) { + continue; + } + idx_t op_id = 0; + if (StrToID(file_name, op_id)) { + files.insert(std::make_pair(op_id, path_inner)); + } + } +} + +} // namespace + WalManager::WalManager() : cleanup_thread_pool_(1, 1) { } @@ -161,17 +194,7 @@ WalManager::Recovery(const DBPtr& db) { // iterate files std::map id_files; - DirectoryIterator iter_inner(path_outer); - DirectoryIterator end_inner; - for (; iter_inner != end_inner; ++iter_inner) { - auto path_inner = (*iter_inner).path(); - std::string file_name = path_inner.filename().c_str(); - if (file_name == WAL_MAX_OP_FILE_NAME) { - continue; - } - idx_t op_id = std::stol(file_name); - id_files.insert(std::make_pair(op_id, path_inner)); - } + FindWalFiles(path_outer, id_files); // the max operation id idx_t max_op_id = 0; @@ -450,17 +473,7 @@ WalManager::CleanupThread() { // iterate files std::map wal_files; - DirectoryIterator file_iter(collection_path); - DirectoryIterator end_iter; - for (; file_iter != end_iter; ++file_iter) { - auto file_path = (*file_iter).path(); - std::string file_name = file_path.filename().c_str(); - if (file_name == WAL_MAX_OP_FILE_NAME) { - continue; - } - idx_t op_id = std::stol(file_name); - wal_files.insert(std::make_pair(op_id, file_path)); - } + FindWalFiles(collection_path, wal_files); // no wal file if (wal_files.empty()) { diff --git a/core/src/db/wal/WalManager.h b/core/src/db/wal/WalManager.h index 9c8bc2413be97903e469549de6ea1bc196038b1a..462fb35030a735e1ba40ca2b82ec578f3e960002 100644 --- a/core/src/db/wal/WalManager.h +++ b/core/src/db/wal/WalManager.h @@ -34,8 +34,6 @@ extern const char* WAL_DEL_FILE_NAME; class WalManager { public: - WalManager(); - static WalManager& GetInstance(); @@ -58,6 +56,8 @@ class WalManager { Recovery(const DBPtr& db); private: + WalManager(); + Status Init(); diff --git a/core/src/server/DBWrapper.cpp b/core/src/server/DBWrapper.cpp index fa17ae2daadd96330255405fa658f5c19140cd6c..65e0d3ac0d28eab62c0d4168e96e1b390d634e3f 100644 --- a/core/src/server/DBWrapper.cpp +++ b/core/src/server/DBWrapper.cpp @@ -56,11 +56,16 @@ DBWrapper::StartService() { kill(0, SIGUSR1); } + // wal opt.wal_enable_ = config.wal.enable(); if (opt.wal_enable_) { opt.wal_path_ = config.wal.path(); } + // transcript + opt.transcript_enable_ = config.transcript.enable(); + opt.replay_script_path_ = config.transcript.replay(); + // engine config int64_t omp_thread = config.engine.omp_thread_num(); diff --git a/core/src/server/delivery/request/CreateCollectionReq.cpp b/core/src/server/delivery/request/CreateCollectionReq.cpp index f9e16994d0279ed4d2df6d7a49b0de6146dfbc37..7a392ae4cf683c8c2741bbc99c6b50f5f556cff4 100644 --- a/core/src/server/delivery/request/CreateCollectionReq.cpp +++ b/core/src/server/delivery/request/CreateCollectionReq.cpp @@ -63,12 +63,10 @@ CreateCollectionReq::OnExecute() { engine::snapshot::CreateCollectionContext create_collection_context; auto collection_schema = std::make_shared(collection_name_, extra_params_); - std::set unique_field_names; create_collection_context.collection = collection_schema; for (auto& field_kv : fields_) { auto& field_name = field_kv.first; auto& field_schema = field_kv.second; - unique_field_names.insert(field_name); auto& field_type = field_schema.field_type_; auto& field_params = field_schema.field_params_; @@ -106,11 +104,6 @@ CreateCollectionReq::OnExecute() { create_collection_context.fields_schema[field] = {}; } - // not allow duplicate field name - if (unique_field_names.size() != fields_.size()) { - return Status(DB_ERROR, "Duplicate field name"); - } - // step 3: create collection status = DBWrapper::DB()->CreateCollection(create_collection_context); fiu_do_on("CreateCollectionReq.OnExecute.invalid_db_execute", diff --git a/core/src/utils/CommonUtil.cpp b/core/src/utils/CommonUtil.cpp index 0ee80d0f72aac5d3c979ee68a28b61667a8ef1bd..f27c3eba70eee90aa5ebe9301527b87b96f568d4 100644 --- a/core/src/utils/CommonUtil.cpp +++ b/core/src/utils/CommonUtil.cpp @@ -176,6 +176,26 @@ CommonUtil::TimeStrToTime(const std::string& time_str, time_t& time_integer, tm& return true; } +void +CommonUtil::GetCurrentTimeStr(std::string& time_str) { + auto t = std::time(nullptr); + struct tm ltm; + localtime_r(&t, <m); + + time_str = ""; + time_str += std::to_string(ltm.tm_year + 1900); + time_str += "-"; + time_str += std::to_string(ltm.tm_mon + 1); + time_str += "-"; + time_str += std::to_string(ltm.tm_mday); + time_str += "_"; + time_str += std::to_string(ltm.tm_hour); + time_str += ":"; + time_str += std::to_string(ltm.tm_min); + time_str += ":"; + time_str += std::to_string(ltm.tm_sec); +} + void CommonUtil::ConvertTime(time_t time_integer, tm& time_struct) { localtime_r(&time_integer, &time_struct); diff --git a/core/src/utils/CommonUtil.h b/core/src/utils/CommonUtil.h index 1a19294b5a2f1584d5661e8a122cd1182714d441..c24c31ec610557979089d3104a2945b349a84422 100644 --- a/core/src/utils/CommonUtil.h +++ b/core/src/utils/CommonUtil.h @@ -40,6 +40,9 @@ class CommonUtil { TimeStrToTime(const std::string& time_str, time_t& time_integer, tm& time_struct, const std::string& format = "%d-%d-%d %d:%d:%d"); + static void + GetCurrentTimeStr(std::string& time_str); + static void ConvertTime(time_t time_integer, tm& time_struct); static void diff --git a/core/unittest/db/CMakeLists.txt b/core/unittest/db/CMakeLists.txt index bf9ffbe82e71d7e88757c5798a044b6711315202..f61905668ba7a78b667d941be2a25f5878344cf6 100644 --- a/core/unittest/db/CMakeLists.txt +++ b/core/unittest/db/CMakeLists.txt @@ -17,6 +17,7 @@ set( TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test_db.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test_meta.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test_ss_event.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/test_transcript.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test_wal.cpp ) diff --git a/core/unittest/db/test_transcript.cpp b/core/unittest/db/test_transcript.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d645365ddc15234edbca9a8eadfcde11fc5b1730 --- /dev/null +++ b/core/unittest/db/test_transcript.cpp @@ -0,0 +1,876 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include +#include +#include + +#include +#include +#include +#include + +#include "db/DBProxy.h" +#include "db/utils.h" +#include "db/transcript/ScriptFile.h" +#include "db/transcript/ScriptCodec.h" +#include "db/transcript/ScriptRecorder.h" +#include "db/transcript/ScriptReplay.h" +#include "db/transcript/TranscriptProxy.h" + +namespace { + +using DBProxy = milvus::engine::DBProxy; +using VaribleData = milvus::engine::VaribleData; + +using ScriptFile = milvus::engine::ScriptFile; +using ScriptCodec = milvus::engine::ScriptCodec; +using ScriptRecorder = milvus::engine::ScriptRecorder; +using ScriptReplay = milvus::engine::ScriptReplay; +using TranscriptProxy = milvus::engine::TranscriptProxy; + +const char* COLLECTION_NAME = "wal_tbl"; +const char* VECTOR_FIELD_NAME = "vector"; +const char* INT_FIELD_NAME = "int"; +const char* STRING_FIELD_NAME = "string"; + +void +CreateContext(CreateCollectionContext& context) { + auto collection_schema = std::make_shared(COLLECTION_NAME); + context.collection = collection_schema; + auto vector_field = std::make_shared(VECTOR_FIELD_NAME, 0, milvus::engine::DataType::VECTOR_FLOAT); + auto int_field = std::make_shared(INT_FIELD_NAME, 0, milvus::engine::DataType::INT32); + auto str_field = std::make_shared(STRING_FIELD_NAME, 0, milvus::engine::DataType::STRING); + context.fields_schema[vector_field] = {}; + context.fields_schema[int_field] = {}; + context.fields_schema[str_field] = {}; + + auto params = context.collection->GetParams(); + params[milvus::engine::PARAM_UID_AUTOGEN] = true; + params[milvus::engine::PARAM_SEGMENT_ROW_COUNT] = 1000; + context.collection->SetParams(params); +} + +void +CreateChunk(DataChunkPtr& chunk, int64_t row_count) { + chunk = std::make_shared(); + chunk->count_ = row_count; + { + // int32 type field + std::string field_name = INT_FIELD_NAME; + auto bin = std::make_shared(); + bin->data_.resize(chunk->count_ * sizeof(int32_t)); + int32_t* p = (int32_t*)(bin->data_.data()); + for (int64_t i = 0; i < chunk->count_; ++i) { + p[i] = i; + } + chunk->fixed_fields_.insert(std::make_pair(field_name, bin)); + } + { + // vector type field + int64_t dimension = 128; + std::string field_name = VECTOR_FIELD_NAME; + auto bin = std::make_shared(); + bin->data_.resize(chunk->count_ * sizeof(float) * dimension); + float* p = (float*)(bin->data_.data()); + for (int64_t i = 0; i < chunk->count_; ++i) { + for (int64_t j = 0; j < dimension; ++j) { + p[i * dimension + j] = 100 + i * j / 100.0; + } + } + chunk->fixed_fields_.insert(std::make_pair(field_name, bin)); + } + + { + // string type field + std::string field_name = STRING_FIELD_NAME; + auto bin = std::make_shared(); + bin->data_.resize(chunk->count_); + memset(bin->data_.data(), 1, chunk->count_); + bin->offset_.resize(chunk->count_); + for (int64_t i = 0; i < chunk->count_; ++i) { + bin->offset_[i] = 1; + } + chunk->variable_fields_.insert(std::make_pair(field_name, bin)); + } +} + +void +CreateQuery(milvus::query::QueryPtr& query) { + query = std::make_shared(); + query->collection_id = COLLECTION_NAME; + query->partitions = {"p1", "p2", "p3"}; + query->field_names = {"f1", "f2", "f3"}; + query->index_fields = {"a", "b", "c"}; + query->metric_types = { + {"a", "IP"}, + {"b", "L2"}, + }; + + // vector queries + for (int i = 0; i < 3; ++i) { + milvus::query::VectorQueryPtr vector_query = std::make_shared(); + vector_query->field_name = "f" + std::to_string(i); + vector_query->metric_type = "L2"; + vector_query->extra_params["nlist"] = 16384; + vector_query->topk = i + 10; + vector_query->nq = i + 100; + vector_query->boost = false; + vector_query->query_vector.float_data = {0.1, 0.2, 0.3, 0.4}; + vector_query->query_vector.binary_data = {1, 2, 3, 4, 5, 6}; + query->vectors.insert(std::make_pair(std::to_string(i), vector_query)); + } + + // general query + query->root = std::make_shared(); + query->root->leaf = std::make_shared(); + query->root->leaf->vector_placeholder = "placeholder"; + query->root->leaf->query_boost = 0.6; + query->root->leaf->term_query = std::make_shared(); + query->root->leaf->term_query->json_obj["term"] = 1; + query->root->leaf->range_query = std::make_shared(); + query->root->leaf->range_query->json_obj["range"] = 10.0; + query->root->bin = std::make_shared(); + query->root->bin->query_boost = 99.9; + query->root->bin->relation = milvus::query::QueryRelation::R3; + query->root->bin->left_query = std::make_shared(); + query->root->bin->left_query->leaf = std::make_shared(); + query->root->bin->left_query->leaf->vector_placeholder = "holder"; + query->root->bin->right_query = std::make_shared(); + query->root->bin->right_query->bin = std::make_shared(); + query->root->bin->right_query->bin->query_boost = 33.3; +} + +class DummyDB : public DBProxy { + public: + explicit DummyDB(const DBOptions& options) + : DBProxy(nullptr, options) { + } + + Status + CreateCollection(const milvus::engine::snapshot::CreateCollectionContext& context) override { + actions_record_.emplace_back(milvus::engine::ActionCreateCollection); + return Status::OK(); + } + + Status + DropCollection(const std::string& collection_name) override { + actions_record_.emplace_back(milvus::engine::ActionDropCollection); + return Status::OK(); + } + + Status + HasCollection(const std::string& collection_name, bool& has_or_not) override { + actions_record_.emplace_back(milvus::engine::ActionHasCollection); + return Status::OK(); + } + + Status + ListCollections(std::vector& names) override { + actions_record_.emplace_back(milvus::engine::ActionListCollections); + return Status::OK(); + } + + Status + GetCollectionInfo(const std::string& collection_name, milvus::engine::snapshot::CollectionPtr& collection, + milvus::engine::snapshot::FieldElementMappings& fields_schema) override { + actions_record_.emplace_back(milvus::engine::ActionGetCollectionInfo); + return Status::OK(); + } + + Status + GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) override { + actions_record_.emplace_back(milvus::engine::ActionGetCollectionStats); + return Status::OK(); + } + + Status + CountEntities(const std::string& collection_name, int64_t& row_count) override { + actions_record_.emplace_back(milvus::engine::ActionCountEntities); + return Status::OK(); + } + + Status + CreatePartition(const std::string& collection_name, const std::string& partition_name) override { + actions_record_.emplace_back(milvus::engine::ActionCreatePartition); + return Status::OK(); + } + + Status + DropPartition(const std::string& collection_name, const std::string& partition_name) override { + actions_record_.emplace_back(milvus::engine::ActionDropPartition); + return Status::OK(); + } + + Status + HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) override { + actions_record_.emplace_back(milvus::engine::ActionHasPartition); + return Status::OK(); + } + + Status + ListPartitions(const std::string& collection_name, std::vector& partition_names) override { + actions_record_.emplace_back(milvus::engine::ActionListPartitions); + return Status::OK(); + } + + Status + CreateIndex(const milvus::server::ContextPtr& context, + const std::string& collection_name, + const std::string& field_name, + const milvus::engine::CollectionIndex& index) override { + actions_record_.emplace_back(milvus::engine::ActionCreateIndex); + return Status::OK(); + } + + Status + DropIndex(const std::string& collection_name, const std::string& field_name) override { + actions_record_.emplace_back(milvus::engine::ActionDropIndex); + return Status::OK(); + } + + Status + DescribeIndex(const std::string& collection_name, + const std::string& field_name, + milvus::engine::CollectionIndex& index) override { + actions_record_.emplace_back(milvus::engine::ActionDescribeIndex); + return Status::OK(); + } + + Status + Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + idx_t op_id) override { + actions_record_.emplace_back(milvus::engine::ActionInsert); + return Status::OK(); + } + + Status + GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, + const std::vector& field_names, std::vector& valid_row, + DataChunkPtr& data_chunk) override { + actions_record_.emplace_back(milvus::engine::ActionGetEntityByID); + return Status::OK(); + } + + Status + DeleteEntityByID(const std::string& collection_name, + const milvus::engine::IDNumbers& entity_ids, + idx_t op_id) override { + actions_record_.emplace_back(milvus::engine::ActionDeleteEntityByID); + return Status::OK(); + } + + Status + ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override { + actions_record_.emplace_back(milvus::engine::ActionListIDInSegment); + return Status::OK(); + } + + Status + Query(const milvus::server::ContextPtr& context, + const milvus::query::QueryPtr& query_ptr, + milvus::engine::QueryResultPtr& result) override { + actions_record_.emplace_back(milvus::engine::ActionQuery); + return Status::OK(); + } + + Status + LoadCollection(const milvus::server::ContextPtr& context, const std::string& collection_name, + const std::vector& field_names, bool force) override { + actions_record_.emplace_back(milvus::engine::ActionLoadCollection); + return Status::OK(); + } + + Status + Flush(const std::string& collection_name) override { + actions_record_.emplace_back(milvus::engine::ActionFlush); + return Status::OK(); + } + + Status + Flush() override { + actions_record_.emplace_back(milvus::engine::ActionFlush); + return Status::OK(); + } + + Status + Compact(const milvus::server::ContextPtr& context, const std::string& collection_name, double threshold) override { + actions_record_.emplace_back(milvus::engine::ActionCompact); + return Status::OK(); + } + + const std::vector& Actions() const { + return actions_record_; + } + + private: + std::vector actions_record_; +}; + +using DummyDBPtr = std::shared_ptr; + +} // namespace + +TEST(TranscriptTest, CodecTest) { + + { + milvus::json json_obj; + std::string input = "action"; + ScriptCodec::EncodeAction(json_obj, input); + + std::string output; + int64_t action_ts = 0; + ScriptCodec::DecodeAction(json_obj, output, action_ts); + ASSERT_EQ(input, output); + ASSERT_GT(action_ts, 0); + } + + { + milvus::json json_obj; + CreateCollectionContext input; + CreateContext(input); + ScriptCodec::Encode(json_obj, input); + + CreateCollectionContext output; + ScriptCodec::Decode(json_obj, output); + ASSERT_NE(output.collection, nullptr); + ASSERT_EQ(output.collection->GetName(), input.collection->GetName()); + ASSERT_EQ(output.collection->GetParams(), input.collection->GetParams()); + ASSERT_EQ(output.fields_schema.size(), input.fields_schema.size()); + } + + { + milvus::json json_obj; + std::string input = "abc"; + ScriptCodec::EncodeCollectionName(json_obj, input); + + std::string output; + ScriptCodec::DecodeCollectionName(json_obj, output); + ASSERT_EQ(input, output); + } + + { + milvus::json json_obj; + std::string input = "abc"; + ScriptCodec::EncodePartitionName(json_obj, input); + + std::string output; + ScriptCodec::DecodePartitionName(json_obj, output); + ASSERT_EQ(input, output); + } + + { + milvus::json json_obj; + std::string input = "abc"; + ScriptCodec::EncodeFieldName(json_obj, input); + + std::string output; + ScriptCodec::DecodeFieldName(json_obj, output); + ASSERT_EQ(input, output); + } + + { + milvus::json json_obj; + std::vector input = {"abc", "cdf", "fjk"}; + ScriptCodec::EncodeFieldNames(json_obj, input); + + std::vector output; + ScriptCodec::DecodeFieldNames(json_obj, output); + ASSERT_EQ(input, output); + } + + { + milvus::json json_obj; + milvus::engine::CollectionIndex input; + input.index_name_ = "a"; + input.index_type_ = "IVF"; + input.metric_name_ = "IP"; + input.extra_params_["NLIST"] = 1024; + ScriptCodec::Encode(json_obj, input); + + milvus::engine::CollectionIndex output; + ScriptCodec::Decode(json_obj, output); + ASSERT_EQ(input.index_name_, output.index_name_); + ASSERT_EQ(input.index_type_, output.index_type_); + ASSERT_EQ(input.metric_name_, output.metric_name_); + ASSERT_EQ(input.extra_params_, output.extra_params_); + } + + { + milvus::json json_obj; + milvus::engine::DataChunkPtr input; + CreateChunk(input, 10); + ScriptCodec::Encode(json_obj, input); + + milvus::engine::DataChunkPtr output; + ScriptCodec::Decode(json_obj, output); + ASSERT_NE(output, nullptr); + ASSERT_EQ(input->count_, output->count_); + ASSERT_EQ(input->fixed_fields_.size(), output->fixed_fields_.size()); + ASSERT_EQ(input->variable_fields_.size(), output->variable_fields_.size()); + for (auto& pair : input->fixed_fields_) { + auto& name = pair.first; + auto& bin_2 = output->fixed_fields_[name]; + ASSERT_NE(bin_2, nullptr); + auto& bin_1 = pair.second; + ASSERT_NE(bin_1, nullptr); + ASSERT_EQ(bin_1->data_, bin_2->data_); + } + for (auto& pair : input->variable_fields_) { + auto& name = pair.first; + auto& bin_2 = output->variable_fields_[name]; + ASSERT_NE(bin_2, nullptr); + auto& bin_1 = pair.second; + ASSERT_NE(bin_1, nullptr); + ASSERT_EQ(bin_1->data_, bin_2->data_); + ASSERT_EQ(bin_1->offset_, bin_2->offset_); + } + } + + { + milvus::json json_obj; + milvus::engine::IDNumbers input = {1, 3, 5, 7}; + ScriptCodec::Encode(json_obj, input); + + milvus::engine::IDNumbers output; + ScriptCodec::Decode(json_obj, output); + ASSERT_EQ(input, output); + } + + { + milvus::json json_obj; + int64_t input = 5467; + ScriptCodec::EncodeSegmentID(json_obj, input); + + int64_t output = 0; + ScriptCodec::DecodeSegmentID(json_obj, output); + ASSERT_EQ(input, output); + } + + { + milvus::json json_obj; + milvus::query::QueryPtr input; + CreateQuery(input); + ScriptCodec::Encode(json_obj, input); + + milvus::query::QueryPtr output; + ScriptCodec::Decode(json_obj, output); + ASSERT_NE(output, nullptr); + ASSERT_EQ(input->collection_id, output->collection_id); + ASSERT_EQ(input->partitions, output->partitions); + ASSERT_EQ(input->field_names, output->field_names); + ASSERT_EQ(input->index_fields, output->index_fields); + ASSERT_EQ(input->metric_types, output->metric_types); + ASSERT_EQ(input->vectors.size(), output->vectors.size()); + + // vector queries + for (auto& pair : input->vectors) { + ASSERT_GT(output->vectors.count(pair.first), 0); + auto& query_1 = output->vectors[pair.first]; + auto& query_2 = pair.second; + ASSERT_NE(query_1, nullptr); + ASSERT_NE(query_2, nullptr); + ASSERT_EQ(query_1->field_name, query_2->field_name); + ASSERT_EQ(query_1->extra_params, query_2->extra_params); + ASSERT_EQ(query_1->topk, query_2->topk); + ASSERT_EQ(query_1->nq, query_2->nq); + ASSERT_EQ(query_1->metric_type, query_2->metric_type); + ASSERT_EQ(query_1->boost, query_2->boost); + ASSERT_EQ(query_1->query_vector.float_data, query_2->query_vector.float_data); + ASSERT_EQ(query_1->query_vector.binary_data, query_2->query_vector.binary_data); + } + + // general query + ASSERT_NE(output->root, nullptr); + ASSERT_NE(output->root->leaf, nullptr); + ASSERT_EQ(output->root->leaf->query_boost, input->root->leaf->query_boost); + ASSERT_EQ(output->root->leaf->vector_placeholder, input->root->leaf->vector_placeholder); + ASSERT_NE(output->root->leaf->term_query, nullptr); + ASSERT_EQ(output->root->leaf->term_query->json_obj, input->root->leaf->term_query->json_obj); + ASSERT_NE(output->root->leaf->range_query, nullptr); + ASSERT_EQ(output->root->leaf->range_query->json_obj, input->root->leaf->range_query->json_obj); + ASSERT_NE(output->root->bin, nullptr); + ASSERT_EQ(output->root->bin->relation, input->root->bin->relation); + ASSERT_EQ(output->root->bin->query_boost, input->root->bin->query_boost); + } + + { + milvus::json json_obj; + double input = 2.5; + ScriptCodec::EncodeThreshold(json_obj, input); + + double output = 0; + ScriptCodec::DecodeThreshold(json_obj, output); + ASSERT_EQ(input, output); + } + + { + milvus::json json_obj; + bool input = true; + ScriptCodec::EncodeForce(json_obj, input); + + bool output = false; + ScriptCodec::DecodeForce(json_obj, output); + ASSERT_EQ(input, output); + } +} + +TEST(TranscriptTest, FileTest) { + std::string file_path = "/tmp/milvus_script_test.txt"; + std::experimental::filesystem::remove(file_path); + int32_t repeat = 100; + { + ScriptFile file; + file.OpenWrite(file_path); + for (int32_t i = 0; i < repeat; ++i) { + file.WriteLine(file_path); + } + + ASSERT_TRUE(file.ExceedMaxSize(milvus::engine::MAX_SCRIPT_FILE_SIZE)); + } + + { + ScriptFile file; + file.OpenRead(file_path); + + int32_t count = 0; + std::string line; + while (file.ReadLine(line)) { + ASSERT_EQ(line, file_path); + count++; + } + ASSERT_EQ(count, repeat); + } + +} + +TEST(TranscriptTest, ReplayTest) { + DBOptions options; + DummyDBPtr db = std::make_shared(options); + + std::string transcript_path = "/tmp/milvus_transcript"; + ScriptRecorder recorder(transcript_path); + + // register action functions + std::string collection_name = "collection"; + std::string partition_name = "partition"; + std::string field_name = "field"; + std::vector actions; + std::vector> functions; + functions.emplace_back([&]() { + milvus::engine::snapshot::CreateCollectionContext context; + recorder.CreateCollection(context); + actions.emplace_back(milvus::engine::ActionCreateCollection); + }); + functions.emplace_back([&]() { + recorder.DropCollection(collection_name); + actions.emplace_back(milvus::engine::ActionDropCollection); + }); + functions.emplace_back([&]() { + bool has = false; + recorder.HasCollection(collection_name, has); + actions.emplace_back(milvus::engine::ActionHasCollection); + }); + functions.emplace_back([&]() { + std::vector names; + recorder.ListCollections(names); + actions.emplace_back(milvus::engine::ActionListCollections); + }); + functions.emplace_back([&]() { + milvus::engine::snapshot::CollectionPtr collection; + milvus::engine::snapshot::FieldElementMappings fields_schema; + recorder.GetCollectionInfo(collection_name, collection, fields_schema); + actions.emplace_back(milvus::engine::ActionGetCollectionInfo); + }); + functions.emplace_back([&]() { + milvus::json collection_stats; + recorder.GetCollectionStats(collection_name, collection_stats); + actions.emplace_back(milvus::engine::ActionGetCollectionStats); + }); + functions.emplace_back([&]() { + int64_t count = 0; + recorder.CountEntities(collection_name, count); + actions.emplace_back(milvus::engine::ActionCountEntities); + }); + functions.emplace_back([&]() { + recorder.CreatePartition(collection_name, partition_name); + actions.emplace_back(milvus::engine::ActionCreatePartition); + }); + functions.emplace_back([&]() { + recorder.DropPartition(collection_name, partition_name); + actions.emplace_back(milvus::engine::ActionDropPartition); + }); + functions.emplace_back([&]() { + bool has = false; + recorder.HasPartition(collection_name, partition_name, has); + actions.emplace_back(milvus::engine::ActionHasPartition); + }); + functions.emplace_back([&]() { + std::vector partition_names; + recorder.ListPartitions(collection_name, partition_names); + actions.emplace_back(milvus::engine::ActionListPartitions); + }); + functions.emplace_back([&]() { + milvus::engine::CollectionIndex index; + recorder.CreateIndex(nullptr, collection_name, field_name, index); + actions.emplace_back(milvus::engine::ActionCreateIndex); + }); + functions.emplace_back([&]() { + recorder.DropIndex(collection_name, field_name); + actions.emplace_back(milvus::engine::ActionDropIndex); + }); + functions.emplace_back([&]() { + milvus::engine::CollectionIndex index; + index.index_type_ = "PQ"; + recorder.DescribeIndex(collection_name, field_name, index); + actions.emplace_back(milvus::engine::ActionDescribeIndex); + }); + functions.emplace_back([&]() { + milvus::engine::DataChunkPtr chunk; + recorder.Insert(collection_name, partition_name, chunk, 0); + actions.emplace_back(milvus::engine::ActionInsert); + }); + functions.emplace_back([&]() { + IDNumbers id_array = {1, 2, 3}; + std::vector field_names = {field_name}; + std::vector valid_row; + DataChunkPtr data_chunk; + recorder.GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk); + actions.emplace_back(milvus::engine::ActionGetEntityByID); + }); + functions.emplace_back([&]() { + IDNumbers id_array = {1, 2, 3}; + recorder.DeleteEntityByID(collection_name, id_array, 0); + actions.emplace_back(milvus::engine::ActionDeleteEntityByID); + }); + functions.emplace_back([&]() { + IDNumbers id_array; + recorder.ListIDInSegment(collection_name, 1, id_array); + actions.emplace_back(milvus::engine::ActionListIDInSegment); + }); + functions.emplace_back([&]() { + milvus::query::QueryPtr query_ptr; + milvus::engine::QueryResultPtr result; + recorder.Query(nullptr, query_ptr, result); + actions.emplace_back(milvus::engine::ActionQuery); + }); + functions.emplace_back([&]() { + IDNumbers id_array = {1, 2, 3}; + std::vector field_names = {field_name}; + recorder.LoadCollection(nullptr, collection_name, field_names, true); + actions.emplace_back(milvus::engine::ActionLoadCollection); + }); + functions.emplace_back([&]() { + recorder.Flush(collection_name); + actions.emplace_back(milvus::engine::ActionFlush); + }); + functions.emplace_back([&]() { + recorder.Flush(); + actions.emplace_back(milvus::engine::ActionFlush); + }); + functions.emplace_back([&]() { + recorder.Compact(nullptr, collection_name, 0.5); + actions.emplace_back(milvus::engine::ActionCompact); + }); + + // random actions + for (int32_t i = 0; i < 100; i++) { + auto rand = lrand48(); + auto index = rand % functions.size(); + auto& function = functions.at(index); + function(); + } + + // each action at least do one time + for (size_t i = 0; i < functions.size(); ++i) { + auto& function = functions.at(i); + function(); + } + + // replay + ScriptReplay replay; + std::string script_path = recorder.GetScriptPath(); + auto status = replay.Replay(db, script_path); + ASSERT_TRUE(status.ok()); + + const std::vector& record_actions = db->Actions(); + ASSERT_EQ(actions.size(), record_actions.size()); + ASSERT_EQ(actions, record_actions); + + std::experimental::filesystem::remove_all(transcript_path); +} + +TEST(TranscriptTest, ProxyTest) { + std::string test_path = "/tmp/milvus_test"; + DBOptions options; + options.transcript_enable_ = true; + options.meta_.path_ = test_path + "/db"; + DummyDBPtr db = std::make_shared(options); + TranscriptProxy proxy(db, options); + proxy.Start(); + + // register action functions + std::string collection_name = "collection"; + std::string partition_name = "partition"; + std::string field_name = "field"; + std::vector actions; + std::vector> functions; + functions.emplace_back([&]() { + milvus::engine::snapshot::CreateCollectionContext context; + proxy.CreateCollection(context); + actions.emplace_back(milvus::engine::ActionCreateCollection); + }); + functions.emplace_back([&]() { + proxy.DropCollection(collection_name); + actions.emplace_back(milvus::engine::ActionDropCollection); + }); + functions.emplace_back([&]() { + bool has = false; + proxy.HasCollection(collection_name, has); + actions.emplace_back(milvus::engine::ActionHasCollection); + }); + functions.emplace_back([&]() { + std::vector names; + proxy.ListCollections(names); + actions.emplace_back(milvus::engine::ActionListCollections); + }); + functions.emplace_back([&]() { + milvus::engine::snapshot::CollectionPtr collection; + milvus::engine::snapshot::FieldElementMappings fields_schema; + proxy.GetCollectionInfo(collection_name, collection, fields_schema); + actions.emplace_back(milvus::engine::ActionGetCollectionInfo); + }); + functions.emplace_back([&]() { + milvus::json collection_stats; + proxy.GetCollectionStats(collection_name, collection_stats); + actions.emplace_back(milvus::engine::ActionGetCollectionStats); + }); + functions.emplace_back([&]() { + int64_t count = 0; + proxy.CountEntities(collection_name, count); + actions.emplace_back(milvus::engine::ActionCountEntities); + }); + functions.emplace_back([&]() { + proxy.CreatePartition(collection_name, partition_name); + actions.emplace_back(milvus::engine::ActionCreatePartition); + }); + functions.emplace_back([&]() { + proxy.DropPartition(collection_name, partition_name); + actions.emplace_back(milvus::engine::ActionDropPartition); + }); + functions.emplace_back([&]() { + bool has = false; + proxy.HasPartition(collection_name, partition_name, has); + actions.emplace_back(milvus::engine::ActionHasPartition); + }); + functions.emplace_back([&]() { + std::vector partition_names; + proxy.ListPartitions(collection_name, partition_names); + actions.emplace_back(milvus::engine::ActionListPartitions); + }); + functions.emplace_back([&]() { + milvus::engine::CollectionIndex index; + proxy.CreateIndex(nullptr, collection_name, field_name, index); + actions.emplace_back(milvus::engine::ActionCreateIndex); + }); + functions.emplace_back([&]() { + proxy.DropIndex(collection_name, field_name); + actions.emplace_back(milvus::engine::ActionDropIndex); + }); + functions.emplace_back([&]() { + milvus::engine::CollectionIndex index; + index.index_type_ = "PQ"; + proxy.DescribeIndex(collection_name, field_name, index); + actions.emplace_back(milvus::engine::ActionDescribeIndex); + }); + functions.emplace_back([&]() { + milvus::engine::DataChunkPtr chunk; + proxy.Insert(collection_name, partition_name, chunk, 0); + actions.emplace_back(milvus::engine::ActionInsert); + }); + functions.emplace_back([&]() { + IDNumbers id_array = {1, 2, 3}; + std::vector field_names = {field_name}; + std::vector valid_row; + DataChunkPtr data_chunk; + proxy.GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk); + actions.emplace_back(milvus::engine::ActionGetEntityByID); + }); + functions.emplace_back([&]() { + IDNumbers id_array = {1, 2, 3}; + proxy.DeleteEntityByID(collection_name, id_array, 0); + actions.emplace_back(milvus::engine::ActionDeleteEntityByID); + }); + functions.emplace_back([&]() { + IDNumbers id_array; + proxy.ListIDInSegment(collection_name, 1, id_array); + actions.emplace_back(milvus::engine::ActionListIDInSegment); + }); + functions.emplace_back([&]() { + milvus::query::QueryPtr query_ptr; + milvus::engine::QueryResultPtr result; + proxy.Query(nullptr, query_ptr, result); + actions.emplace_back(milvus::engine::ActionQuery); + }); + functions.emplace_back([&]() { + IDNumbers id_array = {1, 2, 3}; + std::vector field_names = {field_name}; + proxy.LoadCollection(nullptr, collection_name, field_names, true); + actions.emplace_back(milvus::engine::ActionLoadCollection); + }); + functions.emplace_back([&]() { + proxy.Flush(collection_name); + actions.emplace_back(milvus::engine::ActionFlush); + }); + functions.emplace_back([&]() { + proxy.Flush(); + actions.emplace_back(milvus::engine::ActionFlush); + }); + functions.emplace_back([&]() { + proxy.Compact(nullptr, collection_name, 0.5); + actions.emplace_back(milvus::engine::ActionCompact); + }); + + // random actions + for (int32_t i = 0; i < 100; i++) { + auto rand = lrand48(); + auto index = rand % functions.size(); + auto& function = functions.at(index); + function(); + } + + // each action at least do one time + for (size_t i = 0; i < functions.size(); ++i) { + auto& function = functions.at(i); + function(); + } + + { + const std::vector& record_actions = db->Actions(); + ASSERT_EQ(actions.size(), record_actions.size()); + } + + // replay + { + options.replay_script_path_ = proxy.GetScriptRecorder()->GetScriptPath(); + proxy.Stop(); + DummyDBPtr db_replay = std::make_shared(options); + TranscriptProxy proxy_replay(db_replay, options); + proxy_replay.Start(); + + const std::vector& record_actions = db_replay->Actions(); + ASSERT_EQ(actions.size(), record_actions.size()); + ASSERT_EQ(actions, record_actions); + proxy_replay.Stop(); + } + + std::experimental::filesystem::remove_all(test_path); +} \ No newline at end of file