SegmentNaive.cpp 17.4 KB
Newer Older
F
FluorineDog 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

G
GuoRentong 已提交
12
#include <segcore/SegmentNaive.h>
Z
zhenshan.cao 已提交
13
#include <random>
14 15 16
#include <algorithm>
#include <numeric>
#include <thread>
Z
zhenshan.cao 已提交
17
#include <queue>
18

F
FluorineDog 已提交
19 20
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
B
bigsheeper 已提交
21
#include <faiss/utils/distances.h>
Z
zhenshan.cao 已提交
22
#include <faiss/utils/BitsetView.h>
23
#include "segcore/Reduce.h"
F
FluorineDog 已提交
24

G
GuoRentong 已提交
25
namespace milvus::segcore {
26

27 28 29 30 31
int64_t
SegmentNaive::PreInsert(int64_t size) {
    auto reserved_begin = record_.reserved.fetch_add(size);
    return reserved_begin;
}
S
shengjh 已提交
32

33 34
int64_t
SegmentNaive::PreDelete(int64_t size) {
Z
zhenshan.cao 已提交
35 36 37 38
    auto reserved_begin = deleted_record_.reserved.fetch_add(size);
    return reserved_begin;
}

39 40 41
auto
SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force)
    -> std::shared_ptr<DeletedRecord::TmpBitmap> {
Z
zhenshan.cao 已提交
42
    auto old = deleted_record_.get_lru_entry();
43

44
    if (!force || old->bitmap_ptr->count() == insert_barrier) {
45 46 47
        if (old->del_barrier == del_barrier) {
            return old;
        }
Z
zhenshan.cao 已提交
48 49
    }

50 51 52 53
    auto current = old->clone(insert_barrier);
    current->del_barrier = del_barrier;

    auto bitmap = current->bitmap_ptr;
F
FluorineDog 已提交
54 55
    if (del_barrier < old->del_barrier) {
        for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
Z
zhenshan.cao 已提交
56 57 58 59 60
            // get uid in delete logs
            auto uid = deleted_record_.uids_[del_index];
            // map uid to corrensponding offsets, select the max one, which should be the target
            // the max one should be closest to query_timestamp, so the delete log should refer to it
            int64_t the_offset = -1;
61
            auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
F
FluorineDog 已提交
62
            for (auto iter = iter_b; iter != iter_e; ++iter) {
Z
zhenshan.cao 已提交
63
                auto offset = iter->second;
F
FluorineDog 已提交
64
                if (record_.timestamps_[offset] < query_timestamp) {
B
bigsheeper 已提交
65
                    Assert(offset < insert_barrier);
Z
zhenshan.cao 已提交
66 67 68 69
                    the_offset = std::max(the_offset, offset);
                }
            }
            // if not found, skip
F
FluorineDog 已提交
70
            if (the_offset == -1) {
Z
zhenshan.cao 已提交
71 72 73
                continue;
            }
            // otherwise, clear the flag
74
            bitmap->clear(the_offset);
Z
zhenshan.cao 已提交
75 76 77
        }
        return current;
    } else {
F
FluorineDog 已提交
78
        for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
Z
zhenshan.cao 已提交
79 80 81 82 83
            // get uid in delete logs
            auto uid = deleted_record_.uids_[del_index];
            // map uid to corrensponding offsets, select the max one, which should be the target
            // the max one should be closest to query_timestamp, so the delete log should refer to it
            int64_t the_offset = -1;
84
            auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
F
FluorineDog 已提交
85
            for (auto iter = iter_b; iter != iter_e; ++iter) {
Z
zhenshan.cao 已提交
86
                auto offset = iter->second;
F
FluorineDog 已提交
87
                if (offset >= insert_barrier) {
Z
zhenshan.cao 已提交
88 89
                    continue;
                }
F
FluorineDog 已提交
90
                if (record_.timestamps_[offset] < query_timestamp) {
B
bigsheeper 已提交
91
                    Assert(offset < insert_barrier);
Z
zhenshan.cao 已提交
92 93 94 95 96
                    the_offset = std::max(the_offset, offset);
                }
            }

            // if not found, skip
F
FluorineDog 已提交
97
            if (the_offset == -1) {
Z
zhenshan.cao 已提交
98 99 100 101
                continue;
            }

            // otherwise, set the flag
102
            bitmap->set(the_offset);
Z
zhenshan.cao 已提交
103 104 105 106
        }
        this->deleted_record_.insert_lru_entry(current);
    }
    return current;
107
}
108

109
Status
110 111 112 113
SegmentNaive::Insert(int64_t reserved_begin,
                     int64_t size,
                     const int64_t* uids_raw,
                     const Timestamp* timestamps_raw,
G
GuoRentong 已提交
114
                     const RowBasedRawData& entities_raw) {
B
bigsheeper 已提交
115
    Assert(entities_raw.count == size);
116
    if (entities_raw.sizeof_per_row != schema_->get_total_sizeof()) {
117 118
        std::string msg = "entity length = " + std::to_string(entities_raw.sizeof_per_row) +
                          ", schema length = " + std::to_string(schema_->get_total_sizeof());
119 120
        throw std::runtime_error(msg);
    }
121 122

    auto raw_data = reinterpret_cast<const char*>(entities_raw.raw_data);
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    //    std::vector<char> entities(raw_data, raw_data + size * len_per_row);

    auto len_per_row = entities_raw.sizeof_per_row;
    std::vector<std::tuple<Timestamp, idx_t, int64_t>> ordering;
    ordering.resize(size);
    // #pragma omp parallel for
    for (int i = 0; i < size; ++i) {
        ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i], i);
    }
    std::sort(ordering.begin(), ordering.end());
    auto sizeof_infos = schema_->get_sizeof_infos();
    std::vector<int> offset_infos(schema_->size() + 1, 0);
    std::partial_sum(sizeof_infos.begin(), sizeof_infos.end(), offset_infos.begin() + 1);
    std::vector<std::vector<char>> entities(schema_->size());

    for (int fid = 0; fid < schema_->size(); ++fid) {
        auto len = sizeof_infos[fid];
        entities[fid].resize(len * size);
141 142
    }

143 144 145 146
    std::vector<idx_t> uids(size);
    std::vector<Timestamp> timestamps(size);
    // #pragma omp parallel for
    for (int index = 0; index < size; ++index) {
147
        auto [t, uid, order_index] = ordering[index];
148 149 150 151 152 153 154 155 156
        timestamps[index] = t;
        uids[index] = uid;
        for (int fid = 0; fid < schema_->size(); ++fid) {
            auto len = sizeof_infos[fid];
            auto offset = offset_infos[fid];
            auto src = raw_data + offset + order_index * len_per_row;
            auto dst = entities[fid].data() + index * len;
            memcpy(dst, src, len);
        }
157 158
    }

159 160 161 162
    record_.timestamps_.set_data(reserved_begin, timestamps.data(), size);
    record_.uids_.set_data(reserved_begin, uids.data(), size);
    for (int fid = 0; fid < schema_->size(); ++fid) {
        record_.entity_vec_[fid]->set_data_raw(reserved_begin, entities[fid].data(), size);
S
shengjh 已提交
163
    }
164

F
FluorineDog 已提交
165
    for (int i = 0; i < uids.size(); ++i) {
Z
zhenshan.cao 已提交
166 167 168 169 170 171
        auto uid = uids[i];
        // NOTE: this must be the last step, cannot be put above
        uid2offset_.insert(std::make_pair(uid, reserved_begin + i));
    }

    record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
172 173 174 175 176 177
    return Status::OK();

    //    std::thread go(executor, std::move(uids), std::move(timestamps), std::move(entities));
    //    go.detach();
    //    const auto& schema = *schema_;
    //    auto record_ptr = GetMutableRecord();
B
bigsheeper 已提交
178
    //    Assert(record_ptr);
179 180
    //    auto& record = *record_ptr;
    //    auto data_chunk = ColumnBasedDataChunk::from(row_values, schema);
181
    //
182 183
    //    // TODO: use shared_lock for better concurrency
    //    std::lock_guard lck(mutex_);
B
bigsheeper 已提交
184
    //    Assert(state_ == SegmentState::Open);
185
    //    auto ack_id = ack_count_.load();
G
GuoRentong 已提交
186
    //    record.uids_.grow_by(row_ids, row_ids + size);
187
    //    for (int64_t i = 0; i < size; ++i) {
G
GuoRentong 已提交
188
    //        auto key = row_ids[i];
189 190 191 192 193 194 195 196 197
    //        auto internal_index = i + ack_id;
    //        internal_indexes_[key] = internal_index;
    //    }
    //    record.timestamps_.grow_by(timestamps, timestamps + size);
    //    for (int fid = 0; fid < schema.size(); ++fid) {
    //        auto field = schema[fid];
    //        auto total_len = field.get_sizeof() * size / sizeof(float);
    //        auto source_vec = data_chunk.entity_vecs[fid];
    //        record.entity_vecs_[fid].grow_by(source_vec.data(), source_vec.data() + total_len);
198 199
    //    }
    //
200 201 202
    //    // finish insert
    //    ack_count_ += size;
    //    return Status::OK();
203 204 205
}

Status
206
SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_raw, const Timestamp* timestamps_raw) {
Z
zhenshan.cao 已提交
207 208 209 210 211 212 213 214 215 216 217
    std::vector<std::tuple<Timestamp, idx_t>> ordering;
    ordering.resize(size);
    // #pragma omp parallel for
    for (int i = 0; i < size; ++i) {
        ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]);
    }
    std::sort(ordering.begin(), ordering.end());
    std::vector<idx_t> uids(size);
    std::vector<Timestamp> timestamps(size);
    // #pragma omp parallel for
    for (int index = 0; index < size; ++index) {
218
        auto [t, uid] = ordering[index];
Z
zhenshan.cao 已提交
219 220 221 222 223 224 225
        timestamps[index] = t;
        uids[index] = uid;
    }
    deleted_record_.timestamps_.set_data(reserved_begin, timestamps.data(), size);
    deleted_record_.uids_.set_data(reserved_begin, uids.data(), size);
    deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
    return Status::OK();
226
    //    for (int i = 0; i < size; ++i) {
G
GuoRentong 已提交
227
    //        auto key = row_ids[i];
228 229 230 231
    //        auto time = timestamps[i];
    //        delete_logs_.insert(std::make_pair(key, time));
    //    }
    //    return Status::OK();
S
shengjh 已提交
232
}
233

234
Status
235
SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
236 237 238
    auto ins_barrier = get_barrier(record_, timestamp);
    auto del_barrier = get_barrier(deleted_record_, timestamp);
    auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier, true);
B
bigsheeper 已提交
239
    Assert(bitmap_holder);
240
    Assert(bitmap_holder->bitmap_ptr->count() == ins_barrier);
241 242

    auto field_offset = schema_->get_offset(query_info->field_name);
243
    auto& field = schema_->operator[](query_info->field_name);
244

B
bigsheeper 已提交
245
    Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
246 247 248 249
    auto dim = field.get_dim();
    auto bitmap = bitmap_holder->bitmap_ptr;
    auto topK = query_info->topK;
    auto num_queries = query_info->num_queries;
B
bigsheeper 已提交
250
    auto the_offset_opt = schema_->get_offset(query_info->field_name);
B
bigsheeper 已提交
251 252
    Assert(the_offset_opt.has_value());
    Assert(the_offset_opt.value() < record_.entity_vec_.size());
Z
zhenshan.cao 已提交
253 254
    auto vec_ptr =
        std::static_pointer_cast<ConcurrentVector<FloatVector>>(record_.entity_vec_.at(the_offset_opt.value()));
255 256 257 258 259 260
    auto index_entry = index_meta_->lookup_by_field(query_info->field_name);
    auto conf = index_entry.config;

    conf[milvus::knowhere::meta::TOPK] = query_info->topK;
    {
        auto count = 0;
261
        for (int i = 0; i < bitmap->count(); ++i) {
B
bigsheeper 已提交
262
            if (bitmap->test(i)) {
263 264 265 266 267 268 269 270
                ++count;
            }
        }
        std::cout << "fuck " << count << std::endl;
    }

    auto indexing = std::static_pointer_cast<knowhere::VecIndex>(indexings_[index_entry.index_name]);
    auto ds = knowhere::GenDataset(query_info->num_queries, dim, query_info->query_raw_data.data());
271
    auto final = indexing->Query(ds, conf, bitmap);
272

273 274
    auto ids = final->Get<idx_t*>(knowhere::meta::IDS);
    auto distances = final->Get<float*>(knowhere::meta::DISTANCE);
275 276

    auto total_num = num_queries * topK;
S
sunby 已提交
277
    result.result_ids_.resize(total_num);
278 279 280 281 282
    result.result_distances_.resize(total_num);

    result.num_queries_ = num_queries;
    result.topK_ = topK;

S
sunby 已提交
283
    std::copy_n(ids, total_num, result.result_ids_.data());
284 285
    std::copy_n(distances, total_num, result.result_distances_.data());

S
sunby 已提交
286 287 288 289
    for (auto& id : result.result_ids_) {
        id = record_.uids_[id];
    }

290 291 292
    return Status::OK();
}

B
bigsheeper 已提交
293
Status
294
SegmentNaive::QueryBruteForceImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) {
F
FluorineDog 已提交
295
    PanicInfo("deprecated");
B
bigsheeper 已提交
296
}
Z
zhenshan.cao 已提交
297

B
bigsheeper 已提交
298
Status
299
SegmentNaive::QuerySlowImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
300 301 302
    auto ins_barrier = get_barrier(record_, timestamp);
    auto del_barrier = get_barrier(deleted_record_, timestamp);
    auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
B
bigsheeper 已提交
303
    Assert(bitmap_holder);
304

305
    auto& field = schema_->operator[](query_info->field_name);
B
bigsheeper 已提交
306
    Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
307
    auto dim = field.get_dim();
308
    auto bitmap = bitmap_holder->bitmap_ptr;
Z
zhenshan.cao 已提交
309 310
    auto topK = query_info->topK;
    auto num_queries = query_info->num_queries;
B
bigsheeper 已提交
311 312
    // TODO: optimize
    auto the_offset_opt = schema_->get_offset(query_info->field_name);
B
bigsheeper 已提交
313 314
    Assert(the_offset_opt.has_value());
    Assert(the_offset_opt.value() < record_.entity_vec_.size());
Z
zhenshan.cao 已提交
315 316
    auto vec_ptr =
        std::static_pointer_cast<ConcurrentVector<FloatVector>>(record_.entity_vec_.at(the_offset_opt.value()));
B
bigsheeper 已提交
317
    std::vector<std::priority_queue<std::pair<float, int>>> records(num_queries);
Z
zhenshan.cao 已提交
318

319
    auto get_L2_distance = [dim](const float* a, const float* b) {
Z
zhenshan.cao 已提交
320
        float L2_distance = 0;
F
FluorineDog 已提交
321
        for (auto i = 0; i < dim; ++i) {
Z
zhenshan.cao 已提交
322 323
            auto d = a[i] - b[i];
            L2_distance += d * d;
F
FluorineDog 已提交
324
        }
Z
zhenshan.cao 已提交
325 326 327
        return L2_distance;
    };

328
    for (int64_t i = 0; i < ins_barrier; ++i) {
329
        if (i < bitmap->count() && bitmap->test(i)) {
Z
zhenshan.cao 已提交
330 331 332
            continue;
        }
        auto element = vec_ptr->get_element(i);
F
FluorineDog 已提交
333
        for (auto query_id = 0; query_id < num_queries; ++query_id) {
Z
zhenshan.cao 已提交
334 335
            auto query_blob = query_info->query_raw_data.data() + query_id * dim;
            auto dis = get_L2_distance(query_blob, element);
336
            auto& record = records[query_id];
F
FluorineDog 已提交
337
            if (record.size() < topK) {
Z
zhenshan.cao 已提交
338
                record.emplace(dis, i);
F
FluorineDog 已提交
339
            } else if (record.top().first > dis) {
Z
zhenshan.cao 已提交
340 341 342 343 344 345 346 347 348 349
                record.emplace(dis, i);
                record.pop();
            }
        }
    }

    result.num_queries_ = num_queries;
    result.topK_ = topK;
    auto row_num = topK * num_queries;

S
sunby 已提交
350
    result.result_ids_.resize(row_num);
Z
zhenshan.cao 已提交
351 352
    result.result_distances_.resize(row_num);

F
FluorineDog 已提交
353
    for (int q_id = 0; q_id < num_queries; ++q_id) {
Z
zhenshan.cao 已提交
354
        // reverse
F
FluorineDog 已提交
355
        for (int i = 0; i < topK; ++i) {
Z
zhenshan.cao 已提交
356
            auto dst_id = topK - 1 - i + q_id * topK;
357
            auto [dis, offset] = records[q_id].top();
Z
zhenshan.cao 已提交
358
            records[q_id].pop();
S
sunby 已提交
359
            result.result_ids_[dst_id] = record_.uids_[offset];
Z
zhenshan.cao 已提交
360 361
            result.result_distances_[dst_id] = dis;
        }
362
    }
363

364
    return Status::OK();
B
bigsheeper 已提交
365 366 367
}

Status
368
SegmentNaive::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
B
bigsheeper 已提交
369 370 371 372
    // TODO: enable delete
    // TODO: enable index
    // TODO: remove mock
    if (query_info == nullptr) {
373
        query_info = std::make_shared<query::QueryDeprecated>();
B
bigsheeper 已提交
374 375 376 377 378 379 380 381
        query_info->field_name = "fakevec";
        query_info->topK = 10;
        query_info->num_queries = 1;

        auto dim = schema_->operator[]("fakevec").get_dim();
        std::default_random_engine e(42);
        std::uniform_real_distribution<> dis(0.0, 1.0);
        query_info->query_raw_data.resize(query_info->num_queries * dim);
382
        for (auto& x : query_info->query_raw_data) {
B
bigsheeper 已提交
383 384 385 386 387 388 389
            x = dis(e);
        }
    }

    if (index_ready_) {
        return QueryImpl(query_info, timestamp, result);
    } else {
390
        return QueryBruteForceImpl(query_info, timestamp, result);
B
bigsheeper 已提交
391
    }
392 393
}

394 395
Status
SegmentNaive::Close() {
F
FluorineDog 已提交
396
    if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
N
neza2017 已提交
397
        PanicInfo("insert not ready");
F
FluorineDog 已提交
398
    }
N
neza2017 已提交
399 400
    if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) {
        PanicInfo("delete not ready");
F
FluorineDog 已提交
401
    }
402 403
    state_ = SegmentState::Closed;
    return Status::OK();
F
FluorineDog 已提交
404 405
}

406 407 408
template <typename Type>
knowhere::IndexPtr
SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
F
FluorineDog 已提交
409
    PanicInfo("deprecated");
410
}
411

412
Status
413 414
SegmentNaive::BuildIndex(IndexMetaPtr remote_index_meta) {
    if (remote_index_meta == nullptr) {
B
bigsheeper 已提交
415 416
        std::cout << "WARN: Null index ptr is detected, use default index" << std::endl;

417 418 419
        int dim = 0;
        std::string index_field_name;

420
        for (auto& field : schema_->get_fields()) {
421 422 423 424 425 426
            if (field.get_data_type() == DataType::VECTOR_FLOAT) {
                dim = field.get_dim();
                index_field_name = field.get_name();
            }
        }

B
bigsheeper 已提交
427 428
        Assert(dim != 0);
        Assert(!index_field_name.empty());
429 430 431 432 433

        auto index_meta = std::make_shared<IndexMeta>(schema_);
        // TODO: this is merge of query conf and insert conf
        // TODO: should be splitted into multiple configs
        auto conf = milvus::knowhere::Config{
434 435 436 437
            {milvus::knowhere::meta::DIM, dim},         {milvus::knowhere::IndexParams::nlist, 100},
            {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4},
            {milvus::knowhere::IndexParams::nbits, 8},  {milvus::knowhere::Metric::TYPE, milvus::knowhere::Metric::L2},
            {milvus::knowhere::meta::DEVICEID, 0},
438 439 440 441 442 443
        };
        index_meta->AddEntry("fakeindex", index_field_name, knowhere::IndexEnum::INDEX_FAISS_IVFPQ,
                             knowhere::IndexMode::MODE_CPU, conf);
        remote_index_meta = index_meta;
    }

444
    if (record_.ack_responder_.GetAck() < 1024 * 4) {
445 446
        return Status(SERVER_BUILD_INDEX_ERROR, "too few elements");
    }
B
bigsheeper 已提交
447

B
bigsheeper 已提交
448
    index_meta_ = remote_index_meta;
449
    for (auto& [index_name, entry] : index_meta_->get_entries()) {
B
bigsheeper 已提交
450
        Assert(entry.index_name == index_name);
451
        const auto& field = (*schema_)[entry.field_name];
F
FluorineDog 已提交
452 453

        if (field.is_vector()) {
B
bigsheeper 已提交
454
            Assert(field.get_data_type() == engine::DataType::VECTOR_FLOAT);
F
FluorineDog 已提交
455 456 457 458 459 460
            auto index_ptr = BuildVecIndexImpl<float>(entry);
            indexings_[index_name] = index_ptr;
        } else {
            throw std::runtime_error("unimplemented");
        }
    }
B
bigsheeper 已提交
461

462
    index_ready_ = true;
F
FluorineDog 已提交
463
    return Status::OK();
464
}
465

466 467
int64_t
SegmentNaive::GetMemoryUsageInBytes() {
F
FluorineDog 已提交
468
    PanicInfo("Deprecated");
469 470
}

G
GuoRentong 已提交
471
}  // namespace milvus::segcore