MemManager.cpp 6.8 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7
#include "MemManager.h"
#include "Meta.h"
8
#include "MetaConsts.h"
G
groot 已提交
9
#include "EngineFactory.h"
Y
yu yunfeng 已提交
10
#include "metrics/Metrics.h"
Z
zhiru 已提交
11
#include "Log.h"
12

13 14 15 16
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
17

Z
update  
zhiru 已提交
18

X
Xu Peng 已提交
19
namespace zilliz {
J
jinhai 已提交
20
namespace milvus {
X
Xu Peng 已提交
21
namespace engine {
22

Z
update  
zhiru 已提交
23 24 25 26 27 28 29
MemVectors::MemVectors(const std::shared_ptr<meta::Meta> &meta_ptr,
                       const meta::TableFileSchema &schema, const Options &options)
    : meta_(meta_ptr),
      options_(options),
      schema_(schema),
      id_generator_(new SimpleIDGenerator()),
      active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType) schema_.engine_type_)) {
30 31
}

Y
yu yunfeng 已提交
32

Z
update  
zhiru 已提交
33 34
Status MemVectors::Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_) {
    if (active_engine_ == nullptr) {
S
starlord 已提交
35 36 37
        return Status::Error("index engine is null");
    }

Y
yu yunfeng 已提交
38
    auto start_time = METRICS_NOW_TIME;
S
starlord 已提交
39 40
    id_generator_->GetNextIDNumbers(n_, vector_ids_);
    Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data());
Y
yu yunfeng 已提交
41 42
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Z
update  
zhiru 已提交
43 44 45
    server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
                                                               static_cast<int>(schema_.dimension_),
                                                               total_time);
S
starlord 已提交
46 47

    return status;
48 49
}

S
starlord 已提交
50
size_t MemVectors::RowCount() const {
Z
update  
zhiru 已提交
51
    if (active_engine_ == nullptr) {
S
starlord 已提交
52 53 54 55
        return 0;
    }

    return active_engine_->Count();
56 57
}

S
starlord 已提交
58
size_t MemVectors::Size() const {
Z
update  
zhiru 已提交
59
    if (active_engine_ == nullptr) {
S
starlord 已提交
60 61 62 63
        return 0;
    }

    return active_engine_->Size();
64 65
}

Z
update  
zhiru 已提交
66 67
Status MemVectors::Serialize(std::string &table_id) {
    if (active_engine_ == nullptr) {
S
starlord 已提交
68 69 70
        return Status::Error("index engine is null");
    }

G
groot 已提交
71
    table_id = schema_.table_id_;
S
starlord 已提交
72
    auto size = Size();
Y
yu yunfeng 已提交
73
    auto start_time = METRICS_NOW_TIME;
S
starlord 已提交
74
    active_engine_->Serialize();
Y
yu yunfeng 已提交
75 76
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
77
    schema_.size_ = size;
Y
yu yunfeng 已提交
78

Z
update  
zhiru 已提交
79
    server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size / total_time);
Y
yu yunfeng 已提交
80

G
groot 已提交
81
    schema_.file_type_ = (size >= options_.index_trigger_size) ?
Z
update  
zhiru 已提交
82
                         meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
X
Xu Peng 已提交
83

S
starlord 已提交
84
    auto status = meta_->UpdateTableFile(schema_);
X
Xu Peng 已提交
85

G
groot 已提交
86
    LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
Z
update  
zhiru 已提交
87 88
               << " file " << schema_.file_id_ << " of size " << (double) (active_engine_->Size()) / (double) meta::M
               << " M";
X
Xu Peng 已提交
89

S
starlord 已提交
90
    active_engine_->Cache();
X
Xu Peng 已提交
91

X
Xu Peng 已提交
92
    return status;
93 94
}

G
groot 已提交
95
MemVectors::~MemVectors() {
S
starlord 已提交
96 97 98
    if (id_generator_ != nullptr) {
        delete id_generator_;
        id_generator_ = nullptr;
99 100 101 102 103 104
    }
}

/*
 * MemManager
 */
G
groot 已提交
105
MemManager::MemVectorsPtr MemManager::GetMemByTable(
Z
update  
zhiru 已提交
106
    const std::string &table_id) {
S
starlord 已提交
107 108
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
X
Xu Peng 已提交
109
        return memIt->second;
110
    }
111

X
Xu Peng 已提交
112
    meta::TableFileSchema table_file;
G
groot 已提交
113
    table_file.table_id_ = table_id;
S
starlord 已提交
114
    auto status = meta_->CreateTableFile(table_file);
115 116 117
    if (!status.ok()) {
        return nullptr;
    }
X
Xu Peng 已提交
118

S
starlord 已提交
119 120
    mem_id_map_[table_id] = MemVectorsPtr(new MemVectors(meta_, table_file, options_));
    return mem_id_map_[table_id];
121 122
}

Z
update  
zhiru 已提交
123 124 125 126 127 128 129 130
Status MemManager::InsertVectors(const std::string &table_id_,
                                 size_t n_,
                                 const float *vectors_,
                                 IDNumbers &vector_ids_) {

    LOG(DEBUG) << "MemManager::InsertVectors: mutable mem = " << GetCurrentMutableMem() <<
               ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem();

X
Xu Peng 已提交
131
    std::unique_lock<std::mutex> lock(mutex_);
Y
yu yunfeng 已提交
132

X
Xu Peng 已提交
133
    return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
134 135
}

Z
update  
zhiru 已提交
136 137 138 139
Status MemManager::InsertVectorsNoLock(const std::string &table_id,
                                       size_t n,
                                       const float *vectors,
                                       IDNumbers &vector_ids) {
Z
zhiru 已提交
140

X
Xu Peng 已提交
141
    MemVectorsPtr mem = GetMemByTable(table_id);
142
    if (mem == nullptr) {
143
        return Status::NotFound("Group " + table_id + " not found!");
144
    }
X
Xu Peng 已提交
145

S
starlord 已提交
146
    //makesure each file size less than index_trigger_size
Z
update  
zhiru 已提交
147
    if (mem->Size() > options_.index_trigger_size) {
S
starlord 已提交
148 149 150 151 152 153 154
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        immu_mem_list_.push_back(mem);
        mem_id_map_.erase(table_id);
        return InsertVectorsNoLock(table_id, n, vectors, vector_ids);
    } else {
        return mem->Add(n, vectors, vector_ids);
    }
155 156
}

G
groot 已提交
157
Status MemManager::ToImmutable() {
X
Xu Peng 已提交
158
    std::unique_lock<std::mutex> lock(mutex_);
S
starlord 已提交
159
    MemIdMap temp_map;
Z
update  
zhiru 已提交
160 161
    for (auto &kv: mem_id_map_) {
        if (kv.second->RowCount() == 0) {
S
starlord 已提交
162 163 164
            temp_map.insert(kv);
            continue;//empty vector, no need to serialize
        }
S
starlord 已提交
165
        immu_mem_list_.push_back(kv.second);
X
Xu Peng 已提交
166
    }
X
Xu Peng 已提交
167

S
starlord 已提交
168
    mem_id_map_.swap(temp_map);
169
    return Status::OK();
X
Xu Peng 已提交
170 171
}

Z
update  
zhiru 已提交
172
Status MemManager::Serialize(std::set<std::string> &table_ids) {
X
Xu Peng 已提交
173
    ToImmutable();
X
Xu Peng 已提交
174
    std::unique_lock<std::mutex> lock(serialization_mtx_);
175 176
    std::string table_id;
    table_ids.clear();
Z
update  
zhiru 已提交
177
    for (auto &mem : immu_mem_list_) {
X
Xu Peng 已提交
178
        mem->Serialize(table_id);
S
starlord 已提交
179
        table_ids.insert(table_id);
X
Xu Peng 已提交
180
    }
S
starlord 已提交
181
    immu_mem_list_.clear();
182
    return Status::OK();
X
Xu Peng 已提交
183 184
}

Z
update  
zhiru 已提交
185
Status MemManager::EraseMemVector(const std::string &table_id) {
S
starlord 已提交
186 187 188 189 190 191 192 193
    {//erase MemVector from rapid-insert cache
        std::unique_lock<std::mutex> lock(mutex_);
        mem_id_map_.erase(table_id);
    }

    {//erase MemVector from serialize cache
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        MemList temp_list;
Z
update  
zhiru 已提交
194 195
        for (auto &mem : immu_mem_list_) {
            if (mem->TableId() != table_id) {
S
starlord 已提交
196 197 198 199 200
                temp_list.push_back(mem);
            }
        }
        immu_mem_list_.swap(temp_list);
    }
G
groot 已提交
201 202 203 204

    return Status::OK();
}

Z
zhiru 已提交
205 206
size_t MemManager::GetCurrentMutableMem() {
    size_t totalMem = 0;
Z
update  
zhiru 已提交
207
    for (auto &kv : mem_id_map_) {
Z
zhiru 已提交
208 209 210 211 212 213 214 215
        auto memVector = kv.second;
        totalMem += memVector->Size();
    }
    return totalMem;
}

size_t MemManager::GetCurrentImmutableMem() {
    size_t totalMem = 0;
Z
update  
zhiru 已提交
216
    for (auto &memVector : immu_mem_list_) {
Z
zhiru 已提交
217 218 219 220 221 222 223 224
        totalMem += memVector->Size();
    }
    return totalMem;
}

size_t MemManager::GetCurrentMem() {
    return GetCurrentMutableMem() + GetCurrentImmutableMem();
}
225

X
Xu Peng 已提交
226
} // namespace engine
J
jinhai 已提交
227
} // namespace milvus
X
Xu Peng 已提交
228
} // namespace zilliz