MemSegment.cpp 8.8 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

G
groot 已提交
12
#include "db/insert/MemSegment.h"
G
groot 已提交
13 14 15 16 17 18 19

#include <algorithm>
#include <cmath>
#include <iterator>
#include <string>
#include <vector>

W
Wang XiangYu 已提交
20
#include "config/ServerConfig.h"
G
groot 已提交
21
#include "db/Types.h"
G
groot 已提交
22 23 24
#include "db/Utils.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/Snapshots.h"
C
Cai Yudong 已提交
25
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
G
groot 已提交
26 27 28 29 30 31
#include "metrics/Metrics.h"
#include "utils/Log.h"

namespace milvus {
namespace engine {

G
groot 已提交
32
MemSegment::MemSegment(int64_t collection_id, int64_t partition_id, const DBOptions& options)
G
groot 已提交
33 34
    : collection_id_(collection_id), partition_id_(partition_id), options_(options) {
    current_mem_ = 0;
35
    //    CreateSegment();
G
groot 已提交
36 37 38
}

Status
G
groot 已提交
39
MemSegment::CreateSegment() {
G
groot 已提交
40 41 42
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
    if (!status.ok()) {
G
groot 已提交
43
        std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
44 45 46 47
        LOG_ENGINE_ERROR_ << err_msg;
        return status;
    }

G
groot 已提交
48
    // create segment
G
groot 已提交
49 50 51 52 53
    snapshot::OperationContext context;
    context.prev_partition = ss->GetResource<snapshot::Partition>(partition_id_);
    operation_ = std::make_shared<snapshot::NewSegmentOperation>(context, ss);
    status = operation_->CommitNewSegment(segment_);
    if (!status.ok()) {
G
groot 已提交
54
        std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
55 56 57 58
        LOG_ENGINE_ERROR_ << err_msg;
        return status;
    }

G
groot 已提交
59 60 61 62 63 64 65 66
    // create segment raw files (placeholder)
    auto names = ss->GetFieldNames();
    for (auto& name : names) {
        snapshot::SegmentFileContext sf_context;
        sf_context.collection_id = collection_id_;
        sf_context.partition_id = partition_id_;
        sf_context.segment_id = segment_->GetID();
        sf_context.field_name = name;
G
groot 已提交
67
        sf_context.field_element_name = engine::ELEMENT_RAW_DATA;
G
groot 已提交
68 69 70 71

        snapshot::SegmentFilePtr seg_file;
        status = operation_->CommitNewSegmentFile(sf_context, seg_file);
        if (!status.ok()) {
G
groot 已提交
72
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
73 74 75 76 77 78 79 80 81 82 83
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }
    }

    // create deleted_doc and bloom_filter files (placeholder)
    {
        snapshot::SegmentFileContext sf_context;
        sf_context.collection_id = collection_id_;
        sf_context.partition_id = partition_id_;
        sf_context.segment_id = segment_->GetID();
G
groot 已提交
84 85
        sf_context.field_name = engine::FIELD_UID;
        sf_context.field_element_name = engine::ELEMENT_DELETED_DOCS;
G
groot 已提交
86 87 88 89

        snapshot::SegmentFilePtr delete_doc_file, bloom_filter_file;
        status = operation_->CommitNewSegmentFile(sf_context, delete_doc_file);
        if (!status.ok()) {
G
groot 已提交
90
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
91 92 93 94
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }

G
groot 已提交
95
        sf_context.field_element_name = engine::ELEMENT_BLOOM_FILTER;
G
groot 已提交
96 97
        status = operation_->CommitNewSegmentFile(sf_context, bloom_filter_file);
        if (!status.ok()) {
G
groot 已提交
98
            std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
G
groot 已提交
99 100 101 102 103 104 105 106 107
            LOG_ENGINE_ERROR_ << err_msg;
            return status;
        }
    }

    auto ctx = operation_->GetContext();
    auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);

    // create segment writer
G
groot 已提交
108
    segment_writer_ptr_ = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, visitor);
G
groot 已提交
109 110

    return Status::OK();
G
groot 已提交
111 112
}

G
groot 已提交
113
Status
G
groot 已提交
114
MemSegment::GetSingleEntitySize(int64_t& single_size) {
G
groot 已提交
115 116 117
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
    if (!status.ok()) {
G
groot 已提交
118
        std::string err_msg = "MemSegment::SingleEntitySize failed: " + status.ToString();
G
groot 已提交
119
        LOG_ENGINE_ERROR_ << err_msg;
G
groot 已提交
120
        return status;
G
groot 已提交
121 122
    }

G
groot 已提交
123 124 125 126
    single_size = 0;
    std::vector<std::string> field_names = ss->GetFieldNames();
    for (auto& name : field_names) {
        snapshot::FieldPtr field = ss->GetField(name);
C
Cai Yudong 已提交
127
        auto ftype = static_cast<DataType>(field->GetFtype());
G
groot 已提交
128
        switch (ftype) {
G
groot 已提交
129
            case DataType::BOOL:
G
groot 已提交
130 131
                single_size += sizeof(bool);
                break;
G
groot 已提交
132
            case DataType::DOUBLE:
G
groot 已提交
133 134
                single_size += sizeof(double);
                break;
G
groot 已提交
135
            case DataType::FLOAT:
G
groot 已提交
136 137
                single_size += sizeof(float);
                break;
G
groot 已提交
138
            case DataType::INT8:
G
groot 已提交
139 140
                single_size += sizeof(uint8_t);
                break;
G
groot 已提交
141
            case DataType::INT16:
G
groot 已提交
142 143
                single_size += sizeof(uint16_t);
                break;
G
groot 已提交
144
            case DataType::INT32:
G
groot 已提交
145 146
                single_size += sizeof(uint32_t);
                break;
G
groot 已提交
147
            case DataType::INT64:
G
groot 已提交
148 149
                single_size += sizeof(uint64_t);
                break;
G
groot 已提交
150 151
            case DataType::VECTOR_FLOAT:
            case DataType::VECTOR_BINARY: {
G
groot 已提交
152
                json params = field->GetParams();
C
Cai Yudong 已提交
153
                if (params.find(knowhere::meta::DIM) == params.end()) {
G
groot 已提交
154 155 156 157 158
                    std::string msg = "Vector field params must contain: dimension";
                    LOG_SERVER_ERROR_ << msg;
                    return Status(DB_ERROR, msg);
                }

C
Cai Yudong 已提交
159
                int64_t dimension = params[knowhere::meta::DIM];
G
groot 已提交
160
                if (ftype == DataType::VECTOR_BINARY) {
G
groot 已提交
161 162 163 164 165 166 167
                    single_size += (dimension / 8);
                } else {
                    single_size += (dimension * sizeof(float));
                }

                break;
            }
C
cqy123456 已提交
168 169
            default:
                break;
G
groot 已提交
170
        }
G
groot 已提交
171 172
    }

G
groot 已提交
173
    return Status::OK();
G
groot 已提交
174 175 176
}

Status
G
groot 已提交
177
MemSegment::Add(const VectorSourcePtr& source) {
G
groot 已提交
178
    int64_t single_entity_mem_size = 0;
G
groot 已提交
179 180
    auto status = GetSingleEntitySize(single_entity_mem_size);
    if (!status.ok()) {
G
groot 已提交
181 182 183 184
        return status;
    }

    size_t mem_left = GetMemLeft();
C
cqy123456 已提交
185
    if (mem_left >= single_entity_mem_size && single_entity_mem_size != 0) {
G
groot 已提交
186 187
        int64_t num_entities_to_add = std::ceil(mem_left / single_entity_mem_size);
        int64_t num_entities_added;
G
groot 已提交
188

G
groot 已提交
189
        auto status = source->Add(segment_writer_ptr_, num_entities_to_add, num_entities_added);
G
groot 已提交
190 191 192 193 194 195 196 197 198 199

        if (status.ok()) {
            current_mem_ += (num_entities_added * single_entity_mem_size);
        }
        return status;
    }
    return Status::OK();
}

Status
G
groot 已提交
200
MemSegment::Delete(const std::vector<idx_t>& ids) {
G
groot 已提交
201
    engine::SegmentPtr segment_ptr;
G
groot 已提交
202
    segment_writer_ptr_->GetSegment(segment_ptr);
G
groot 已提交
203

G
groot 已提交
204
    // Check wither the doc_id is present, if yes, delete it's corresponding buffer
G
groot 已提交
205
    std::vector<idx_t> uids;
G
groot 已提交
206 207 208 209 210 211 212
    segment_writer_ptr_->LoadUids(uids);

    std::vector<offset_t> offsets;
    for (auto id : ids) {
        auto found = std::find(uids.begin(), uids.end(), id);
        if (found == uids.end()) {
            continue;
G
groot 已提交
213
        }
G
groot 已提交
214

G
groot 已提交
215 216
        auto offset = std::distance(uids.begin(), found);
        offsets.push_back(offset);
G
groot 已提交
217
    }
G
groot 已提交
218
    segment_ptr->DeleteEntity(offsets);
G
groot 已提交
219 220 221 222

    return Status::OK();
}

G
groot 已提交
223
int64_t
G
groot 已提交
224
MemSegment::GetCurrentMem() {
G
groot 已提交
225 226 227
    return current_mem_;
}

G
groot 已提交
228
int64_t
G
groot 已提交
229
MemSegment::GetMemLeft() {
G
groot 已提交
230 231 232 233
    return (MAX_TABLE_FILE_MEM - current_mem_);
}

bool
G
groot 已提交
234
MemSegment::IsFull() {
G
groot 已提交
235
    int64_t single_entity_mem_size = 0;
G
groot 已提交
236 237 238 239 240 241
    auto status = GetSingleEntitySize(single_entity_mem_size);
    if (!status.ok()) {
        return true;
    }

    return (GetMemLeft() < single_entity_mem_size);
G
groot 已提交
242 243 244
}

Status
245
MemSegment::Serialize() {
G
groot 已提交
246 247 248
    int64_t size = GetCurrentMem();
    server::CollectSerializeMetrics metrics(size);

G
groot 已提交
249 250 251 252 253 254
    // delete action could delete all entities of the segment
    // no need to serialize empty segment
    if (segment_writer_ptr_->RowCount() == 0) {
        return Status::OK();
    }

G
groot 已提交
255
    auto status = segment_writer_ptr_->Serialize();
G
groot 已提交
256 257 258 259 260
    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to serialize segment: " << segment_->GetID();
        return status;
    }

C
Cai Yudong 已提交
261 262
    STATUS_CHECK(operation_->CommitRowCount(segment_writer_ptr_->RowCount()));
    STATUS_CHECK(operation_->Push());
263
    LOG_ENGINE_DEBUG_ << "New segment " << segment_->GetID() << " serialized";
C
Cai Yudong 已提交
264
    return Status::OK();
G
groot 已提交
265 266 267
}

int64_t
G
groot 已提交
268
MemSegment::GetSegmentId() const {
G
groot 已提交
269 270 271 272 273
    return segment_->GetID();
}

}  // namespace engine
}  // namespace milvus