MemManagerImpl.cpp 4.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/insert/MemManagerImpl.h"
Z
zhiru 已提交
19
#include "VectorSource.h"
S
starlord 已提交
20
#include "db/Constants.h"
S
starlord 已提交
21
#include "utils/Log.h"
Z
zhiru 已提交
22 23

#include <thread>
Z
zhiru 已提交
24 25 26 27

namespace milvus {
namespace engine {

S
starlord 已提交
28
MemTablePtr
S
starlord 已提交
29
MemManagerImpl::GetMemByTable(const std::string& table_id) {
Z
zhiru 已提交
30 31 32 33 34 35 36 37 38
    auto memIt = mem_id_map_.find(table_id);
    if (memIt != mem_id_map_.end()) {
        return memIt->second;
    }

    mem_id_map_[table_id] = std::make_shared<MemTable>(table_id, meta_, options_);
    return mem_id_map_[table_id];
}

S
starlord 已提交
39
Status
S
starlord 已提交
40
MemManagerImpl::InsertVectors(const std::string& table_id_, size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
S
starlord 已提交
41
    while (GetCurrentMem() > options_.insert_buffer_size_) {
Z
zhiru 已提交
42 43
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
Z
zhiru 已提交
44 45 46 47 48 49

    std::unique_lock<std::mutex> lock(mutex_);

    return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}

S
starlord 已提交
50
Status
S
starlord 已提交
51 52
MemManagerImpl::InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors,
                                    IDNumbers& vector_ids) {
Z
zhiru 已提交
53
    MemTablePtr mem = GetMemByTable(table_id);
S
starlord 已提交
54
    VectorSourcePtr source = std::make_shared<VectorSource>(n, vectors);
Z
zhiru 已提交
55

Y
Yu Kun 已提交
56
    auto status = mem->Add(source, vector_ids);
Z
zhiru 已提交
57
    if (status.ok()) {
Y
Yu Kun 已提交
58 59 60
        if (vector_ids.empty()) {
            vector_ids = source->GetVectorIds();
        }
Z
zhiru 已提交
61 62 63 64
    }
    return status;
}

S
starlord 已提交
65 66
Status
MemManagerImpl::ToImmutable() {
Z
zhiru 已提交
67 68
    std::unique_lock<std::mutex> lock(mutex_);
    MemIdMap temp_map;
S
starlord 已提交
69
    for (auto& kv : mem_id_map_) {
Z
update  
zhiru 已提交
70
        if (kv.second->Empty()) {
S
starlord 已提交
71
            // empty table, no need to serialize
Z
zhiru 已提交
72
            temp_map.insert(kv);
Z
update  
zhiru 已提交
73 74
        } else {
            immu_mem_list_.push_back(kv.second);
Z
zhiru 已提交
75 76 77 78 79 80 81
        }
    }

    mem_id_map_.swap(temp_map);
    return Status::OK();
}

S
starlord 已提交
82
Status
S
starlord 已提交
83
MemManagerImpl::Serialize(std::set<std::string>& table_ids) {
Z
zhiru 已提交
84 85 86
    ToImmutable();
    std::unique_lock<std::mutex> lock(serialization_mtx_);
    table_ids.clear();
S
starlord 已提交
87
    for (auto& mem : immu_mem_list_) {
Z
zhiru 已提交
88 89 90 91 92 93 94
        mem->Serialize();
        table_ids.insert(mem->GetTableId());
    }
    immu_mem_list_.clear();
    return Status::OK();
}

S
starlord 已提交
95
Status
S
starlord 已提交
96 97
MemManagerImpl::EraseMemVector(const std::string& table_id) {
    {  // erase MemVector from rapid-insert cache
Z
zhiru 已提交
98 99 100 101
        std::unique_lock<std::mutex> lock(mutex_);
        mem_id_map_.erase(table_id);
    }

S
starlord 已提交
102
    {  // erase MemVector from serialize cache
Z
zhiru 已提交
103 104
        std::unique_lock<std::mutex> lock(serialization_mtx_);
        MemList temp_list;
S
starlord 已提交
105
        for (auto& mem : immu_mem_list_) {
Z
update  
zhiru 已提交
106
            if (mem->GetTableId() != table_id) {
Z
zhiru 已提交
107 108 109 110 111 112 113 114 115
                temp_list.push_back(mem);
            }
        }
        immu_mem_list_.swap(temp_list);
    }

    return Status::OK();
}

S
starlord 已提交
116 117
size_t
MemManagerImpl::GetCurrentMutableMem() {
Z
update  
zhiru 已提交
118
    size_t total_mem = 0;
S
starlord 已提交
119
    for (auto& kv : mem_id_map_) {
Z
zhiru 已提交
120
        auto memTable = kv.second;
Z
update  
zhiru 已提交
121
        total_mem += memTable->GetCurrentMem();
Z
zhiru 已提交
122
    }
Z
update  
zhiru 已提交
123
    return total_mem;
Z
zhiru 已提交
124 125
}

S
starlord 已提交
126 127
size_t
MemManagerImpl::GetCurrentImmutableMem() {
Z
update  
zhiru 已提交
128
    size_t total_mem = 0;
S
starlord 已提交
129
    for (auto& mem_table : immu_mem_list_) {
Z
update  
zhiru 已提交
130
        total_mem += mem_table->GetCurrentMem();
Z
zhiru 已提交
131
    }
Z
update  
zhiru 已提交
132
    return total_mem;
Z
zhiru 已提交
133 134
}

S
starlord 已提交
135 136
size_t
MemManagerImpl::GetCurrentMem() {
Z
zhiru 已提交
137 138 139
    return GetCurrentMutableMem() + GetCurrentImmutableMem();
}

S
starlord 已提交
140 141
}  // namespace engine
}  // namespace milvus