MemManagerImpl.cpp 8.4 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/insert/MemManagerImpl.h"
13 14 15

#include <thread>

Z
zhiru 已提交
16
#include "VectorSource.h"
S
starlord 已提交
17
#include "db/Constants.h"
S
starlord 已提交
18
#include "utils/Log.h"
Z
zhiru 已提交
19

Z
zhiru 已提交
20 21 22
namespace milvus {
namespace engine {

S
starlord 已提交
23
MemTablePtr
S
starlord 已提交
24
MemManagerImpl::GetMemByTable(const std::string& table_id) {
Z
zhiru 已提交
25 26 27 28 29 30 31 32 33
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
        return memIt->second;
    }

    mem_id_map_[table_id] = std::make_shared<MemTable>(table_id, meta_, options_);
    return mem_id_map_[table_id];
}

S
starlord 已提交
34
Status
35 36 37 38 39 40 41 42 43
MemManagerImpl::InsertVectors(const std::string& table_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
                              const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
    flushed_tables.clear();
    if (GetCurrentMem() > options_.insert_buffer_size_) {
        ENGINE_LOG_DEBUG << "Insert buffer size exceeds limit. Performing force flush";
        auto status = Flush(flushed_tables);
        if (!status.ok()) {
            return status;
        }
Z
zhiru 已提交
44
    }
Z
zhiru 已提交
45

46 47 48 49 50 51 52 53
    VectorsData vectors_data;
    vectors_data.vector_count_ = length;
    vectors_data.float_data_.resize(length * dim);
    memcpy(vectors_data.float_data_.data(), vectors, length * dim * sizeof(float));
    vectors_data.id_array_.resize(length);
    memcpy(vectors_data.id_array_.data(), vector_ids, length * sizeof(IDNumber));
    VectorSourcePtr source = std::make_shared<VectorSource>(vectors_data);

Z
zhiru 已提交
54 55
    std::unique_lock<std::mutex> lock(mutex_);

56
    return InsertVectorsNoLock(table_id, source, lsn);
Z
zhiru 已提交
57 58
}

S
starlord 已提交
59
Status
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
MemManagerImpl::InsertVectors(const std::string& table_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
                              const uint8_t* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
    flushed_tables.clear();
    if (GetCurrentMem() > options_.insert_buffer_size_) {
        ENGINE_LOG_DEBUG << "Insert buffer size exceeds limit. Performing force flush";
        auto status = Flush(flushed_tables);
        if (!status.ok()) {
            return status;
        }
    }

    VectorsData vectors_data;
    vectors_data.vector_count_ = length;
    vectors_data.binary_data_.resize(length * dim);
    memcpy(vectors_data.binary_data_.data(), vectors, length * dim * sizeof(uint8_t));
    vectors_data.id_array_.resize(length);
    memcpy(vectors_data.id_array_.data(), vector_ids, length * sizeof(IDNumber));
    VectorSourcePtr source = std::make_shared<VectorSource>(vectors_data);

    std::unique_lock<std::mutex> lock(mutex_);

    return InsertVectorsNoLock(table_id, source, lsn);
}

Status
MemManagerImpl::InsertVectorsNoLock(const std::string& table_id, const VectorSourcePtr& source, uint64_t lsn) {
Z
zhiru 已提交
86
    MemTablePtr mem = GetMemByTable(table_id);
87
    mem->SetLSN(lsn);
Z
zhiru 已提交
88

G
groot 已提交
89
    auto status = mem->Add(source);
Z
zhiru 已提交
90 91 92
    return status;
}

S
starlord 已提交
93
Status
94
MemManagerImpl::DeleteVector(const std::string& table_id, IDNumber vector_id, uint64_t lsn) {
Z
zhiru 已提交
95
    std::unique_lock<std::mutex> lock(mutex_);
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    MemTablePtr mem = GetMemByTable(table_id);
    mem->SetLSN(lsn);
    auto status = mem->Delete(vector_id);
    return status;
}

Status
MemManagerImpl::DeleteVectors(const std::string& table_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) {
    std::unique_lock<std::mutex> lock(mutex_);
    MemTablePtr mem = GetMemByTable(table_id);
    mem->SetLSN(lsn);

    IDNumbers ids;
    ids.resize(length);
    memcpy(ids.data(), vector_ids, length * sizeof(IDNumber));

    auto status = mem->Delete(ids);
    if (!status.ok()) {
        return status;
    }

    //    // TODO(zhiru): loop for now
    //    for (auto& id : ids) {
    //        auto status = mem->Delete(id);
    //        if (!status.ok()) {
    //            return status;
    //        }
    //    }

    return Status::OK();
}

Status
MemManagerImpl::Flush(const std::string& table_id) {
    ToImmutable(table_id);
    // TODO: There is actually only one memTable in the immutable list
    MemList temp_immutable_list;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        immu_mem_list_.swap(temp_immutable_list);
    }

    std::unique_lock<std::mutex> lock(serialization_mtx_);
    auto max_lsn = GetMaxLSN(temp_immutable_list);
    for (auto& mem : temp_immutable_list) {
        ENGINE_LOG_DEBUG << "Flushing table: " << mem->GetTableId();
        auto status = mem->Serialize(max_lsn);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Flush table " << mem->GetTableId() << " failed";
            return status;
Z
zhiru 已提交
146
        }
147
        ENGINE_LOG_DEBUG << "Flushed table: " << mem->GetTableId();
Z
zhiru 已提交
148 149 150 151 152
    }

    return Status::OK();
}

S
starlord 已提交
153
Status
154
MemManagerImpl::Flush(std::set<std::string>& table_ids) {
Z
zhiru 已提交
155
    ToImmutable();
156 157 158 159 160 161 162

    MemList temp_immutable_list;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        immu_mem_list_.swap(temp_immutable_list);
    }

Z
zhiru 已提交
163 164
    std::unique_lock<std::mutex> lock(serialization_mtx_);
    table_ids.clear();
165 166 167 168 169 170 171 172
    auto max_lsn = GetMaxLSN(temp_immutable_list);
    for (auto& mem : temp_immutable_list) {
        ENGINE_LOG_DEBUG << "Flushing table: " << mem->GetTableId();
        auto status = mem->Serialize(max_lsn);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Flush table " << mem->GetTableId() << " failed";
            return status;
        }
Z
zhiru 已提交
173
        table_ids.insert(mem->GetTableId());
174
        ENGINE_LOG_DEBUG << "Flushed table: " << mem->GetTableId();
Z
zhiru 已提交
175
    }
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212

    meta_->SetGlobalLastLSN(max_lsn);

    return Status::OK();
}

Status
MemManagerImpl::ToImmutable(const std::string& table_id) {
    std::unique_lock<std::mutex> lock(mutex_);
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
        if (!memIt->second->Empty()) {
            immu_mem_list_.push_back(memIt->second);
            mem_id_map_.erase(memIt);
        }
        //        std::string err_msg = "Could not find table = " + table_id + " to flush";
        //        ENGINE_LOG_ERROR << err_msg;
        //        return Status(DB_NOT_FOUND, err_msg);
    }

    return Status::OK();
}

Status
MemManagerImpl::ToImmutable() {
    std::unique_lock<std::mutex> lock(mutex_);
    MemIdMap temp_map;
    for (auto& kv : mem_id_map_) {
        if (kv.second->Empty()) {
            // empty table without any deletes, no need to serialize
            temp_map.insert(kv);
        } else {
            immu_mem_list_.push_back(kv.second);
        }
    }

    mem_id_map_.swap(temp_map);
Z
zhiru 已提交
213 214 215
    return Status::OK();
}

S
starlord 已提交
216
Status
S
starlord 已提交
217 218
MemManagerImpl::EraseMemVector(const std::string& table_id) {
    {  // erase MemVector from rapid-insert cache
Z
zhiru 已提交
219 220 221 222
        std::unique_lock<std::mutex> lock(mutex_);
        mem_id_map_.erase(table_id);
    }

S
starlord 已提交
223
    {  // erase MemVector from serialize cache
Z
zhiru 已提交
224 225
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        MemList temp_list;
S
starlord 已提交
226
        for (auto& mem : immu_mem_list_) {
Z
update  
zhiru 已提交
227
            if (mem->GetTableId() != table_id) {
Z
zhiru 已提交
228 229 230 231 232 233 234 235 236
                temp_list.push_back(mem);
            }
        }
        immu_mem_list_.swap(temp_list);
    }

    return Status::OK();
}

S
starlord 已提交
237 238
size_t
MemManagerImpl::GetCurrentMutableMem() {
Z
update  
zhiru 已提交
239
    size_t total_mem = 0;
240
    std::unique_lock<std::mutex> lock(mutex_);
S
starlord 已提交
241
    for (auto& kv : mem_id_map_) {
Z
zhiru 已提交
242
        auto memTable = kv.second;
Z
update  
zhiru 已提交
243
        total_mem += memTable->GetCurrentMem();
Z
zhiru 已提交
244
    }
Z
update  
zhiru 已提交
245
    return total_mem;
Z
zhiru 已提交
246 247
}

S
starlord 已提交
248 249
size_t
MemManagerImpl::GetCurrentImmutableMem() {
Z
update  
zhiru 已提交
250
    size_t total_mem = 0;
251
    std::unique_lock<std::mutex> lock(serialization_mtx_);
S
starlord 已提交
252
    for (auto& mem_table : immu_mem_list_) {
Z
update  
zhiru 已提交
253
        total_mem += mem_table->GetCurrentMem();
Z
zhiru 已提交
254
    }
Z
update  
zhiru 已提交
255
    return total_mem;
Z
zhiru 已提交
256 257
}

S
starlord 已提交
258 259
size_t
MemManagerImpl::GetCurrentMem() {
Z
zhiru 已提交
260 261 262
    return GetCurrentMutableMem() + GetCurrentImmutableMem();
}

263 264 265 266 267 268 269 270 271 272 273 274
uint64_t
MemManagerImpl::GetMaxLSN(const MemList& tables) {
    uint64_t max_lsn = 0;
    for (auto& table : tables) {
        auto cur_lsn = table->GetLSN();
        if (table->GetLSN() > max_lsn) {
            max_lsn = cur_lsn;
        }
    }
    return max_lsn;
}

S
starlord 已提交
275 276
}  // namespace engine
}  // namespace milvus