未验证 提交 f6144560 编写于 作者: G groot 提交者: GitHub

prepare for wal (#3259)

* prepare for wal
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refine
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 7ec3ab25
......@@ -18,6 +18,7 @@ aux_source_directory( ${MILVUS_ENGINE_SRC}/db/insert DB_INSERT_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/db/merge DB_MERGE_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/db/wal DB_WAL_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/db/snapshot DB_SNAPSHOT_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/db/transcript DB_TRANSCRIPT_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/db/meta DB_META_MAIN_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/db/meta/backend DB_META_BACKEND_FILES )
......@@ -42,6 +43,7 @@ set( ENGINE_FILES ${DB_MAIN_FILES}
${DB_MERGE_FILES}
${DB_WAL_FILES}
${DB_SNAPSHOT_FILES}
${DB_TRANSCRIPT_FILES}
${THIRDPARTY_FILES}
${WRAPPER_FILES}
)
......
......@@ -11,22 +11,27 @@
#include "db/DBFactory.h"
#include "db/DBImpl.h"
#include "meta/MetaFactory.h"
#include "db/transcript/Transcript.h"
#include "db/wal/WriteAheadLog.h"
namespace milvus {
namespace engine {
DBOptions
DBFactory::BuildOption() {
auto meta = MetaFactory::BuildOption();
DBOptions options;
options.meta_ = meta;
return options;
}
DBPtr
DBFactory::BuildDB(const DBOptions& options) {
return std::make_shared<DBImpl>(options);
DBPtr db = std::make_shared<DBImpl>(options);
// need wal? wal must be after db
if (options.wal_enable_) {
db = std::make_shared<WriteAheadLog>(db, options);
}
// need transcript? transcript must be after wal
if (options.transcript_enable_) {
db = std::make_shared<Transcript>(db, options);
}
return db;
}
} // namespace engine
......
......@@ -21,9 +21,6 @@ namespace engine {
class DBFactory {
public:
static DBOptions
BuildOption();
static DBPtr
BuildDB(const DBOptions& options);
};
......
......@@ -80,7 +80,7 @@ class DBImpl : public DB, public ConfigObserver {
const std::string& field_name, const CollectionIndex& index) override;
Status
DropIndex(const std::string& collection_name, const std::string& field_name = "") override;
DropIndex(const std::string& collection_name, const std::string& field_name) override;
Status
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
......@@ -105,7 +105,7 @@ class DBImpl : public DB, public ConfigObserver {
// if the input field_names is empty, will load all fields of this collection
Status
LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force = false) override;
const std::vector<std::string>& field_names, bool force) override;
Status
Flush(const std::string& collection_name) override;
......@@ -114,7 +114,7 @@ class DBImpl : public DB, public ConfigObserver {
Flush() override;
Status
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold = 0.0) override;
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) override;
void
ConfigUpdate(const std::string& name) override;
......
......@@ -19,7 +19,13 @@ namespace engine {
return Status::OK(); \
}
DBProxy::DBProxy(const DBPtr& db) : db_(db) {
DBProxy::DBProxy(const DBPtr& db, const DBOptions& options) : db_(db), options_(options) {
}
Status
DBProxy::Start() {
DB_CHECK
return db_->Start();
}
Status
......
......@@ -22,7 +22,7 @@ namespace engine {
class DBProxy : public DB {
public:
explicit DBProxy(const DBPtr& db);
DBProxy(const DBPtr& db, const DBOptions& options);
Status
Start() override;
......@@ -69,7 +69,7 @@ class DBProxy : public DB {
const CollectionIndex& index) override;
Status
DropIndex(const std::string& collection_name, const std::string& field_name = "") override;
DropIndex(const std::string& collection_name, const std::string& field_name) override;
Status
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
......@@ -93,7 +93,7 @@ class DBProxy : public DB {
Status
LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force = false) override;
const std::vector<std::string>& field_names, bool force) override;
Status
Flush(const std::string& collection_name) override;
......@@ -102,10 +102,11 @@ class DBProxy : public DB {
Flush() override;
Status
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold = 0.0) override;
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) override;
private:
protected:
DBPtr db_;
DBOptions options_;
};
} // namespace engine
......
......@@ -185,10 +185,14 @@ struct DBOptions {
bool metric_enable_ = false;
// wal relative configurations
bool wal_enable_ = true;
bool wal_enable_ = false;
int64_t buffer_size_ = 256;
std::string mxlog_path_ = "/tmp/milvus/wal/";
}; // Options
// transcript configurations
bool transcript_enable_ = false;
std::string replay_script_path_; // for replay
}; // Options
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/transcript/ScriptCodec.h"
namespace milvus {
namespace engine {
ScriptCodec&
ScriptCodec::GetInstance() {
static ScriptCodec s_codec;
return s_codec;
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
namespace milvus {
namespace engine {
class ScriptCodec {
public:
ScriptCodec() = default;
static ScriptCodec&
GetInstance();
void
SetScriptPath(const std::string& path) {
script_path_ = path;
}
private:
std::string script_path_;
};
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/transcript/ScriptReplay.h"
#include "db/transcript/ScriptCodec.h"
namespace milvus {
namespace engine {
Status
ScriptReplay::Replay(const DBPtr& db, const std::string& replay_script_path) {
return Status::OK();
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/DB.h"
#include <string>
namespace milvus {
namespace engine {
class ScriptReplay {
public:
ScriptReplay() = default;
Status
Replay(const DBPtr& db, const std::string& replay_script_path);
};
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/transcript/Transcript.h"
#include "db/transcript/ScriptCodec.h"
#include "db/transcript/ScriptReplay.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include <boost/filesystem.hpp>
namespace milvus {
namespace engine {
Transcript::Transcript(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
// db must implemented
if (db == nullptr) {
throw Exception(DB_ERROR, "null pointer");
}
}
Status
Transcript::Start() {
// let service start
auto status = db_->Start();
if (!status.ok()) {
return status;
}
// replay script in necessary
if (!options_.replay_script_path_.empty()) {
ScriptReplay replay;
return replay.Replay(db_, options_.replay_script_path_);
} else {
ScriptCodec& codec = ScriptCodec::GetInstance();
boost::filesystem::path db_path(options_.meta_.path_);
auto transcript_path = db_path.parent_path();
transcript_path /= "transcript";
std::string path = transcript_path.c_str();
status = CommonUtil::CreateDirectory(path);
if (!status.ok()) {
std::cerr << "Error: Failed to create transcript path: " << path << std::endl;
kill(0, SIGUSR1);
}
codec.SetScriptPath(path);
}
return Status::OK();
}
Status
Transcript::Stop() {
return db_->Stop();
}
Status
Transcript::CreateCollection(const snapshot::CreateCollectionContext& context) {
return db_->CreateCollection(context);
}
Status
Transcript::DropCollection(const std::string& name) {
return db_->DropCollection(name);
}
Status
Transcript::HasCollection(const std::string& collection_name, bool& has_or_not) {
return db_->HasCollection(collection_name, has_or_not);
}
Status
Transcript::ListCollections(std::vector<std::string>& names) {
return db_->ListCollections(names);
}
Status
Transcript::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) {
return db_->GetCollectionInfo(collection_name, collection, fields_schema);
}
Status
Transcript::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
return db_->GetCollectionStats(collection_name, collection_stats);
}
Status
Transcript::CountEntities(const std::string& collection_name, int64_t& row_count) {
return db_->CountEntities(collection_name, row_count);
}
Status
Transcript::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
return db_->CreatePartition(collection_name, partition_name);
}
Status
Transcript::DropPartition(const std::string& collection_name, const std::string& partition_name) {
return db_->DropPartition(collection_name, partition_name);
}
Status
Transcript::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
return db_->HasPartition(collection_name, partition_tag, exist);
}
Status
Transcript::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
return db_->ListPartitions(collection_name, partition_names);
}
Status
Transcript::CreateIndex(const server::ContextPtr& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
return db_->CreateIndex(context, collection_name, field_name, index);
}
Status
Transcript::DropIndex(const std::string& collection_name, const std::string& field_name) {
return db_->DropIndex(collection_name, field_name);
}
Status
Transcript::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
return db_->DescribeIndex(collection_name, field_name, index);
}
Status
Transcript::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
return db_->Insert(collection_name, partition_name, data_chunk);
}
Status
Transcript::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) {
return db_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk);
}
Status
Transcript::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
return db_->DeleteEntityByID(collection_name, entity_ids);
}
Status
Transcript::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
return db_->ListIDInSegment(collection_name, segment_id, entity_ids);
}
Status
Transcript::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
return db_->Query(context, query_ptr, result);
}
Status
Transcript::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) {
return db_->LoadCollection(context, collection_name, field_names, force);
}
Status
Transcript::Flush(const std::string& collection_name) {
return db_->Flush(collection_name);
}
Status
Transcript::Flush() {
return db_->Flush();
}
Status
Transcript::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) {
return db_->Compact(context, collection_name, threshold);
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/DBProxy.h"
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace engine {
class Transcript : public DBProxy {
public:
Transcript(const DBPtr& db, const DBOptions& options);
Status
Start() override;
Status
Stop() override;
Status
CreateCollection(const snapshot::CreateCollectionContext& context) override;
Status
DropCollection(const std::string& name) override;
Status
HasCollection(const std::string& collection_name, bool& has_or_not) override;
Status
ListCollections(std::vector<std::string>& names) override;
Status
GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) override;
Status
GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) override;
Status
CountEntities(const std::string& collection_name, int64_t& row_count) override;
Status
CreatePartition(const std::string& collection_name, const std::string& partition_name) override;
Status
DropPartition(const std::string& collection_name, const std::string& partition_name) override;
Status
HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) override;
Status
ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) override;
Status
CreateIndex(const server::ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const CollectionIndex& index) override;
Status
DropIndex(const std::string& collection_name, const std::string& field_name) override;
Status
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
Status
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override;
Status
Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) override;
Status
LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) override;
Status
Flush(const std::string& collection_name) override;
Status
Flush() override;
Status
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) override;
private:
};
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WriteAheadLog.h"
#include "utils/Exception.h"
namespace milvus {
namespace engine {
WriteAheadLog::WriteAheadLog(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
// db must implemented
if (db == nullptr) {
throw Exception(DB_ERROR, "null pointer");
}
}
Status
WriteAheadLog::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
return db_->Insert(collection_name, partition_name, data_chunk);
}
Status
WriteAheadLog::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
return db_->DeleteEntityByID(collection_name, entity_ids);
}
Status
WriteAheadLog::Flush(const std::string& collection_name) {
return db_->Flush(collection_name);
}
Status
WriteAheadLog::Flush() {
return db_->Flush();
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/DBProxy.h"
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace engine {
class WriteAheadLog : public DBProxy {
public:
WriteAheadLog(const DBPtr& db, const DBOptions& options);
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
Status
Flush(const std::string& collection_name) override;
Status
Flush() override;
private:
};
} // namespace engine
} // namespace milvus
......@@ -33,7 +33,7 @@ namespace {
const char* VECTOR_FIELD_NAME = "vector";
milvus::Status
CreateCollection(const std::shared_ptr<DBImpl>& db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollection(const std::shared_ptr<DB>& db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
......@@ -51,7 +51,7 @@ CreateCollection(const std::shared_ptr<DBImpl>& db, const std::string& collectio
static constexpr int64_t COLLECTION_DIM = 128;
milvus::Status
CreateCollection2(std::shared_ptr<DBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollection2(std::shared_ptr<DB> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
......@@ -79,7 +79,7 @@ CreateCollection2(std::shared_ptr<DBImpl> db, const std::string& collection_name
}
milvus::Status
CreateCollection3(std::shared_ptr<DBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollection3(std::shared_ptr<DB> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
......
......@@ -29,7 +29,7 @@ using SegmentVisitor = milvus::engine::SegmentVisitor;
namespace {
milvus::Status
CreateCollection(std::shared_ptr<DBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollection(std::shared_ptr<DB> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
......
......@@ -27,7 +27,7 @@
#include "cache/GpuCacheMgr.h"
#include "config/ServerConfig.h"
#include "codecs/Codec.h"
#include "db/DBFactory.h"
#include "db/DBImpl.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
......
......@@ -19,7 +19,7 @@
#include <set>
#include <string>
#include "db/DBImpl.h"
#include "db/DB.h"
#include "db/meta/MetaAdapter.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Context.h"
......@@ -82,7 +82,7 @@ using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<Segme
using PartitionIterator = milvus::engine::snapshot::PartitionIterator;
using SegmentIterator = milvus::engine::snapshot::SegmentIterator;
using SegmentFileIterator = milvus::engine::snapshot::SegmentFileIterator;
using DBImpl = milvus::engine::DBImpl;
using DB = milvus::engine::DB;
using Status = milvus::Status;
using Store = milvus::engine::snapshot::Store;
......@@ -343,7 +343,7 @@ class SnapshotTest : public BaseTest {
///////////////////////////////////////////////////////////////////////////////
class DBTest : public BaseTest {
protected:
std::shared_ptr<DBImpl> db_;
std::shared_ptr<DB> db_;
milvus::engine::DBOptions
GetOptions();
......@@ -360,7 +360,7 @@ class DBTest : public BaseTest {
///////////////////////////////////////////////////////////////////////////////
class SegmentTest : public BaseTest {
protected:
std::shared_ptr<DBImpl> db_;
std::shared_ptr<DB> db_;
void
SetUp() override;
......@@ -383,7 +383,7 @@ class MetaTest : public BaseTest {
///////////////////////////////////////////////////////////////////////////////
class SchedulerTest : public BaseTest {
protected:
std::shared_ptr<DBImpl> db_;
std::shared_ptr<DB> db_;
void
SetUp() override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册