MemManagerImpl.cpp 3.7 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/insert/MemManagerImpl.h"
Z
zhiru 已提交
13
#include "VectorSource.h"
S
starlord 已提交
14
#include "db/Constants.h"
S
starlord 已提交
15
#include "utils/Log.h"
Z
zhiru 已提交
16 17

#include <thread>
Z
zhiru 已提交
18 19 20 21

namespace milvus {
namespace engine {

S
starlord 已提交
22
MemTablePtr
S
starlord 已提交
23
MemManagerImpl::GetMemByTable(const std::string& table_id) {
Z
zhiru 已提交
24 25 26 27 28 29 30 31 32
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
        return memIt->second;
    }

    mem_id_map_[table_id] = std::make_shared<MemTable>(table_id, meta_, options_);
    return mem_id_map_[table_id];
}

S
starlord 已提交
33
Status
G
groot 已提交
34
MemManagerImpl::InsertVectors(const std::string& table_id, VectorsData& vectors) {
S
starlord 已提交
35
    while (GetCurrentMem() > options_.insert_buffer_size_) {
Z
zhiru 已提交
36 37
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
Z
zhiru 已提交
38 39 40

    std::unique_lock<std::mutex> lock(mutex_);

G
groot 已提交
41
    return InsertVectorsNoLock(table_id, vectors);
Z
zhiru 已提交
42 43
}

S
starlord 已提交
44
Status
G
groot 已提交
45
MemManagerImpl::InsertVectorsNoLock(const std::string& table_id, VectorsData& vectors) {
Z
zhiru 已提交
46
    MemTablePtr mem = GetMemByTable(table_id);
G
groot 已提交
47
    VectorSourcePtr source = std::make_shared<VectorSource>(vectors);
Z
zhiru 已提交
48

G
groot 已提交
49
    auto status = mem->Add(source);
Z
zhiru 已提交
50
    if (status.ok()) {
G
groot 已提交
51 52
        if (vectors.id_array_.empty()) {
            vectors.id_array_ = source->GetVectorIds();
Y
Yu Kun 已提交
53
        }
Z
zhiru 已提交
54 55 56 57
    }
    return status;
}

S
starlord 已提交
58 59
Status
MemManagerImpl::ToImmutable() {
Z
zhiru 已提交
60 61
    std::unique_lock<std::mutex> lock(mutex_);
    MemIdMap temp_map;
S
starlord 已提交
62
    for (auto& kv : mem_id_map_) {
Z
update  
zhiru 已提交
63
        if (kv.second->Empty()) {
S
starlord 已提交
64
            // empty table, no need to serialize
Z
zhiru 已提交
65
            temp_map.insert(kv);
Z
update  
zhiru 已提交
66 67
        } else {
            immu_mem_list_.push_back(kv.second);
Z
zhiru 已提交
68 69 70 71 72 73 74
        }
    }

    mem_id_map_.swap(temp_map);
    return Status::OK();
}

S
starlord 已提交
75
Status
S
starlord 已提交
76
MemManagerImpl::Serialize(std::set<std::string>& table_ids) {
Z
zhiru 已提交
77 78 79
    ToImmutable();
    std::unique_lock<std::mutex> lock(serialization_mtx_);
    table_ids.clear();
S
starlord 已提交
80
    for (auto& mem : immu_mem_list_) {
Z
zhiru 已提交
81 82 83 84 85 86 87
        mem->Serialize();
        table_ids.insert(mem->GetTableId());
    }
    immu_mem_list_.clear();
    return Status::OK();
}

S
starlord 已提交
88
Status
S
starlord 已提交
89 90
MemManagerImpl::EraseMemVector(const std::string& table_id) {
    {  // erase MemVector from rapid-insert cache
Z
zhiru 已提交
91 92 93 94
        std::unique_lock<std::mutex> lock(mutex_);
        mem_id_map_.erase(table_id);
    }

S
starlord 已提交
95
    {  // erase MemVector from serialize cache
Z
zhiru 已提交
96 97
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        MemList temp_list;
S
starlord 已提交
98
        for (auto& mem : immu_mem_list_) {
Z
update  
zhiru 已提交
99
            if (mem->GetTableId() != table_id) {
Z
zhiru 已提交
100 101 102 103 104 105 106 107 108
                temp_list.push_back(mem);
            }
        }
        immu_mem_list_.swap(temp_list);
    }

    return Status::OK();
}

S
starlord 已提交
109 110
size_t
MemManagerImpl::GetCurrentMutableMem() {
Z
update  
zhiru 已提交
111
    size_t total_mem = 0;
112
    std::unique_lock<std::mutex> lock(mutex_);
S
starlord 已提交
113
    for (auto& kv : mem_id_map_) {
Z
zhiru 已提交
114
        auto memTable = kv.second;
Z
update  
zhiru 已提交
115
        total_mem += memTable->GetCurrentMem();
Z
zhiru 已提交
116
    }
Z
update  
zhiru 已提交
117
    return total_mem;
Z
zhiru 已提交
118 119
}

S
starlord 已提交
120 121
size_t
MemManagerImpl::GetCurrentImmutableMem() {
Z
update  
zhiru 已提交
122
    size_t total_mem = 0;
123
    std::unique_lock<std::mutex> lock(serialization_mtx_);
S
starlord 已提交
124
    for (auto& mem_table : immu_mem_list_) {
Z
update  
zhiru 已提交
125
        total_mem += mem_table->GetCurrentMem();
Z
zhiru 已提交
126
    }
Z
update  
zhiru 已提交
127
    return total_mem;
Z
zhiru 已提交
128 129
}

S
starlord 已提交
130 131
size_t
MemManagerImpl::GetCurrentMem() {
Z
zhiru 已提交
132 133 134
    return GetCurrentMutableMem() + GetCurrentImmutableMem();
}

S
starlord 已提交
135 136
}  // namespace engine
}  // namespace milvus