MemSegment.cpp 9.7 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

G
groot 已提交
12
#include "db/insert/MemSegment.h"
G
groot 已提交
13 14 15 16 17 18 19

#include <algorithm>
#include <cmath>
#include <iterator>
#include <string>
#include <vector>

W
Wang XiangYu 已提交
20
#include "config/ServerConfig.h"
G
groot 已提交
21 22 23
#include "db/Constants.h"
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
G
groot 已提交
24
#include "db/meta/MetaTypes.h"
G
groot 已提交
25 26
#include "db/snapshot/Operations.h"
#include "db/snapshot/Snapshots.h"
C
Cai Yudong 已提交
27
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
G
groot 已提交
28 29 30 31 32 33
#include "metrics/Metrics.h"
#include "utils/Log.h"

namespace milvus {
namespace engine {

G
groot 已提交
34
MemSegment::MemSegment(int64_t collection_id, int64_t partition_id, const DBOptions& options)
G
groot 已提交
35 36
    : collection_id_(collection_id), partition_id_(partition_id), options_(options) {
    current_mem_ = 0;
G
groot 已提交
37
    CreateSegment();
G
groot 已提交
38 39 40
}

Status
G
groot 已提交
41
MemSegment::CreateSegment() {
G
groot 已提交
42 43 44
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
    if (!status.ok()) {
G
groot 已提交
45
        std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
46 47 48 49
        LOG_ENGINE_ERROR_ << err_msg;
        return status;
    }

G
groot 已提交
50
    // create segment
G
groot 已提交
51 52 53 54 55
    snapshot::OperationContext context;
    context.prev_partition = ss->GetResource<snapshot::Partition>(partition_id_);
    operation_ = std::make_shared<snapshot::NewSegmentOperation>(context, ss);
    status = operation_->CommitNewSegment(segment_);
    if (!status.ok()) {
G
groot 已提交
56
        std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
57 58 59 60
        LOG_ENGINE_ERROR_ << err_msg;
        return status;
    }

G
groot 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73
    // create segment raw files (placeholder)
    auto names = ss->GetFieldNames();
    for (auto& name : names) {
        snapshot::SegmentFileContext sf_context;
        sf_context.collection_id = collection_id_;
        sf_context.partition_id = partition_id_;
        sf_context.segment_id = segment_->GetID();
        sf_context.field_name = name;
        sf_context.field_element_name = engine::DEFAULT_RAW_DATA_NAME;

        snapshot::SegmentFilePtr seg_file;
        status = operation_->CommitNewSegmentFile(sf_context, seg_file);
        if (!status.ok()) {
G
groot 已提交
74
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }
    }

    // create deleted_doc and bloom_filter files (placeholder)
    {
        snapshot::SegmentFileContext sf_context;
        sf_context.collection_id = collection_id_;
        sf_context.partition_id = partition_id_;
        sf_context.segment_id = segment_->GetID();
        sf_context.field_name = engine::DEFAULT_UID_NAME;
        sf_context.field_element_name = engine::DEFAULT_DELETED_DOCS_NAME;

        snapshot::SegmentFilePtr delete_doc_file, bloom_filter_file;
        status = operation_->CommitNewSegmentFile(sf_context, delete_doc_file);
        if (!status.ok()) {
G
groot 已提交
92
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
93 94 95 96 97 98 99
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }

        sf_context.field_element_name = engine::DEFAULT_BLOOM_FILTER_NAME;
        status = operation_->CommitNewSegmentFile(sf_context, bloom_filter_file);
        if (!status.ok()) {
G
groot 已提交
100
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
101 102 103 104 105 106 107 108 109
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }
    }

    auto ctx = operation_->GetContext();
    auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);

    // create segment writer
G
groot 已提交
110
    segment_writer_ptr_ = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, visitor);
G
groot 已提交
111 112

    return Status::OK();
G
groot 已提交
113 114
}

G
groot 已提交
115
Status
G
groot 已提交
116
MemSegment::GetSingleEntitySize(int64_t& single_size) {
G
groot 已提交
117 118 119
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
    if (!status.ok()) {
G
groot 已提交
120
        std::string err_msg = "MemSegment::SingleEntitySize failed: " + status.ToString();
G
groot 已提交
121
        LOG_ENGINE_ERROR_ << err_msg;
G
groot 已提交
122
        return status;
G
groot 已提交
123 124
    }

G
groot 已提交
125 126 127 128
    single_size = 0;
    std::vector<std::string> field_names = ss->GetFieldNames();
    for (auto& name : field_names) {
        snapshot::FieldPtr field = ss->GetField(name);
G
groot 已提交
129
        meta::DataType ftype = static_cast<meta::DataType>(field->GetFtype());
G
groot 已提交
130
        switch (ftype) {
G
groot 已提交
131
            case meta::DataType::BOOL:
G
groot 已提交
132 133
                single_size += sizeof(bool);
                break;
G
groot 已提交
134
            case meta::DataType::DOUBLE:
G
groot 已提交
135 136
                single_size += sizeof(double);
                break;
G
groot 已提交
137
            case meta::DataType::FLOAT:
G
groot 已提交
138 139
                single_size += sizeof(float);
                break;
G
groot 已提交
140
            case meta::DataType::INT8:
G
groot 已提交
141 142
                single_size += sizeof(uint8_t);
                break;
G
groot 已提交
143
            case meta::DataType::INT16:
G
groot 已提交
144 145
                single_size += sizeof(uint16_t);
                break;
G
groot 已提交
146
            case meta::DataType::INT32:
G
groot 已提交
147 148
                single_size += sizeof(uint32_t);
                break;
G
groot 已提交
149 150
            case meta::DataType::UID:
            case meta::DataType::INT64:
G
groot 已提交
151 152
                single_size += sizeof(uint64_t);
                break;
G
groot 已提交
153 154
            case meta::DataType::VECTOR_FLOAT:
            case meta::DataType::VECTOR_BINARY: {
G
groot 已提交
155
                json params = field->GetParams();
C
Cai Yudong 已提交
156
                if (params.find(knowhere::meta::DIM) == params.end()) {
G
groot 已提交
157 158 159 160 161
                    std::string msg = "Vector field params must contain: dimension";
                    LOG_SERVER_ERROR_ << msg;
                    return Status(DB_ERROR, msg);
                }

C
Cai Yudong 已提交
162
                int64_t dimension = params[knowhere::meta::DIM];
G
groot 已提交
163
                if (ftype == meta::DataType::VECTOR_BINARY) {
G
groot 已提交
164 165 166 167 168 169 170 171
                    single_size += (dimension / 8);
                } else {
                    single_size += (dimension * sizeof(float));
                }

                break;
            }
        }
G
groot 已提交
172 173
    }

G
groot 已提交
174
    return Status::OK();
G
groot 已提交
175 176 177
}

Status
G
groot 已提交
178
MemSegment::Add(const VectorSourcePtr& source) {
G
groot 已提交
179
    int64_t single_entity_mem_size = 0;
G
groot 已提交
180 181
    auto status = GetSingleEntitySize(single_entity_mem_size);
    if (!status.ok()) {
G
groot 已提交
182 183 184 185 186
        return status;
    }

    size_t mem_left = GetMemLeft();
    if (mem_left >= single_entity_mem_size) {
G
groot 已提交
187 188
        int64_t num_entities_to_add = std::ceil(mem_left / single_entity_mem_size);
        int64_t num_entities_added;
G
groot 已提交
189

G
groot 已提交
190
        auto status = source->Add(segment_writer_ptr_, num_entities_to_add, num_entities_added);
G
groot 已提交
191 192 193 194 195 196 197 198 199 200

        if (status.ok()) {
            current_mem_ += (num_entities_added * single_entity_mem_size);
        }
        return status;
    }
    return Status::OK();
}

Status
G
groot 已提交
201
MemSegment::Delete(segment::doc_id_t doc_id) {
G
groot 已提交
202
    engine::SegmentPtr segment_ptr;
G
groot 已提交
203
    segment_writer_ptr_->GetSegment(segment_ptr);
G
groot 已提交
204

G
groot 已提交
205
    // Check wither the doc_id is present, if yes, delete it's corresponding buffer
G
groot 已提交
206 207 208 209 210 211 212 213 214 215 216 217
    engine::FIXED_FIELD_DATA raw_data;
    auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
    if (!status.ok()) {
        return Status::OK();
    }

    int64_t* uids = reinterpret_cast<int64_t*>(raw_data.data());
    int64_t row_count = segment_ptr->GetRowCount();
    for (int64_t i = 0; i < row_count; i++) {
        if (doc_id == uids[i]) {
            segment_ptr->DeleteEntity(i);
        }
G
groot 已提交
218 219 220 221 222 223
    }

    return Status::OK();
}

Status
G
groot 已提交
224
MemSegment::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
G
groot 已提交
225
    engine::SegmentPtr segment_ptr;
G
groot 已提交
226 227 228 229 230 231 232 233 234
    segment_writer_ptr_->GetSegment(segment_ptr);

    // Check wither the doc_id is present, if yes, delete it's corresponding buffer
    std::vector<segment::doc_id_t> temp;
    temp.resize(doc_ids.size());
    memcpy(temp.data(), doc_ids.data(), doc_ids.size() * sizeof(segment::doc_id_t));

    std::sort(temp.begin(), temp.end());

G
groot 已提交
235 236 237 238 239
    engine::FIXED_FIELD_DATA raw_data;
    auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
    if (!status.ok()) {
        return Status::OK();
    }
G
groot 已提交
240

G
groot 已提交
241 242
    int64_t* uids = reinterpret_cast<int64_t*>(raw_data.data());
    int64_t row_count = segment_ptr->GetRowCount();
G
groot 已提交
243
    size_t deleted = 0;
G
groot 已提交
244
    for (int64_t i = 0; i < row_count; ++i) {
G
groot 已提交
245
        if (std::binary_search(temp.begin(), temp.end(), uids[i])) {
G
groot 已提交
246
            segment_ptr->DeleteEntity(i - deleted);
G
groot 已提交
247 248 249 250 251 252 253
            ++deleted;
        }
    }

    return Status::OK();
}

G
groot 已提交
254
int64_t
G
groot 已提交
255
MemSegment::GetCurrentMem() {
G
groot 已提交
256 257 258
    return current_mem_;
}

G
groot 已提交
259
int64_t
G
groot 已提交
260
MemSegment::GetMemLeft() {
G
groot 已提交
261 262 263 264
    return (MAX_TABLE_FILE_MEM - current_mem_);
}

bool
G
groot 已提交
265
MemSegment::IsFull() {
G
groot 已提交
266
    int64_t single_entity_mem_size = 0;
G
groot 已提交
267 268 269 270 271 272
    auto status = GetSingleEntitySize(single_entity_mem_size);
    if (!status.ok()) {
        return true;
    }

    return (GetMemLeft() < single_entity_mem_size);
G
groot 已提交
273 274 275
}

Status
G
groot 已提交
276
MemSegment::Serialize(uint64_t wal_lsn) {
G
groot 已提交
277 278 279
    int64_t size = GetCurrentMem();
    server::CollectSerializeMetrics metrics(size);

G
groot 已提交
280
    auto status = segment_writer_ptr_->Serialize();
G
groot 已提交
281 282 283 284 285
    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to serialize segment: " << segment_->GetID();
        return status;
    }

G
groot 已提交
286
    status = operation_->CommitRowCount(segment_writer_ptr_->RowCount());
G
groot 已提交
287
    status = operation_->Push();
G
groot 已提交
288
    LOG_ENGINE_DEBUG_ << "New segment " << segment_->GetID() << " serialized, lsn = " << wal_lsn;
G
groot 已提交
289 290 291 292
    return status;
}

int64_t
G
groot 已提交
293
MemSegment::GetSegmentId() const {
G
groot 已提交
294 295 296 297 298
    return segment_->GetID();
}

}  // namespace engine
}  // namespace milvus