NewMemManager.cpp 3.3 KB
Newer Older
Z
zhiru 已提交
1 2
#include "NewMemManager.h"
#include "VectorSource.h"
Z
zhiru 已提交
3 4 5 6
#include "Log.h"
#include "Constants.h"

#include <thread>
Z
zhiru 已提交
7

Z
update  
zhiru 已提交
8

Z
zhiru 已提交
9 10 11 12
namespace zilliz {
namespace milvus {
namespace engine {

Z
update  
zhiru 已提交
13
NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string &table_id) {
Z
zhiru 已提交
14 15 16 17 18 19 20 21 22
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
        return memIt->second;
    }

    mem_id_map_[table_id] = std::make_shared<MemTable>(table_id, meta_, options_);
    return mem_id_map_[table_id];
}

Z
update  
zhiru 已提交
23
Status NewMemManager::InsertVectors(const std::string &table_id_,
Z
zhiru 已提交
24
                                    size_t n_,
Z
update  
zhiru 已提交
25 26
                                    const float *vectors_,
                                    IDNumbers &vector_ids_) {
Z
zhiru 已提交
27

28
    while (GetCurrentMem() > options_.insert_buffer_size) {
Z
zhiru 已提交
29 30
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
Z
zhiru 已提交
31 32 33 34 35 36

    std::unique_lock<std::mutex> lock(mutex_);

    return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}

Z
update  
zhiru 已提交
37
Status NewMemManager::InsertVectorsNoLock(const std::string &table_id,
Z
zhiru 已提交
38
                                          size_t n,
Z
update  
zhiru 已提交
39 40
                                          const float *vectors,
                                          IDNumbers &vector_ids) {
Z
zhiru 已提交
41

Z
zhiru 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54
    MemTablePtr mem = GetMemByTable(table_id);
    VectorSource::Ptr source = std::make_shared<VectorSource>(n, vectors);

    auto status = mem->Add(source);
    if (status.ok()) {
        vector_ids = source->GetVectorIds();
    }
    return status;
}

Status NewMemManager::ToImmutable() {
    std::unique_lock<std::mutex> lock(mutex_);
    MemIdMap temp_map;
Z
update  
zhiru 已提交
55 56 57
    for (auto &kv: mem_id_map_) {
        if (kv.second->Empty()) {
            //empty table, no need to serialize
Z
zhiru 已提交
58
            temp_map.insert(kv);
Z
update  
zhiru 已提交
59 60
        } else {
            immu_mem_list_.push_back(kv.second);
Z
zhiru 已提交
61 62 63 64 65 66 67
        }
    }

    mem_id_map_.swap(temp_map);
    return Status::OK();
}

Z
update  
zhiru 已提交
68
Status NewMemManager::Serialize(std::set<std::string> &table_ids) {
Z
zhiru 已提交
69 70 71
    ToImmutable();
    std::unique_lock<std::mutex> lock(serialization_mtx_);
    table_ids.clear();
Z
update  
zhiru 已提交
72
    for (auto &mem : immu_mem_list_) {
Z
zhiru 已提交
73 74 75 76 77 78 79
        mem->Serialize();
        table_ids.insert(mem->GetTableId());
    }
    immu_mem_list_.clear();
    return Status::OK();
}

Z
update  
zhiru 已提交
80
Status NewMemManager::EraseMemVector(const std::string &table_id) {
Z
zhiru 已提交
81 82 83 84 85 86 87 88
    {//erase MemVector from rapid-insert cache
        std::unique_lock<std::mutex> lock(mutex_);
        mem_id_map_.erase(table_id);
    }

    {//erase MemVector from serialize cache
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        MemList temp_list;
Z
update  
zhiru 已提交
89 90
        for (auto &mem : immu_mem_list_) {
            if (mem->GetTableId() != table_id) {
Z
zhiru 已提交
91 92 93 94 95 96 97 98 99
                temp_list.push_back(mem);
            }
        }
        immu_mem_list_.swap(temp_list);
    }

    return Status::OK();
}

Z
zhiru 已提交
100
size_t NewMemManager::GetCurrentMutableMem() {
Z
update  
zhiru 已提交
101 102
    size_t total_mem = 0;
    for (auto &kv : mem_id_map_) {
Z
zhiru 已提交
103
        auto memTable = kv.second;
Z
update  
zhiru 已提交
104
        total_mem += memTable->GetCurrentMem();
Z
zhiru 已提交
105
    }
Z
update  
zhiru 已提交
106
    return total_mem;
Z
zhiru 已提交
107 108 109
}

size_t NewMemManager::GetCurrentImmutableMem() {
Z
update  
zhiru 已提交
110 111 112
    size_t total_mem = 0;
    for (auto &mem_table : immu_mem_list_) {
        total_mem += mem_table->GetCurrentMem();
Z
zhiru 已提交
113
    }
Z
update  
zhiru 已提交
114
    return total_mem;
Z
zhiru 已提交
115 116 117 118 119 120
}

size_t NewMemManager::GetCurrentMem() {
    return GetCurrentMutableMem() + GetCurrentImmutableMem();
}

Z
zhiru 已提交
121 122 123
} // namespace engine
} // namespace milvus
} // namespace zilliz