MemManager.cpp 6.6 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7
#include "MemManager.h"
#include "Meta.h"
8
#include "MetaConsts.h"
G
groot 已提交
9
#include "EngineFactory.h"
Y
yu yunfeng 已提交
10
#include "metrics/Metrics.h"
Z
zhiru 已提交
11
#include "Log.h"
12

13 14 15 16
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
17

Z
update  
zhiru 已提交
18

X
Xu Peng 已提交
19
namespace zilliz {
J
jinhai 已提交
20
namespace milvus {
X
Xu Peng 已提交
21
namespace engine {
22

Z
update  
zhiru 已提交
23 24 25 26 27 28 29
MemVectors::MemVectors(const std::shared_ptr<meta::Meta> &meta_ptr,
                       const meta::TableFileSchema &schema, const Options &options)
    : meta_(meta_ptr),
      options_(options),
      schema_(schema),
      id_generator_(new SimpleIDGenerator()),
      active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType) schema_.engine_type_)) {
30 31
}

Y
yu yunfeng 已提交
32

Z
update  
zhiru 已提交
33 34
Status MemVectors::Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_) {
    if (active_engine_ == nullptr) {
G
groot 已提交
35 36 37
        return Status::Error("index engine is null");
    }

Y
yu yunfeng 已提交
38
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
39 40
    id_generator_->GetNextIDNumbers(n_, vector_ids_);
    Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data());
Y
yu yunfeng 已提交
41 42
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Z
update  
zhiru 已提交
43 44 45
    server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
                                                               static_cast<int>(schema_.dimension_),
                                                               total_time);
G
groot 已提交
46 47

    return status;
48 49
}

G
groot 已提交
50
size_t MemVectors::RowCount() const {
Z
update  
zhiru 已提交
51
    if (active_engine_ == nullptr) {
G
groot 已提交
52 53 54 55
        return 0;
    }

    return active_engine_->Count();
56 57
}

G
groot 已提交
58
size_t MemVectors::Size() const {
Z
update  
zhiru 已提交
59
    if (active_engine_ == nullptr) {
G
groot 已提交
60 61 62 63
        return 0;
    }

    return active_engine_->Size();
64 65
}

Z
update  
zhiru 已提交
66 67
Status MemVectors::Serialize(std::string &table_id) {
    if (active_engine_ == nullptr) {
G
groot 已提交
68 69 70
        return Status::Error("index engine is null");
    }

G
groot 已提交
71
    table_id = schema_.table_id_;
G
groot 已提交
72
    auto size = Size();
Y
yu yunfeng 已提交
73
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
74
    active_engine_->Serialize();
Y
yu yunfeng 已提交
75 76
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
77
    schema_.size_ = size;
Y
yu yunfeng 已提交
78

Z
update  
zhiru 已提交
79
    server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size / total_time);
Y
yu yunfeng 已提交
80

G
groot 已提交
81
    schema_.file_type_ = (size >= options_.index_trigger_size) ?
Z
update  
zhiru 已提交
82
                         meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
X
Xu Peng 已提交
83

G
groot 已提交
84
    auto status = meta_->UpdateTableFile(schema_);
X
Xu Peng 已提交
85

G
groot 已提交
86
    LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
Z
update  
zhiru 已提交
87 88
               << " file " << schema_.file_id_ << " of size " << (double) (active_engine_->Size()) / (double) meta::M
               << " M";
X
Xu Peng 已提交
89

G
groot 已提交
90
    active_engine_->Cache();
X
Xu Peng 已提交
91

X
Xu Peng 已提交
92
    return status;
93 94
}

G
groot 已提交
95
MemVectors::~MemVectors() {
G
groot 已提交
96 97 98
    if (id_generator_ != nullptr) {
        delete id_generator_;
        id_generator_ = nullptr;
99 100 101 102 103 104
    }
}

/*
 * MemManager
 */
G
groot 已提交
105
MemManager::MemVectorsPtr MemManager::GetMemByTable(
Z
update  
zhiru 已提交
106
    const std::string &table_id) {
G
groot 已提交
107 108
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
X
Xu Peng 已提交
109
        return memIt->second;
110
    }
111

X
Xu Peng 已提交
112
    meta::TableFileSchema table_file;
G
groot 已提交
113
    table_file.table_id_ = table_id;
G
groot 已提交
114
    auto status = meta_->CreateTableFile(table_file);
115 116 117
    if (!status.ok()) {
        return nullptr;
    }
X
Xu Peng 已提交
118

G
groot 已提交
119 120
    mem_id_map_[table_id] = MemVectorsPtr(new MemVectors(meta_, table_file, options_));
    return mem_id_map_[table_id];
121 122
}

Z
update  
zhiru 已提交
123 124 125 126 127
Status MemManager::InsertVectors(const std::string &table_id_,
                                 size_t n_,
                                 const float *vectors_,
                                 IDNumbers &vector_ids_) {

X
Xu Peng 已提交
128
    std::unique_lock<std::mutex> lock(mutex_);
Y
yu yunfeng 已提交
129

X
Xu Peng 已提交
130
    return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
131 132
}

Z
update  
zhiru 已提交
133 134 135 136
Status MemManager::InsertVectorsNoLock(const std::string &table_id,
                                       size_t n,
                                       const float *vectors,
                                       IDNumbers &vector_ids) {
Z
zhiru 已提交
137

X
Xu Peng 已提交
138
    MemVectorsPtr mem = GetMemByTable(table_id);
139
    if (mem == nullptr) {
140
        return Status::NotFound("Group " + table_id + " not found!");
141
    }
X
Xu Peng 已提交
142

G
groot 已提交
143
    //makesure each file size less than index_trigger_size
Z
update  
zhiru 已提交
144
    if (mem->Size() > options_.index_trigger_size) {
G
groot 已提交
145 146 147 148 149 150 151
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        immu_mem_list_.push_back(mem);
        mem_id_map_.erase(table_id);
        return InsertVectorsNoLock(table_id, n, vectors, vector_ids);
    } else {
        return mem->Add(n, vectors, vector_ids);
    }
152 153
}

G
groot 已提交
154
Status MemManager::ToImmutable() {
X
Xu Peng 已提交
155
    std::unique_lock<std::mutex> lock(mutex_);
G
groot 已提交
156
    MemIdMap temp_map;
Z
update  
zhiru 已提交
157 158
    for (auto &kv: mem_id_map_) {
        if (kv.second->RowCount() == 0) {
G
groot 已提交
159 160 161
            temp_map.insert(kv);
            continue;//empty vector, no need to serialize
        }
G
groot 已提交
162
        immu_mem_list_.push_back(kv.second);
X
Xu Peng 已提交
163
    }
X
Xu Peng 已提交
164

G
groot 已提交
165
    mem_id_map_.swap(temp_map);
166
    return Status::OK();
X
Xu Peng 已提交
167 168
}

Z
update  
zhiru 已提交
169
Status MemManager::Serialize(std::set<std::string> &table_ids) {
X
Xu Peng 已提交
170
    ToImmutable();
X
Xu Peng 已提交
171
    std::unique_lock<std::mutex> lock(serialization_mtx_);
172 173
    std::string table_id;
    table_ids.clear();
Z
update  
zhiru 已提交
174
    for (auto &mem : immu_mem_list_) {
X
Xu Peng 已提交
175
        mem->Serialize(table_id);
G
groot 已提交
176
        table_ids.insert(table_id);
X
Xu Peng 已提交
177
    }
G
groot 已提交
178
    immu_mem_list_.clear();
179
    return Status::OK();
X
Xu Peng 已提交
180 181
}

Z
update  
zhiru 已提交
182
Status MemManager::EraseMemVector(const std::string &table_id) {
G
groot 已提交
183 184 185 186 187 188 189 190
    {//erase MemVector from rapid-insert cache
        std::unique_lock<std::mutex> lock(mutex_);
        mem_id_map_.erase(table_id);
    }

    {//erase MemVector from serialize cache
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        MemList temp_list;
Z
update  
zhiru 已提交
191 192
        for (auto &mem : immu_mem_list_) {
            if (mem->TableId() != table_id) {
G
groot 已提交
193 194 195 196 197
                temp_list.push_back(mem);
            }
        }
        immu_mem_list_.swap(temp_list);
    }
G
groot 已提交
198 199 200 201

    return Status::OK();
}

Z
zhiru 已提交
202 203
size_t MemManager::GetCurrentMutableMem() {
    size_t totalMem = 0;
Z
update  
zhiru 已提交
204
    for (auto &kv : mem_id_map_) {
Z
zhiru 已提交
205 206 207 208 209 210 211 212
        auto memVector = kv.second;
        totalMem += memVector->Size();
    }
    return totalMem;
}

size_t MemManager::GetCurrentImmutableMem() {
    size_t totalMem = 0;
Z
update  
zhiru 已提交
213
    for (auto &memVector : immu_mem_list_) {
Z
zhiru 已提交
214 215 216 217 218 219 220 221
        totalMem += memVector->Size();
    }
    return totalMem;
}

size_t MemManager::GetCurrentMem() {
    return GetCurrentMutableMem() + GetCurrentImmutableMem();
}
222

X
Xu Peng 已提交
223
} // namespace engine
J
jinhai 已提交
224
} // namespace milvus
X
Xu Peng 已提交
225
} // namespace zilliz