DBImpl.cpp 13.0 KB
Newer Older
X
Xu Peng 已提交
1
#include <assert.h>
X
Xu Peng 已提交
2
#include <chrono>
X
Xu Peng 已提交
3
#include <thread>
4
#include <iostream>
5 6 7
#include <faiss/IndexFlat.h>
#include <faiss/MetaIndexes.h>
#include <faiss/index_io.h>
X
Xu Peng 已提交
8
#include <faiss/AutoTune.h>
X
xj.lin 已提交
9 10
#include <cstring>
#include <wrapper/Topk.h>
11
#include <easylogging++.h>
X
Xu Peng 已提交
12 13
#include <wrapper/IndexBuilder.h>
#include <cache/CpuCacheMgr.h>
X
Xu Peng 已提交
14 15 16
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
X
Xu Peng 已提交
17

X
Xu Peng 已提交
18 19 20
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23 24
DBImpl::DBImpl(const Options& options)
    : _env(options.env),
      _options(options),
25
      _bg_compaction_scheduled(false),
X
Xu Peng 已提交
26
      _shutting_down(false),
X
Xu Peng 已提交
27
      bg_build_index_started_(false),
X
Xu Peng 已提交
28
      _pMeta(new meta::DBMetaImpl(_options.meta)),
X
Xu Peng 已提交
29
      _pMemMgr(new MemManager(_pMeta, _options)) {
X
Xu Peng 已提交
30
    start_timer_task(_options.memory_sync_interval);
X
Xu Peng 已提交
31 32
}

33 34
Status DBImpl::add_group(meta::GroupSchema& group_info) {
    return _pMeta->add_group(group_info);
35 36
}

37 38
Status DBImpl::get_group(meta::GroupSchema& group_info) {
    return _pMeta->get_group(group_info);
39 40 41 42 43 44
}

Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
    return _pMeta->has_group(group_id_, has_or_not_);
}

X
Xu Peng 已提交
45 46
Status DBImpl::get_group_files(const std::string& group_id,
                               const int date_delta,
47
                               meta::GroupFilesSchema& group_files_info) {
X
Xu Peng 已提交
48
    return _pMeta->get_group_files(group_id, date_delta, group_files_info);
49

X
Xu Peng 已提交
50 51
}

52 53
Status DBImpl::add_vectors(const std::string& group_id_,
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
X
Xu Peng 已提交
54 55 56 57 58 59
    Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
    if (!status.ok()) {
        return status;
    }
}

X
xj.lin 已提交
60
// TODO(XUPENG): add search range based on time
X
xj.lin 已提交
61 62 63 64 65 66 67 68 69 70 71
Status DBImpl::search(const std::string &group_id, size_t k, size_t nq,
                      const float *vectors, QueryResults &results) {
    meta::DatePartionedGroupFilesSchema files;
    std::vector<meta::DateT> partition;
    auto status = _pMeta->files_to_search(group_id, partition, files);
    if (!status.ok()) { return status; }

    meta::GroupFilesSchema index_files;
    meta::GroupFilesSchema raw_files;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
X
xj.lin 已提交
72 73
            file.file_type == meta::GroupFileSchema::INDEX ?
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
74 75 76
        }
    }

X
xj.lin 已提交
77 78 79 80 81 82 83 84
    int dim = 0;
    if (!index_files.empty()) {
        dim = index_files[0].dimension;
    } else if (!raw_files.empty()) {
        dim = raw_files[0].dimension;
    } else {
        return Status::OK();
    }
X
xj.lin 已提交
85 86

    {
X
xj.lin 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

        auto cluster = [&](long *nns, float *dis) -> void {
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
101 102
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
103 104 105 106
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
107

X
xj.lin 已提交
108 109 110 111 112 113 114 115
        auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void {
            for (auto &file : file_vec) {
                auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(file.location);
                if (!index) {
                    LOG(DEBUG) << "Disk io from: " << file.location;
                    index = read_index(file.location.c_str());
                    zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->InsertItem(file.location, index);
                }
X
Xu Peng 已提交
116
                LOG(DEBUG) << "Search Index Of Size: " << index->dim * index->ntotal * 4 /(1024*1024) << " M";
X
xj.lin 已提交
117 118 119 120
                index->search(nq, vectors, k, output_distence, output_ids);
                cluster(output_ids, output_distence); // cluster to each query
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
121
            }
X
xj.lin 已提交
122
        };
X
xj.lin 已提交
123

X
xj.lin 已提交
124 125 126 127 128 129 130 131 132 133 134
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
                TopK(dis.data(), dis.size(), k, output_distence, output_ids);
                for (int i = 0; i < k; ++i) {
                    res.emplace_back(nns[output_ids[i]]); // mapping
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
135 136
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
137 138
            }
        };
X
xj.lin 已提交
139 140 141

        search_in_index(raw_files);
        search_in_index(index_files);
X
xj.lin 已提交
142
        cluster_topk();
X
xj.lin 已提交
143 144 145 146 147

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
148 149 150
    if (results.empty()) {
        return Status::NotFound("Group " + group_id + ", search result not found!");
    }
X
Xu Peng 已提交
151 152 153
    return Status::OK();
}

X
Xu Peng 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
void DBImpl::start_timer_task(int interval_) {
    std::thread bg_task(&DBImpl::background_timer_task, this, interval_);
    bg_task.detach();
}

void DBImpl::background_timer_task(int interval_) {
    Status status;
    while (true) {
        if (!_bg_error.ok()) break;
        if (_shutting_down.load(std::memory_order_acquire)) break;

        std::this_thread::sleep_for(std::chrono::seconds(interval_));

        try_schedule_compaction();
    }
169 170
}

X
Xu Peng 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
void DBImpl::try_schedule_compaction() {
    if (_bg_compaction_scheduled) return;
    if (!_bg_error.ok()) return;

    _bg_compaction_scheduled = true;
    _env->schedule(&DBImpl::BGWork, this);
}

void DBImpl::BGWork(void* db_) {
    reinterpret_cast<DBImpl*>(db_)->background_call();
}

void DBImpl::background_call() {
    std::lock_guard<std::mutex> lock(_mutex);
    assert(_bg_compaction_scheduled);

    if (!_bg_error.ok()) return;

    background_compaction();
X
Xu Peng 已提交
190 191 192

    _bg_compaction_scheduled = false;
    _bg_work_finish_signal.notify_all();
X
Xu Peng 已提交
193 194
}

195 196 197 198

Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
        const meta::GroupFilesSchema& files) {
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
199 200 201
    group_file.group_id = group_id;
    group_file.date = date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
202

203
    if (!status.ok()) {
204
        LOG(INFO) << status.ToString() << std::endl;
205 206 207
        return status;
    }

X
Xu Peng 已提交
208
    std::shared_ptr<faiss::Index> index(faiss::index_factory(group_file.dimension, "IDMap,Flat"));
209 210

    meta::GroupFilesSchema updated;
X
Xu Peng 已提交
211
    long  index_size = 0;
212 213

    for (auto& file : files) {
X
Xu Peng 已提交
214 215 216 217 218
        auto to_merge = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(file.location);
        if (!to_merge) {
            to_merge = read_index(file.location.c_str());
        }
        auto file_index = dynamic_cast<faiss::IndexIDMap*>(to_merge->data().get());
X
Xu Peng 已提交
219
        index->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
220 221 222 223
                file_index->id_map.data());
        auto file_schema = file;
        file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
        updated.push_back(file_schema);
224 225
        LOG(DEBUG) << "About to merge file " << file_schema.file_id <<
            " of size=" << file_schema.rows;
X
Xu Peng 已提交
226 227 228
        index_size = group_file.dimension * index->ntotal;

        if (index_size >= _options.index_trigger_size) break;
229 230
    }

X
Xu Peng 已提交
231
    faiss::write_index(index.get(), group_file.location.c_str());
X
Xu Peng 已提交
232 233 234 235 236 237

    if (index_size >= _options.index_trigger_size) {
        group_file.file_type = meta::GroupFileSchema::TO_INDEX;
    } else {
        group_file.file_type = meta::GroupFileSchema::RAW;
    }
X
Xu Peng 已提交
238
    group_file.rows = index_size;
239 240
    updated.push_back(group_file);
    status = _pMeta->update_files(updated);
241 242
    LOG(DEBUG) << "New merged file " << group_file.file_id <<
        " of size=" << group_file.rows;
243

X
Xu Peng 已提交
244 245 246
    zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->InsertItem(
            group_file.location, std::make_shared<Index>(index));

247 248 249 250 251
    return status;
}

Status DBImpl::background_merge_files(const std::string& group_id) {
    meta::DatePartionedGroupFilesSchema raw_files;
X
Xu Peng 已提交
252 253 254 255
    auto status = _pMeta->files_to_merge(group_id, raw_files);
    if (!status.ok()) {
        return status;
    }
256

257 258 259
    /* if (raw_files.size() == 0) { */
    /*     return Status::OK(); */
    /* } */
260

X
Xu Peng 已提交
261 262
    bool has_merge = false;

263
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
264
        auto files = kv.second;
X
Xu Peng 已提交
265
        if (files.size() <= _options.merge_trigger_number) {
X
Xu Peng 已提交
266 267
            continue;
        }
X
Xu Peng 已提交
268
        has_merge = true;
269 270
        merge_files(group_id, kv.first, kv.second);
    }
X
Xu Peng 已提交
271

272
    try_build_index();
X
Xu Peng 已提交
273

X
Xu Peng 已提交
274 275
    _pMeta->cleanup_ttl_files(1);

X
Xu Peng 已提交
276 277 278 279
    return Status::OK();
}

Status DBImpl::build_index(const meta::GroupFileSchema& file) {
X
Xu Peng 已提交
280
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
281 282 283
    group_file.group_id = file.group_id;
    group_file.date = file.date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
284 285 286 287 288
    if (!status.ok()) {
        return status;
    }

    auto opd = std::make_shared<Operand>();
X
Xu Peng 已提交
289
    opd->d = file.dimension;
X
Xu Peng 已提交
290 291 292
    opd->index_type = "IDMap,Flat";
    IndexBuilderPtr pBuilder = GetIndexBuilder(opd);

X
Xu Peng 已提交
293 294 295 296 297 298
    auto to_index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(file.location);
    if (!to_index) {
        to_index = read_index(file.location.c_str());
    }
    auto from_index = dynamic_cast<faiss::IndexIDMap*>(to_index->data().get());

299
    LOG(DEBUG) << "Preparing build_index for file_id=" << file.file_id
X
Xu Peng 已提交
300
        << " with new index_file_id=" << group_file.file_id << std::endl;
X
Xu Peng 已提交
301 302 303
    auto index = pBuilder->build_all(from_index->ntotal,
            dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
            from_index->id_map.data());
304
    LOG(DEBUG) << "Ending build_index for file_id=" << file.file_id
X
Xu Peng 已提交
305
        << " with new index_file_id=" << group_file.file_id << std::endl;
X
Xu Peng 已提交
306
    /* std::cout << "raw size=" << from_index->ntotal << "   index size=" << index->ntotal << std::endl; */
X
Xu Peng 已提交
307 308
    write_index(index, group_file.location.c_str());
    group_file.file_type = meta::GroupFileSchema::INDEX;
X
Xu Peng 已提交
309
    group_file.rows = file.dimension * index->ntotal;
X
Xu Peng 已提交
310

X
Xu Peng 已提交
311 312
    auto to_remove = file;
    to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
X
Xu Peng 已提交
313

X
Xu Peng 已提交
314 315
    meta::GroupFilesSchema update_files = {to_remove, group_file};
    _pMeta->update_files(update_files);
X
Xu Peng 已提交
316

X
Xu Peng 已提交
317 318
    zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->InsertItem(group_file.location, index);

X
Xu Peng 已提交
319

X
Xu Peng 已提交
320 321 322
    return Status::OK();
}

X
Xu Peng 已提交
323
void DBImpl::background_build_index() {
X
Xu Peng 已提交
324
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
325 326
    assert(bg_build_index_started_);
    meta::GroupFilesSchema to_index_files;
X
Xu Peng 已提交
327
    _pMeta->files_to_index(to_index_files);
X
Xu Peng 已提交
328 329
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
330
        LOG(DEBUG) << "Buiding index for " << file.location;
X
Xu Peng 已提交
331 332 333
        status = build_index(file);
        if (!status.ok()) {
            _bg_error = status;
X
Xu Peng 已提交
334
            return;
X
Xu Peng 已提交
335 336
        }
    }
X
Xu Peng 已提交
337
    LOG(DEBUG) << "All Buiding index Done";
X
Xu Peng 已提交
338 339

    bg_build_index_started_ = false;
X
Xu Peng 已提交
340
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
341 342 343 344 345 346 347
}

Status DBImpl::try_build_index() {
    if (bg_build_index_started_) return Status::OK();
    bg_build_index_started_ = true;
    std::thread build_index_task(&DBImpl::background_build_index, this);
    build_index_task.detach();
X
Xu Peng 已提交
348
    return Status::OK();
349 350
}

X
Xu Peng 已提交
351
void DBImpl::background_compaction() {
352 353
    std::vector<std::string> group_ids;
    _pMemMgr->serialize(group_ids);
354

X
Xu Peng 已提交
355 356 357 358 359 360 361
    Status status;
    for (auto group_id : group_ids) {
        status = background_merge_files(group_id);
        if (!status.ok()) {
            _bg_error = status;
            return;
        }
362
    }
X
Xu Peng 已提交
363 364
}

X
Xu Peng 已提交
365 366 367 368
Status DBImpl::drop_all() {
    return _pMeta->drop_all();
}

X
Xu Peng 已提交
369 370 371 372
Status DBImpl::count(const std::string& group_id, long& result) {
    return _pMeta->count(group_id, result);
}

X
Xu Peng 已提交
373
DBImpl::~DBImpl() {
X
Xu Peng 已提交
374
    {
X
Xu Peng 已提交
375
        LOG(DEBUG) << "Start wait background merge thread";
X
Xu Peng 已提交
376 377 378 379 380
        std::unique_lock<std::mutex> lock(_mutex);
        _shutting_down.store(true, std::memory_order_release);
        while (_bg_compaction_scheduled) {
            _bg_work_finish_signal.wait(lock);
        }
X
Xu Peng 已提交
381
        LOG(DEBUG) << "Stop wait background merge thread";
X
Xu Peng 已提交
382 383
    }
    {
X
Xu Peng 已提交
384
        LOG(DEBUG) << "Start wait background build index thread";
X
Xu Peng 已提交
385 386 387 388
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
389
        LOG(DEBUG) << "Stop wait background build index thread";
X
Xu Peng 已提交
390
    }
X
Xu Peng 已提交
391 392
    std::vector<std::string> ids;
    _pMemMgr->serialize(ids);
X
Xu Peng 已提交
393 394
}

X
Xu Peng 已提交
395 396 397 398 399 400
/*
 *  DB
 */

DB::~DB() {}

X
Xu Peng 已提交
401 402 403 404
void DB::Open(const Options& options, DB** dbptr) {
    *dbptr = nullptr;
    *dbptr = new DBImpl(options);
    return;
X
Xu Peng 已提交
405 406
}

X
Xu Peng 已提交
407 408 409
} // namespace engine
} // namespace vecwise
} // namespace zilliz