DBImpl.cpp 13.4 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#ifndef DBIMPL_CPP__
#define DBIMPL_CPP__
X
Xu Peng 已提交
8

X
Xu Peng 已提交
9
#include <assert.h>
X
Xu Peng 已提交
10
#include <chrono>
X
Xu Peng 已提交
11
#include <thread>
12
#include <iostream>
X
xj.lin 已提交
13
#include <cstring>
14
#include <easylogging++.h>
X
Xu Peng 已提交
15
#include <cache/CpuCacheMgr.h>
16

X
Xu Peng 已提交
17 18 19
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
X
Xu Peng 已提交
20

X
Xu Peng 已提交
21 22 23
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
24

25 26
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
X
Xu Peng 已提交
27 28
    : _env(options.env),
      _options(options),
29
      _bg_compaction_scheduled(false),
X
Xu Peng 已提交
30
      _shutting_down(false),
X
Xu Peng 已提交
31
      bg_build_index_started_(false),
X
Xu Peng 已提交
32
      _pMeta(new meta::DBMetaImpl(_options.meta)),
33
      _pMemMgr(new MemManager<EngineT>(_pMeta, _options)) {
X
Xu Peng 已提交
34
    start_timer_task(_options.memory_sync_interval);
X
Xu Peng 已提交
35 36
}

37
template<typename EngineT>
38 39
Status DBImpl<EngineT>::add_group(meta::TableSchema& table_schema) {
    return _pMeta->CreateTable(table_schema);
40 41
}

42
template<typename EngineT>
43
Status DBImpl<EngineT>::get_group(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
44
    return _pMeta->DescribeTable(table_schema);
45 46
}

47
template<typename EngineT>
48 49
Status DBImpl<EngineT>::has_group(const std::string& table_id, bool& has_or_not) {
    return _pMeta->HasTable(table_id, has_or_not);
50 51
}

52
template<typename EngineT>
53
Status DBImpl<EngineT>::add_vectors(const std::string& table_id_,
54
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
55
    Status status = _pMemMgr->add_vectors(table_id_, n, vectors, vector_ids_);
X
Xu Peng 已提交
56 57 58 59 60
    if (!status.ok()) {
        return status;
    }
}

61
template<typename EngineT>
62
Status DBImpl<EngineT>::search(const std::string &table_id, size_t k, size_t nq,
X
xj.lin 已提交
63
                      const float *vectors, QueryResults &results) {
X
Xu Peng 已提交
64
    meta::DatesT dates = {meta::Meta::GetDate()};
65
    return search(table_id, k, nq, vectors, dates, results);
X
Xu Peng 已提交
66 67
}

68
template<typename EngineT>
69
Status DBImpl<EngineT>::search(const std::string& table_id, size_t k, size_t nq,
X
Xu Peng 已提交
70 71
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {

72
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
73
    auto status = _pMeta->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
74 75
    if (!status.ok()) { return status; }

G
groot 已提交
76
    LOG(DEBUG) << "Search DateT Size=" << files.size();
X
Xu Peng 已提交
77

78 79
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
80 81
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
82
            file.file_type == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
83
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
84 85 86
        }
    }

X
xj.lin 已提交
87 88 89 90 91 92
    int dim = 0;
    if (!index_files.empty()) {
        dim = index_files[0].dimension;
    } else if (!raw_files.empty()) {
        dim = raw_files[0].dimension;
    } else {
G
groot 已提交
93
        LOG(DEBUG) << "no files to search";
X
xj.lin 已提交
94 95
        return Status::OK();
    }
X
xj.lin 已提交
96 97

    {
X
xj.lin 已提交
98 99 100 101
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
102
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
103 104 105 106 107 108 109 110 111
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
112 113
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
114 115 116 117
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
118

X
Xu Peng 已提交
119 120
        long search_set_size = 0;

121
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
122
            for (auto &file : file_vec) {
123
                EngineT index(file.dimension, file.location);
124 125
                index.Load();
                auto file_size = index.PhysicalSize()/(1024*1024);
X
Xu Peng 已提交
126
                search_set_size += file_size;
X
Xu Peng 已提交
127
                LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
X
Xu Peng 已提交
128
                    << file_size << " M";
X
xj.lin 已提交
129 130 131 132

                int inner_k = index.Count() < k ? index.Count() : k;
                index.Search(nq, vectors, inner_k, output_distence, output_ids);
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
133 134
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
135
            }
X
xj.lin 已提交
136
        };
X
xj.lin 已提交
137

X
xj.lin 已提交
138 139 140 141
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
142
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
143
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
144 145 146 147 148 149 150
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
151 152 153
            }

            int count = 0;
X
xj.lin 已提交
154 155 156 157 158
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
159
                    if (++count == k) break;
X
xj.lin 已提交
160
                }
X
xj.lin 已提交
161 162
            }
        };
X
xj.lin 已提交
163 164 165 166 167
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
168

X
xj.lin 已提交
169
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
170 171 172

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
X
xj.lin 已提交
173 174 175 176
                    res.emplace_back(nns[output_ids[i]]); // mapping
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
177 178
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
179 180
            }
        };
X
xj.lin 已提交
181 182 183

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
184 185

        LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
X
xj.lin 已提交
186
        cluster_topk();
X
xj.lin 已提交
187 188 189 190 191

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
192
    if (results.empty()) {
193
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
194
    }
X
Xu Peng 已提交
195 196 197
    return Status::OK();
}

198 199
template<typename EngineT>
void DBImpl<EngineT>::start_timer_task(int interval_) {
X
Xu Peng 已提交
200
    bg_timer_thread_ = std::thread(&DBImpl<EngineT>::background_timer_task, this, interval_);
X
Xu Peng 已提交
201 202
}

203 204
template<typename EngineT>
void DBImpl<EngineT>::background_timer_task(int interval_) {
X
Xu Peng 已提交
205 206 207 208 209 210 211 212 213
    Status status;
    while (true) {
        if (!_bg_error.ok()) break;
        if (_shutting_down.load(std::memory_order_acquire)) break;

        std::this_thread::sleep_for(std::chrono::seconds(interval_));

        try_schedule_compaction();
    }
214 215
}

216 217
template<typename EngineT>
void DBImpl<EngineT>::try_schedule_compaction() {
X
Xu Peng 已提交
218 219 220 221
    if (_bg_compaction_scheduled) return;
    if (!_bg_error.ok()) return;

    _bg_compaction_scheduled = true;
X
Xu Peng 已提交
222
    _env->Schedule(&DBImpl<EngineT>::BGWork, this);
X
Xu Peng 已提交
223 224
}

225 226
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
X
Xu Peng 已提交
227 228 229
    reinterpret_cast<DBImpl*>(db_)->background_call();
}

230 231
template<typename EngineT>
void DBImpl<EngineT>::background_call() {
X
Xu Peng 已提交
232 233 234
    std::lock_guard<std::mutex> lock(_mutex);
    assert(_bg_compaction_scheduled);

235 236
    if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire))
        return ;
X
Xu Peng 已提交
237 238

    background_compaction();
X
Xu Peng 已提交
239 240 241

    _bg_compaction_scheduled = false;
    _bg_work_finish_signal.notify_all();
X
Xu Peng 已提交
242 243
}

244

245
template<typename EngineT>
246
Status DBImpl<EngineT>::merge_files(const std::string& table_id, const meta::DateT& date,
247
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
248 249 250 251
    meta::TableFileSchema table_file;
    table_file.table_id = table_id;
    table_file.date = date;
    Status status = _pMeta->CreateTableFile(table_file);
X
Xu Peng 已提交
252

253
    if (!status.ok()) {
254
        LOG(INFO) << status.ToString() << std::endl;
255 256 257
        return status;
    }

X
Xu Peng 已提交
258
    EngineT index(table_file.dimension, table_file.location);
259

260
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
261
    long  index_size = 0;
262 263

    for (auto& file : files) {
X
Xu Peng 已提交
264
        index.Merge(file.location);
265
        auto file_schema = file;
266
        file_schema.file_type = meta::TableFileSchema::TO_DELETE;
267
        updated.push_back(file_schema);
X
Xu Peng 已提交
268
        LOG(DEBUG) << "Merging file " << file_schema.file_id;
X
Xu Peng 已提交
269
        index_size = index.Size();
X
Xu Peng 已提交
270 271

        if (index_size >= _options.index_trigger_size) break;
272 273
    }

X
Xu Peng 已提交
274
    index.Serialize();
X
Xu Peng 已提交
275 276

    if (index_size >= _options.index_trigger_size) {
X
Xu Peng 已提交
277
        table_file.file_type = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
278
    } else {
X
Xu Peng 已提交
279
        table_file.file_type = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
280
    }
X
Xu Peng 已提交
281 282
    table_file.size = index_size;
    updated.push_back(table_file);
X
Xu Peng 已提交
283
    status = _pMeta->UpdateTableFiles(updated);
X
Xu Peng 已提交
284
    LOG(DEBUG) << "New merged file " << table_file.file_id <<
X
Xu Peng 已提交
285
        " of size=" << index.PhysicalSize()/(1024*1024) << " M";
286

X
Xu Peng 已提交
287
    index.Cache();
X
Xu Peng 已提交
288

289 290 291
    return status;
}

292
template<typename EngineT>
293
Status DBImpl<EngineT>::background_merge_files(const std::string& table_id) {
294
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
295
    auto status = _pMeta->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
296 297 298
    if (!status.ok()) {
        return status;
    }
299

300 301 302
    /* if (raw_files.size() == 0) { */
    /*     return Status::OK(); */
    /* } */
303

X
Xu Peng 已提交
304 305
    bool has_merge = false;

306
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
307
        auto files = kv.second;
X
Xu Peng 已提交
308
        if (files.size() <= _options.merge_trigger_number) {
X
Xu Peng 已提交
309 310
            continue;
        }
X
Xu Peng 已提交
311
        has_merge = true;
312
        merge_files(table_id, kv.first, kv.second);
313
    }
X
Xu Peng 已提交
314

X
Xu Peng 已提交
315
    _pMeta->Archive();
316

317
    try_build_index();
X
Xu Peng 已提交
318

319
    _pMeta->CleanUpFilesWithTTL(1);
X
Xu Peng 已提交
320

X
Xu Peng 已提交
321 322 323
    return Status::OK();
}

324
template<typename EngineT>
325
Status DBImpl<EngineT>::build_index(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
326 327 328 329
    meta::TableFileSchema table_file;
    table_file.table_id = file.table_id;
    table_file.date = file.date;
    Status status = _pMeta->CreateTableFile(table_file);
X
Xu Peng 已提交
330 331 332 333
    if (!status.ok()) {
        return status;
    }

334
    EngineT to_index(file.dimension, file.location);
335 336

    to_index.Load();
X
Xu Peng 已提交
337
    auto index = to_index.BuildIndex(table_file.location);
338

X
Xu Peng 已提交
339 340
    table_file.file_type = meta::TableFileSchema::INDEX;
    table_file.size = index->Size();
X
Xu Peng 已提交
341

X
Xu Peng 已提交
342
    auto to_remove = file;
343
    to_remove.file_type = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
344

X
Xu Peng 已提交
345
    meta::TableFilesSchema update_files = {to_remove, table_file};
X
Xu Peng 已提交
346
    _pMeta->UpdateTableFiles(update_files);
X
Xu Peng 已提交
347

X
Xu Peng 已提交
348
    LOG(DEBUG) << "New index file " << table_file.file_id << " of size "
X
Xu Peng 已提交
349 350 351
        << index->PhysicalSize()/(1024*1024) << " M"
        << " from file " << to_remove.file_id;

352
    index->Cache();
X
Xu Peng 已提交
353
    _pMeta->Archive();
X
Xu Peng 已提交
354

X
Xu Peng 已提交
355 356 357
    return Status::OK();
}

358 359
template<typename EngineT>
void DBImpl<EngineT>::background_build_index() {
X
Xu Peng 已提交
360
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
361
    assert(bg_build_index_started_);
362
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
363
    _pMeta->FilesToIndex(to_index_files);
X
Xu Peng 已提交
364 365
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
366
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
367 368 369
        status = build_index(file);
        if (!status.ok()) {
            _bg_error = status;
X
Xu Peng 已提交
370
            return;
X
Xu Peng 已提交
371 372
        }
    }
X
Xu Peng 已提交
373
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
374 375

    bg_build_index_started_ = false;
X
Xu Peng 已提交
376
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
377 378
}

379 380
template<typename EngineT>
Status DBImpl<EngineT>::try_build_index() {
X
Xu Peng 已提交
381
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
382
    if (_shutting_down.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
383
    bg_build_index_started_ = true;
384
    std::thread build_index_task(&DBImpl<EngineT>::background_build_index, this);
X
Xu Peng 已提交
385
    build_index_task.detach();
X
Xu Peng 已提交
386
    return Status::OK();
387 388
}

389 390
template<typename EngineT>
void DBImpl<EngineT>::background_compaction() {
391 392
    std::vector<std::string> table_ids;
    _pMemMgr->serialize(table_ids);
393

X
Xu Peng 已提交
394
    Status status;
395 396
    for (auto table_id : table_ids) {
        status = background_merge_files(table_id);
X
Xu Peng 已提交
397 398 399 400
        if (!status.ok()) {
            _bg_error = status;
            return;
        }
401
    }
X
Xu Peng 已提交
402 403
}

404 405
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
406
    return _pMeta->DropAll();
X
Xu Peng 已提交
407 408
}

X
Xu Peng 已提交
409 410
template<typename EngineT>
Status DBImpl<EngineT>::size(long& result) {
X
Xu Peng 已提交
411
    return  _pMeta->Size(result);
X
Xu Peng 已提交
412 413
}

414 415
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
X
Xu Peng 已提交
416 417 418 419 420 421 422 423 424 425 426 427
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutting_down.store(true, std::memory_order_release);
        while (_bg_compaction_scheduled) {
            _bg_work_finish_signal.wait(lock);
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
428
    }
X
Xu Peng 已提交
429
    bg_timer_thread_.join();
X
Xu Peng 已提交
430 431
    std::vector<std::string> ids;
    _pMemMgr->serialize(ids);
X
Xu Peng 已提交
432
    _env->Stop();
X
Xu Peng 已提交
433 434
}

X
Xu Peng 已提交
435 436 437
} // namespace engine
} // namespace vecwise
} // namespace zilliz
438 439

#endif