MemSegment.cpp 9.6 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

G
groot 已提交
12
#include "db/insert/MemSegment.h"
G
groot 已提交
13 14 15 16 17 18 19

#include <algorithm>
#include <cmath>
#include <iterator>
#include <string>
#include <vector>

W
Wang XiangYu 已提交
20
#include "config/ServerConfig.h"
G
groot 已提交
21
#include "db/Constants.h"
G
groot 已提交
22
#include "db/Types.h"
G
groot 已提交
23 24 25
#include "db/Utils.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/Snapshots.h"
C
Cai Yudong 已提交
26
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
G
groot 已提交
27 28 29 30 31 32
#include "metrics/Metrics.h"
#include "utils/Log.h"

namespace milvus {
namespace engine {

G
groot 已提交
33
MemSegment::MemSegment(int64_t collection_id, int64_t partition_id, const DBOptions& options)
G
groot 已提交
34 35
    : collection_id_(collection_id), partition_id_(partition_id), options_(options) {
    current_mem_ = 0;
G
groot 已提交
36
    CreateSegment();
G
groot 已提交
37 38 39
}

Status
G
groot 已提交
40
MemSegment::CreateSegment() {
G
groot 已提交
41 42 43
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
    if (!status.ok()) {
G
groot 已提交
44
        std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
45 46 47 48
        LOG_ENGINE_ERROR_ << err_msg;
        return status;
    }

G
groot 已提交
49
    // create segment
G
groot 已提交
50 51 52 53 54
    snapshot::OperationContext context;
    context.prev_partition = ss->GetResource<snapshot::Partition>(partition_id_);
    operation_ = std::make_shared<snapshot::NewSegmentOperation>(context, ss);
    status = operation_->CommitNewSegment(segment_);
    if (!status.ok()) {
G
groot 已提交
55
        std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
56 57 58 59
        LOG_ENGINE_ERROR_ << err_msg;
        return status;
    }

G
groot 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72
    // create segment raw files (placeholder)
    auto names = ss->GetFieldNames();
    for (auto& name : names) {
        snapshot::SegmentFileContext sf_context;
        sf_context.collection_id = collection_id_;
        sf_context.partition_id = partition_id_;
        sf_context.segment_id = segment_->GetID();
        sf_context.field_name = name;
        sf_context.field_element_name = engine::DEFAULT_RAW_DATA_NAME;

        snapshot::SegmentFilePtr seg_file;
        status = operation_->CommitNewSegmentFile(sf_context, seg_file);
        if (!status.ok()) {
G
groot 已提交
73
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }
    }

    // create deleted_doc and bloom_filter files (placeholder)
    {
        snapshot::SegmentFileContext sf_context;
        sf_context.collection_id = collection_id_;
        sf_context.partition_id = partition_id_;
        sf_context.segment_id = segment_->GetID();
        sf_context.field_name = engine::DEFAULT_UID_NAME;
        sf_context.field_element_name = engine::DEFAULT_DELETED_DOCS_NAME;

        snapshot::SegmentFilePtr delete_doc_file, bloom_filter_file;
        status = operation_->CommitNewSegmentFile(sf_context, delete_doc_file);
        if (!status.ok()) {
G
groot 已提交
91
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
92 93 94 95 96 97 98
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }

        sf_context.field_element_name = engine::DEFAULT_BLOOM_FILTER_NAME;
        status = operation_->CommitNewSegmentFile(sf_context, bloom_filter_file);
        if (!status.ok()) {
G
groot 已提交
99
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
100 101 102 103 104 105 106 107 108
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }
    }

    auto ctx = operation_->GetContext();
    auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);

    // create segment writer
G
groot 已提交
109
    segment_writer_ptr_ = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, visitor);
G
groot 已提交
110 111

    return Status::OK();
G
groot 已提交
112 113
}

G
groot 已提交
114
Status
G
groot 已提交
115
MemSegment::GetSingleEntitySize(int64_t& single_size) {
G
groot 已提交
116 117 118
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
    if (!status.ok()) {
G
groot 已提交
119
        std::string err_msg = "MemSegment::SingleEntitySize failed: " + status.ToString();
G
groot 已提交
120
        LOG_ENGINE_ERROR_ << err_msg;
G
groot 已提交
121
        return status;
G
groot 已提交
122 123
    }

G
groot 已提交
124 125 126 127
    single_size = 0;
    std::vector<std::string> field_names = ss->GetFieldNames();
    for (auto& name : field_names) {
        snapshot::FieldPtr field = ss->GetField(name);
G
groot 已提交
128
        DataType ftype = static_cast<DataType>(field->GetFtype());
G
groot 已提交
129
        switch (ftype) {
G
groot 已提交
130
            case DataType::BOOL:
G
groot 已提交
131 132
                single_size += sizeof(bool);
                break;
G
groot 已提交
133
            case DataType::DOUBLE:
G
groot 已提交
134 135
                single_size += sizeof(double);
                break;
G
groot 已提交
136
            case DataType::FLOAT:
G
groot 已提交
137 138
                single_size += sizeof(float);
                break;
G
groot 已提交
139
            case DataType::INT8:
G
groot 已提交
140 141
                single_size += sizeof(uint8_t);
                break;
G
groot 已提交
142
            case DataType::INT16:
G
groot 已提交
143 144
                single_size += sizeof(uint16_t);
                break;
G
groot 已提交
145
            case DataType::INT32:
G
groot 已提交
146 147
                single_size += sizeof(uint32_t);
                break;
G
groot 已提交
148
            case DataType::INT64:
G
groot 已提交
149 150
                single_size += sizeof(uint64_t);
                break;
G
groot 已提交
151 152
            case DataType::VECTOR_FLOAT:
            case DataType::VECTOR_BINARY: {
G
groot 已提交
153
                json params = field->GetParams();
C
Cai Yudong 已提交
154
                if (params.find(knowhere::meta::DIM) == params.end()) {
G
groot 已提交
155 156 157 158 159
                    std::string msg = "Vector field params must contain: dimension";
                    LOG_SERVER_ERROR_ << msg;
                    return Status(DB_ERROR, msg);
                }

C
Cai Yudong 已提交
160
                int64_t dimension = params[knowhere::meta::DIM];
G
groot 已提交
161
                if (ftype == DataType::VECTOR_BINARY) {
G
groot 已提交
162 163 164 165 166 167 168 169
                    single_size += (dimension / 8);
                } else {
                    single_size += (dimension * sizeof(float));
                }

                break;
            }
        }
G
groot 已提交
170 171
    }

G
groot 已提交
172
    return Status::OK();
G
groot 已提交
173 174 175
}

Status
G
groot 已提交
176
MemSegment::Add(const VectorSourcePtr& source) {
G
groot 已提交
177
    int64_t single_entity_mem_size = 0;
G
groot 已提交
178 179
    auto status = GetSingleEntitySize(single_entity_mem_size);
    if (!status.ok()) {
G
groot 已提交
180 181 182 183 184
        return status;
    }

    size_t mem_left = GetMemLeft();
    if (mem_left >= single_entity_mem_size) {
G
groot 已提交
185 186
        int64_t num_entities_to_add = std::ceil(mem_left / single_entity_mem_size);
        int64_t num_entities_added;
G
groot 已提交
187

G
groot 已提交
188
        auto status = source->Add(segment_writer_ptr_, num_entities_to_add, num_entities_added);
G
groot 已提交
189 190 191 192 193 194 195 196 197 198

        if (status.ok()) {
            current_mem_ += (num_entities_added * single_entity_mem_size);
        }
        return status;
    }
    return Status::OK();
}

Status
G
groot 已提交
199
MemSegment::Delete(segment::doc_id_t doc_id) {
G
groot 已提交
200
    engine::SegmentPtr segment_ptr;
G
groot 已提交
201
    segment_writer_ptr_->GetSegment(segment_ptr);
G
groot 已提交
202

G
groot 已提交
203
    // Check wither the doc_id is present, if yes, delete it's corresponding buffer
G
groot 已提交
204
    engine::BinaryDataPtr raw_data;
G
groot 已提交
205 206 207 208 209
    auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
    if (!status.ok()) {
        return Status::OK();
    }

G
groot 已提交
210
    int64_t* uids = reinterpret_cast<int64_t*>(raw_data->data_.data());
G
groot 已提交
211 212 213 214 215
    int64_t row_count = segment_ptr->GetRowCount();
    for (int64_t i = 0; i < row_count; i++) {
        if (doc_id == uids[i]) {
            segment_ptr->DeleteEntity(i);
        }
G
groot 已提交
216 217 218 219 220 221
    }

    return Status::OK();
}

Status
G
groot 已提交
222
MemSegment::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
G
groot 已提交
223
    engine::SegmentPtr segment_ptr;
G
groot 已提交
224 225 226 227 228 229 230 231 232
    segment_writer_ptr_->GetSegment(segment_ptr);

    // Check wither the doc_id is present, if yes, delete it's corresponding buffer
    std::vector<segment::doc_id_t> temp;
    temp.resize(doc_ids.size());
    memcpy(temp.data(), doc_ids.data(), doc_ids.size() * sizeof(segment::doc_id_t));

    std::sort(temp.begin(), temp.end());

G
groot 已提交
233
    engine::BinaryDataPtr raw_data;
G
groot 已提交
234 235 236 237
    auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
    if (!status.ok()) {
        return Status::OK();
    }
G
groot 已提交
238

G
groot 已提交
239
    int64_t* uids = reinterpret_cast<int64_t*>(raw_data->data_.data());
G
groot 已提交
240
    int64_t row_count = segment_ptr->GetRowCount();
G
groot 已提交
241
    size_t deleted = 0;
G
groot 已提交
242
    for (int64_t i = 0; i < row_count; ++i) {
G
groot 已提交
243
        if (std::binary_search(temp.begin(), temp.end(), uids[i])) {
G
groot 已提交
244
            segment_ptr->DeleteEntity(i - deleted);
G
groot 已提交
245 246 247 248 249 250 251
            ++deleted;
        }
    }

    return Status::OK();
}

G
groot 已提交
252
int64_t
G
groot 已提交
253
MemSegment::GetCurrentMem() {
G
groot 已提交
254 255 256
    return current_mem_;
}

G
groot 已提交
257
int64_t
G
groot 已提交
258
MemSegment::GetMemLeft() {
G
groot 已提交
259 260 261 262
    return (MAX_TABLE_FILE_MEM - current_mem_);
}

bool
G
groot 已提交
263
MemSegment::IsFull() {
G
groot 已提交
264
    int64_t single_entity_mem_size = 0;
G
groot 已提交
265 266 267 268 269 270
    auto status = GetSingleEntitySize(single_entity_mem_size);
    if (!status.ok()) {
        return true;
    }

    return (GetMemLeft() < single_entity_mem_size);
G
groot 已提交
271 272 273
}

Status
G
groot 已提交
274
MemSegment::Serialize(uint64_t wal_lsn) {
G
groot 已提交
275 276 277
    int64_t size = GetCurrentMem();
    server::CollectSerializeMetrics metrics(size);

G
groot 已提交
278
    auto status = segment_writer_ptr_->Serialize();
G
groot 已提交
279 280 281 282 283
    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to serialize segment: " << segment_->GetID();
        return status;
    }

G
groot 已提交
284
    status = operation_->CommitRowCount(segment_writer_ptr_->RowCount());
G
groot 已提交
285
    status = operation_->Push();
G
groot 已提交
286
    LOG_ENGINE_DEBUG_ << "New segment " << segment_->GetID() << " serialized, lsn = " << wal_lsn;
G
groot 已提交
287 288 289 290
    return status;
}

int64_t
G
groot 已提交
291
MemSegment::GetSegmentId() const {
G
groot 已提交
292 293 294 295 296
    return segment_->GetID();
}

}  // namespace engine
}  // namespace milvus