MemManager.inl 4.3 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#pragma once
7

X
Xu Peng 已提交
8 9
#include "MemManager.h"
#include "Meta.h"
10
#include "MetaConsts.h"
Y
yu yunfeng 已提交
11
#include "metrics/Metrics.h"
12

13 14 15 16
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
17

X
Xu Peng 已提交
18 19 20
namespace zilliz {
namespace vecwise {
namespace engine {
21

22 23
template<typename EngineT>
MemVectors<EngineT>::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
24
        const meta::TableFileSchema& schema, const Options& options)
X
Xu Peng 已提交
25 26 27
  : pMeta_(meta_ptr),
    options_(options),
    schema_(schema),
X
Xu Peng 已提交
28
    pIdGenerator_(new SimpleIDGenerator()),
29
    pEE_(new EngineT(schema_.dimension, schema_.location)) {
30 31
}

32
template<typename EngineT>
X
Xu Peng 已提交
33 34
void MemVectors<EngineT>::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
    pIdGenerator_->GetNextIDNumbers(n_, vector_ids_);
X
Xu Peng 已提交
35
    pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
36 37
}

38
template<typename EngineT>
X
Xu Peng 已提交
39
size_t MemVectors<EngineT>::Total() const {
X
Xu Peng 已提交
40
    return pEE_->Count();
41 42
}

43
template<typename EngineT>
X
Xu Peng 已提交
44
size_t MemVectors<EngineT>::ApproximateSize() const {
X
Xu Peng 已提交
45
    return pEE_->Size();
46 47
}

48
template<typename EngineT>
X
Xu Peng 已提交
49
Status MemVectors<EngineT>::Serialize(std::string& table_id) {
50
    table_id = schema_.table_id;
X
Xu Peng 已提交
51
    auto size = ApproximateSize();
Y
yu yunfeng 已提交
52
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
53
    pEE_->Serialize();
Y
yu yunfeng 已提交
54 55
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
56
    schema_.size = size;
Y
yu yunfeng 已提交
57 58 59

    server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size/total_time);

60
    schema_.file_type = (size >= options_.index_trigger_size) ?
61
        meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
X
Xu Peng 已提交
62

X
Xu Peng 已提交
63
    auto status = pMeta_->UpdateTableFile(schema_);
X
Xu Peng 已提交
64

65
    LOG(DEBUG) << "New " << ((schema_.file_type == meta::TableFileSchema::RAW) ? "raw" : "to_index")
66
        << " file " << schema_.file_id << " of size " << pEE_->Size() / meta::M << " M";
X
Xu Peng 已提交
67

X
Xu Peng 已提交
68
    pEE_->Cache();
X
Xu Peng 已提交
69

X
Xu Peng 已提交
70
    return status;
71 72
}

73 74
template<typename EngineT>
MemVectors<EngineT>::~MemVectors() {
X
Xu Peng 已提交
75 76 77
    if (pIdGenerator_ != nullptr) {
        delete pIdGenerator_;
        pIdGenerator_ = nullptr;
78 79 80 81 82 83 84
    }
}

/*
 * MemManager
 */

85
template<typename EngineT>
X
Xu Peng 已提交
86
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::GetMemByTable(
87
        const std::string& table_id) {
X
Xu Peng 已提交
88 89
    auto memIt = memMap_.find(table_id);
    if (memIt != memMap_.end()) {
X
Xu Peng 已提交
90
        return memIt->second;
91
    }
92

X
Xu Peng 已提交
93 94
    meta::TableFileSchema table_file;
    table_file.table_id = table_id;
X
Xu Peng 已提交
95
    auto status = pMeta_->CreateTableFile(table_file);
96 97 98
    if (!status.ok()) {
        return nullptr;
    }
X
Xu Peng 已提交
99

X
Xu Peng 已提交
100 101
    memMap_[table_id] = MemVectorsPtr(new MemVectors<EngineT>(pMeta_, table_file, options_));
    return memMap_[table_id];
102 103
}

104
template<typename EngineT>
X
Xu Peng 已提交
105
Status MemManager<EngineT>::InsertVectors(const std::string& table_id_,
106 107 108
        size_t n_,
        const float* vectors_,
        IDNumbers& vector_ids_) {
X
Xu Peng 已提交
109 110
    std::unique_lock<std::mutex> lock(mutex_);
    return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
111 112
}

113
template<typename EngineT>
X
Xu Peng 已提交
114
Status MemManager<EngineT>::InsertVectorsNoLock(const std::string& table_id,
115
        size_t n,
116
        const float* vectors,
X
Xu Peng 已提交
117
        IDNumbers& vector_ids) {
X
Xu Peng 已提交
118
    MemVectorsPtr mem = GetMemByTable(table_id);
119
    if (mem == nullptr) {
120
        return Status::NotFound("Group " + table_id + " not found!");
121
    }
X
Xu Peng 已提交
122
    mem->Add(n, vectors, vector_ids);
X
Xu Peng 已提交
123 124

    return Status::OK();
125 126
}

127
template<typename EngineT>
X
Xu Peng 已提交
128 129 130 131
Status MemManager<EngineT>::ToImmutable() {
    std::unique_lock<std::mutex> lock(mutex_);
    for (auto& kv: memMap_) {
        immMems_.push_back(kv.second);
X
Xu Peng 已提交
132
    }
X
Xu Peng 已提交
133

X
Xu Peng 已提交
134
    memMap_.clear();
135
    return Status::OK();
X
Xu Peng 已提交
136 137
}

138
template<typename EngineT>
X
Xu Peng 已提交
139 140
Status MemManager<EngineT>::Serialize(std::vector<std::string>& table_ids) {
    ToImmutable();
X
Xu Peng 已提交
141
    std::unique_lock<std::mutex> lock(serialization_mtx_);
142 143
    std::string table_id;
    table_ids.clear();
X
Xu Peng 已提交
144 145
    for (auto& mem : immMems_) {
        mem->Serialize(table_id);
146
        table_ids.push_back(table_id);
X
Xu Peng 已提交
147
    }
X
Xu Peng 已提交
148
    immMems_.clear();
149
    return Status::OK();
X
Xu Peng 已提交
150 151
}

152

X
Xu Peng 已提交
153 154 155
} // namespace engine
} // namespace vecwise
} // namespace zilliz