DBImpl.cpp 38.6 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/DBImpl.h"
S
starlord 已提交
19
#include "Utils.h"
S
starlord 已提交
20 21
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
22
#include "engine/EngineFactory.h"
S
starlord 已提交
23
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
24
#include "meta/MetaConsts.h"
S
starlord 已提交
25 26
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
27
#include "metrics/Metrics.h"
S
starlord 已提交
28
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
29
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
30 31
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
S
starlord 已提交
32
#include "utils/Log.h"
G
groot 已提交
33
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
34
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
35

X
Xu Peng 已提交
36
#include <assert.h>
S
starlord 已提交
37
#include <algorithm>
G
groot 已提交
38
#include <boost/filesystem.hpp>
S
starlord 已提交
39 40 41
#include <chrono>
#include <cstring>
#include <iostream>
G
groot 已提交
42
#include <set>
S
starlord 已提交
43
#include <thread>
G
groot 已提交
44
#include <utility>
X
Xu Peng 已提交
45

J
jinhai 已提交
46
namespace milvus {
X
Xu Peng 已提交
47
namespace engine {
X
Xu Peng 已提交
48

G
groot 已提交
49 50
namespace {

J
jinhai 已提交
51 52 53
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
54

G
groot 已提交
55 56
constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1;

G
groot 已提交
57 58 59 60 61 62 63 64 65 66 67
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!");

void
TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) {
    for (auto& day_files : date_files) {
        for (auto& file : day_files.second) {
            files_array.push_back(file);
        }
    }
}

S
starlord 已提交
68
}  // namespace
G
groot 已提交
69

Y
Yu Kun 已提交
70
DBImpl::DBImpl(const DBOptions& options)
S
starlord 已提交
71
    : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
72
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
73
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
74 75 76 77 78 79 80
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
81
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
82
// external api
S
starlord 已提交
83
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
84 85 86
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
87 88 89
        return Status::OK();
    }

S
Shouyu Luo 已提交
90
    // ENGINE_LOG_TRACE << "DB service start";
S
starlord 已提交
91 92
    shutting_down_.store(false, std::memory_order_release);

S
starlord 已提交
93
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
94
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
S
Shouyu Luo 已提交
95
        // ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
96
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
97
    }
S
starlord 已提交
98

S
starlord 已提交
99 100 101
    return Status::OK();
}

S
starlord 已提交
102 103 104
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
105 106 107 108
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
S
starlord 已提交
109

S
starlord 已提交
110
    // makesure all memory data serialized
G
groot 已提交
111 112
    std::set<std::string> sync_table_ids;
    SyncMemData(sync_table_ids);
S
starlord 已提交
113

S
starlord 已提交
114
    // wait compaction/buildindex finish
S
starlord 已提交
115
    bg_timer_thread_.join();
S
starlord 已提交
116

Y
yudong.cai 已提交
117
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
118
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
119 120
    }

S
Shouyu Luo 已提交
121
    // ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
122
    return Status::OK();
X
Xu Peng 已提交
123 124
}

S
starlord 已提交
125 126
Status
DBImpl::DropAll() {
S
starlord 已提交
127 128 129
    return meta_ptr_->DropAll();
}

S
starlord 已提交
130
Status
Y
Yu Kun 已提交
131
DBImpl::CreateTable(meta::TableSchema& table_schema) {
S
starlord 已提交
132
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
133
        return SHUTDOWN_ERROR;
S
starlord 已提交
134 135
    }

136
    meta::TableSchema temp_schema = table_schema;
S
starlord 已提交
137
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
138
    return meta_ptr_->CreateTable(temp_schema);
139 140
}

S
starlord 已提交
141
Status
G
groot 已提交
142
DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
S
starlord 已提交
143
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
144
        return SHUTDOWN_ERROR;
S
starlord 已提交
145 146
    }

G
groot 已提交
147
    return DropTableRecursively(table_id, dates);
G
groot 已提交
148 149
}

S
starlord 已提交
150
Status
Y
Yu Kun 已提交
151
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
S
starlord 已提交
152
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
153
        return SHUTDOWN_ERROR;
S
starlord 已提交
154 155
    }

S
starlord 已提交
156
    auto stat = meta_ptr_->DescribeTable(table_schema);
S
starlord 已提交
157
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
158
    return stat;
159 160
}

S
starlord 已提交
161
Status
Y
Yu Kun 已提交
162
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
163
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
164
        return SHUTDOWN_ERROR;
S
starlord 已提交
165 166
    }

G
groot 已提交
167
    return meta_ptr_->HasTable(table_id, has_or_not);
168 169
}

S
starlord 已提交
170
Status
Y
Yu Kun 已提交
171
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
S
starlord 已提交
172
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
173
        return SHUTDOWN_ERROR;
S
starlord 已提交
174 175
    }

G
groot 已提交
176
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
177 178
}

S
starlord 已提交
179
Status
Y
Yu Kun 已提交
180
DBImpl::PreloadTable(const std::string& table_id) {
S
starlord 已提交
181
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
182
        return SHUTDOWN_ERROR;
S
starlord 已提交
183 184
    }

G
groot 已提交
185
    // get all table files from parent table
G
groot 已提交
186
    meta::DatesT dates;
187
    std::vector<size_t> ids;
G
groot 已提交
188
    meta::TableFilesSchema files_array;
G
groot 已提交
189
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
Y
Yu Kun 已提交
190 191 192
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
193

G
groot 已提交
194 195 196 197
    // get files from partition tables
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
G
groot 已提交
198
        status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
199 200
    }

Y
Yu Kun 已提交
201 202
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
203 204
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
205

G
groot 已提交
206 207 208 209 210 211 212
    for (auto& file : files_array) {
        ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                         (MetricType)file.metric_type_, file.nlist_);
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
213

G
groot 已提交
214 215 216 217 218 219 220 221 222 223 224
        size += engine->PhysicalSize();
        if (size > available_size) {
            return Status(SERVER_CACHE_FULL, "Cache is full");
        } else {
            try {
                // step 1: load index
                engine->Load(true);
            } catch (std::exception& ex) {
                std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
                ENGINE_LOG_ERROR << msg;
                return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
225 226 227
            }
        }
    }
G
groot 已提交
228

Y
Yu Kun 已提交
229
    return Status::OK();
Y
Yu Kun 已提交
230 231
}

S
starlord 已提交
232
Status
Y
Yu Kun 已提交
233
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
S
starlord 已提交
234
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
235
        return SHUTDOWN_ERROR;
S
starlord 已提交
236 237
    }

S
starlord 已提交
238 239 240
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

S
starlord 已提交
241
Status
Y
Yu Kun 已提交
242
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
S
starlord 已提交
243
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
        return SHUTDOWN_ERROR;
    }

    return GetTableRowCountRecursively(table_id, row_count);
}

Status
DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name,
                        const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->CreatePartition(table_id, partition_name, partition_tag);
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
S
starlord 已提交
264 265
    }

G
groot 已提交
266 267 268 269 270 271 272 273 274 275
    auto status = mem_mgr_->EraseMemVector(partition_name);  // not allow insert
    status = meta_ptr_->DropPartition(partition_name);       // soft delete table

    // scheduler will determine when to delete table files
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
276 277
}

S
starlord 已提交
278
Status
G
groot 已提交
279 280 281 282 283 284 285
DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
    auto status = meta_ptr_->GetPartitionName(table_id, partition_tag, partition_name);
286 287 288 289 290
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }

G
groot 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
    return DropPartition(partition_name);
}

Status
DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->ShowPartitions(table_id, partiton_schema_array);
}

Status
DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors,
                      IDNumbers& vector_ids) {
S
starlord 已提交
306
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
S
starlord 已提交
307
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
308
        return SHUTDOWN_ERROR;
S
starlord 已提交
309
    }
Y
yu yunfeng 已提交
310

G
groot 已提交
311
    // if partition is specified, use partition as target table
312
    Status status;
G
groot 已提交
313 314 315 316
    std::string target_table_name = table_id;
    if (!partition_tag.empty()) {
        std::string partition_name;
        status = meta_ptr_->GetPartitionName(table_id, partition_tag, target_table_name);
G
groot 已提交
317 318 319 320
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
            return status;
        }
G
groot 已提交
321 322 323
    }

    // insert vectors into target table
S
starlord 已提交
324
    milvus::server::CollectInsertMetrics metrics(n, status);
G
groot 已提交
325
    status = mem_mgr_->InsertVectors(target_table_name, n, vectors, vector_ids);
S
starlord 已提交
326

G
groot 已提交
327
    return status;
X
Xu Peng 已提交
328 329
}

S
starlord 已提交
330
Status
Y
Yu Kun 已提交
331
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
G
groot 已提交
332 333 334 335
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
336 337 338 339
    // serialize memory data
    std::set<std::string> sync_table_ids;
    auto status = SyncMemData(sync_table_ids);

S
starlord 已提交
340 341 342
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
343
        // step 1: check index difference
S
starlord 已提交
344
        TableIndex old_index;
G
groot 已提交
345
        status = DescribeIndex(table_id, old_index);
S
starlord 已提交
346
        if (!status.ok()) {
S
starlord 已提交
347 348 349 350
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

S
starlord 已提交
351
        // step 2: update index info
S
starlord 已提交
352
        TableIndex new_index = index;
S
starlord 已提交
353
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
S
starlord 已提交
354
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
355
            status = UpdateTableIndexRecursively(table_id, new_index);
S
starlord 已提交
356 357 358 359 360 361
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
362 363
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
364 365
    WaitMergeFileFinish();

S
starlord 已提交
366
    // step 4: wait and build index
G
groot 已提交
367
    status = CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
368
    status = BuildTableIndexRecursively(table_id, index);
S
starlord 已提交
369

G
groot 已提交
370
    return status;
S
starlord 已提交
371 372
}

S
starlord 已提交
373
Status
Y
Yu Kun 已提交
374
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
G
groot 已提交
375 376 377 378
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
379 380 381
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

S
starlord 已提交
382
Status
Y
Yu Kun 已提交
383
DBImpl::DropIndex(const std::string& table_id) {
G
groot 已提交
384 385 386 387
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
388
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
G
groot 已提交
389
    return DropTableIndexRecursively(table_id);
S
starlord 已提交
390 391
}

S
starlord 已提交
392
Status
G
groot 已提交
393 394
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
S
starlord 已提交
395
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
396
        return SHUTDOWN_ERROR;
S
starlord 已提交
397 398
    }

399
    meta::DatesT dates = {utils::GetDate()};
G
groot 已提交
400
    Status result = Query(table_id, partition_tags, k, nq, nprobe, vectors, dates, result_ids, result_distances);
Y
yu yunfeng 已提交
401
    return result;
X
Xu Peng 已提交
402 403
}

S
starlord 已提交
404
Status
G
groot 已提交
405 406 407
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
              ResultDistances& result_distances) {
S
starlord 已提交
408
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
409
        return SHUTDOWN_ERROR;
S
starlord 已提交
410 411
    }

412
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
413

G
groot 已提交
414
    Status status;
415
    std::vector<size_t> ids;
G
groot 已提交
416
    meta::TableFilesSchema files_array;
417

G
groot 已提交
418 419 420
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
421
        status = GetFilesToSearch(table_id, ids, dates, files_array);
G
groot 已提交
422 423 424 425 426 427 428
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::TableSchema> partiton_array;
        status = meta_ptr_->ShowPartitions(table_id, partiton_array);
        for (auto& schema : partiton_array) {
G
groot 已提交
429
            status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
430 431 432 433 434 435 436
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(table_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
437
            status = GetFilesToSearch(partition_name, ids, dates, files_array);
438 439 440
        }
    }

S
starlord 已提交
441
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
442
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
443
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
444
    return status;
G
groot 已提交
445
}
X
Xu Peng 已提交
446

S
starlord 已提交
447
Status
G
groot 已提交
448 449 450
DBImpl::QueryByFileID(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
                      uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
                      ResultDistances& result_distances) {
S
starlord 已提交
451
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
452
        return SHUTDOWN_ERROR;
S
starlord 已提交
453 454
    }

455
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
456

S
starlord 已提交
457
    // get specified files
458
    std::vector<size_t> ids;
Y
Yu Kun 已提交
459
    for (auto& id : file_ids) {
460
        meta::TableFileSchema table_file;
461 462
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
463
        ids.push_back(std::stoul(id, &sz));
464 465
    }

G
groot 已提交
466
    meta::TableFilesSchema files_array;
G
groot 已提交
467
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
468 469
    if (!status.ok()) {
        return status;
470 471
    }

G
groot 已提交
472
    if (files_array.empty()) {
S
starlord 已提交
473
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
474 475
    }

S
starlord 已提交
476
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
477
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
478
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
479
    return status;
480 481
}

S
starlord 已提交
482
Status
Y
Yu Kun 已提交
483
DBImpl::Size(uint64_t& result) {
S
starlord 已提交
484
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
485
        return SHUTDOWN_ERROR;
S
starlord 已提交
486 487
    }

S
starlord 已提交
488
    return meta_ptr_->Size(result);
S
starlord 已提交
489 490 491
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
492
// internal methods
S
starlord 已提交
493
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
494
Status
Y
Yu Kun 已提交
495
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
G
groot 已提交
496
                   uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
Y
Yu Kun 已提交
497 498
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
499
    TimeRecorder rc("");
G
groot 已提交
500

S
starlord 已提交
501
    // step 1: get files to search
502
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
W
wxyu 已提交
503
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
Y
Yu Kun 已提交
504
    for (auto& file : files) {
S
starlord 已提交
505
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
506
        job->AddIndexFile(file_ptr);
G
groot 已提交
507 508
    }

S
starlord 已提交
509
    // step 2: put search task to scheduler
S
starlord 已提交
510
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
511 512 513
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
514
    }
G
groot 已提交
515

516
    // step 3: construct results
G
groot 已提交
517 518
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
519
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
520 521 522 523

    return Status::OK();
}

S
starlord 已提交
524 525
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
526
    Status status;
Y
yu yunfeng 已提交
527
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
528
    while (true) {
S
starlord 已提交
529
        if (shutting_down_.load(std::memory_order_acquire)) {
530 531
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
532 533

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
534 535
            break;
        }
X
Xu Peng 已提交
536

G
groot 已提交
537
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
538

G
groot 已提交
539
        StartMetricTask();
G
groot 已提交
540 541 542
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
543 544
}

S
starlord 已提交
545 546
void
DBImpl::WaitMergeFileFinish() {
547
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
Y
Yu Kun 已提交
548
    for (auto& iter : compact_thread_results_) {
549 550 551 552
        iter.wait();
    }
}

S
starlord 已提交
553 554
void
DBImpl::WaitBuildIndexFinish() {
555
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
556
    for (auto& iter : index_thread_results_) {
557 558 559 560
        iter.wait();
    }
}

S
starlord 已提交
561 562
void
DBImpl::StartMetricTask() {
G
groot 已提交
563 564
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
S
starlord 已提交
565
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
566 567 568
        return;
    }

S
Shouyu Luo 已提交
569
    // ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
570

G
groot 已提交
571 572 573
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
J
JinHai-CN 已提交
574 575 576 577 578 579 580
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
581
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
582 583 584 585 586 587 588 589
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
590

K
kun yu 已提交
591
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
592 593
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
594

S
Shouyu Luo 已提交
595
    // ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
596 597
}

S
starlord 已提交
598
Status
G
groot 已提交
599
DBImpl::SyncMemData(std::set<std::string>& sync_table_ids) {
600
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
601
    std::set<std::string> temp_table_ids;
G
groot 已提交
602
    mem_mgr_->Serialize(temp_table_ids);
Y
Yu Kun 已提交
603
    for (auto& id : temp_table_ids) {
G
groot 已提交
604
        sync_table_ids.insert(id);
G
groot 已提交
605
    }
X
Xu Peng 已提交
606

S
starlord 已提交
607
    if (!temp_table_ids.empty()) {
608 609
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
610

611 612 613
    return Status::OK();
}

S
starlord 已提交
614 615
void
DBImpl::StartCompactionTask() {
616 617
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
S
starlord 已提交
618
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
619 620 621
        return;
    }

S
starlord 已提交
622
    // serialize memory data
G
groot 已提交
623
    SyncMemData(compact_table_ids_);
624

S
starlord 已提交
625
    // compactiong has been finished?
626 627 628 629 630 631 632
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
633 634
        }
    }
X
Xu Peng 已提交
635

S
starlord 已提交
636
    // add new compaction task
637 638 639
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
640 641 642
            // collect merge files for all tables(if compact_table_ids_ is empty) for two reasons:
            // 1. other tables may still has un-merged files
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
G
groot 已提交
643
            if (compact_table_ids_.empty()) {
644 645
                std::vector<meta::TableSchema> table_schema_array;
                meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
646
                for (auto& schema : table_schema_array) {
647 648 649 650 651
                    compact_table_ids_.insert(schema.table_id_);
                }
            }

            // start merge file thread
652
            compact_thread_results_.push_back(
G
groot 已提交
653
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
654 655
            compact_table_ids_.clear();
        }
G
groot 已提交
656
    }
X
Xu Peng 已提交
657 658
}

S
starlord 已提交
659
Status
Y
Yu Kun 已提交
660
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
S
starlord 已提交
661
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
662

S
starlord 已提交
663
    // step 1: create table file
X
Xu Peng 已提交
664
    meta::TableFileSchema table_file;
G
groot 已提交
665 666
    table_file.table_id_ = table_id;
    table_file.date_ = date;
667
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
668
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
669

670
    if (!status.ok()) {
S
starlord 已提交
671
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
672 673 674
        return status;
    }

S
starlord 已提交
675
    // step 2: merge files
G
groot 已提交
676
    ExecutionEnginePtr index =
Y
Yu Kun 已提交
677 678
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
679

680
    meta::TableFilesSchema updated;
S
starlord 已提交
681
    int64_t index_size = 0;
682

Y
Yu Kun 已提交
683
    for (auto& file : files) {
Y
Yu Kun 已提交
684
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
685

G
groot 已提交
686
        index->Merge(file.location_);
687
        auto file_schema = file;
G
groot 已提交
688
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
689
        updated.push_back(file_schema);
G
groot 已提交
690
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
691
        index_size = index->Size();
X
Xu Peng 已提交
692

S
starlord 已提交
693
        if (index_size >= file_schema.index_file_size_) {
S
starlord 已提交
694
            break;
S
starlord 已提交
695
        }
696 697
    }

S
starlord 已提交
698
    // step 3: serialize to disk
S
starlord 已提交
699 700
    try {
        index->Serialize();
Y
Yu Kun 已提交
701
    } catch (std::exception& ex) {
S
starlord 已提交
702
        // typical error: out of disk space or permition denied
S
starlord 已提交
703
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
704
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
705

S
starlord 已提交
706 707 708
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
709

S
starlord 已提交
710 711 712
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

S
starlord 已提交
713
        return Status(DB_ERROR, msg);
S
starlord 已提交
714 715
    }

S
starlord 已提交
716 717 718
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
Y
Yu Kun 已提交
719
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
720 721
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
722 723 724
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
725 726
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
727
    updated.push_back(table_file);
G
groot 已提交
728
    status = meta_ptr_->UpdateTableFiles(updated);
S
starlord 已提交
729
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
730

S
starlord 已提交
731
    if (options_.insert_cache_immediately_) {
S
starlord 已提交
732 733
        index->Cache();
    }
X
Xu Peng 已提交
734

735 736 737
    return status;
}

S
starlord 已提交
738
Status
Y
Yu Kun 已提交
739
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
740
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
741
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
742
    if (!status.ok()) {
S
starlord 已提交
743
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
744 745
        return status;
    }
746

Y
Yu Kun 已提交
747
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
748
        auto files = kv.second;
S
starlord 已提交
749
        if (files.size() < options_.merge_trigger_number_) {
750
            ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
751 752
            continue;
        }
753

X
Xu Peng 已提交
754
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
755

S
starlord 已提交
756
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
757
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
758 759
            break;
        }
760
    }
X
Xu Peng 已提交
761

G
groot 已提交
762 763
    return Status::OK();
}
764

S
starlord 已提交
765 766
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
S
Shouyu Luo 已提交
767
    // ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
768

G
groot 已提交
769
    Status status;
Y
Yu Kun 已提交
770
    for (auto& table_id : table_ids) {
G
groot 已提交
771 772
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
773
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
774
        }
S
starlord 已提交
775

S
starlord 已提交
776
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
777 778 779
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
780
    }
X
Xu Peng 已提交
781

G
groot 已提交
782
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
783

784 785 786 787 788 789 790 791 792 793 794
    {
        uint64_t ttl = 10 * meta::SECOND;  // default: file data will be erase from cache after few seconds
        meta_ptr_->CleanUpCacheWithTTL(ttl);
    }

    {
        uint64_t ttl = 5 * meta::M_SEC;  // default: file will be deleted after few minutes
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
            ttl = meta::D_SEC;
        }
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
795
    }
S
starlord 已提交
796

S
Shouyu Luo 已提交
797
    // ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
798
}
X
Xu Peng 已提交
799

S
starlord 已提交
800 801
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
802 803
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
S
starlord 已提交
804
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
805 806 807
        return;
    }

S
starlord 已提交
808
    // build index has been finished?
809 810 811 812 813 814 815
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
816 817 818
        }
    }

S
starlord 已提交
819
    // add new build index task
820 821 822
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
823
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
824
        }
G
groot 已提交
825
    }
X
Xu Peng 已提交
826 827
}

S
starlord 已提交
828 829
void
DBImpl::BackgroundBuildIndex() {
S
Shouyu Luo 已提交
830
    // ENGINE_LOG_TRACE << "Background build index thread start";
S
starlord 已提交
831

P
peng.xu 已提交
832
    std::unique_lock<std::mutex> lock(build_index_mutex_);
833
    meta::TableFilesSchema to_index_files;
G
groot 已提交
834
    meta_ptr_->FilesToIndex(to_index_files);
G
groot 已提交
835
    Status status = IgnoreFailedIndexFiles(to_index_files);
836

837 838
    if (!to_index_files.empty()) {
        // step 2: put build index task to scheduler
G
groot 已提交
839
        std::map<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr> job2file_map;
840
        for (auto& file : to_index_files) {
G
groot 已提交
841
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
842 843
            scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
844 845
            scheduler::JobMgrInst::GetInstance()->Put(job);
            job2file_map.insert(std::make_pair(job, file_ptr));
846
        }
G
groot 已提交
847 848 849 850 851 852 853 854 855 856 857 858 859 860

        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
            meta::TableFileSchema& file_schema = *(iter->second.get());
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
                ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();

                MarkFailedIndexFile(file_schema);
            } else {
                MarkSucceedIndexFile(file_schema);
                ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
            }
861
        }
G
groot 已提交
862 863

        ENGINE_LOG_DEBUG << "Background build index thread finished";
Y
Yu Kun 已提交
864
    }
Y
Yu Kun 已提交
865

S
Shouyu Luo 已提交
866
    // ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
867 868
}

G
groot 已提交
869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
Status
DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
                             meta::TableFilesSchema& files) {
    files.clear();
    auto status = meta_ptr_->FilesByType(table_id, file_types, files);

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
        if ((*it).file_type_ == static_cast<int>(meta::TableFileSchema::RAW) &&
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
            it++;
        }
    }

    return Status::OK();
}

G
groot 已提交
888
Status
G
groot 已提交
889
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
G
groot 已提交
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
                         meta::TableFilesSchema& files) {
    meta::DatePartionedTableFilesSchema date_files;
    auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
    if (!status.ok()) {
        return status;
    }

    TraverseFiles(date_files, files);
    return Status::OK();
}

Status
DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags,
                            std::set<std::string>& partition_name_array) {
    std::vector<meta::TableSchema> partiton_array;
    auto status = meta_ptr_->ShowPartitions(table_id, partiton_array);

    for (auto& tag : partition_tags) {
908 909 910 911
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
912
        for (auto& schema : partiton_array) {
913
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
G
groot 已提交
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
                partition_name_array.insert(schema.table_id_);
            }
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& dates) {
    // dates partly delete files of the table but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;

    Status status;
    if (dates.empty()) {
        status = mem_mgr_->EraseMemVector(table_id);  // not allow insert
        status = meta_ptr_->DropTable(table_id);      // soft delete table
G
groot 已提交
931
        CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996

        // scheduler will determine when to delete table files
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitAndDelete();
    } else {
        status = meta_ptr_->DropDataByDate(table_id, dates);
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableRecursively(schema.table_id_, dates);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    DropIndex(table_id);

    auto status = meta_ptr_->UpdateTableIndex(table_id, index);
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
        return status;
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = UpdateTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
        };
    } else {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::RAW),
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
            static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
            static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
        };
    }

    // get files to build index
G
groot 已提交
997 998
    meta::TableFilesSchema table_files;
    auto status = GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
999 1000
    int times = 1;

G
groot 已提交
1001
    while (!table_files.empty()) {
G
groot 已提交
1002 1003 1004 1005 1006 1007
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
G
groot 已提交
1008
        GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
1009
        times++;
G
groot 已提交
1010 1011

        IgnoreFailedIndexFiles(table_files);
G
groot 已提交
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
    }

    // build index for partition
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = BuildTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
1024 1025 1026 1027 1028
    // failed to build index for some files, return error
    std::vector<std::string> failed_files;
    GetFailedIndexFileOfTable(table_id, failed_files);
    if (!failed_files.empty()) {
        std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
G
groot 已提交
1029 1030 1031 1032 1033 1034
                          ((failed_files.size() == 1) ? " file" : " files");
#ifdef MILVUS_CPU_VERSION
        msg += ", please double check index parameters.";
#else
        msg += ", file size is too large or gpu memory is not enough.";
#endif
G
groot 已提交
1035 1036 1037
        return Status(DB_ERROR, msg);
    }

G
groot 已提交
1038 1039 1040 1041 1042 1043
    return Status::OK();
}

Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
G
groot 已提交
1044
    CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
    auto status = meta_ptr_->DropTableIndex(table_id);
    if (!status.ok()) {
        return status;
    }

    // drop partition index
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableIndexRecursively(schema.table_id_);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count) {
    row_count = 0;
    auto status = meta_ptr_->Count(table_id, row_count);
    if (!status.ok()) {
        return status;
    }

    // get partition row count
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        uint64_t partition_row_count = 0;
        status = GetTableRowCountRecursively(schema.table_id_, partition_row_count);
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

G
groot 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
Status
DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) {
    std::lock_guard<std::mutex> lck(index_failed_mutex_);
    index_failed_files_.erase(table_id);  // rebuild failed index files for this table

    return Status::OK();
}

Status
DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) {
    failed_files.clear();
    std::lock_guard<std::mutex> lck(index_failed_mutex_);
    auto iter = index_failed_files_.find(table_id);
    if (iter != index_failed_files_.end()) {
        FileID2FailedTimes& failed_map = iter->second;
        for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) {
            failed_files.push_back(it_file->first);
        }
    }

    return Status::OK();
}

Status
DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) {
    std::lock_guard<std::mutex> lck(index_failed_mutex_);

    auto iter = index_failed_files_.find(file.table_id_);
    if (iter == index_failed_files_.end()) {
        FileID2FailedTimes failed_files;
        failed_files.insert(std::make_pair(file.file_id_, 1));
        index_failed_files_.insert(std::make_pair(file.table_id_, failed_files));
    } else {
        auto it_failed_files = iter->second.find(file.file_id_);
        if (it_failed_files != iter->second.end()) {
            it_failed_files->second++;
        } else {
            iter->second.insert(std::make_pair(file.file_id_, 1));
        }
    }

    return Status::OK();
}

Status
DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) {
    std::lock_guard<std::mutex> lck(index_failed_mutex_);

    auto iter = index_failed_files_.find(file.table_id_);
    if (iter != index_failed_files_.end()) {
        iter->second.erase(file.file_id_);
    }

    return Status::OK();
}

Status
DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) {
    std::lock_guard<std::mutex> lck(index_failed_mutex_);

    // there could be some failed files belong to different table.
    // some files may has failed for several times, no need to build index for these files.
    // thus we can avoid dead circle for build index operation
    for (auto it_file = table_files.begin(); it_file != table_files.end();) {
        auto it_failed_files = index_failed_files_.find((*it_file).table_id_);
        if (it_failed_files != index_failed_files_.end()) {
            auto it_failed_file = it_failed_files->second.find((*it_file).file_id_);
            if (it_failed_file != it_failed_files->second.end()) {
                if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) {
                    it_file = table_files.erase(it_file);
                    continue;
                }
            }
        }

        ++it_file;
    }

    return Status::OK();
}

S
starlord 已提交
1168 1169
}  // namespace engine
}  // namespace milvus