DBImpl.cpp 7.6 KB
Newer Older
X
Xu Peng 已提交
1
#include <assert.h>
X
Xu Peng 已提交
2
#include <chrono>
X
Xu Peng 已提交
3
#include <thread>
4
#include <iostream>
5 6 7
#include <faiss/IndexFlat.h>
#include <faiss/MetaIndexes.h>
#include <faiss/index_io.h>
X
Xu Peng 已提交
8
#include <faiss/AutoTune.h>
X
Xu Peng 已提交
9
#include <wrapper/IndexBuilder.h>
X
Xu Peng 已提交
10 11 12
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14 15 16
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
17

X
Xu Peng 已提交
18 19 20
DBImpl::DBImpl(const Options& options)
    : _env(options.env),
      _options(options),
21
      _bg_compaction_scheduled(false),
X
Xu Peng 已提交
22
      _shutting_down(false),
X
Xu Peng 已提交
23
      bg_build_index_started_(false),
X
Xu Peng 已提交
24
      _pMeta(new meta::DBMetaImpl(_options.meta)),
X
Xu Peng 已提交
25
      _pMemMgr(new MemManager(_pMeta, _options)) {
X
Xu Peng 已提交
26
    start_timer_task(_options.memory_sync_interval);
X
Xu Peng 已提交
27 28
}

29 30
Status DBImpl::add_group(meta::GroupSchema& group_info) {
    return _pMeta->add_group(group_info);
31 32
}

33 34
Status DBImpl::get_group(meta::GroupSchema& group_info) {
    return _pMeta->get_group(group_info);
35 36 37 38 39 40
}

Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
    return _pMeta->has_group(group_id_, has_or_not_);
}

X
Xu Peng 已提交
41 42
Status DBImpl::get_group_files(const std::string& group_id,
                               const int date_delta,
43
                               meta::GroupFilesSchema& group_files_info) {
X
Xu Peng 已提交
44
    return _pMeta->get_group_files(group_id, date_delta, group_files_info);
45

X
Xu Peng 已提交
46 47
}

48 49
Status DBImpl::add_vectors(const std::string& group_id_,
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
X
Xu Peng 已提交
50 51 52 53 54 55
    Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
    if (!status.ok()) {
        return status;
    }
}

X
Xu Peng 已提交
56 57 58 59 60 61
Status DBImpl::search(const std::string& group_id, size_t k, size_t nq,
        const float* vectors, QueryResults& results) {
    // PXU TODO
    return Status::OK();
}

X
Xu Peng 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
void DBImpl::start_timer_task(int interval_) {
    std::thread bg_task(&DBImpl::background_timer_task, this, interval_);
    bg_task.detach();
}

void DBImpl::background_timer_task(int interval_) {
    Status status;
    while (true) {
        if (!_bg_error.ok()) break;
        if (_shutting_down.load(std::memory_order_acquire)) break;

        std::this_thread::sleep_for(std::chrono::seconds(interval_));

        try_schedule_compaction();
    }
77 78
}

X
Xu Peng 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
void DBImpl::try_schedule_compaction() {
    if (_bg_compaction_scheduled) return;
    if (!_bg_error.ok()) return;

    _bg_compaction_scheduled = true;
    _env->schedule(&DBImpl::BGWork, this);
}

void DBImpl::BGWork(void* db_) {
    reinterpret_cast<DBImpl*>(db_)->background_call();
}

void DBImpl::background_call() {
    std::lock_guard<std::mutex> lock(_mutex);
    assert(_bg_compaction_scheduled);

    if (!_bg_error.ok()) return;

    background_compaction();
X
Xu Peng 已提交
98 99 100

    _bg_compaction_scheduled = false;
    _bg_work_finish_signal.notify_all();
X
Xu Peng 已提交
101 102
}

103 104 105 106

Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
        const meta::GroupFilesSchema& files) {
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
107 108 109
    group_file.group_id = group_id;
    group_file.date = date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
110

111
    if (!status.ok()) {
X
Xu Peng 已提交
112
        std::cout << status.ToString() << std::endl;
113 114 115
        return status;
    }

X
Xu Peng 已提交
116
    std::shared_ptr<faiss::Index> index(faiss::index_factory(group_file.dimension, "IDMap,Flat"));
117 118 119 120 121

    meta::GroupFilesSchema updated;

    for (auto& file : files) {
        auto file_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
X
Xu Peng 已提交
122
        index->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
123 124 125 126 127 128
                file_index->id_map.data());
        auto file_schema = file;
        file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
        updated.push_back(file_schema);
    }

X
Xu Peng 已提交
129
    auto index_size = group_file.dimension * index->ntotal;
X
Xu Peng 已提交
130
    faiss::write_index(index.get(), group_file.location.c_str());
X
Xu Peng 已提交
131 132 133 134 135 136

    if (index_size >= _options.index_trigger_size) {
        group_file.file_type = meta::GroupFileSchema::TO_INDEX;
    } else {
        group_file.file_type = meta::GroupFileSchema::RAW;
    }
X
Xu Peng 已提交
137
    group_file.rows = index_size;
138 139 140 141 142 143 144 145
    updated.push_back(group_file);
    status = _pMeta->update_files(updated);

    return status;
}

Status DBImpl::background_merge_files(const std::string& group_id) {
    meta::DatePartionedGroupFilesSchema raw_files;
X
Xu Peng 已提交
146 147 148 149
    auto status = _pMeta->files_to_merge(group_id, raw_files);
    if (!status.ok()) {
        return status;
    }
150 151 152 153 154

    if (raw_files.size() == 0) {
        return Status::OK();
    }

X
Xu Peng 已提交
155 156
    bool has_merge = false;

157
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
158
        auto files = kv.second;
X
Xu Peng 已提交
159
        if (files.size() <= _options.merge_trigger_number) {
X
Xu Peng 已提交
160 161
            continue;
        }
X
Xu Peng 已提交
162
        has_merge = true;
163 164
        merge_files(group_id, kv.first, kv.second);
    }
X
Xu Peng 已提交
165 166 167 168 169 170 171 172 173

    if (has_merge) {
        try_build_index();
    }

    return Status::OK();
}

Status DBImpl::build_index(const meta::GroupFileSchema& file) {
X
Xu Peng 已提交
174
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
175 176 177
    group_file.group_id = file.group_id;
    group_file.date = file.date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190
    if (!status.ok()) {
        return status;
    }

    auto opd = std::make_shared<Operand>();
    opd->index_type = "IDMap,Flat";
    IndexBuilderPtr pBuilder = GetIndexBuilder(opd);

    auto from_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
    auto index = pBuilder->build_all(from_index->ntotal,
            dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
            from_index->id_map.data());
    /* std::cout << "raw size=" << from_index->ntotal << "   index size=" << index->ntotal << std::endl; */
X
Xu Peng 已提交
191 192
    write_index(index, group_file.location.c_str());
    group_file.file_type = meta::GroupFileSchema::INDEX;
X
Xu Peng 已提交
193
    group_file.rows = file.dimension * index->ntotal;
X
Xu Peng 已提交
194

X
Xu Peng 已提交
195 196
    auto to_remove = file;
    to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
X
Xu Peng 已提交
197

X
Xu Peng 已提交
198 199
    meta::GroupFilesSchema update_files = {to_remove, group_file};
    _pMeta->update_files(update_files);
X
Xu Peng 已提交
200

X
Xu Peng 已提交
201 202 203
    return Status::OK();
}

X
Xu Peng 已提交
204
void DBImpl::background_build_index() {
X
Xu Peng 已提交
205
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
206 207
    assert(bg_build_index_started_);
    meta::GroupFilesSchema to_index_files;
X
Xu Peng 已提交
208
    _pMeta->files_to_index(to_index_files);
X
Xu Peng 已提交
209 210 211 212 213
    Status status;
    for (auto& file : to_index_files) {
        status = build_index(file);
        if (!status.ok()) {
            _bg_error = status;
X
Xu Peng 已提交
214
            return;
X
Xu Peng 已提交
215 216 217 218
        }
    }

    bg_build_index_started_ = false;
X
Xu Peng 已提交
219
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
220 221 222 223 224 225 226
}

Status DBImpl::try_build_index() {
    if (bg_build_index_started_) return Status::OK();
    bg_build_index_started_ = true;
    std::thread build_index_task(&DBImpl::background_build_index, this);
    build_index_task.detach();
X
Xu Peng 已提交
227
    return Status::OK();
228 229
}

X
Xu Peng 已提交
230
void DBImpl::background_compaction() {
231 232
    std::vector<std::string> group_ids;
    _pMemMgr->serialize(group_ids);
233

X
Xu Peng 已提交
234 235 236 237 238 239 240
    Status status;
    for (auto group_id : group_ids) {
        status = background_merge_files(group_id);
        if (!status.ok()) {
            _bg_error = status;
            return;
        }
241
    }
X
Xu Peng 已提交
242 243
}

X
Xu Peng 已提交
244
DBImpl::~DBImpl() {
X
Xu Peng 已提交
245 246 247 248 249 250 251 252 253 254 255 256
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutting_down.store(true, std::memory_order_release);
        while (_bg_compaction_scheduled) {
            _bg_work_finish_signal.wait(lock);
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
257
    }
X
Xu Peng 已提交
258 259
    std::vector<std::string> ids;
    _pMemMgr->serialize(ids);
X
Xu Peng 已提交
260 261
}

X
Xu Peng 已提交
262 263 264 265 266 267
/*
 *  DB
 */

DB::~DB() {}

X
Xu Peng 已提交
268 269
DB* DB::Open(const Options& options) {
    DBImpl* impl = new DBImpl(options);
X
Xu Peng 已提交
270 271 272
    return impl;
}

X
Xu Peng 已提交
273 274 275
} // namespace engine
} // namespace vecwise
} // namespace zilliz