未验证 提交 5857f640 编写于 作者: G groot 提交者: GitHub

rewrite merge manager 1 (#2797)

* rewrite merge manager 1
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* rewrite merge manager 2
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 7bf3e6e1
......@@ -12,6 +12,7 @@
#include "db/SSDBImpl.h"
#include "cache/CpuCacheMgr.h"
#include "db/IDGenerator.h"
#include "db/merge/MergeManagerFactory.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
......@@ -51,6 +52,7 @@ static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown
SSDBImpl::SSDBImpl(const DBOptions& options)
: options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
mem_mgr_ = MemManagerFactory::SSBuild(options_);
merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
if (options_.wal_enable_) {
wal::MXLogConfiguration mxlog_config;
......
......@@ -11,6 +11,7 @@
#include "db/merge/MergeManagerFactory.h"
#include "db/merge/MergeManagerImpl.h"
#include "db/merge/SSMergeManagerImpl.h"
#include "utils/Exception.h"
#include "utils/Log.h"
......@@ -22,5 +23,10 @@ MergeManagerFactory::Build(const meta::MetaPtr& meta_ptr, const DBOptions& optio
return std::make_shared<MergeManagerImpl>(meta_ptr, options, MergeStrategyType::LAYERED);
}
MergeManagerPtr
MergeManagerFactory::SSBuild(const DBOptions& options) {
return std::make_shared<SSMergeManagerImpl>(options, MergeStrategyType::SIMPLE);
}
} // namespace engine
} // namespace milvus
......@@ -23,6 +23,9 @@ class MergeManagerFactory {
public:
static MergeManagerPtr
Build(const meta::MetaPtr& meta_ptr, const DBOptions& options);
static MergeManagerPtr
SSBuild(const DBOptions& options);
};
} // namespace engine
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/SSMergeManagerImpl.h"
#include "db/merge/SSMergeSimpleStrategy.h"
#include "db/merge/SSMergeTask.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include <map>
namespace milvus {
namespace engine {
SSMergeManagerImpl::SSMergeManagerImpl(const DBOptions& options, MergeStrategyType type)
: options_(options), strategy_type_(type) {
UseStrategy(type);
}
Status
SSMergeManagerImpl::UseStrategy(MergeStrategyType type) {
switch (type) {
case MergeStrategyType::SIMPLE: {
strategy_ = std::make_shared<SSMergeSimpleStrategy>();
break;
}
case MergeStrategyType::LAYERED:
case MergeStrategyType::ADAPTIVE:
default: {
std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type);
LOG_ENGINE_ERROR_ << msg;
throw Exception(DB_ERROR, msg);
}
}
strategy_type_ = type;
return Status::OK();
}
Status
SSMergeManagerImpl::MergeFiles(const std::string& collection_name) {
if (strategy_ == nullptr) {
std::string msg = "No merge strategy specified";
LOG_ENGINE_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
int64_t row_count_per_segment = DEFAULT_ROW_COUNT_PER_SEGMENT;
while (true) {
snapshot::ScopedSnapshotT latest_ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name));
Partition2SegmentsMap part2seg;
auto& segments = latest_ss->GetResources<snapshot::Segment>();
for (auto& kv : segments) {
auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(kv.second->GetID());
part2seg[kv.second->GetPartitionId()].push_back(kv.second->GetID());
}
Partition2SegmentsMap::iterator it;
for (it = part2seg.begin(); it != part2seg.end();) {
if (it->second.size() <= 1) {
part2seg.erase(it++);
}
}
if (part2seg.empty()) {
break;
}
SegmentGroups segment_groups;
auto status = strategy_->RegroupSegments(latest_ss, part2seg, segment_groups);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to regroup segments for: " << collection_name
<< ", continue to merge all files into one";
return status;
}
for (auto& segments : segment_groups) {
SSMergeTask task(options_, latest_ss, segments);
task.Execute();
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <ctime>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/merge/MergeManager.h"
#include "db/merge/SSMergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class SSMergeManagerImpl : public MergeManager {
public:
SSMergeManagerImpl(const DBOptions& options, MergeStrategyType type);
MergeStrategyType
Strategy() const override {
return strategy_type_;
}
Status
UseStrategy(MergeStrategyType type) override;
Status
MergeFiles(const std::string& collection_name) override;
private:
DBOptions options_;
MergeStrategyType strategy_type_ = MergeStrategyType::SIMPLE;
SSMergeStrategyPtr strategy_;
}; // MergeManagerImpl
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/SSMergeSimpleStrategy.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
const char* ROW_COUNT_PER_SEGMENT = "row_count_per_segment";
Status
SSMergeSimpleStrategy::RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment,
SegmentGroups& groups) {
auto collection = ss->GetCollection();
int64_t row_count_per_segment = DEFAULT_ROW_COUNT_PER_SEGMENT;
const json params = collection->GetParams();
if (params.find(ROW_COUNT_PER_SEGMENT) != params.end()) {
row_count_per_segment = params[ROW_COUNT_PER_SEGMENT];
}
for (auto& kv : part2segment) {
snapshot::IDS_TYPE ids;
int64_t row_count_sum = 0;
for (auto& id : kv.second) {
auto segment_commit = ss->GetSegmentCommitBySegmentId(id);
if (segment_commit == nullptr) {
continue; // maybe stale
}
ids.push_back(id);
row_count_sum += segment_commit->GetRowCount();
if (row_count_sum >= row_count_per_segment) {
if (ids.size() >= 2) {
groups.push_back(ids);
}
ids.clear();
row_count_sum = 0;
continue;
}
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "db/merge/SSMergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class SSMergeSimpleStrategy : public SSMergeStrategy {
public:
Status
RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment,
SegmentGroups& groups) override;
}; // MergeSimpleStrategy
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "db/Types.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshot.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
const int64_t DEFAULT_ROW_COUNT_PER_SEGMENT = 500000;
using Partition2SegmentsMap = std::map<snapshot::ID_TYPE, snapshot::IDS_TYPE>;
using SegmentGroups = std::vector<snapshot::IDS_TYPE>;
class SSMergeStrategy {
public:
virtual Status
RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment,
SegmentGroups& groups) = 0;
}; // MergeStrategy
using SSMergeStrategyPtr = std::shared_ptr<SSMergeStrategy>;
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/SSMergeTask.h"
#include "db/Utils.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/Snapshots.h"
#include "metrics/Metrics.h"
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Log.h"
#include <memory>
#include <string>
namespace milvus {
namespace engine {
SSMergeTask::SSMergeTask(const DBOptions& options, const snapshot::ScopedSnapshotT& ss,
const snapshot::IDS_TYPE& segments)
: options_(options), snapshot_(ss), segments_(segments) {
}
Status
SSMergeTask::Execute() {
if (segments_.size() <= 1) {
return Status::OK();
}
snapshot::OperationContext context;
for (auto& id : segments_) {
auto seg = snapshot_->GetResource<snapshot::Segment>(id);
if (!seg) {
return Status(DB_ERROR, "snapshot segment is null");
}
context.stale_segments.push_back(seg);
if (!context.prev_partition) {
snapshot::PartitionPtr partition = snapshot_->GetResource<snapshot::Partition>(seg->GetPartitionId());
context.prev_partition = partition;
}
}
auto op = std::make_shared<snapshot::MergeOperation>(context, snapshot_);
snapshot::SegmentPtr new_seg;
auto status = op->CommitNewSegment(new_seg);
if (!status.ok()) {
return status;
}
// TODO: merge each field, each field create a new SegmentFile
snapshot::SegmentFileContext sf_context;
sf_context.field_name = "vector";
sf_context.field_element_name = "ivfsq8";
sf_context.segment_id = 1;
sf_context.partition_id = 1;
sf_context.segment_id = new_seg->GetID();
snapshot::SegmentFilePtr seg_file;
status = op->CommitNewSegmentFile(sf_context, seg_file);
status = op->Push();
return status;
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include "db/merge/MergeManager.h"
#include "db/meta/MetaTypes.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshot.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class SSMergeTask {
public:
SSMergeTask(const DBOptions& options, const snapshot::ScopedSnapshotT& ss, const snapshot::IDS_TYPE& segments);
Status
Execute();
private:
DBOptions options_;
snapshot::ScopedSnapshotT snapshot_;
snapshot::IDS_TYPE segments_;
}; // SSMergeTask
} // namespace engine
} // namespace milvus
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册