DBImpl.cpp 12.2 KB
Newer Older
X
Xu Peng 已提交
1
#include <assert.h>
X
Xu Peng 已提交
2
#include <chrono>
X
Xu Peng 已提交
3
#include <thread>
4
#include <iostream>
X
xj.lin 已提交
5 6
#include <cstring>
#include <wrapper/Topk.h>
7
#include <easylogging++.h>
X
Xu Peng 已提交
8
#include <cache/CpuCacheMgr.h>
9

X
Xu Peng 已提交
10 11 12
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
13
#include "FaissExecutionEngine.h"
X
Xu Peng 已提交
14

X
Xu Peng 已提交
15 16 17
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
18

19 20
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
X
Xu Peng 已提交
21 22
    : _env(options.env),
      _options(options),
23
      _bg_compaction_scheduled(false),
X
Xu Peng 已提交
24
      _shutting_down(false),
X
Xu Peng 已提交
25
      bg_build_index_started_(false),
X
Xu Peng 已提交
26
      _pMeta(new meta::DBMetaImpl(_options.meta)),
27
      _pMemMgr(new MemManager<EngineT>(_pMeta, _options)) {
X
Xu Peng 已提交
28
    start_timer_task(_options.memory_sync_interval);
X
Xu Peng 已提交
29 30
}

31 32
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::GroupSchema& group_info) {
33
    return _pMeta->add_group(group_info);
34 35
}

36 37
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
38
    return _pMeta->get_group(group_info);
39 40
}

41 42
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
43 44 45
    return _pMeta->has_group(group_id_, has_or_not_);
}

46 47
template<typename EngineT>
Status DBImpl<EngineT>::get_group_files(const std::string& group_id,
X
Xu Peng 已提交
48
                               const int date_delta,
49
                               meta::GroupFilesSchema& group_files_info) {
X
Xu Peng 已提交
50
    return _pMeta->get_group_files(group_id, date_delta, group_files_info);
51

X
Xu Peng 已提交
52 53
}

54 55
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
56
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
X
Xu Peng 已提交
57 58 59 60 61 62
    Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
    if (!status.ok()) {
        return status;
    }
}

63 64
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string &group_id, size_t k, size_t nq,
X
xj.lin 已提交
65
                      const float *vectors, QueryResults &results) {
X
Xu Peng 已提交
66 67 68 69
    meta::DatesT dates = {meta::Meta::GetDate()};
    return search(group_id, k, nq, vectors, dates, results);
}

70 71
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
X
Xu Peng 已提交
72 73
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {

X
xj.lin 已提交
74
    meta::DatePartionedGroupFilesSchema files;
X
Xu Peng 已提交
75
    auto status = _pMeta->files_to_search(group_id, dates, files);
X
xj.lin 已提交
76 77
    if (!status.ok()) { return status; }

X
Xu Peng 已提交
78
    /* LOG(DEBUG) << "Search DateT Size=" << files.size(); */
X
Xu Peng 已提交
79

X
xj.lin 已提交
80 81 82 83
    meta::GroupFilesSchema index_files;
    meta::GroupFilesSchema raw_files;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
X
xj.lin 已提交
84 85
            file.file_type == meta::GroupFileSchema::INDEX ?
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
86 87 88
        }
    }

X
xj.lin 已提交
89 90 91 92 93 94 95 96
    int dim = 0;
    if (!index_files.empty()) {
        dim = index_files[0].dimension;
    } else if (!raw_files.empty()) {
        dim = raw_files[0].dimension;
    } else {
        return Status::OK();
    }
X
xj.lin 已提交
97 98

    {
X
xj.lin 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

        auto cluster = [&](long *nns, float *dis) -> void {
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
113 114
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
115 116 117 118
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
119

X
Xu Peng 已提交
120 121
        long search_set_size = 0;

X
xj.lin 已提交
122 123
        auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void {
            for (auto &file : file_vec) {
124
                EngineT index(file.dimension, file.location);
125 126
                index.Load();
                auto file_size = index.PhysicalSize()/(1024*1024);
X
Xu Peng 已提交
127
                search_set_size += file_size;
X
Xu Peng 已提交
128
                LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
X
Xu Peng 已提交
129
                    << file_size << " M";
130
                index.Search(nq, vectors, k, output_distence, output_ids);
X
xj.lin 已提交
131 132 133
                cluster(output_ids, output_distence); // cluster to each query
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
134
            }
X
xj.lin 已提交
135
        };
X
xj.lin 已提交
136

X
xj.lin 已提交
137 138 139 140 141 142 143 144 145 146 147
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
                TopK(dis.data(), dis.size(), k, output_distence, output_ids);
                for (int i = 0; i < k; ++i) {
                    res.emplace_back(nns[output_ids[i]]); // mapping
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
148 149
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
150 151
            }
        };
X
xj.lin 已提交
152 153 154

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
155 156

        LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
X
xj.lin 已提交
157
        cluster_topk();
X
xj.lin 已提交
158 159 160 161 162

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
163 164 165
    if (results.empty()) {
        return Status::NotFound("Group " + group_id + ", search result not found!");
    }
X
Xu Peng 已提交
166 167 168
    return Status::OK();
}

169 170 171
template<typename EngineT>
void DBImpl<EngineT>::start_timer_task(int interval_) {
    std::thread bg_task(&DBImpl<EngineT>::background_timer_task, this, interval_);
X
Xu Peng 已提交
172 173 174
    bg_task.detach();
}

175 176
template<typename EngineT>
void DBImpl<EngineT>::background_timer_task(int interval_) {
X
Xu Peng 已提交
177 178 179 180 181 182 183 184 185
    Status status;
    while (true) {
        if (!_bg_error.ok()) break;
        if (_shutting_down.load(std::memory_order_acquire)) break;

        std::this_thread::sleep_for(std::chrono::seconds(interval_));

        try_schedule_compaction();
    }
186 187
}

188 189
template<typename EngineT>
void DBImpl<EngineT>::try_schedule_compaction() {
X
Xu Peng 已提交
190 191 192 193
    if (_bg_compaction_scheduled) return;
    if (!_bg_error.ok()) return;

    _bg_compaction_scheduled = true;
194
    _env->schedule(&DBImpl<EngineT>::BGWork, this);
X
Xu Peng 已提交
195 196
}

197 198
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
X
Xu Peng 已提交
199 200 201
    reinterpret_cast<DBImpl*>(db_)->background_call();
}

202 203
template<typename EngineT>
void DBImpl<EngineT>::background_call() {
X
Xu Peng 已提交
204 205 206
    std::lock_guard<std::mutex> lock(_mutex);
    assert(_bg_compaction_scheduled);

207 208
    if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire))
        return ;
X
Xu Peng 已提交
209 210

    background_compaction();
X
Xu Peng 已提交
211 212 213

    _bg_compaction_scheduled = false;
    _bg_work_finish_signal.notify_all();
X
Xu Peng 已提交
214 215
}

216

217 218
template<typename EngineT>
Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::DateT& date,
219 220
        const meta::GroupFilesSchema& files) {
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
221 222 223
    group_file.group_id = group_id;
    group_file.date = date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
224

225
    if (!status.ok()) {
226
        LOG(INFO) << status.ToString() << std::endl;
227 228 229
        return status;
    }

230
    EngineT index(group_file.dimension, group_file.location);
231 232

    meta::GroupFilesSchema updated;
X
Xu Peng 已提交
233
    long  index_size = 0;
234 235

    for (auto& file : files) {
X
Xu Peng 已提交
236
        index.Merge(file.location);
237 238 239
        auto file_schema = file;
        file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
        updated.push_back(file_schema);
X
Xu Peng 已提交
240 241
        /* LOG(DEBUG) << "About to merge file " << file_schema.file_id << */
        /*     " of size=" << file_schema.rows; */
X
Xu Peng 已提交
242
        index_size = index.Size();
X
Xu Peng 已提交
243 244

        if (index_size >= _options.index_trigger_size) break;
245 246
    }

X
Xu Peng 已提交
247
    index.Serialize();
X
Xu Peng 已提交
248 249 250 251 252 253

    if (index_size >= _options.index_trigger_size) {
        group_file.file_type = meta::GroupFileSchema::TO_INDEX;
    } else {
        group_file.file_type = meta::GroupFileSchema::RAW;
    }
X
Xu Peng 已提交
254
    group_file.rows = index_size;
255 256
    updated.push_back(group_file);
    status = _pMeta->update_files(updated);
X
Xu Peng 已提交
257 258
    /* LOG(DEBUG) << "New merged file " << group_file.file_id << */
    /*     " of size=" << group_file.rows; */
259

X
Xu Peng 已提交
260
    index.Cache();
X
Xu Peng 已提交
261

262 263 264
    return status;
}

265 266
template<typename EngineT>
Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
267
    meta::DatePartionedGroupFilesSchema raw_files;
X
Xu Peng 已提交
268 269 270 271
    auto status = _pMeta->files_to_merge(group_id, raw_files);
    if (!status.ok()) {
        return status;
    }
272

273 274 275
    /* if (raw_files.size() == 0) { */
    /*     return Status::OK(); */
    /* } */
276

X
Xu Peng 已提交
277 278
    bool has_merge = false;

279
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
280
        auto files = kv.second;
X
Xu Peng 已提交
281
        if (files.size() <= _options.merge_trigger_number) {
X
Xu Peng 已提交
282 283
            continue;
        }
X
Xu Peng 已提交
284
        has_merge = true;
285 286
        merge_files(group_id, kv.first, kv.second);
    }
X
Xu Peng 已提交
287

288
    try_build_index();
X
Xu Peng 已提交
289

X
Xu Peng 已提交
290 291
    _pMeta->cleanup_ttl_files(1);

X
Xu Peng 已提交
292 293 294
    return Status::OK();
}

295 296
template<typename EngineT>
Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
X
Xu Peng 已提交
297
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
298 299 300
    group_file.group_id = file.group_id;
    group_file.date = file.date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
301 302 303 304
    if (!status.ok()) {
        return status;
    }

305
    EngineT to_index(file.dimension, file.location);
306 307 308 309

    to_index.Load();
    auto index = to_index.BuildIndex(group_file.location);

X
Xu Peng 已提交
310
    group_file.file_type = meta::GroupFileSchema::INDEX;
311
    group_file.rows = index->Size();
X
Xu Peng 已提交
312

X
Xu Peng 已提交
313 314
    auto to_remove = file;
    to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
X
Xu Peng 已提交
315

X
Xu Peng 已提交
316 317
    meta::GroupFilesSchema update_files = {to_remove, group_file};
    _pMeta->update_files(update_files);
X
Xu Peng 已提交
318

319
    index->Cache();
X
Xu Peng 已提交
320

X
Xu Peng 已提交
321 322 323
    return Status::OK();
}

324 325
template<typename EngineT>
void DBImpl<EngineT>::background_build_index() {
X
Xu Peng 已提交
326
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
327 328
    assert(bg_build_index_started_);
    meta::GroupFilesSchema to_index_files;
X
Xu Peng 已提交
329
    _pMeta->files_to_index(to_index_files);
X
Xu Peng 已提交
330 331
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
332
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
333 334 335
        status = build_index(file);
        if (!status.ok()) {
            _bg_error = status;
X
Xu Peng 已提交
336
            return;
X
Xu Peng 已提交
337 338
        }
    }
X
Xu Peng 已提交
339
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
340 341

    bg_build_index_started_ = false;
X
Xu Peng 已提交
342
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
343 344
}

345 346
template<typename EngineT>
Status DBImpl<EngineT>::try_build_index() {
X
Xu Peng 已提交
347
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
348
    if (_shutting_down.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
349
    bg_build_index_started_ = true;
350
    std::thread build_index_task(&DBImpl<EngineT>::background_build_index, this);
X
Xu Peng 已提交
351
    build_index_task.detach();
X
Xu Peng 已提交
352
    return Status::OK();
353 354
}

355 356
template<typename EngineT>
void DBImpl<EngineT>::background_compaction() {
357 358
    std::vector<std::string> group_ids;
    _pMemMgr->serialize(group_ids);
359

X
Xu Peng 已提交
360 361 362 363 364 365 366
    Status status;
    for (auto group_id : group_ids) {
        status = background_merge_files(group_id);
        if (!status.ok()) {
            _bg_error = status;
            return;
        }
367
    }
X
Xu Peng 已提交
368 369
}

370 371
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
X
Xu Peng 已提交
372 373 374
    return _pMeta->drop_all();
}

375 376
template<typename EngineT>
Status DBImpl<EngineT>::count(const std::string& group_id, long& result) {
X
Xu Peng 已提交
377 378 379
    return _pMeta->count(group_id, result);
}

380 381
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
X
Xu Peng 已提交
382 383 384 385 386 387 388 389 390 391 392 393
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutting_down.store(true, std::memory_order_release);
        while (_bg_compaction_scheduled) {
            _bg_work_finish_signal.wait(lock);
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
394
    }
X
Xu Peng 已提交
395 396
    std::vector<std::string> ids;
    _pMemMgr->serialize(ids);
X
Xu Peng 已提交
397 398
}

X
Xu Peng 已提交
399 400 401 402 403 404
/*
 *  DB
 */

DB::~DB() {}

X
Xu Peng 已提交
405 406
void DB::Open(const Options& options, DB** dbptr) {
    *dbptr = nullptr;
407
    *dbptr = new DBImpl<FaissExecutionEngine>(options);
X
Xu Peng 已提交
408
    return;
X
Xu Peng 已提交
409 410
}

X
Xu Peng 已提交
411 412 413
} // namespace engine
} // namespace vecwise
} // namespace zilliz