未验证 提交 bf5bdf25 编写于 作者: G groot 提交者: GitHub

Transcript (#3561)

* transcript 1
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* transcript 2
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* transcript 3
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* transcript 4
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* transcript 5
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* transcript 6
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* typo
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* add unittest
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refine code
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* typo
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* tidy
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* tidy
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 d66ae0d6
......@@ -185,6 +185,12 @@ ConfigMgr::ConfigMgr() {
{"system.lock.enable",
CreateBoolConfig("system.lock.enable", false, &config.system.lock.enable.value, true, nullptr, nullptr)},
/* transcript */
{"transcript.enable",
CreateBoolConfig("transcript.enable", false, &config.transcript.enable.value, false, nullptr, nullptr)},
{"transcript.replay",
CreateStringConfig("transcript.replay", false, &config.transcript.replay.value, "", nullptr, nullptr)},
};
}
......
......@@ -165,6 +165,11 @@ struct ServerConfig {
Bool enable{false};
} lock;
} system;
struct Transcript {
Bool enable{false};
String replay{""};
} transcript;
};
extern ServerConfig config;
......
......@@ -29,6 +29,7 @@ constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024; // max row count of
constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 512 * 1024; // default row count per segment when creating collection
constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB; // max data size in one insert action
constexpr int64_t MAX_WAL_FILE_SIZE = 256 * MB; // max file size of wal file
constexpr int64_t MAX_SCRIPT_FILE_SIZE = 256 * MB; // max file size of transcript file
constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3; // retry times if build index failed
......
......@@ -11,25 +11,135 @@
#pragma once
#include "db/DB.h"
#include "utils/Json.h"
#include "utils/Status.h"
#include <string>
#include <vector>
namespace milvus {
namespace engine {
// action names
extern const char* ActionCreateCollection;
extern const char* ActionDropCollection;
extern const char* ActionHasCollection;
extern const char* ActionListCollections;
extern const char* ActionGetCollectionInfo;
extern const char* ActionGetCollectionStats;
extern const char* ActionCountEntities;
extern const char* ActionCreatePartition;
extern const char* ActionDropPartition;
extern const char* ActionHasPartition;
extern const char* ActionListPartitions;
extern const char* ActionCreateIndex;
extern const char* ActionDropIndex;
extern const char* ActionDescribeIndex;
extern const char* ActionInsert;
extern const char* ActionGetEntityByID;
extern const char* ActionDeleteEntityByID;
extern const char* ActionListIDInSegment;
extern const char* ActionQuery;
extern const char* ActionLoadCollection;
extern const char* ActionFlush;
extern const char* ActionCompact;
// json keys
extern const char* J_ACTION_TYPE;
extern const char* J_ACTION_TS; // action timestamp
class ScriptCodec {
public:
ScriptCodec() = default;
// encode methods
static Status
EncodeAction(milvus::json& json_obj, const std::string& action_type);
static Status
Encode(milvus::json& json_obj, const snapshot::CreateCollectionContext& context);
static Status
EncodeCollectionName(milvus::json& json_obj, const std::string& collection_name);
static Status
EncodePartitionName(milvus::json& json_obj, const std::string& partition_name);
static Status
EncodeFieldName(milvus::json& json_obj, const std::string& field_name);
static Status
EncodeFieldNames(milvus::json& json_obj, const std::vector<std::string>& field_names);
static Status
Encode(milvus::json& json_obj, const CollectionIndex& index);
static Status
Encode(milvus::json& json_obj, const DataChunkPtr& data_chunk);
static Status
Encode(milvus::json& json_obj, const IDNumbers& id_array);
static Status
EncodeSegmentID(milvus::json& json_obj, int64_t segment_id);
static Status
Encode(milvus::json& json_obj, const query::QueryPtr& query_ptr);
static Status
EncodeThreshold(milvus::json& json_obj, double threshold);
static Status
EncodeForce(milvus::json& json_obj, bool force);
static ScriptCodec&
GetInstance();
// decode methods
static Status
DecodeAction(milvus::json& json_obj, std::string& action_type, int64_t& action_ts);
static Status
Decode(milvus::json& json_obj, snapshot::CreateCollectionContext& context);
static Status
DecodeCollectionName(milvus::json& json_obj, std::string& collection_name);
static Status
DecodePartitionName(milvus::json& json_obj, std::string& partition_name);
static Status
DecodeFieldName(milvus::json& json_obj, std::string& field_name);
static Status
DecodeFieldNames(milvus::json& json_obj, std::vector<std::string>& field_names);
static Status
Decode(milvus::json& json_obj, CollectionIndex& index);
static Status
Decode(milvus::json& json_obj, DataChunkPtr& data_chunk);
static Status
Decode(milvus::json& json_obj, IDNumbers& id_array);
static Status
DecodeSegmentID(milvus::json& json_obj, int64_t& segment_id);
static Status
Decode(milvus::json& json_obj, query::QueryPtr& query_ptr);
static Status
DecodeThreshold(milvus::json& json_obj, double& threshold);
static Status
DecodeForce(milvus::json& json_obj, bool& force);
private:
static Status
EncodeGeneralQuery(milvus::json& json_obj, query::GeneralQueryPtr& query);
void
SetScriptPath(const std::string& path) {
script_path_ = path;
}
static Status
DecodeGeneralQuery(milvus::json& json_obj, query::GeneralQueryPtr& query);
private:
std::string script_path_;
ScriptCodec() = delete;
};
} // namespace engine
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/transcript/ScriptFile.h"
#include "db/Types.h"
#include <limits>
namespace milvus {
namespace engine {
ScriptFile::~ScriptFile() {
CloseFile();
}
Status
ScriptFile::OpenWrite(const std::string& path) {
try {
if (writer_.is_open()) {
writer_.close();
}
file_size_ = 0;
writer_.open(path.c_str(), std::ios::out | std::ios::app);
} catch (std::exception& ex) {
return Status(DB_ERROR, ex.what());
}
return Status::OK();
}
Status
ScriptFile::OpenRead(const std::string& path) {
try {
if (reader_.is_open()) {
reader_.close();
}
reader_.open(path.c_str(), std::ios::in);
} catch (std::exception& ex) {
return Status(DB_ERROR, ex.what());
}
return Status::OK();
}
Status
ScriptFile::CloseFile() {
try {
if (reader_.is_open()) {
reader_.close();
}
if (writer_.is_open()) {
writer_.close();
}
} catch (std::exception& ex) {
return Status(DB_ERROR, ex.what());
}
file_size_ = 0;
return Status::OK();
}
Status
ScriptFile::WriteLine(const std::string& str) {
try {
if (writer_.is_open()) {
writer_.write(str.c_str(), str.size());
writer_.write("\n", 1);
writer_.flush();
file_size_ += str.size() + 1;
}
} catch (std::exception& ex) {
return Status(DB_ERROR, ex.what());
}
return Status::OK();
}
bool
ScriptFile::ReadLine(std::string& str) {
str = "";
if (reader_.is_open()) {
while (getline(reader_, str)) {
if (!str.empty()) {
return true;
}
}
}
return false;
}
bool
ScriptFile::ExceedMaxSize(int64_t append_size) {
return (file_size_ + append_size) > MAX_SCRIPT_FILE_SIZE;
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/Types.h"
#include "utils/Status.h"
#include <fstream>
#include <memory>
#include <ostream>
#include <string>
namespace milvus {
namespace engine {
class ScriptFile {
public:
ScriptFile() = default;
virtual ~ScriptFile();
Status
OpenWrite(const std::string& path);
Status
OpenRead(const std::string& path);
Status
WriteLine(const std::string& str);
bool
ReadLine(std::string& str);
bool
ExceedMaxSize(int64_t append_size);
private:
Status
CloseFile();
private:
std::ifstream reader_;
std::ofstream writer_;
int64_t file_size_ = 0;
};
using ScriptFilePtr = std::shared_ptr<ScriptFile>;
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/transcript/ScriptRecorder.h"
#include "db/Utils.h"
#include "db/transcript/ScriptCodec.h"
#include "utils/CommonUtil.h"
#include "utils/Json.h"
#include <experimental/filesystem>
#include <memory>
namespace milvus {
namespace engine {
ScriptRecorder::ScriptRecorder(const std::string& path) {
std::experimental::filesystem::path script_path(path);
std::string time_str;
CommonUtil::GetCurrentTimeStr(time_str);
script_path /= time_str;
script_path_ = script_path.c_str();
CommonUtil::CreateDirectory(script_path_);
}
ScriptRecorder::~ScriptRecorder() {
}
std::string
ScriptRecorder::GetScriptPath() const {
return script_path_;
}
ScriptFilePtr
ScriptRecorder::GetFile() {
if (file_ == nullptr || file_->ExceedMaxSize(0)) {
file_ = std::make_shared<ScriptFile>();
int64_t current_time = utils::GetMicroSecTimeStamp();
std::experimental::filesystem::path file_path(script_path_);
std::string file_name = std::to_string(current_time) + ".txt";
file_path /= file_name;
file_->OpenWrite(file_path.c_str());
}
return file_;
}
Status
ScriptRecorder::WriteJson(milvus::json& json_obj) {
auto file = GetFile();
std::string str = json_obj.dump();
return file->WriteLine(str);
}
Status
ScriptRecorder::CreateCollection(const snapshot::CreateCollectionContext& context) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionCreateCollection);
ScriptCodec::Encode(json_obj, context);
return WriteJson(json_obj);
}
Status
ScriptRecorder::DropCollection(const std::string& collection_name) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionDropCollection);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::HasCollection(const std::string& collection_name, bool& has_or_not) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionHasCollection);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::ListCollections(std::vector<std::string>& names) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionListCollections);
return WriteJson(json_obj);
}
Status
ScriptRecorder::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionGetCollectionInfo);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionGetCollectionStats);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::CountEntities(const std::string& collection_name, int64_t& row_count) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionCountEntities);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionCreatePartition);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodePartitionName(json_obj, partition_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::DropPartition(const std::string& collection_name, const std::string& partition_name) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionDropPartition);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodePartitionName(json_obj, partition_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::HasPartition(const std::string& collection_name, const std::string& partition_name, bool& exist) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionHasPartition);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodePartitionName(json_obj, partition_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionListPartitions);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::CreateIndex(const server::ContextPtr& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionCreateIndex);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodeFieldName(json_obj, field_name);
ScriptCodec::Encode(json_obj, index);
return WriteJson(json_obj);
}
Status
ScriptRecorder::DropIndex(const std::string& collection_name, const std::string& field_name) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionDropIndex);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodeFieldName(json_obj, field_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::DescribeIndex(const std::string& collection_name, const std::string& field_name,
CollectionIndex& index) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionDescribeIndex);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodeFieldName(json_obj, field_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
idx_t op_id) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionInsert);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodePartitionName(json_obj, partition_name);
ScriptCodec::Encode(json_obj, data_chunk);
return WriteJson(json_obj);
}
Status
ScriptRecorder::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionGetEntityByID);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::Encode(json_obj, id_array);
ScriptCodec::EncodeFieldNames(json_obj, field_names);
return WriteJson(json_obj);
}
Status
ScriptRecorder::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionDeleteEntityByID);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::Encode(json_obj, entity_ids);
return WriteJson(json_obj);
}
Status
ScriptRecorder::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionListIDInSegment);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodeSegmentID(json_obj, segment_id);
ScriptCodec::Encode(json_obj, entity_ids);
return WriteJson(json_obj);
}
Status
ScriptRecorder::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr,
engine::QueryResultPtr& result) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionQuery);
ScriptCodec::Encode(json_obj, query_ptr);
return WriteJson(json_obj);
}
Status
ScriptRecorder::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionLoadCollection);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodeFieldNames(json_obj, field_names);
ScriptCodec::EncodeForce(json_obj, force);
return WriteJson(json_obj);
}
Status
ScriptRecorder::Flush(const std::string& collection_name) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionFlush);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
return WriteJson(json_obj);
}
Status
ScriptRecorder::Flush() {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionFlush);
return WriteJson(json_obj);
}
Status
ScriptRecorder::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) {
milvus::json json_obj;
ScriptCodec::EncodeAction(json_obj, ActionCompact);
ScriptCodec::EncodeCollectionName(json_obj, collection_name);
ScriptCodec::EncodeThreshold(json_obj, threshold);
return WriteJson(json_obj);
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/DB.h"
#include "db/transcript/ScriptFile.h"
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace engine {
class ScriptRecorder {
public:
explicit ScriptRecorder(const std::string& path);
~ScriptRecorder();
std::string
GetScriptPath() const;
Status
CreateCollection(const snapshot::CreateCollectionContext& context);
Status
DropCollection(const std::string& collection_name);
Status
HasCollection(const std::string& collection_name, bool& has_or_not);
Status
ListCollections(std::vector<std::string>& names);
Status
GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema);
Status
GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats);
Status
CountEntities(const std::string& collection_name, int64_t& row_count);
Status
CreatePartition(const std::string& collection_name, const std::string& partition_name);
Status
DropPartition(const std::string& collection_name, const std::string& partition_name);
Status
HasPartition(const std::string& collection_name, const std::string& partition_name, bool& exist);
Status
ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names);
Status
CreateIndex(const server::ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const CollectionIndex& index);
Status
DropIndex(const std::string& collection_name, const std::string& field_name);
Status
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index);
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
idx_t op_id);
Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row, DataChunkPtr& data_chunk);
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id);
Status
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids);
Status
Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result);
Status
LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force);
Status
Flush(const std::string& collection_name);
Status
Flush();
Status
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold);
private:
ScriptFilePtr
GetFile();
Status
WriteJson(milvus::json& json_obj);
private:
std::string script_path_;
ScriptFilePtr file_;
};
using ScriptRecorderPtr = std::shared_ptr<ScriptRecorder>;
} // namespace engine
} // namespace milvus
......@@ -11,14 +11,235 @@
#include "db/transcript/ScriptReplay.h"
#include "db/transcript/ScriptCodec.h"
#include "db/transcript/ScriptFile.h"
#include "utils/Json.h"
#include <experimental/filesystem>
#include <map>
#include <memory>
#include <utility>
#include <vector>
namespace milvus {
namespace engine {
Status
ScriptReplay::Replay(const DBPtr& db, const std::string& replay_script_path) {
// get all script files under this folder, arrange them in ascending order
std::map<int64_t, std::experimental::filesystem::path> files_map;
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter(replay_script_path);
DirectoryIterator end;
for (; iter != end; ++iter) {
auto path = (*iter).path();
if (std::experimental::filesystem::is_directory(path)) {
continue;
}
std::string file_name = path.filename().c_str();
int64_t file_ts = atol(file_name.c_str());
if (file_ts > 0) {
files_map.insert(std::make_pair(file_ts, path));
}
}
// replay the script files
for (auto& pair : files_map) {
ScriptFile file;
file.OpenRead(pair.second.c_str());
std::string str_line;
while (file.ReadLine(str_line)) {
STATUS_CHECK(PerformAction(db, str_line));
}
}
return Status::OK();
}
Status
ScriptReplay::PerformAction(const DBPtr& db, const std::string& str_action) {
try {
milvus::json json_obj = milvus::json::parse(str_action);
std::string action_type;
int64_t action_ts = 0;
ScriptCodec::DecodeAction(json_obj, action_type, action_ts);
if (action_type == ActionCreateCollection) {
snapshot::CreateCollectionContext context;
ScriptCodec::Decode(json_obj, context);
db->CreateCollection(context);
} else if (action_type == ActionDropCollection) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
db->DropCollection(collection_name);
} else if (action_type == ActionHasCollection) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
bool has = false;
db->HasCollection(collection_name, has);
} else if (action_type == ActionListCollections) {
std::vector<std::string> names;
db->ListCollections(names);
} else if (action_type == ActionGetCollectionInfo) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
snapshot::CollectionPtr collection;
snapshot::FieldElementMappings fields_schema;
db->GetCollectionInfo(collection_name, collection, fields_schema);
} else if (action_type == ActionGetCollectionStats) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
milvus::json collection_stats;
db->GetCollectionStats(collection_name, collection_stats);
} else if (action_type == ActionCountEntities) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
int64_t count = 0;
db->CountEntities(collection_name, count);
} else if (action_type == ActionCreatePartition) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::string partition_name;
ScriptCodec::DecodePartitionName(json_obj, partition_name);
db->CreatePartition(collection_name, partition_name);
} else if (action_type == ActionDropPartition) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::string partition_name;
ScriptCodec::DecodePartitionName(json_obj, partition_name);
db->DropPartition(collection_name, partition_name);
} else if (action_type == ActionHasPartition) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::string partition_name;
ScriptCodec::DecodePartitionName(json_obj, partition_name);
bool has = false;
db->HasPartition(collection_name, partition_name, has);
} else if (action_type == ActionListPartitions) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::vector<std::string> partition_names;
db->ListPartitions(collection_name, partition_names);
} else if (action_type == ActionCreateIndex) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::string field_name;
ScriptCodec::DecodeFieldName(json_obj, field_name);
CollectionIndex index;
ScriptCodec::Decode(json_obj, index);
std::vector<std::string> partition_names;
db->CreateIndex(nullptr, collection_name, field_name, index);
} else if (action_type == ActionDropIndex) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::string field_name;
ScriptCodec::DecodeFieldName(json_obj, field_name);
db->DropIndex(collection_name, field_name);
} else if (action_type == ActionDescribeIndex) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::string field_name;
ScriptCodec::DecodeFieldName(json_obj, field_name);
CollectionIndex index;
db->DescribeIndex(collection_name, field_name, index);
} else if (action_type == ActionInsert) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::string partition_name;
ScriptCodec::DecodePartitionName(json_obj, partition_name);
DataChunkPtr data_chunk;
ScriptCodec::Decode(json_obj, data_chunk);
db->Insert(collection_name, partition_name, data_chunk, 0);
} else if (action_type == ActionGetEntityByID) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
IDNumbers id_array;
ScriptCodec::Decode(json_obj, id_array);
std::vector<std::string> field_names;
ScriptCodec::DecodeFieldNames(json_obj, field_names);
std::vector<bool> valid_row;
DataChunkPtr data_chunk;
db->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk);
} else if (action_type == ActionDeleteEntityByID) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
IDNumbers id_array;
ScriptCodec::Decode(json_obj, id_array);
db->DeleteEntityByID(collection_name, id_array, 0);
} else if (action_type == ActionListIDInSegment) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
int64_t segment_id = 0;
ScriptCodec::DecodeSegmentID(json_obj, segment_id);
IDNumbers entity_ids;
db->ListIDInSegment(collection_name, segment_id, entity_ids);
} else if (action_type == ActionQuery) {
query::QueryPtr query_ptr;
ScriptCodec::Decode(json_obj, query_ptr);
if (query_ptr != nullptr) {
engine::QueryResultPtr result;
db->Query(nullptr, query_ptr, result);
}
} else if (action_type == ActionLoadCollection) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
std::vector<std::string> field_names;
ScriptCodec::DecodeFieldNames(json_obj, field_names);
bool force = false;
ScriptCodec::DecodeForce(json_obj, force);
std::vector<bool> valid_row;
DataChunkPtr data_chunk;
db->LoadCollection(nullptr, collection_name, field_names, force);
} else if (action_type == ActionFlush) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
if (collection_name.empty()) {
db->Flush();
} else {
db->Flush(collection_name);
}
} else if (action_type == ActionCompact) {
std::string collection_name;
ScriptCodec::DecodeCollectionName(json_obj, collection_name);
double threshold = 0.0;
ScriptCodec::DecodeThreshold(json_obj, threshold);
db->Compact(nullptr, collection_name, threshold);
} else {
std::string msg = "Unsupportted action: " + action_type;
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
return Status::OK();
} catch (std::exception& ex) {
std::string msg = "Failed to perform script action, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
}
} // namespace engine
} // namespace milvus
......@@ -24,6 +24,10 @@ class ScriptReplay {
Status
Replay(const DBPtr& db, const std::string& replay_script_path);
private:
Status
PerformAction(const DBPtr& db, const std::string& str_action);
};
} // namespace engine
......
......@@ -10,16 +10,21 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/transcript/TranscriptProxy.h"
#include "db/transcript/ScriptCodec.h"
#include "db/transcript/ScriptRecorder.h"
#include "db/transcript/ScriptReplay.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include <boost/filesystem.hpp>
#include <experimental/filesystem>
namespace milvus {
namespace engine {
#define CHECK_RECORDER \
if (recorder_ == nullptr) { \
return Status(DB_ERROR, "Transcript recoder is null pointer"); \
}
TranscriptProxy::TranscriptProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
// db must implemented
if (db == nullptr) {
......@@ -38,106 +43,137 @@ TranscriptProxy::Start() {
// replay script in necessary
if (!options_.replay_script_path_.empty()) {
ScriptReplay replay;
return replay.Replay(db_, options_.replay_script_path_);
} else {
ScriptCodec& codec = ScriptCodec::GetInstance();
boost::filesystem::path db_path(options_.meta_.path_);
auto transcript_path = db_path.parent_path();
transcript_path /= "transcript";
std::string path = transcript_path.c_str();
status = CommonUtil::CreateDirectory(path);
if (!status.ok()) {
std::cerr << "Error: Failed to create transcript path: " << path << std::endl;
kill(0, SIGUSR1);
}
codec.SetScriptPath(path);
STATUS_CHECK(replay.Replay(db_, options_.replay_script_path_));
}
// prepare for transcript
std::experimental::filesystem::path db_path(options_.meta_.path_);
auto transcript_path = db_path.parent_path();
transcript_path /= "transcript";
std::string path = transcript_path.c_str();
status = CommonUtil::CreateDirectory(path);
if (!status.ok()) {
std::cerr << "Error: Failed to create transcript path: " << path << std::endl;
kill(0, SIGUSR1);
}
recorder_ = std::make_shared<ScriptRecorder>(path);
return Status::OK();
}
Status
TranscriptProxy::Stop() {
recorder_ = nullptr;
return db_->Stop();
}
Status
TranscriptProxy::CreateCollection(const snapshot::CreateCollectionContext& context) {
CHECK_RECORDER
recorder_->CreateCollection(context);
return db_->CreateCollection(context);
}
Status
TranscriptProxy::DropCollection(const std::string& collection_name) {
CHECK_RECORDER
recorder_->DropCollection(collection_name);
return db_->DropCollection(collection_name);
}
Status
TranscriptProxy::HasCollection(const std::string& collection_name, bool& has_or_not) {
CHECK_RECORDER
recorder_->HasCollection(collection_name, has_or_not);
return db_->HasCollection(collection_name, has_or_not);
}
Status
TranscriptProxy::ListCollections(std::vector<std::string>& names) {
CHECK_RECORDER
recorder_->ListCollections(names);
return db_->ListCollections(names);
}
Status
TranscriptProxy::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) {
CHECK_RECORDER
recorder_->GetCollectionInfo(collection_name, collection, fields_schema);
return db_->GetCollectionInfo(collection_name, collection, fields_schema);
}
Status
TranscriptProxy::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
CHECK_RECORDER
recorder_->GetCollectionStats(collection_name, collection_stats);
return db_->GetCollectionStats(collection_name, collection_stats);
}
Status
TranscriptProxy::CountEntities(const std::string& collection_name, int64_t& row_count) {
CHECK_RECORDER
recorder_->CountEntities(collection_name, row_count);
return db_->CountEntities(collection_name, row_count);
}
Status
TranscriptProxy::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
CHECK_RECORDER
recorder_->CreatePartition(collection_name, partition_name);
return db_->CreatePartition(collection_name, partition_name);
}
Status
TranscriptProxy::DropPartition(const std::string& collection_name, const std::string& partition_name) {
CHECK_RECORDER
recorder_->DropPartition(collection_name, partition_name);
return db_->DropPartition(collection_name, partition_name);
}
Status
TranscriptProxy::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
CHECK_RECORDER
recorder_->HasPartition(collection_name, partition_tag, exist);
return db_->HasPartition(collection_name, partition_tag, exist);
}
Status
TranscriptProxy::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
CHECK_RECORDER
recorder_->ListPartitions(collection_name, partition_names);
return db_->ListPartitions(collection_name, partition_names);
}
Status
TranscriptProxy::CreateIndex(const server::ContextPtr& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
CHECK_RECORDER
recorder_->CreateIndex(context, collection_name, field_name, index);
return db_->CreateIndex(context, collection_name, field_name, index);
}
Status
TranscriptProxy::DropIndex(const std::string& collection_name, const std::string& field_name) {
CHECK_RECORDER
recorder_->DropIndex(collection_name, field_name);
return db_->DropIndex(collection_name, field_name);
}
Status
TranscriptProxy::DescribeIndex(const std::string& collection_name, const std::string& field_name,
CollectionIndex& index) {
CHECK_RECORDER
recorder_->DescribeIndex(collection_name, field_name, index);
return db_->DescribeIndex(collection_name, field_name, index);
}
Status
TranscriptProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
idx_t op_id) {
CHECK_RECORDER
recorder_->Insert(collection_name, partition_name, data_chunk, op_id);
return db_->Insert(collection_name, partition_name, data_chunk);
}
......@@ -145,44 +181,60 @@ Status
TranscriptProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) {
CHECK_RECORDER
recorder_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk);
return db_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk);
}
Status
TranscriptProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids,
idx_t op_id) {
CHECK_RECORDER
recorder_->DeleteEntityByID(collection_name, entity_ids, op_id);
return db_->DeleteEntityByID(collection_name, entity_ids);
}
Status
TranscriptProxy::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
CHECK_RECORDER
recorder_->ListIDInSegment(collection_name, segment_id, entity_ids);
return db_->ListIDInSegment(collection_name, segment_id, entity_ids);
}
Status
TranscriptProxy::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr,
engine::QueryResultPtr& result) {
CHECK_RECORDER
recorder_->Query(context, query_ptr, result);
return db_->Query(context, query_ptr, result);
}
Status
TranscriptProxy::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) {
CHECK_RECORDER
recorder_->LoadCollection(context, collection_name, field_names, force);
return db_->LoadCollection(context, collection_name, field_names, force);
}
Status
TranscriptProxy::Flush(const std::string& collection_name) {
CHECK_RECORDER
recorder_->Flush(collection_name);
return db_->Flush(collection_name);
}
Status
TranscriptProxy::Flush() {
CHECK_RECORDER
recorder_->Flush();
return db_->Flush();
}
Status
TranscriptProxy::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) {
CHECK_RECORDER
recorder_->Compact(context, collection_name, threshold);
return db_->Compact(context, collection_name, threshold);
}
......
......@@ -12,6 +12,7 @@
#pragma once
#include "db/DBProxy.h"
#include "db/transcript/ScriptRecorder.h"
#include <memory>
#include <string>
......@@ -24,6 +25,11 @@ class TranscriptProxy : public DBProxy {
public:
TranscriptProxy(const DBPtr& db, const DBOptions& options);
ScriptRecorderPtr
GetScriptRecorder() const {
return recorder_;
}
Status
Start() override;
......@@ -106,6 +112,7 @@ class TranscriptProxy : public DBProxy {
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) override;
private:
ScriptRecorderPtr recorder_;
};
} // namespace engine
......
......@@ -26,6 +26,39 @@ namespace engine {
const char* WAL_MAX_OP_FILE_NAME = "max_op";
const char* WAL_DEL_FILE_NAME = "del";
namespace {
bool
StrToID(const std::string& str, idx_t& id) {
try {
id = std::stol(str);
return true;
} catch (std::exception& ex) {
return false;
}
}
void
FindWalFiles(const std::experimental::filesystem::path& folder,
std::map<idx_t, std::experimental::filesystem::path>& files) {
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter(folder);
DirectoryIterator end;
for (; iter != end; ++iter) {
auto path_inner = (*iter).path();
std::string file_name = path_inner.filename().c_str();
if (file_name == WAL_MAX_OP_FILE_NAME || file_name == WAL_DEL_FILE_NAME) {
continue;
}
idx_t op_id = 0;
if (StrToID(file_name, op_id)) {
files.insert(std::make_pair(op_id, path_inner));
}
}
}
} // namespace
WalManager::WalManager() : cleanup_thread_pool_(1, 1) {
}
......@@ -161,17 +194,7 @@ WalManager::Recovery(const DBPtr& db) {
// iterate files
std::map<idx_t, std::experimental::filesystem::path> id_files;
DirectoryIterator iter_inner(path_outer);
DirectoryIterator end_inner;
for (; iter_inner != end_inner; ++iter_inner) {
auto path_inner = (*iter_inner).path();
std::string file_name = path_inner.filename().c_str();
if (file_name == WAL_MAX_OP_FILE_NAME) {
continue;
}
idx_t op_id = std::stol(file_name);
id_files.insert(std::make_pair(op_id, path_inner));
}
FindWalFiles(path_outer, id_files);
// the max operation id
idx_t max_op_id = 0;
......@@ -450,17 +473,7 @@ WalManager::CleanupThread() {
// iterate files
std::map<idx_t, std::experimental::filesystem::path> wal_files;
DirectoryIterator file_iter(collection_path);
DirectoryIterator end_iter;
for (; file_iter != end_iter; ++file_iter) {
auto file_path = (*file_iter).path();
std::string file_name = file_path.filename().c_str();
if (file_name == WAL_MAX_OP_FILE_NAME) {
continue;
}
idx_t op_id = std::stol(file_name);
wal_files.insert(std::make_pair(op_id, file_path));
}
FindWalFiles(collection_path, wal_files);
// no wal file
if (wal_files.empty()) {
......
......@@ -34,8 +34,6 @@ extern const char* WAL_DEL_FILE_NAME;
class WalManager {
public:
WalManager();
static WalManager&
GetInstance();
......@@ -58,6 +56,8 @@ class WalManager {
Recovery(const DBPtr& db);
private:
WalManager();
Status
Init();
......
......@@ -56,11 +56,16 @@ DBWrapper::StartService() {
kill(0, SIGUSR1);
}
// wal
opt.wal_enable_ = config.wal.enable();
if (opt.wal_enable_) {
opt.wal_path_ = config.wal.path();
}
// transcript
opt.transcript_enable_ = config.transcript.enable();
opt.replay_script_path_ = config.transcript.replay();
// engine config
int64_t omp_thread = config.engine.omp_thread_num();
......
......@@ -63,12 +63,10 @@ CreateCollectionReq::OnExecute() {
engine::snapshot::CreateCollectionContext create_collection_context;
auto collection_schema = std::make_shared<engine::snapshot::Collection>(collection_name_, extra_params_);
std::set<std::string> unique_field_names;
create_collection_context.collection = collection_schema;
for (auto& field_kv : fields_) {
auto& field_name = field_kv.first;
auto& field_schema = field_kv.second;
unique_field_names.insert(field_name);
auto& field_type = field_schema.field_type_;
auto& field_params = field_schema.field_params_;
......@@ -106,11 +104,6 @@ CreateCollectionReq::OnExecute() {
create_collection_context.fields_schema[field] = {};
}
// not allow duplicate field name
if (unique_field_names.size() != fields_.size()) {
return Status(DB_ERROR, "Duplicate field name");
}
// step 3: create collection
status = DBWrapper::DB()->CreateCollection(create_collection_context);
fiu_do_on("CreateCollectionReq.OnExecute.invalid_db_execute",
......
......@@ -176,6 +176,26 @@ CommonUtil::TimeStrToTime(const std::string& time_str, time_t& time_integer, tm&
return true;
}
void
CommonUtil::GetCurrentTimeStr(std::string& time_str) {
auto t = std::time(nullptr);
struct tm ltm;
localtime_r(&t, &ltm);
time_str = "";
time_str += std::to_string(ltm.tm_year + 1900);
time_str += "-";
time_str += std::to_string(ltm.tm_mon + 1);
time_str += "-";
time_str += std::to_string(ltm.tm_mday);
time_str += "_";
time_str += std::to_string(ltm.tm_hour);
time_str += ":";
time_str += std::to_string(ltm.tm_min);
time_str += ":";
time_str += std::to_string(ltm.tm_sec);
}
void
CommonUtil::ConvertTime(time_t time_integer, tm& time_struct) {
localtime_r(&time_integer, &time_struct);
......
......@@ -40,6 +40,9 @@ class CommonUtil {
TimeStrToTime(const std::string& time_str, time_t& time_integer, tm& time_struct,
const std::string& format = "%d-%d-%d %d:%d:%d");
static void
GetCurrentTimeStr(std::string& time_str);
static void
ConvertTime(time_t time_integer, tm& time_struct);
static void
......
......@@ -17,6 +17,7 @@ set( TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_db.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_meta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_event.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_transcript.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_wal.cpp
)
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册