DBImpl.cpp 10.9 KB
Newer Older
X
Xu Peng 已提交
1
#include <assert.h>
X
Xu Peng 已提交
2
#include <chrono>
X
Xu Peng 已提交
3
#include <thread>
4
#include <iostream>
5 6 7
#include <faiss/IndexFlat.h>
#include <faiss/MetaIndexes.h>
#include <faiss/index_io.h>
X
Xu Peng 已提交
8
#include <faiss/AutoTune.h>
X
Xu Peng 已提交
9
#include <wrapper/IndexBuilder.h>
X
xj.lin 已提交
10 11
#include <cstring>
#include <wrapper/Topk.h>
X
Xu Peng 已提交
12 13 14
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
X
Xu Peng 已提交
15

X
Xu Peng 已提交
16 17 18
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
19

X
Xu Peng 已提交
20 21 22
DBImpl::DBImpl(const Options& options)
    : _env(options.env),
      _options(options),
23
      _bg_compaction_scheduled(false),
X
Xu Peng 已提交
24
      _shutting_down(false),
X
Xu Peng 已提交
25
      bg_build_index_started_(false),
X
Xu Peng 已提交
26
      _pMeta(new meta::DBMetaImpl(_options.meta)),
X
Xu Peng 已提交
27
      _pMemMgr(new MemManager(_pMeta, _options)) {
X
Xu Peng 已提交
28
    start_timer_task(_options.memory_sync_interval);
X
Xu Peng 已提交
29 30
}

31 32
Status DBImpl::add_group(meta::GroupSchema& group_info) {
    return _pMeta->add_group(group_info);
33 34
}

35 36
Status DBImpl::get_group(meta::GroupSchema& group_info) {
    return _pMeta->get_group(group_info);
37 38 39 40 41 42
}

Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
    return _pMeta->has_group(group_id_, has_or_not_);
}

X
Xu Peng 已提交
43 44
Status DBImpl::get_group_files(const std::string& group_id,
                               const int date_delta,
45
                               meta::GroupFilesSchema& group_files_info) {
X
Xu Peng 已提交
46
    return _pMeta->get_group_files(group_id, date_delta, group_files_info);
47

X
Xu Peng 已提交
48 49
}

50 51
Status DBImpl::add_vectors(const std::string& group_id_,
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
X
Xu Peng 已提交
52 53 54 55 56 57
    Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
    if (!status.ok()) {
        return status;
    }
}

X
xj.lin 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
Status DBImpl::search(const std::string &group_id, size_t k, size_t nq,
                      const float *vectors, QueryResults &results) {
    meta::DatePartionedGroupFilesSchema files;
    std::vector<meta::DateT> partition;
    auto status = _pMeta->files_to_search(group_id, partition, files);
    if (!status.ok()) { return status; }

    // TODO: optimized
    meta::GroupFilesSchema index_files;
    meta::GroupFilesSchema raw_files;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file.file_type == meta::GroupFileSchema::RAW ?
            raw_files.push_back(file) :
            index_files.push_back(file);
        }
    }
    int dim = raw_files[0].dimension;


    // merge raw files
    faiss::Index *index(faiss::index_factory(dim, "IDMap,Flat"));

    for (auto &file : raw_files) {
        auto file_index = dynamic_cast<faiss::IndexIDMap *>(faiss::read_index(file.location.c_str()));
        index->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat *>(file_index->index)->xb.data(),
                            file_index->id_map.data());
    }
    float *xb = dynamic_cast<faiss::IndexFlat *>(index)->xb.data();
    int64_t *ids = dynamic_cast<faiss::IndexIDMap *>(index)->id_map.data();
    long totoal = index->ntotal;

    std::vector<float> distence;
    std::vector<long> result_ids;
    {
        // allocate memory
        float *output_distence;
        long *output_ids;
        output_distence = (float *) malloc(k * sizeof(float));
        output_ids = (long *) malloc(k * sizeof(long));

        // build and search in raw file
        // TODO: HardCode
        auto opd = std::make_shared<Operand>();
        opd->index_type = "IDMap,Flat";
        IndexBuilderPtr builder = GetIndexBuilder(opd);
        auto index = builder->build_all(totoal, xb, ids);

        index->search(nq, vectors, k, output_distence, output_ids);
        distence.insert(distence.begin(), output_distence, output_distence + k);
        result_ids.insert(result_ids.begin(), output_ids, output_ids + k);
        memset(output_distence, 0, k * sizeof(float));
        memset(output_ids, 0, k * sizeof(long));

        // search in index file
        for (auto &file : index_files) {
            auto index = read_index(file.location.c_str());
            index->search(nq, vectors, k, output_distence, output_ids);
            distence.insert(distence.begin(), output_distence, output_distence + k);
            result_ids.insert(result_ids.begin(), output_ids, output_ids + k);
            memset(output_distence, 0, k * sizeof(float));
            memset(output_ids, 0, k * sizeof(long));
        }

        // TopK
        TopK(distence.data(), distence.size(), k, output_distence, output_ids);
        distence.clear();
        result_ids.clear();
        distence.insert(distence.begin(), output_distence, output_distence + k);
        result_ids.insert(result_ids.begin(), output_ids, output_ids + k);

        // free
        free(output_distence);
        free(output_ids);
    }

X
Xu Peng 已提交
134 135 136
    return Status::OK();
}

X
Xu Peng 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
void DBImpl::start_timer_task(int interval_) {
    std::thread bg_task(&DBImpl::background_timer_task, this, interval_);
    bg_task.detach();
}

void DBImpl::background_timer_task(int interval_) {
    Status status;
    while (true) {
        if (!_bg_error.ok()) break;
        if (_shutting_down.load(std::memory_order_acquire)) break;

        std::this_thread::sleep_for(std::chrono::seconds(interval_));

        try_schedule_compaction();
    }
152 153
}

X
Xu Peng 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
void DBImpl::try_schedule_compaction() {
    if (_bg_compaction_scheduled) return;
    if (!_bg_error.ok()) return;

    _bg_compaction_scheduled = true;
    _env->schedule(&DBImpl::BGWork, this);
}

void DBImpl::BGWork(void* db_) {
    reinterpret_cast<DBImpl*>(db_)->background_call();
}

void DBImpl::background_call() {
    std::lock_guard<std::mutex> lock(_mutex);
    assert(_bg_compaction_scheduled);

    if (!_bg_error.ok()) return;

    background_compaction();
X
Xu Peng 已提交
173 174 175

    _bg_compaction_scheduled = false;
    _bg_work_finish_signal.notify_all();
X
Xu Peng 已提交
176 177
}

178 179 180 181

Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
        const meta::GroupFilesSchema& files) {
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
182 183 184
    group_file.group_id = group_id;
    group_file.date = date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
185

186
    if (!status.ok()) {
X
Xu Peng 已提交
187
        std::cout << status.ToString() << std::endl;
188 189 190
        return status;
    }

X
Xu Peng 已提交
191
    std::shared_ptr<faiss::Index> index(faiss::index_factory(group_file.dimension, "IDMap,Flat"));
192 193 194 195 196

    meta::GroupFilesSchema updated;

    for (auto& file : files) {
        auto file_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
X
Xu Peng 已提交
197
        index->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
198 199 200 201 202 203
                file_index->id_map.data());
        auto file_schema = file;
        file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
        updated.push_back(file_schema);
    }

X
Xu Peng 已提交
204
    auto index_size = group_file.dimension * index->ntotal;
X
Xu Peng 已提交
205
    faiss::write_index(index.get(), group_file.location.c_str());
X
Xu Peng 已提交
206 207 208 209 210 211

    if (index_size >= _options.index_trigger_size) {
        group_file.file_type = meta::GroupFileSchema::TO_INDEX;
    } else {
        group_file.file_type = meta::GroupFileSchema::RAW;
    }
X
Xu Peng 已提交
212
    group_file.rows = index_size;
213 214 215 216 217 218 219 220
    updated.push_back(group_file);
    status = _pMeta->update_files(updated);

    return status;
}

Status DBImpl::background_merge_files(const std::string& group_id) {
    meta::DatePartionedGroupFilesSchema raw_files;
X
Xu Peng 已提交
221 222 223 224
    auto status = _pMeta->files_to_merge(group_id, raw_files);
    if (!status.ok()) {
        return status;
    }
225 226 227 228 229

    if (raw_files.size() == 0) {
        return Status::OK();
    }

X
Xu Peng 已提交
230 231
    bool has_merge = false;

232
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
233
        auto files = kv.second;
X
Xu Peng 已提交
234
        if (files.size() <= _options.merge_trigger_number) {
X
Xu Peng 已提交
235 236
            continue;
        }
X
Xu Peng 已提交
237
        has_merge = true;
238 239
        merge_files(group_id, kv.first, kv.second);
    }
X
Xu Peng 已提交
240 241 242 243 244

    if (has_merge) {
        try_build_index();
    }

X
Xu Peng 已提交
245 246
    _pMeta->cleanup_ttl_files(1);

X
Xu Peng 已提交
247 248 249 250
    return Status::OK();
}

Status DBImpl::build_index(const meta::GroupFileSchema& file) {
X
Xu Peng 已提交
251
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
252 253 254
    group_file.group_id = file.group_id;
    group_file.date = file.date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
255 256 257 258 259
    if (!status.ok()) {
        return status;
    }

    auto opd = std::make_shared<Operand>();
X
Xu Peng 已提交
260
    opd->d = file.dimension;
X
Xu Peng 已提交
261 262 263 264
    opd->index_type = "IDMap,Flat";
    IndexBuilderPtr pBuilder = GetIndexBuilder(opd);

    auto from_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
X
Xu Peng 已提交
265 266
    std::cout << "Preparing build_index for file_id=" << file.file_id
        << " with new index_file_id=" << group_file.file_id << std::endl;
X
Xu Peng 已提交
267 268 269
    auto index = pBuilder->build_all(from_index->ntotal,
            dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
            from_index->id_map.data());
X
Xu Peng 已提交
270 271
    std::cout << "Ending build_index for file_id=" << file.file_id
        << " with new index_file_id=" << group_file.file_id << std::endl;
X
Xu Peng 已提交
272
    /* std::cout << "raw size=" << from_index->ntotal << "   index size=" << index->ntotal << std::endl; */
X
Xu Peng 已提交
273 274
    write_index(index, group_file.location.c_str());
    group_file.file_type = meta::GroupFileSchema::INDEX;
X
Xu Peng 已提交
275
    group_file.rows = file.dimension * index->ntotal;
X
Xu Peng 已提交
276

X
Xu Peng 已提交
277 278
    auto to_remove = file;
    to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
X
Xu Peng 已提交
279

X
Xu Peng 已提交
280 281
    meta::GroupFilesSchema update_files = {to_remove, group_file};
    _pMeta->update_files(update_files);
X
Xu Peng 已提交
282

X
Xu Peng 已提交
283 284 285
    return Status::OK();
}

X
Xu Peng 已提交
286
void DBImpl::background_build_index() {
X
Xu Peng 已提交
287
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
288 289
    assert(bg_build_index_started_);
    meta::GroupFilesSchema to_index_files;
X
Xu Peng 已提交
290
    _pMeta->files_to_index(to_index_files);
X
Xu Peng 已提交
291 292 293 294 295
    Status status;
    for (auto& file : to_index_files) {
        status = build_index(file);
        if (!status.ok()) {
            _bg_error = status;
X
Xu Peng 已提交
296
            return;
X
Xu Peng 已提交
297 298 299 300
        }
    }

    bg_build_index_started_ = false;
X
Xu Peng 已提交
301
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
302 303 304 305 306 307 308
}

Status DBImpl::try_build_index() {
    if (bg_build_index_started_) return Status::OK();
    bg_build_index_started_ = true;
    std::thread build_index_task(&DBImpl::background_build_index, this);
    build_index_task.detach();
X
Xu Peng 已提交
309
    return Status::OK();
310 311
}

X
Xu Peng 已提交
312
void DBImpl::background_compaction() {
313 314
    std::vector<std::string> group_ids;
    _pMemMgr->serialize(group_ids);
315

X
Xu Peng 已提交
316 317 318 319 320 321 322
    Status status;
    for (auto group_id : group_ids) {
        status = background_merge_files(group_id);
        if (!status.ok()) {
            _bg_error = status;
            return;
        }
323
    }
X
Xu Peng 已提交
324 325
}

X
Xu Peng 已提交
326
DBImpl::~DBImpl() {
X
Xu Peng 已提交
327 328 329 330 331 332 333 334 335 336 337 338
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutting_down.store(true, std::memory_order_release);
        while (_bg_compaction_scheduled) {
            _bg_work_finish_signal.wait(lock);
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
339
    }
X
Xu Peng 已提交
340 341
    std::vector<std::string> ids;
    _pMemMgr->serialize(ids);
X
Xu Peng 已提交
342 343
}

X
Xu Peng 已提交
344 345 346 347 348 349
/*
 *  DB
 */

DB::~DB() {}

X
Xu Peng 已提交
350 351 352 353
void DB::Open(const Options& options, DB** dbptr) {
    *dbptr = nullptr;
    *dbptr = new DBImpl(options);
    return;
X
Xu Peng 已提交
354 355
}

X
Xu Peng 已提交
356 357 358
} // namespace engine
} // namespace vecwise
} // namespace zilliz