MemManager.cpp 5.2 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7
#include "MemManager.h"
#include "Meta.h"
8
#include "MetaConsts.h"
G
groot 已提交
9
#include "EngineFactory.h"
Y
yu yunfeng 已提交
10
#include "metrics/Metrics.h"
11

12 13 14 15
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
16

X
Xu Peng 已提交
17
namespace zilliz {
J
jinhai 已提交
18
namespace milvus {
X
Xu Peng 已提交
19
namespace engine {
20

G
groot 已提交
21
MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
22
        const meta::TableFileSchema& schema, const Options& options)
G
groot 已提交
23
  : meta_(meta_ptr),
X
Xu Peng 已提交
24 25
    options_(options),
    schema_(schema),
G
groot 已提交
26 27
    id_generator_(new SimpleIDGenerator()),
    active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) {
28 29
}

Y
yu yunfeng 已提交
30

G
groot 已提交
31 32 33 34 35
Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
    if(active_engine_ == nullptr) {
        return Status::Error("index engine is null");
    }

Y
yu yunfeng 已提交
36
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
37 38
    id_generator_->GetNextIDNumbers(n_, vector_ids_);
    Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data());
Y
yu yunfeng 已提交
39 40
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
41
    server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(schema_.dimension_), total_time);
G
groot 已提交
42 43

    return status;
44 45
}

G
groot 已提交
46 47 48 49 50 51
size_t MemVectors::RowCount() const {
    if(active_engine_ == nullptr) {
        return 0;
    }

    return active_engine_->Count();
52 53
}

G
groot 已提交
54 55 56 57 58 59
size_t MemVectors::Size() const {
    if(active_engine_ == nullptr) {
        return 0;
    }

    return active_engine_->Size();
60 61
}

G
groot 已提交
62
Status MemVectors::Serialize(std::string& table_id) {
G
groot 已提交
63 64 65 66
    if(active_engine_ == nullptr) {
        return Status::Error("index engine is null");
    }

G
groot 已提交
67
    table_id = schema_.table_id_;
G
groot 已提交
68
    auto size = Size();
Y
yu yunfeng 已提交
69
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
70
    active_engine_->Serialize();
Y
yu yunfeng 已提交
71 72
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
73
    schema_.size_ = size;
Y
yu yunfeng 已提交
74 75 76

    server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size/total_time);

G
groot 已提交
77
    schema_.file_type_ = (size >= options_.index_trigger_size) ?
78
        meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
X
Xu Peng 已提交
79

G
groot 已提交
80
    auto status = meta_->UpdateTableFile(schema_);
X
Xu Peng 已提交
81

G
groot 已提交
82
    LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
G
groot 已提交
83
        << " file " << schema_.file_id_ << " of size " << (double)(active_engine_->Size()) / (double)meta::M << " M";
X
Xu Peng 已提交
84

G
groot 已提交
85
    active_engine_->Cache();
X
Xu Peng 已提交
86

X
Xu Peng 已提交
87
    return status;
88 89
}

G
groot 已提交
90
MemVectors::~MemVectors() {
G
groot 已提交
91 92 93
    if (id_generator_ != nullptr) {
        delete id_generator_;
        id_generator_ = nullptr;
94 95 96 97 98 99
    }
}

/*
 * MemManager
 */
G
groot 已提交
100
MemManager::MemVectorsPtr MemManager::GetMemByTable(
101
        const std::string& table_id) {
G
groot 已提交
102 103
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
X
Xu Peng 已提交
104
        return memIt->second;
105
    }
106

X
Xu Peng 已提交
107
    meta::TableFileSchema table_file;
G
groot 已提交
108
    table_file.table_id_ = table_id;
G
groot 已提交
109
    auto status = meta_->CreateTableFile(table_file);
110 111 112
    if (!status.ok()) {
        return nullptr;
    }
X
Xu Peng 已提交
113

G
groot 已提交
114 115
    mem_id_map_[table_id] = MemVectorsPtr(new MemVectors(meta_, table_file, options_));
    return mem_id_map_[table_id];
116 117
}

G
groot 已提交
118
Status MemManager::InsertVectors(const std::string& table_id_,
119 120 121
        size_t n_,
        const float* vectors_,
        IDNumbers& vector_ids_) {
X
Xu Peng 已提交
122
    std::unique_lock<std::mutex> lock(mutex_);
Y
yu yunfeng 已提交
123

X
Xu Peng 已提交
124
    return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
125 126
}

G
groot 已提交
127
Status MemManager::InsertVectorsNoLock(const std::string& table_id,
128
        size_t n,
129
        const float* vectors,
X
Xu Peng 已提交
130
        IDNumbers& vector_ids) {
X
Xu Peng 已提交
131
    MemVectorsPtr mem = GetMemByTable(table_id);
132
    if (mem == nullptr) {
133
        return Status::NotFound("Group " + table_id + " not found!");
134
    }
X
Xu Peng 已提交
135

G
groot 已提交
136 137 138 139 140 141 142 143 144
    //makesure each file size less than index_trigger_size
    if(mem->Size() > options_.index_trigger_size) {
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        immu_mem_list_.push_back(mem);
        mem_id_map_.erase(table_id);
        return InsertVectorsNoLock(table_id, n, vectors, vector_ids);
    } else {
        return mem->Add(n, vectors, vector_ids);
    }
145 146
}

G
groot 已提交
147
Status MemManager::ToImmutable() {
X
Xu Peng 已提交
148
    std::unique_lock<std::mutex> lock(mutex_);
G
groot 已提交
149 150
    for (auto& kv: mem_id_map_) {
        immu_mem_list_.push_back(kv.second);
X
Xu Peng 已提交
151
    }
X
Xu Peng 已提交
152

G
groot 已提交
153
    mem_id_map_.clear();
154
    return Status::OK();
X
Xu Peng 已提交
155 156
}

G
groot 已提交
157
Status MemManager::Serialize(std::set<std::string>& table_ids) {
X
Xu Peng 已提交
158
    ToImmutable();
X
Xu Peng 已提交
159
    std::unique_lock<std::mutex> lock(serialization_mtx_);
160 161
    std::string table_id;
    table_ids.clear();
G
groot 已提交
162
    for (auto& mem : immu_mem_list_) {
X
Xu Peng 已提交
163
        mem->Serialize(table_id);
G
groot 已提交
164
        table_ids.insert(table_id);
X
Xu Peng 已提交
165
    }
G
groot 已提交
166
    immu_mem_list_.clear();
167
    return Status::OK();
X
Xu Peng 已提交
168 169
}

G
groot 已提交
170 171
Status MemManager::EraseMemVector(const std::string& table_id) {
    std::unique_lock<std::mutex> lock(mutex_);
G
groot 已提交
172
    mem_id_map_.erase(table_id);
G
groot 已提交
173 174 175 176

    return Status::OK();
}

177

X
Xu Peng 已提交
178
} // namespace engine
J
jinhai 已提交
179
} // namespace milvus
X
Xu Peng 已提交
180
} // namespace zilliz