未验证 提交 a4b2c2b8 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshots): Integration and Enhancement (#2673)

* (db/snapshot): integrate stage 1
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): integrate stage 2
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): integrate stage 3
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): integrate stage 4
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): integrate stage 5
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): integrate stage 6
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): integrate stage 7
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add params for some resources
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add field type
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): implement iterate framework for Snapshot
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor code
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint errors
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add SnapshotHandlers
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add SnapshotVistor
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add SizeField for all resource commit
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add SegmentsToSearch collector
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Iterate handler update
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Update IterateHandler
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add some hooks for IterateHandler
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Small code refactor
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add const for some snapshot APIs
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add const for some snapshots APIs
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Update create collection
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): merge to new ssdb
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint errors
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add test_db in ssdb
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update test_ssdb
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 c454acde
......@@ -11,6 +11,7 @@
#pragma once
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
......@@ -62,7 +63,7 @@ class DB {
HasNativeCollection(const std::string& collection_id, bool& has_or_not) = 0;
virtual Status
AllCollections(std::vector<meta::CollectionSchema>& table_schema_array) = 0;
AllCollections(std::vector<std::string>& names) = 0;
virtual Status
GetCollectionInfo(const std::string& collection_id, std::string& collection_info) = 0;
......
......@@ -334,19 +334,20 @@ DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not)
}
Status
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
DBImpl::AllCollections(std::vector<std::string>& names) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
names.clear();
std::vector<meta::CollectionSchema> all_collections;
auto status = meta_ptr_->AllCollections(all_collections);
// only return real collections, dont return partition collections
collection_schema_array.clear();
for (auto& schema : all_collections) {
if (schema.owner_collection_.empty()) {
collection_schema_array.push_back(schema);
names.push_back(schema.collection_id_);
}
}
......
......@@ -72,7 +72,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
HasNativeCollection(const std::string& collection_id, bool& has_or_not_) override;
Status
AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) override;
AllCollections(std::vector<std::string>& names) override;
Status
GetCollectionInfo(const std::string& collection_id, std::string& collection_info) override;
......
......@@ -10,11 +10,34 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/SSDBImpl.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Snapshots.h"
#include "wal/WalDefinations.h"
#include <limits>
#include <utility>
namespace milvus {
namespace engine {
namespace {
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
} // namespace
#define CHECK_INITIALIZED \
if (!initialized_.load(std::memory_order_acquire)) { \
return SHUTDOWN_ERROR; \
}
SSDBImpl::SSDBImpl(const DBOptions& options) : options_(options), initialized_(false) {
if (options_.wal_enable_) {
wal::MXLogConfiguration mxlog_config;
mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
// 2 buffers in the WAL
mxlog_config.buffer_size = options_.buffer_size_ / 2;
mxlog_config.mxlog_path = options_.mxlog_path_;
wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
}
Start();
}
......@@ -49,5 +72,169 @@ SSDBImpl::Stop() {
return Status::OK();
}
Status
SSDBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
CHECK_INITIALIZED;
auto ctx = context;
if (options_.wal_enable_) {
ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName());
}
auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
auto status = op->Push();
return status;
}
Status
SSDBImpl::DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection,
std::map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema) {
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
collection = ss->GetCollection();
auto& fields = ss->GetResources<snapshot::Field>();
for (auto& kv : fields) {
fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
}
return status;
}
Status
SSDBImpl::DropCollection(const std::string& name) {
CHECK_INITIALIZED;
// dates partly delete files of the collection but currently we don't support
LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << name;
snapshot::ScopedSnapshotT ss;
auto& snapshots = snapshot::Snapshots::GetInstance();
auto status = snapshots.GetSnapshot(ss, name);
if (!status.ok()) {
return status;
}
if (options_.wal_enable_) {
// SS TODO
/* wal_mgr_->DropCollection(ss->GetCollectionId()); */
}
status = snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
return status;
}
Status
SSDBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
has_or_not = status.ok();
return status;
}
Status
SSDBImpl::AllCollections(std::vector<std::string>& names) {
CHECK_INITIALIZED;
names.clear();
return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
}
Status
SSDBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
CHECK_INITIALIZED;
uint64_t lsn = 0;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
if (options_.wal_enable_) {
// SS TODO
/* lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); */
} else {
lsn = ss->GetCollection()->GetLsn();
}
snapshot::OperationContext context;
context.lsn = lsn;
auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);
snapshot::PartitionContext p_ctx;
p_ctx.name = partition_name;
snapshot::PartitionPtr partition;
status = op->CommitNewPartition(p_ctx, partition);
if (!status.ok()) {
return status;
}
status = op->Push();
return status;
}
Status
SSDBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
// SS TODO: Is below step needed? Or How to implement it?
/* mem_mgr_->EraseMemVector(partition_name); */
snapshot::PartitionContext context;
context.name = partition_name;
auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
status = op->Push();
return status;
}
Status
SSDBImpl::ShowPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
partition_names = std::move(ss->GetPartitionNames());
return status;
}
Status
SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
bool force) {
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
auto handler = std::make_shared<LoadVectorFieldHandler>(context, ss);
handler->Iterate();
return handler->GetStatus();
}
} // namespace engine
} // namespace milvus
......@@ -12,9 +12,18 @@
#pragma once
#include <atomic>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "db/Options.h"
#include "db/SnapshotHandlers.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Resources.h"
#include "utils/Status.h"
#include "wal/WalManager.h"
namespace milvus {
namespace engine {
......@@ -23,6 +32,35 @@ class SSDBImpl {
public:
explicit SSDBImpl(const DBOptions& options);
Status
CreateCollection(const snapshot::CreateCollectionContext& context);
Status
DropCollection(const std::string& name);
Status
DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection,
std::map<snapshot::FieldPtr, std::vector<snapshot::FieldElementPtr>>& fields_schema);
Status
HasCollection(const std::string& collection_name, bool& has_or_not);
Status
AllCollections(std::vector<std::string>& names);
Status
PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
bool force = false);
Status
CreatePartition(const std::string& collection_name, const std::string& partition_name);
Status
DropPartition(const std::string& collection_name, const std::string& partition_name);
Status
ShowPartitions(const std::string& collection_name, std::vector<std::string>& partition_names);
~SSDBImpl();
Status
......@@ -34,6 +72,7 @@ class SSDBImpl {
private:
DBOptions options_;
std::atomic<bool> initialized_;
std::shared_ptr<wal::WalManager> wal_mgr_;
}; // SSDBImpl
} // namespace engine
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/SnapshotHandlers.h"
#include "db/meta/MetaTypes.h"
namespace milvus {
namespace engine {
LoadVectorFieldElementHandler::LoadVectorFieldElementHandler(const std::shared_ptr<server::Context>& context,
snapshot::ScopedSnapshotT ss,
const snapshot::FieldPtr& field)
: BaseT(ss), context_(context), field_(field) {
}
Status
LoadVectorFieldElementHandler::Handle(const snapshot::FieldElementPtr& field_element) {
if (field_->GetFtype() != snapshot::FieldType::VECTOR) {
return Status(DB_ERROR, "Should be VECTOR field");
}
if (field_->GetID() != field_element->GetFieldId()) {
return Status::OK();
}
// SS TODO
return Status::OK();
}
LoadVectorFieldHandler::LoadVectorFieldHandler(const std::shared_ptr<server::Context>& context,
snapshot::ScopedSnapshotT ss)
: BaseT(ss), context_(context) {
}
Status
LoadVectorFieldHandler::Handle(const snapshot::FieldPtr& field) {
if (field->GetFtype() != snapshot::FieldType::VECTOR) {
return Status::OK();
}
if (context_ && context_->IsConnectionBroken()) {
LOG_ENGINE_DEBUG_ << "Client connection broken, stop load collection";
return Status(DB_ERROR, "Connection broken");
}
// SS TODO
auto element_handler = std::make_shared<LoadVectorFieldElementHandler>(context_, ss_, field);
element_handler->Iterate();
auto status = element_handler->GetStatus();
if (!status.ok()) {
return status;
}
// SS TODO: Do Load
return status;
}
SegmentsToSearchCollector::SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, meta::FilesHolder& holder)
: BaseT(ss), holder_(holder) {
}
Status
SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_commit) {
// SS TODO
meta::SegmentSchema schema;
/* schema.id_ = segment_commit->GetSegmentId(); */
/* schema.file_type_ = resRow["file_type"]; */
/* schema.file_size_ = resRow["file_size"]; */
/* schema.row_count_ = resRow["row_count"]; */
/* schema.date_ = resRow["date"]; */
/* schema.engine_type_ = resRow["engine_type"]; */
/* schema.created_on_ = resRow["created_on"]; */
/* schema.updated_time_ = resRow["updated_time"]; */
/* schema.dimension_ = collection_schema.dimension_; */
/* schema.index_file_size_ = collection_schema.index_file_size_; */
/* schema.index_params_ = collection_schema.index_params_; */
/* schema.metric_type_ = collection_schema.metric_type_; */
/* auto status = utils::GetCollectionFilePath(options_, schema); */
/* if (!status.ok()) { */
/* ret = status; */
/* continue; */
/* } */
holder_.MarkFile(schema);
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/meta/FilesHolder.h"
#include "db/snapshot/Snapshot.h"
#include "server/context/Context.h"
#include "utils/Log.h"
#include <memory>
namespace milvus {
namespace engine {
struct LoadVectorFieldElementHandler : public snapshot::IterateHandler<snapshot::FieldElement> {
using ResourceT = snapshot::FieldElement;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldElementHandler(const std::shared_ptr<server::Context>& context, snapshot::ScopedSnapshotT ss,
const snapshot::FieldPtr& field);
Status
Handle(const typename ResourceT::Ptr&) override;
const std::shared_ptr<server::Context>& context_;
const snapshot::FieldPtr& field_;
};
struct LoadVectorFieldHandler : public snapshot::IterateHandler<snapshot::Field> {
using ResourceT = snapshot::Field;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldHandler(const std::shared_ptr<server::Context>& context, snapshot::ScopedSnapshotT ss);
Status
Handle(const typename ResourceT::Ptr&) override;
const std::shared_ptr<server::Context>& context_;
};
struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::SegmentCommit> {
using ResourceT = snapshot::SegmentCommit;
using BaseT = snapshot::IterateHandler<ResourceT>;
SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, meta::FilesHolder& holder);
Status
Handle(const typename ResourceT::Ptr&) override;
meta::FilesHolder& holder_;
};
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/SnapshotVisitor.h"
#include "db/SnapshotHandlers.h"
#include "db/meta/MetaTypes.h"
#include "db/snapshot/Snapshots.h"
namespace milvus {
namespace engine {
SnapshotVisitor::SnapshotVisitor(snapshot::ScopedSnapshotT ss) : ss_(ss) {
}
SnapshotVisitor::SnapshotVisitor(const std::string& collection_name) {
status_ = snapshot::Snapshots::GetInstance().GetSnapshot(ss_, collection_name);
}
SnapshotVisitor::SnapshotVisitor(snapshot::ID_TYPE collection_id) {
status_ = snapshot::Snapshots::GetInstance().GetSnapshot(ss_, collection_id);
}
Status
SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) {
STATUS_CHECK(status_);
auto handler = std::make_shared<SegmentsToSearchCollector>(ss_, files_holder);
handler->Iterate();
return handler->GetStatus();
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/meta/FilesHolder.h"
#include "db/snapshot/Snapshot.h"
#include <memory>
#include <string>
namespace milvus {
namespace engine {
class SnapshotVisitor {
public:
explicit SnapshotVisitor(snapshot::ScopedSnapshotT ss);
explicit SnapshotVisitor(const std::string& collection_name);
explicit SnapshotVisitor(snapshot::ID_TYPE collection_id);
Status
SegmentsToSearch(meta::FilesHolder& files_holder);
protected:
snapshot::ScopedSnapshotT ss_;
Status status_;
};
} // namespace engine
} // namespace milvus
......@@ -402,7 +402,8 @@ CreateCollectionOperation::DoExecute(Store& store) {
auto& field_schema = field_kv.first;
auto& field_elements = field_kv.second;
FieldPtr field;
status = store.CreateResource<Field>(Field(field_schema->GetName(), field_idx), field);
status =
store.CreateResource<Field>(Field(field_schema->GetName(), field_idx, field_schema->GetFtype()), field);
AddStepWithLsn(*field, c_context_.lsn);
MappingT element_ids = {};
FieldElementPtr raw_element;
......
......@@ -24,8 +24,10 @@ using NUM_TYPE = int64_t;
using FTYPE_TYPE = int64_t;
using TS_TYPE = int64_t;
using LSN_TYPE = uint64_t;
using SIZE_TYPE = uint64_t;
using MappingT = std::set<ID_TYPE>;
enum FieldType { VECTOR, INT32 };
enum FieldElementType { RAW, IVFSQ8 };
using IDS_TYPE = std::vector<ID_TYPE>;
......
......@@ -16,9 +16,10 @@
namespace milvus::engine::snapshot {
Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on,
TS_TYPE updated_on)
Collection::Collection(const std::string& name, const std::string& params, ID_TYPE id, LSN_TYPE lsn, State state,
TS_TYPE created_on, TS_TYPE updated_on)
: NameField(name),
ParamsField(params),
IdField(id),
LsnField(lsn),
StateField(state),
......@@ -26,11 +27,12 @@ Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State
UpdatedOnField(updated_on) {
}
CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, ID_TYPE id,
LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, SIZE_TYPE size,
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
SchemaIdField(schema_id),
MappingsField(mappings),
SizeField(size),
IdField(id),
LsnField(lsn),
StateField(state),
......@@ -49,11 +51,12 @@ Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id,
UpdatedOnField(updated_on) {
}
PartitionCommit::PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings, ID_TYPE id,
LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
PartitionCommit::PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings, SIZE_TYPE size,
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
MappingsField(mappings),
SizeField(size),
IdField(id),
LsnField(lsn),
StateField(state),
......@@ -99,11 +102,13 @@ Segment::ToString() const {
}
SegmentCommit::SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings,
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
SIZE_TYPE size, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on,
TS_TYPE updated_on)
: SchemaIdField(schema_id),
PartitionIdField(partition_id),
SegmentIdField(segment_id),
MappingsField(mappings),
SizeField(size),
IdField(id),
LsnField(lsn),
StateField(state),
......@@ -123,11 +128,12 @@ SegmentCommit::ToString() const {
}
SegmentFile::SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id,
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
SIZE_TYPE size, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
SegmentIdField(segment_id),
FieldElementIdField(field_element_id),
SizeField(size),
IdField(id),
LsnField(lsn),
StateField(state),
......@@ -146,10 +152,12 @@ SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_T
UpdatedOnField(updated_on) {
}
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on,
TS_TYPE updated_on)
Field::Field(const std::string& name, NUM_TYPE num, FTYPE_TYPE ftype, const std::string& params, ID_TYPE id,
LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: NameField(name),
NumField(num),
FtypeField(ftype),
ParamsField(params),
IdField(id),
LsnField(lsn),
StateField(state),
......@@ -170,11 +178,13 @@ FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT
}
FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
const std::string& params, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on,
TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
NameField(name),
FtypeField(ftype),
ParamsField(params),
IdField(id),
LsnField(lsn),
StateField(state),
......
......@@ -23,11 +23,14 @@
#include "db/snapshot/BaseResource.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/ScopedResource.h"
#include "utils/Json.h"
using milvus::engine::utils::GetMicroSecTimeStamp;
namespace milvus::engine::snapshot {
static constexpr const char* JEmpty = "{}";
class MappingsField {
public:
explicit MappingsField(MappingT mappings = {}) : mappings_(std::move(mappings)) {
......@@ -291,10 +294,45 @@ class NameField {
std::string name_;
};
class ParamsField {
public:
explicit ParamsField(std::string params) : params_(std::move(params)), json_params_(json::parse(params_)) {
}
const std::string&
GetParams() const {
return params_;
}
const json&
GetParamsJson() const {
return json_params_;
}
protected:
std::string params_;
json json_params_;
};
class SizeField {
public:
explicit SizeField(SIZE_TYPE size) : size_(size) {
}
SIZE_TYPE
GetSize() const {
return size_;
}
protected:
SIZE_TYPE size_;
};
///////////////////////////////////////////////////////////////////////////////
class Collection : public BaseResource,
public NameField,
public ParamsField,
public IdField,
public LsnField,
public StateField,
......@@ -307,8 +345,9 @@ class Collection : public BaseResource,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Collection";
explicit Collection(const std::string& name, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
explicit Collection(const std::string& name, const std::string& params = JEmpty, ID_TYPE id = 0, LSN_TYPE lsn = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using CollectionPtr = Collection::Ptr;
......@@ -317,6 +356,7 @@ class CollectionCommit : public BaseResource,
public CollectionIdField,
public SchemaIdField,
public MappingsField,
public SizeField,
public IdField,
public LsnField,
public StateField,
......@@ -328,9 +368,9 @@ class CollectionCommit : public BaseResource,
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<CollectionCommit>>;
using VecT = std::vector<Ptr>;
CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings = {}, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings = {}, SIZE_TYPE size = 0,
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using CollectionCommitPtr = CollectionCommit::Ptr;
......@@ -362,6 +402,7 @@ class PartitionCommit : public BaseResource,
public CollectionIdField,
public PartitionIdField,
public MappingsField,
public SizeField,
public IdField,
public LsnField,
public StateField,
......@@ -374,9 +415,9 @@ class PartitionCommit : public BaseResource,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "PartitionCommit";
PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings = {}, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings = {}, SIZE_TYPE size = 0,
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
ToString() const override;
......@@ -417,6 +458,7 @@ class SegmentCommit : public BaseResource,
public PartitionIdField,
public SegmentIdField,
public MappingsField,
public SizeField,
public IdField,
public LsnField,
public StateField,
......@@ -430,8 +472,8 @@ class SegmentCommit : public BaseResource,
static constexpr const char* Name = "SegmentCommit";
SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings = {},
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
SIZE_TYPE size = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
ToString() const override;
......@@ -446,6 +488,7 @@ class SegmentFile : public BaseResource,
public PartitionIdField,
public SegmentIdField,
public FieldElementIdField,
public SizeField,
public IdField,
public LsnField,
public StateField,
......@@ -459,8 +502,8 @@ class SegmentFile : public BaseResource,
static constexpr const char* Name = "SegmentFile";
SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id,
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
SIZE_TYPE size = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using SegmentFilePtr = SegmentFile::Ptr;
......@@ -494,6 +537,8 @@ using SchemaCommitPtr = SchemaCommit::Ptr;
class Field : public BaseResource,
public NameField,
public NumField,
public FtypeField,
public ParamsField,
public IdField,
public LsnField,
public StateField,
......@@ -506,8 +551,9 @@ class Field : public BaseResource,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Field";
Field(const std::string& name, NUM_TYPE num, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
Field(const std::string& name, NUM_TYPE num, FTYPE_TYPE ftype, const std::string& params = JEmpty, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldPtr = Field::Ptr;
......@@ -542,6 +588,7 @@ class FieldElement : public BaseResource,
public FieldIdField,
public NameField,
public FtypeField,
public ParamsField,
public IdField,
public LsnField,
public StateField,
......@@ -553,9 +600,9 @@ class FieldElement : public BaseResource,
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldElement>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldElement";
FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
const std::string& params = JEmpty, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldElementPtr = FieldElement::Ptr;
......
......@@ -35,6 +35,10 @@ class ScopedResource {
Get() {
return res_;
}
const ResourcePtr&
Get() const {
return res_;
}
ResourceT operator*() const {
return *res_;
......
......@@ -122,7 +122,7 @@ Snapshot::Snapshot(ID_TYPE id) {
}
const std::string
Snapshot::ToString() {
Snapshot::ToString() const {
auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string {
std::stringstream ss;
std::string l1_spaces;
......
......@@ -46,30 +46,30 @@ class Snapshot : public ReferenceProxy {
explicit Snapshot(ID_TYPE id);
ID_TYPE
GetID() {
GetID() const {
return GetCollectionCommit()->GetID();
}
[[nodiscard]] ID_TYPE
GetCollectionId() const {
auto it = GetResources<Collection>().begin();
auto it = GetResources<Collection>().cbegin();
return it->first;
}
CollectionPtr
GetCollection() {
return GetResources<Collection>().begin()->second.Get();
GetCollection() const {
return GetResources<Collection>().cbegin()->second.Get();
}
SchemaCommitPtr
GetSchemaCommit() {
GetSchemaCommit() const {
auto id = GetLatestSchemaCommitId();
return GetResource<SchemaCommit>(id);
}
[[nodiscard]] const std::string&
GetName() const {
return GetResources<Collection>().begin()->second->GetName();
return GetResources<Collection>().cbegin()->second->GetName();
}
[[nodiscard]] size_t
......@@ -83,7 +83,7 @@ class Snapshot : public ReferenceProxy {
}
PartitionPtr
GetPartition(const std::string& name) {
GetPartition(const std::string& name) const {
ID_TYPE id;
auto status = GetPartitionId(name, id);
if (!status.ok()) {
......@@ -103,8 +103,8 @@ class Snapshot : public ReferenceProxy {
}
CollectionCommitPtr
GetCollectionCommit() {
return GetResources<CollectionCommit>().begin()->second.Get();
GetCollectionCommit() const {
return GetResources<CollectionCommit>().cbegin()->second.Get();
}
[[nodiscard]] ID_TYPE
......@@ -114,21 +114,55 @@ class Snapshot : public ReferenceProxy {
// PXU TODO: add const. Need to change Scopedxxxx::Get
SegmentCommitPtr
GetSegmentCommitBySegmentId(ID_TYPE segment_id) {
GetSegmentCommitBySegmentId(ID_TYPE segment_id) const {
auto it = seg_segc_map_.find(segment_id);
if (it == seg_segc_map_.end())
return nullptr;
return GetResource<SegmentCommit>(it->second);
}
std::vector<std::string>
GetPartitionNames() const {
std::vector<std::string> names;
for (auto& kv : partition_names_map_) {
names.emplace_back(kv.first);
}
return std::move(names);
}
PartitionCommitPtr
GetPartitionCommitByPartitionId(ID_TYPE partition_id) {
GetPartitionCommitByPartitionId(ID_TYPE partition_id) const {
auto it = p_pc_map_.find(partition_id);
if (it == p_pc_map_.end())
return nullptr;
return GetResource<PartitionCommit>(it->second);
}
template <typename HandlerT>
void
IterateResources(const typename HandlerT::Ptr& handler) {
auto& resources = GetResources<typename HandlerT::ResourceT>();
auto status = handler->PreIterate();
if (!status.ok()) {
handler->SetStatus(status);
return;
}
for (auto& kv : resources) {
status = handler->Handle(kv.second.Get());
if (!status.ok()) {
break;
}
}
if (!status.ok()) {
handler->SetStatus(status);
return;
}
status = handler->PostIterate();
handler->SetStatus(status);
}
[[nodiscard]] std::vector<std::string>
GetFieldNames() const {
std::vector<std::string> names;
......@@ -183,8 +217,22 @@ class Snapshot : public ReferenceProxy {
return itfe->second;
}
std::vector<FieldElementPtr>
GetFieldElementsByField(const std::string& field_name) const {
auto it = field_element_names_map_.find(field_name);
if (it == field_element_names_map_.end()) {
return {};
}
std::vector<FieldElementPtr> elements;
for (auto& kv : it->second) {
elements.push_back(GetResource<FieldElement>(kv.second));
}
return std::move(elements);
}
NUM_TYPE
GetMaxSegmentNumByPartition(ID_TYPE partition_id) {
GetMaxSegmentNumByPartition(ID_TYPE partition_id) const {
auto it = p_max_seg_num_.find(partition_id);
if (it == p_max_seg_num_.end())
return 0;
......@@ -198,7 +246,7 @@ class Snapshot : public ReferenceProxy {
template <typename ResourceT>
void
DumpResource(const std::string& tag = "") {
DumpResource(const std::string& tag = "") const {
auto& resources = GetResources<ResourceT>();
std::cout << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " Start [" << tag
<< "]:" << resources.size() << std::endl;
......@@ -239,7 +287,7 @@ class Snapshot : public ReferenceProxy {
template <typename ResourceT>
typename ResourceT::Ptr
GetResource(ID_TYPE id) {
GetResource(ID_TYPE id) const {
auto& resources = GetResources<ResourceT>();
auto it = resources.find(id);
if (it == resources.end()) {
......@@ -257,7 +305,7 @@ class Snapshot : public ReferenceProxy {
}
const std::string
ToString();
ToString() const;
private:
Snapshot(const Snapshot&) = delete;
......@@ -278,8 +326,49 @@ class Snapshot : public ReferenceProxy {
LSN_TYPE max_lsn_;
};
using ScopedSnapshotT = ScopedResource<Snapshot>;
using GCHandler = std::function<void(Snapshot::Ptr)>;
using ScopedSnapshotT = ScopedResource<Snapshot>;
template <typename T>
struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
using ResourceT = T;
using ThisT = IterateHandler<ResourceT>;
using Ptr = std::shared_ptr<ThisT>;
explicit IterateHandler(ScopedSnapshotT ss) : ss_(ss) {
}
virtual Status
PreIterate() {
return Status::OK();
}
virtual Status
Handle(const typename ResourceT::Ptr& resource) = 0;
virtual Status
PostIterate() {
return Status::OK();
}
void
SetStatus(Status status) {
std::unique_lock<std::mutex> lock(mtx_);
status_ = status;
}
Status
GetStatus() const {
std::unique_lock<std::mutex> lock(mtx_);
return status_;
}
virtual void
Iterate() {
ss_->IterateResources<ThisT>(this->shared_from_this());
}
ScopedSnapshotT ss_;
Status status_;
mutable std::mutex mtx_;
};
} // namespace snapshot
} // namespace engine
......
......@@ -50,7 +50,7 @@ SnapshotHolder::Load(Store& store, ScopedSnapshotT& ss, ID_TYPE id, bool scoped)
std::unique_lock<std::mutex> lock(mutex_);
if (id == 0 || id == max_id_) {
auto raw = active_[max_id_];
auto raw = active_.at(max_id_);
ss = ScopedSnapshotT(raw, scoped);
return status;
}
......@@ -75,7 +75,7 @@ SnapshotHolder::Load(Store& store, ScopedSnapshotT& ss, ID_TYPE id, bool scoped)
}
Status
SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) {
SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) const {
Status status;
if (id > max_id_) {
std::stringstream emsg;
......@@ -87,7 +87,7 @@ SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) {
std::unique_lock<std::mutex> lock(mutex_);
if (id == 0 || id == max_id_) {
auto raw = active_[max_id_];
auto raw = active_.at(max_id_);
ss = ScopedSnapshotT(raw, scoped);
return status;
}
......
......@@ -35,7 +35,7 @@ class SnapshotHolder {
Add(ID_TYPE id);
Status
Get(ScopedSnapshotT& ss, ID_TYPE id = 0, bool scoped = true);
Get(ScopedSnapshotT& ss, ID_TYPE id = 0, bool scoped = true) const;
Status
Load(Store& store, ScopedSnapshotT& ss, ID_TYPE id = 0, bool scoped = true);
......@@ -63,7 +63,7 @@ class SnapshotHolder {
}
}
std::mutex mutex_;
mutable std::mutex mutex_;
ID_TYPE collection_id_;
ID_TYPE min_id_ = std::numeric_limits<ID_TYPE>::max();
ID_TYPE max_id_ = std::numeric_limits<ID_TYPE>::min();
......
......@@ -86,7 +86,7 @@ Snapshots::LoadSnapshot(Store& store, ScopedSnapshotT& ss, ID_TYPE collection_id
}
Status
Snapshots::GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, bool scoped) {
Snapshots::GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, bool scoped) const {
SnapshotHolderPtr holder;
auto status = GetHolder(collection_id, holder);
if (!status.ok())
......@@ -96,7 +96,7 @@ Snapshots::GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, b
}
Status
Snapshots::GetSnapshot(ScopedSnapshotT& ss, const std::string& name, ID_TYPE id, bool scoped) {
Snapshots::GetSnapshot(ScopedSnapshotT& ss, const std::string& name, ID_TYPE id, bool scoped) const {
SnapshotHolderPtr holder;
auto status = GetHolder(name, holder);
if (!status.ok())
......@@ -114,6 +114,15 @@ Snapshots::GetCollectionIds(IDS_TYPE& ids) const {
return Status::OK();
}
Status
Snapshots::GetCollectionNames(std::vector<std::string>& names) const {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (auto& kv : name_id_map_) {
names.push_back(kv.first);
}
return Status::OK();
}
Status
Snapshots::LoadNoLock(Store& store, ID_TYPE collection_id, SnapshotHolderPtr& holder) {
auto op = std::make_shared<GetSnapshotIDsOperation>(collection_id, false);
......@@ -148,7 +157,7 @@ Snapshots::Init() {
}
Status
Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) {
Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) const {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto kv = name_id_map_.find(name);
if (kv != name_id_map_.end()) {
......@@ -163,7 +172,7 @@ Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) {
}
Status
Snapshots::GetHolder(const ID_TYPE& collection_id, SnapshotHolderPtr& holder) {
Snapshots::GetHolder(const ID_TYPE& collection_id, SnapshotHolderPtr& holder) const {
Status status;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
status = GetHolderNoLock(collection_id, holder);
......@@ -195,7 +204,7 @@ Snapshots::LoadHolder(Store& store, const ID_TYPE& collection_id, SnapshotHolder
}
Status
Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) {
Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) const {
auto it = holders_.find(collection_id);
if (it == holders_.end()) {
std::stringstream emsg;
......
......@@ -35,21 +35,23 @@ class Snapshots {
return sss;
}
Status
GetHolder(const ID_TYPE& collection_id, SnapshotHolderPtr& holder);
GetHolder(const ID_TYPE& collection_id, SnapshotHolderPtr& holder) const;
Status
GetHolder(const std::string& name, SnapshotHolderPtr& holder);
GetHolder(const std::string& name, SnapshotHolderPtr& holder) const;
Status
LoadHolder(Store& store, const ID_TYPE& collection_id, SnapshotHolderPtr& holder);
Status
GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id = 0, bool scoped = true);
GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id = 0, bool scoped = true) const;
Status
GetSnapshot(ScopedSnapshotT& ss, const std::string& name, ID_TYPE id = 0, bool scoped = true);
GetSnapshot(ScopedSnapshotT& ss, const std::string& name, ID_TYPE id = 0, bool scoped = true) const;
Status
LoadSnapshot(Store& store, ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, bool scoped = true);
Status
GetCollectionIds(IDS_TYPE& ids) const;
Status
GetCollectionNames(std::vector<std::string>& names) const;
Status
DropCollection(const std::string& name, const LSN_TYPE& lsn);
......@@ -77,7 +79,7 @@ class Snapshots {
Status
LoadNoLock(Store& store, ID_TYPE collection_id, SnapshotHolderPtr& holder);
Status
GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder);
GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) const;
mutable std::shared_timed_mutex mutex_;
std::map<ID_TYPE, SnapshotHolderPtr> holders_;
......
......@@ -293,7 +293,7 @@ class Store {
template <class T, class F>
inline void
register_any_visitor(F const& f) {
std::cout << "Register visitor for type " << std::quoted(typeid(T).name()) << '\n';
/* std::cout << "Register visitor for type " << std::quoted(typeid(T).name()) << '\n'; */
any_flush_vistors_.insert(to_any_visitor<T>(f));
}
......@@ -397,7 +397,7 @@ class Store {
std::stringstream fname;
fname << "f_" << fi << "_" << std::get<Index<Field::MapT, MockResourcesT>::value>(ids_) + 1;
FieldPtr field;
CreateResource<Field>(Field(fname.str(), fi), field);
CreateResource<Field>(Field(fname.str(), fi, FieldType::VECTOR), field);
all_records.push_back(field);
MappingT f_c_m = {};
......
......@@ -21,6 +21,7 @@
#include "config/Config.h"
#include "config/Utils.h"
#include "db/DBFactory.h"
#include "db/snapshot/OperationExecutor.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
......@@ -193,6 +194,9 @@ DBWrapper::StartService() {
kill(0, SIGUSR1);
}
// SS TODO
/* engine::snapshot::OperationExecutor::GetInstance().Start(); */
// create db instance
try {
db_ = engine::DBFactory::Build(opt);
......@@ -229,6 +233,8 @@ DBWrapper::StopService() {
db_->Stop();
}
// SS TODO
/* engine::snapshot::OperationExecutor::GetInstance().Stop(); */
return Status::OK();
}
......@@ -238,11 +244,15 @@ DBWrapper::PreloadCollections(const std::string& preload_collections) {
// do nothing
} else if (preload_collections == "*") {
// load all tables
std::vector<engine::meta::CollectionSchema> table_schema_array;
db_->AllCollections(table_schema_array);
// SS TODO: Replace name with id
std::vector<std::string> names;
auto status = db_->AllCollections(names);
if (!status.ok()) {
return status;
}
for (auto& schema : table_schema_array) {
auto status = db_->PreloadCollection(nullptr, schema.collection_id_);
for (auto& name : names) {
auto status = db_->PreloadCollection(nullptr, name);
if (!status.ok()) {
return status;
}
......
......@@ -37,16 +37,16 @@ Status
ShowCollectionsRequest::OnExecute() {
TimeRecorderAuto rc("ShowCollectionsRequest");
std::vector<engine::meta::CollectionSchema> schema_array;
auto status = DBWrapper::DB()->AllCollections(schema_array);
std::vector<std::string> names;
auto status = DBWrapper::DB()->AllCollections(names);
fiu_do_on("ShowCollectionsRequest.OnExecute.show_collections_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
for (auto& schema : schema_array) {
collection_name_list_.push_back(schema.collection_id_);
for (auto& name : names) {
collection_name_list_.push_back(name);
}
return Status::OK();
}
......
......@@ -42,6 +42,7 @@ add_executable(test_db
${db_meta_files}
${db_merge_files}
${db_wal_files}
${db_snapshot_files}
${grpc_server_files}
${grpc_service_files}
${metrics_files}
......
......@@ -514,7 +514,8 @@ TEST_F(DBTest, SHUTDOWN_TEST) {
ASSERT_FALSE(stat.ok());
std::vector<milvus::engine::meta::CollectionSchema> collection_infos;
stat = db_->AllCollections(collection_infos);
std::vector<std::string> names;
stat = db_->AllCollections(names);
ASSERT_EQ(stat.code(), milvus::DB_ERROR);
bool has_collection = false;
......@@ -907,12 +908,12 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
milvus::engine::meta::CollectionSchema collection_info = BuildCollectionSchema();
auto stat = db_->CreateCollection(collection_info);
std::vector<milvus::engine::meta::CollectionSchema> collection_schema_array;
stat = db_->AllCollections(collection_schema_array);
std::vector<std::string> names;
stat = db_->AllCollections(names);
ASSERT_TRUE(stat.ok());
bool bfound = false;
for (auto& schema : collection_schema_array) {
if (schema.collection_id_ == COLLECTION_NAME) {
for (auto& name : names) {
if (name == COLLECTION_NAME) {
bfound = true;
break;
}
......
......@@ -198,12 +198,12 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) {
milvus::engine::meta::CollectionSchema collection_info = BuildCollectionSchema();
auto stat = db_->CreateCollection(collection_info);
std::vector<milvus::engine::meta::CollectionSchema> collection_schema_array;
stat = db_->AllCollections(collection_schema_array);
std::vector<std::string> names;
stat = db_->AllCollections(names);
ASSERT_TRUE(stat.ok());
bool bfound = false;
for (auto& schema : collection_schema_array) {
if (schema.collection_id_ == COLLECTION_NAME) {
for (auto& name : names) {
if (name == COLLECTION_NAME) {
bfound = true;
break;
}
......@@ -212,11 +212,11 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) {
fiu_init(0);
FIU_ENABLE_FIU("MySQLMetaImpl.AllCollection.null_connection");
stat = db_->AllCollections(collection_schema_array);
stat = db_->AllCollections(names);
ASSERT_FALSE(stat.ok());
FIU_ENABLE_FIU("MySQLMetaImpl.AllCollection.throw_exception");
stat = db_->AllCollections(collection_schema_array);
stat = db_->AllCollections(names);
ASSERT_FALSE(stat.ok());
fiu_disable("MySQLMetaImpl.AllCollection.null_connection");
fiu_disable("MySQLMetaImpl.AllCollection.throw_exception");
......@@ -476,5 +476,3 @@ TEST_F(MySqlDBTest, PARTITION_TEST) {
ASSERT_TRUE(stat.ok());
}
}
......@@ -31,6 +31,7 @@
#include "server/grpc_impl/GrpcServer.h"
#include "utils/CommonUtil.h"
#include <fiu-control.h>
#include <fiu-local.h>
......@@ -123,6 +124,7 @@ class RpcHandlerTest : public testing::Test {
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
boost::filesystem::remove_all("/tmp/milvus_test");
}
......
......@@ -37,6 +37,7 @@
#include "utils/CommonUtil.h"
#include "utils/StringHelpFunctions.h"
static const char* COLLECTION_NAME = "test_milvus_web_collection";
using OStatus = oatpp::web::protocol::http::Status;
......@@ -361,6 +362,7 @@ class WebControllerTest : public ::testing::Test {
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
boost::filesystem::remove_all(CONTROLLER_TEST_CONFIG_DIR);
}
......
......@@ -25,10 +25,6 @@
#include "cache/GpuCacheMgr.h"
#include "db/DBFactory.h"
#include "db/Options.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/ResourceHolders.h"
#ifdef MILVUS_GPU_VERSION
#include "knowhere/index/vector_index/helpers/FaissGpuResourceMgr.h"
......
......@@ -38,6 +38,7 @@ add_executable(test_scheduler
${db_meta_files}
${db_merge_files}
${db_wal_files}
${db_snapshot_files}
${scheduler_files}
${segment_files}
${server_init_files}
......@@ -55,4 +56,3 @@ target_link_libraries(test_scheduler
${unittest_libs})
install(TARGETS test_scheduler DESTINATION unittest)
......@@ -12,30 +12,48 @@
#-------------------------------------------------------------------------------
set(test_files
${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_snapshot.cpp
${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp)
set(util_files
${MILVUS_ENGINE_SRC}/db/Utils.cpp)
${CMAKE_CURRENT_SOURCE_DIR}/test_db.cpp)
add_executable(test_ssdb
${common_files}
${log_files}
${cache_files}
${codecs_files}
${codecs_default_files}
${config_files}
${config_handler_files}
${db_main_files}
${db_engine_files}
${db_insert_files}
${db_meta_files}
${db_merge_files}
${db_wal_files}
${db_snapshot_files}
${grpc_server_files}
${grpc_service_files}
${metrics_files}
${metrics_prometheus_files}
# ${thirdparty_files}
${util_files}
${query_files}
${segment_files}
${scheduler_files}
${server_files}
${server_init_files}
${server_context_files}
${server_delivery_files}
${storage_files}
${tracing_files}
${web_server_files}
${wrapper_files}
${thirdparty_files}
${test_files}
)
target_link_libraries(test_ssdb
knowhere
metrics
stdc++
${unittest_libs}
)
oatpp)
install(TARGETS test_db DESTINATION unittest)
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <fiu-control.h>
#include <fiu-local.h>
#include <gtest/gtest.h>
#include <string>
#include <set>
#include <algorithm>
#include "ssdb/utils.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ReferenceProxy.h"
#include "db/snapshot/ResourceHolders.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/Store.h"
#include "db/snapshot/WrappedTypes.h"
using ID_TYPE = milvus::engine::snapshot::ID_TYPE;
using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE;
using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE;
using MappingT = milvus::engine::snapshot::MappingT;
using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext;
using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext;
using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext = milvus::engine::snapshot::OperationContext;
using PartitionContext = milvus::engine::snapshot::PartitionContext;
using BuildOperation = milvus::engine::snapshot::BuildOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;
using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation;
using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation;
using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation;
using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder;
using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder;
using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT;
using Collection = milvus::engine::snapshot::Collection;
using CollectionPtr = milvus::engine::snapshot::CollectionPtr;
using Partition = milvus::engine::snapshot::Partition;
using PartitionPtr = milvus::engine::snapshot::PartitionPtr;
using Segment = milvus::engine::snapshot::Segment;
using SegmentPtr = milvus::engine::snapshot::SegmentPtr;
using SegmentFile = milvus::engine::snapshot::SegmentFile;
using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr;
using Field = milvus::engine::snapshot::Field;
using FieldElement = milvus::engine::snapshot::FieldElement;
using Snapshots = milvus::engine::snapshot::Snapshots;
using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT;
using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy;
using Queue = milvus::BlockingQueue<ID_TYPE>;
using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using SSDBImpl = milvus::engine::SSDBImpl;
milvus::Status
CreateCollection(std::shared_ptr<SSDBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<Field>("vector", 0,
milvus::engine::snapshot::FieldType::VECTOR);
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
milvus::engine::snapshot::FieldElementType::IVFSQ8);
auto int_field = std::make_shared<Field>("int", 0,
milvus::engine::snapshot::FieldType::INT32);
context.fields_schema[vector_field] = {vector_field_element};
context.fields_schema[int_field] = {};
return db->CreateCollection(context);
}
TEST_F(SSDBTest, CollectionTest) {
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), c1);
bool has;
status = db_->HasCollection(c1, has);
ASSERT_TRUE(has);
ASSERT_TRUE(status.ok());
std::vector<std::string> names;
status = db_->AllCollections(names);
ASSERT_TRUE(status.ok());
ASSERT_EQ(names.size(), 1);
ASSERT_EQ(names[0], c1);
std::string c1_1 = "c1";
status = CreateCollection(db_, c1_1, next_lsn());
ASSERT_FALSE(status.ok());
std::string c2 = "c2";
status = CreateCollection(db_, c2, next_lsn());
ASSERT_TRUE(status.ok());
status = db_->AllCollections(names);
ASSERT_TRUE(status.ok());
ASSERT_EQ(names.size(), 2);
status = db_->DropCollection(c1);
ASSERT_TRUE(status.ok());
status = db_->AllCollections(names);
ASSERT_TRUE(status.ok());
ASSERT_EQ(names.size(), 1);
ASSERT_EQ(names[0], c2);
status = db_->DropCollection(c1);
ASSERT_FALSE(status.ok());
}
......@@ -13,7 +13,6 @@
#include <fiu-local.h>
#include <gtest/gtest.h>
#include <random>
#include <string>
#include <set>
#include <algorithm>
......@@ -65,12 +64,118 @@ using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy;
using Queue = milvus::BlockingQueue<ID_TYPE>;
using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
struct PartitionCollector : public IteratePartitionHandler {
using ResourceT = Partition;
using BaseT = IteratePartitionHandler;
explicit PartitionCollector(ScopedSnapshotT ss) : BaseT(ss) {}
milvus::Status
PreIterate() override {
partition_names_.clear();
return milvus::Status::OK();
}
milvus::Status
Handle(const typename ResourceT::Ptr& partition) override {
partition_names_.push_back(partition->GetName());
return milvus::Status::OK();
}
int RandomInt(int start, int end) {
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(start, end);
return dist(rng);
std::vector<std::string> partition_names_;
};
struct WaitableObj {
bool notified_ = false;
std::mutex mutex_;
std::condition_variable cv_;
void
Wait() {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait(lck);
}
notified_ = false;
}
void
Notify() {
std::unique_lock<std::mutex> lck(mutex_);
notified_ = true;
lck.unlock();
cv_.notify_one();
}
};
ScopedSnapshotT
CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<Field>("vector", 0,
milvus::engine::snapshot::FieldType::VECTOR);
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
milvus::engine::snapshot::FieldElementType::IVFSQ8);
auto int_field = std::make_shared<Field>("int", 0,
milvus::engine::snapshot::FieldType::INT32);
context.fields_schema[vector_field] = {vector_field_element};
context.fields_schema[int_field] = {};
auto op = std::make_shared<CreateCollectionOperation>(context);
op->Push();
ScopedSnapshotT ss;
auto status = op->GetSnapshot(ss);
return ss;
}
ScopedSnapshotT
CreatePartition(const std::string& collection_name, const PartitionContext& p_context, const LSN_TYPE& lsn) {
ScopedSnapshotT curr_ss;
ScopedSnapshotT ss;
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
OperationContext context;
context.lsn = lsn;
auto op = std::make_shared<CreatePartitionOperation>(context, ss);
PartitionPtr partition;
status = op->CommitNewPartition(p_context, partition);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->GetSnapshot(curr_ss);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
return curr_ss;
}
TEST_F(SnapshotTest, ResourcesTest) {
int nprobe = 16;
milvus::json params = {{"nprobe", nprobe}};
ParamsField p_field(params.dump());
ASSERT_EQ(params.dump(), p_field.GetParams());
ASSERT_EQ(params, p_field.GetParamsJson());
auto nprobe_real = p_field.GetParamsJson().at("nprobe").get<int>();
ASSERT_EQ(nprobe, nprobe_real);
}
TEST_F(SnapshotTest, ReferenceProxyTest) {
......@@ -179,26 +284,6 @@ TEST_F(SnapshotTest, ResourceHoldersTest) {
}
}
ScopedSnapshotT
CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<Field>("vector", 0);
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
milvus::engine::snapshot::FieldElementType::IVFSQ8);
auto int_field = std::make_shared<Field>("int", 0);
context.fields_schema[vector_field] = {vector_field_element};
context.fields_schema[int_field] = {};
auto op = std::make_shared<CreateCollectionOperation>(context);
op->Push();
ScopedSnapshotT ss;
auto status = op->GetSnapshot(ss);
return ss;
}
TEST_F(SnapshotTest, DeleteOperationTest) {
std::string collection_name = "test_c1";
LSN_TYPE lsn = 1;
......@@ -381,41 +466,6 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
ASSERT_FALSE(c_c);
}
ScopedSnapshotT
CreatePartition(const std::string& collection_name, const PartitionContext& p_context, const LSN_TYPE& lsn) {
ScopedSnapshotT curr_ss;
ScopedSnapshotT ss;
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
OperationContext context;
context.lsn = lsn;
auto op = std::make_shared<CreatePartitionOperation>(context, ss);
PartitionPtr partition;
status = op->CommitNewPartition(p_context, partition);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->GetSnapshot(curr_ss);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
return curr_ss;
}
TEST_F(SnapshotTest, PartitionTest) {
std::string collection_name("c1");
LSN_TYPE lsn = 1;
......@@ -424,6 +474,11 @@ TEST_F(SnapshotTest, PartitionTest) {
ASSERT_EQ(ss->GetName(), collection_name);
ASSERT_EQ(ss->NumberOfPartitions(), 1);
auto partition_iterator = std::make_shared<PartitionCollector>(ss);
partition_iterator->Iterate();
ASSERT_TRUE(partition_iterator->GetStatus().ok());
ASSERT_EQ(partition_iterator->partition_names_.size(), 1);
OperationContext context;
context.lsn = ++lsn;
auto op = std::make_shared<CreatePartitionOperation>(context, ss);
......@@ -449,6 +504,11 @@ TEST_F(SnapshotTest, PartitionTest) {
ASSERT_GT(curr_ss->GetID(), ss->GetID());
ASSERT_EQ(curr_ss->NumberOfPartitions(), 2);
partition_iterator = std::make_shared<PartitionCollector>(curr_ss);
partition_iterator->Iterate();
ASSERT_TRUE(partition_iterator->GetStatus().ok());
ASSERT_EQ(partition_iterator->partition_names_.size(), 2);
p_ctx.lsn = ++lsn;
auto drop_op = std::make_shared<DropPartitionOperation>(p_ctx, curr_ss);
status = drop_op->Push();
......@@ -710,30 +770,6 @@ TEST_F(SnapshotTest, OperationTest) {
Snapshots::GetInstance().Reset();
}
struct WaitableObj {
bool notified_ = false;
std::mutex mutex_;
std::condition_variable cv_;
void
Wait() {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait(lck);
}
notified_ = false;
}
void
Notify() {
std::unique_lock<std::mutex> lck(mutex_);
notified_ = true;
lck.unlock();
cv_.notify_one();
}
};
TEST_F(SnapshotTest, CompoundTest1) {
milvus::Status status;
std::atomic<LSN_TYPE> lsn = 0;
......
......@@ -20,6 +20,7 @@
#include <thread>
#include <utility>
#include <fiu-local.h>
#include <random>
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
......@@ -177,6 +178,44 @@ SnapshotTest::TearDown() {
BaseTest::TearDown();
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void
SSDBTest::SetUp() {
BaseTest::SetUp();
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
milvus::engine::snapshot::EventExecutor::GetInstance().Start();
milvus::engine::snapshot::CollectionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::CollectionsHolder::GetInstance().Reset();
milvus::engine::snapshot::SchemaCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldElementsHolder::GetInstance().Reset();
milvus::engine::snapshot::PartitionsHolder::GetInstance().Reset();
milvus::engine::snapshot::PartitionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentFilesHolder::GetInstance().Reset();
milvus::engine::snapshot::Store::GetInstance().DoReset();
milvus::engine::snapshot::Snapshots::GetInstance().Reset();
milvus::engine::snapshot::Snapshots::GetInstance().Init();
auto options = milvus::engine::DBOptions();
options.wal_enable_ = false;
db_ = std::make_shared<milvus::engine::SSDBImpl>(options);
}
void
SSDBTest::TearDown() {
db_ = nullptr;
// TODO: Temp to delay some time. OperationExecutor should wait all resources be destructed before stop
std::this_thread::sleep_for(std::chrono::milliseconds(20));
milvus::engine::snapshot::EventExecutor::GetInstance().Stop();
milvus::engine::snapshot::OperationExecutor::GetInstance().Stop();
BaseTest::TearDown();
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int
main(int argc, char **argv) {
......
......@@ -12,6 +12,18 @@
#pragma once
#include <gtest/gtest.h>
#include <random>
#include <memory>
#include "db/SSDBImpl.h"
inline int
RandomInt(int start, int end) {
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(start, end);
return dist(rng);
}
class BaseTest : public ::testing::Test {
protected:
......@@ -31,3 +43,13 @@ class SnapshotTest : public BaseTest {
void
TearDown() override;
};
class SSDBTest : public BaseTest {
protected:
std::shared_ptr<milvus::engine::SSDBImpl> db_;
void
SetUp() override;
void
TearDown() override;
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册