未验证 提交 5c37a9fa 编写于 作者: G groot 提交者: GitHub

refine insert code (#3090)

* refine code
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* refine insert code
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
Co-authored-by: NWang XiangYu <xy.wang@zilliz.com>
上级 6f441fee
......@@ -251,7 +251,7 @@ DBImpl::DropCollection(const std::string& name) {
/* wal_mgr_->DropCollection(ss->GetCollectionId()); */
}
mem_mgr_->EraseMemVector(ss->GetCollectionId()); // not allow insert
mem_mgr_->EraseMem(ss->GetCollectionId()); // not allow insert
return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
}
......@@ -342,7 +342,7 @@ DBImpl::DropPartition(const std::string& collection_name, const std::string& par
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
// SS TODO: Is below step needed? Or How to implement it?
/* mem_mgr_->EraseMemVector(partition_name); */
/* mem_mgr_->EraseMem(partition_name); */
snapshot::PartitionContext context;
context.name = partition_name;
......
......@@ -26,24 +26,29 @@
namespace milvus {
namespace engine {
MemCollection::MemCollection(int64_t collection_id, int64_t partition_id, const DBOptions& options)
: collection_id_(collection_id), partition_id_(partition_id), options_(options) {
MemCollection::MemCollection(int64_t collection_id, const DBOptions& options)
: collection_id_(collection_id), options_(options) {
}
Status
MemCollection::Add(const milvus::engine::VectorSourcePtr& source) {
MemCollection::Add(int64_t partition_id, const milvus::engine::VectorSourcePtr& source) {
while (!source->AllAdded()) {
std::lock_guard<std::mutex> lock(mutex_);
MemSegmentPtr current_mem_segment;
if (!mem_segment_list_.empty()) {
current_mem_segment = mem_segment_list_.back();
auto pair = mem_segments_.find(partition_id);
if (pair != mem_segments_.end()) {
MemSegmentList& segments = pair->second;
if (!segments.empty()) {
current_mem_segment = segments.back();
}
}
Status status;
if (mem_segment_list_.empty() || current_mem_segment->IsFull()) {
MemSegmentPtr new_mem_segment = std::make_shared<MemSegment>(collection_id_, partition_id_, options_);
if (current_mem_segment == nullptr || current_mem_segment->IsFull()) {
MemSegmentPtr new_mem_segment = std::make_shared<MemSegment>(collection_id_, partition_id, options_);
status = new_mem_segment->Add(source);
if (status.ok()) {
mem_segment_list_.emplace_back(new_mem_segment);
mem_segments_[partition_id].emplace_back(new_mem_segment);
} else {
return status;
}
......@@ -60,23 +65,17 @@ MemCollection::Add(const milvus::engine::VectorSourcePtr& source) {
return Status::OK();
}
Status
MemCollection::Delete(segment::doc_id_t doc_id) {
// Locate which collection file the doc id lands in
for (auto& mem_segment : mem_segment_list_) {
mem_segment->Delete(doc_id);
}
// Add the id to delete list so it can be applied to other segments on disk during the next flush
doc_ids_to_delete_.insert(doc_id);
return Status::OK();
}
Status
MemCollection::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
// Locate which collection file the doc id lands in
for (auto& mem_segment : mem_segment_list_) {
mem_segment->Delete(doc_ids);
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto partition_segments : mem_segments_) {
MemSegmentList& segments = partition_segments.second;
for (auto& segment : segments) {
segment->Delete(doc_ids);
}
}
}
// Add the id to delete list so it can be applied to other segments on disk during the next flush
for (auto& id : doc_ids) {
......@@ -86,14 +85,15 @@ MemCollection::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
return Status::OK();
}
void
MemCollection::GetCurrentMemSegment(MemSegmentPtr& mem_segment) {
mem_segment = mem_segment_list_.back();
}
Status
MemCollection::EraseMem(int64_t partition_id) {
std::lock_guard<std::mutex> lock(mutex_);
auto pair = mem_segments_.find(partition_id);
if (pair != mem_segments_.end()) {
mem_segments_.erase(pair);
}
size_t
MemCollection::GetTableFileCount() {
return mem_segment_list_.size();
return Status::OK();
}
Status
......@@ -107,46 +107,39 @@ MemCollection::Serialize(uint64_t wal_lsn) {
}
}
for (auto mem_segment = mem_segment_list_.begin(); mem_segment != mem_segment_list_.end();) {
auto status = (*mem_segment)->Serialize(wal_lsn);
if (!status.ok()) {
return status;
}
LOG_ENGINE_DEBUG_ << "Flushed segment " << (*mem_segment)->GetSegmentId();
{
std::lock_guard<std::mutex> lock(mutex_);
mem_segment = mem_segment_list_.erase(mem_segment);
std::lock_guard<std::mutex> lock(mutex_);
for (auto partition_segments : mem_segments_) {
MemSegmentList& segments = partition_segments.second;
for (auto& segment : segments) {
auto status = segment->Serialize(wal_lsn);
if (!status.ok()) {
return status;
}
LOG_ENGINE_DEBUG_ << "Flushed segment " << segment->GetSegmentId() << " of collection " << collection_id_;
}
}
mem_segments_.clear();
recorder.RecordSection("Finished flushing");
return Status::OK();
}
bool
MemCollection::Empty() {
return mem_segment_list_.empty() && doc_ids_to_delete_.empty();
}
int64_t
MemCollection::GetCollectionId() const {
return collection_id_;
}
int64_t
MemCollection::GetPartitionId() const {
return partition_id_;
}
size_t
MemCollection::GetCurrentMem() {
std::lock_guard<std::mutex> lock(mutex_);
size_t total_mem = 0;
for (auto& mem_table_file : mem_segment_list_) {
total_mem += mem_table_file->GetCurrentMem();
for (auto& partition_segments : mem_segments_) {
MemSegmentList& segments = partition_segments.second;
for (auto& segment : segments) {
total_mem += segment->GetCurrentMem();
}
}
return total_mem;
}
......
......@@ -16,6 +16,7 @@
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "config/ConfigMgr.h"
......@@ -28,39 +29,28 @@ namespace engine {
class MemCollection {
public:
using MemCollectionFileList = std::vector<MemSegmentPtr>;
using MemSegmentList = std::vector<MemSegmentPtr>;
using MemSegmentMap = std::unordered_map<int64_t, MemSegmentList>; // partition id mapping to segments
MemCollection(int64_t collection_id, int64_t partition_id, const DBOptions& options);
MemCollection(int64_t collection_id, const DBOptions& options);
~MemCollection() = default;
Status
Add(const VectorSourcePtr& source);
Status
Delete(segment::doc_id_t doc_id);
Add(int64_t partition_id, const VectorSourcePtr& source);
Status
Delete(const std::vector<segment::doc_id_t>& doc_ids);
void
GetCurrentMemSegment(MemSegmentPtr& mem_segment);
size_t
GetTableFileCount();
Status
EraseMem(int64_t partition_id);
Status
Serialize(uint64_t wal_lsn);
bool
Empty();
int64_t
GetCollectionId() const;
int64_t
GetPartitionId() const;
size_t
GetCurrentMem();
......@@ -76,9 +66,8 @@ class MemCollection {
private:
int64_t collection_id_;
int64_t partition_id_;
MemCollectionFileList mem_segment_list_;
MemSegmentMap mem_segments_;
DBOptions options_;
......
......@@ -24,8 +24,6 @@
namespace milvus {
namespace engine {
extern const char* VECTOR_FIELD;
class MemManager {
public:
virtual Status
......@@ -43,14 +41,11 @@ class MemManager {
virtual Status
Flush(std::set<int64_t>& collection_ids) = 0;
// virtual Status
// Serialize(std::set<std::string>& table_ids) = 0;
virtual Status
EraseMemVector(int64_t collection_id) = 0;
EraseMem(int64_t collection_id) = 0;
virtual Status
EraseMemVector(int64_t collection_id, int64_t partition_id) = 0;
EraseMem(int64_t collection_id, int64_t partition_id) = 0;
virtual size_t
GetCurrentMutableMem() = 0;
......
......@@ -23,38 +23,21 @@
namespace milvus {
namespace engine {
const char* VECTOR_FIELD = "vector"; // hard code
MemCollectionPtr
MemManagerImpl::GetMemByTable(int64_t collection_id, int64_t partition_id) {
MemManagerImpl::GetMemByCollection(int64_t collection_id) {
auto mem_collection = mem_map_.find(collection_id);
if (mem_collection != mem_map_.end()) {
auto mem_partition = mem_collection->second.find(partition_id);
if (mem_partition != mem_collection->second.end()) {
return mem_partition->second;
}
return mem_collection->second;
}
auto mem = std::make_shared<MemCollection>(collection_id, partition_id, options_);
mem_map_[collection_id][partition_id] = mem;
auto mem = std::make_shared<MemCollection>(collection_id, options_);
mem_map_[collection_id] = mem;
return mem;
}
std::vector<MemCollectionPtr>
MemManagerImpl::GetMemByTable(int64_t collection_id) {
std::vector<MemCollectionPtr> result;
auto mem_collection = mem_map_.find(collection_id);
if (mem_collection != mem_map_.end()) {
for (auto& pair : mem_collection->second) {
result.push_back(pair.second);
}
}
return result;
}
Status
MemManagerImpl::InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) {
auto status = ValidateChunk(collection_id, partition_id, chunk);
auto status = ValidateChunk(collection_id, chunk);
if (!status.ok()) {
return status;
}
......@@ -65,7 +48,7 @@ MemManagerImpl::InsertEntities(int64_t collection_id, int64_t partition_id, cons
}
Status
MemManagerImpl::ValidateChunk(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk) {
MemManagerImpl::ValidateChunk(int64_t collection_id, const DataChunkPtr& chunk) {
if (chunk == nullptr) {
return Status(DB_ERROR, "Null chunk pointer");
}
......@@ -155,45 +138,42 @@ MemManagerImpl::ValidateChunk(int64_t collection_id, int64_t partition_id, const
Status
MemManagerImpl::InsertEntitiesNoLock(int64_t collection_id, int64_t partition_id,
const milvus::engine::VectorSourcePtr& source, uint64_t lsn) {
MemCollectionPtr mem = GetMemByTable(collection_id, partition_id);
MemCollectionPtr mem = GetMemByCollection(collection_id);
mem->SetLSN(lsn);
auto status = mem->Add(source);
auto status = mem->Add(partition_id, source);
return status;
}
Status
MemManagerImpl::DeleteEntity(int64_t collection_id, IDNumber vector_id, uint64_t lsn) {
MemManagerImpl::DeleteEntity(int64_t collection_id, IDNumber engity_id, uint64_t lsn) {
std::unique_lock<std::mutex> lock(mutex_);
std::vector<MemCollectionPtr> mems = GetMemByTable(collection_id);
MemCollectionPtr mem = GetMemByCollection(collection_id);
for (auto& mem : mems) {
mem->SetLSN(lsn);
auto status = mem->Delete(vector_id);
if (status.ok()) {
return status;
}
mem->SetLSN(lsn);
IDNumbers ids = {engity_id};
auto status = mem->Delete(ids);
if (status.ok()) {
return status;
}
return Status::OK();
}
Status
MemManagerImpl::DeleteEntities(int64_t collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) {
MemManagerImpl::DeleteEntities(int64_t collection_id, int64_t length, const IDNumber* engity_ids, uint64_t lsn) {
std::unique_lock<std::mutex> lock(mutex_);
std::vector<MemCollectionPtr> mems = GetMemByTable(collection_id);
MemCollectionPtr mem = GetMemByCollection(collection_id);
for (auto& mem : mems) {
mem->SetLSN(lsn);
mem->SetLSN(lsn);
IDNumbers ids;
ids.resize(length);
memcpy(ids.data(), vector_ids, length * sizeof(IDNumber));
IDNumbers ids;
ids.resize(length);
memcpy(ids.data(), engity_ids, length * sizeof(IDNumber));
auto status = mem->Delete(ids);
if (!status.ok()) {
return status;
}
auto status = mem->Delete(ids);
if (!status.ok()) {
return status;
}
return Status::OK();
......@@ -202,7 +182,7 @@ MemManagerImpl::DeleteEntities(int64_t collection_id, int64_t length, const IDNu
Status
MemManagerImpl::Flush(int64_t collection_id) {
ToImmutable(collection_id);
// TODO: There is actually only one memTable in the immutable list
MemList temp_immutable_list;
{
std::unique_lock<std::mutex> lock(mutex_);
......@@ -260,19 +240,8 @@ MemManagerImpl::ToImmutable(int64_t collection_id) {
auto mem_collection = mem_map_.find(collection_id);
if (mem_collection != mem_map_.end()) {
MemPartitionMap temp_map;
for (auto& mem : mem_collection->second) {
if (mem.second->Empty()) {
temp_map.insert(mem);
} else {
immu_mem_list_.push_back(mem.second);
}
}
mem_collection->second.swap(temp_map);
if (temp_map.empty()) {
mem_map_.erase(mem_collection);
}
immu_mem_list_.push_back(mem_collection->second);
mem_map_.erase(mem_collection);
}
return Status::OK();
......@@ -283,23 +252,15 @@ MemManagerImpl::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
for (auto& mem_collection : mem_map_) {
MemPartitionMap temp_map;
for (auto& mem : mem_collection.second) {
if (mem.second->Empty()) {
temp_map.insert(mem);
} else {
immu_mem_list_.push_back(mem.second);
}
}
mem_collection.second.swap(temp_map);
immu_mem_list_.push_back(mem_collection.second);
}
mem_map_.clear();
return Status::OK();
}
Status
MemManagerImpl::EraseMemVector(int64_t collection_id) {
MemManagerImpl::EraseMem(int64_t collection_id) {
{ // erase MemVector from rapid-insert cache
std::unique_lock<std::mutex> lock(mutex_);
mem_map_.erase(collection_id);
......@@ -320,15 +281,12 @@ MemManagerImpl::EraseMemVector(int64_t collection_id) {
}
Status
MemManagerImpl::EraseMemVector(int64_t collection_id, int64_t partition_id) {
MemManagerImpl::EraseMem(int64_t collection_id, int64_t partition_id) {
{ // erase MemVector from rapid-insert cache
std::unique_lock<std::mutex> lock(mutex_);
auto mem_collection = mem_map_.find(collection_id);
if (mem_collection != mem_map_.end()) {
mem_collection->second.erase(partition_id);
if (mem_collection->second.empty()) {
mem_map_.erase(collection_id);
}
mem_collection->second->EraseMem(partition_id);
}
}
......@@ -336,11 +294,8 @@ MemManagerImpl::EraseMemVector(int64_t collection_id, int64_t partition_id) {
std::unique_lock<std::mutex> lock(serialization_mtx_);
MemList temp_list;
for (auto& mem : immu_mem_list_) {
if (mem->GetCollectionId() != collection_id && mem->GetPartitionId() != partition_id) {
temp_list.push_back(mem);
}
mem->EraseMem(partition_id);
}
immu_mem_list_.swap(temp_list);
}
return Status::OK();
......@@ -351,9 +306,7 @@ MemManagerImpl::GetCurrentMutableMem() {
size_t total_mem = 0;
std::unique_lock<std::mutex> lock(mutex_);
for (auto& mem_collection : mem_map_) {
for (auto& mem : mem_collection.second) {
total_mem += mem.second->GetCurrentMem();
}
total_mem += mem_collection.second->GetCurrentMem();
}
return total_mem;
}
......@@ -362,8 +315,8 @@ size_t
MemManagerImpl::GetCurrentImmutableMem() {
size_t total_mem = 0;
std::unique_lock<std::mutex> lock(serialization_mtx_);
for (auto& mem_table : immu_mem_list_) {
total_mem += mem_table->GetCurrentMem();
for (auto& mem_collection : immu_mem_list_) {
total_mem += mem_collection->GetCurrentMem();
}
return total_mem;
}
......@@ -374,9 +327,9 @@ MemManagerImpl::GetCurrentMem() {
}
uint64_t
MemManagerImpl::GetMaxLSN(const MemList& tables) {
MemManagerImpl::GetMaxLSN(const MemList& collections) {
uint64_t max_lsn = 0;
for (auto& collection : tables) {
for (auto& collection : collections) {
auto cur_lsn = collection->GetLSN();
if (collection->GetLSN() > max_lsn) {
max_lsn = cur_lsn;
......
......@@ -30,8 +30,7 @@ namespace engine {
class MemManagerImpl : public MemManager {
public:
using Ptr = std::shared_ptr<MemManagerImpl>;
using MemPartitionMap = std::map<int64_t, MemCollectionPtr>;
using MemCollectionMap = std::map<int64_t, MemPartitionMap>;
using MemCollectionMap = std::unordered_map<int64_t, MemCollectionPtr>;
using MemList = std::vector<MemCollectionPtr>;
explicit MemManagerImpl(const DBOptions& options) : options_(options) {
......@@ -43,10 +42,10 @@ class MemManagerImpl : public MemManager {
InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) override;
Status
DeleteEntity(int64_t collection_id, IDNumber vector_id, uint64_t lsn) override;
DeleteEntity(int64_t collection_id, IDNumber engity_ids, uint64_t lsn) override;
Status
DeleteEntities(int64_t collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) override;
DeleteEntities(int64_t collection_id, int64_t length, const IDNumber* engity_idss, uint64_t lsn) override;
Status
Flush(int64_t collection_id) override;
......@@ -55,10 +54,10 @@ class MemManagerImpl : public MemManager {
Flush(std::set<int64_t>& collection_ids) override;
Status
EraseMemVector(int64_t collection_id) override;
EraseMem(int64_t collection_id) override;
Status
EraseMemVector(int64_t collection_id, int64_t partition_id) override;
EraseMem(int64_t collection_id, int64_t partition_id) override;
size_t
GetCurrentMutableMem() override;
......@@ -71,13 +70,10 @@ class MemManagerImpl : public MemManager {
private:
MemCollectionPtr
GetMemByTable(int64_t collection_id, int64_t partition_id);
std::vector<MemCollectionPtr>
GetMemByTable(int64_t collection_id);
GetMemByCollection(int64_t collection_id);
Status
ValidateChunk(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk);
ValidateChunk(int64_t collection_id, const DataChunkPtr& chunk);
Status
InsertEntitiesNoLock(int64_t collection_id, int64_t partition_id, const VectorSourcePtr& source, uint64_t lsn);
......@@ -89,7 +85,7 @@ class MemManagerImpl : public MemManager {
ToImmutable(int64_t collection_id);
uint64_t
GetMaxLSN(const MemList& tables);
GetMaxLSN(const MemList& collections);
MemCollectionMap mem_map_;
MemList immu_mem_list_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册