DBImpl.cpp 37.9 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
19 20 21 22 23 24 25 26 27 28 29 30

#include <assert.h>

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
#include <iostream>
#include <set>
#include <thread>
#include <utility>

S
starlord 已提交
31
#include "Utils.h"
S
starlord 已提交
32 33
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
34
#include "engine/EngineFactory.h"
S
starlord 已提交
35
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
36
#include "meta/MetaConsts.h"
S
starlord 已提交
37 38
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
39
#include "metrics/Metrics.h"
S
starlord 已提交
40
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
41
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
42 43
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
S
starlord 已提交
44
#include "utils/Log.h"
G
groot 已提交
45
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
46
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
47

J
jinhai 已提交
48
namespace milvus {
X
Xu Peng 已提交
49
namespace engine {
X
Xu Peng 已提交
50

G
groot 已提交
51 52
namespace {

J
jinhai 已提交
53 54 55
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
56

G
groot 已提交
57
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
58 59 60 61 62 63 64 65 66 67

void
TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) {
    for (auto& day_files : date_files) {
        for (auto& file : day_files.second) {
            files_array.push_back(file);
        }
    }
}

S
starlord 已提交
68
}  // namespace
G
groot 已提交
69

Y
Yu Kun 已提交
70
DBImpl::DBImpl(const DBOptions& options)
71
    : options_(options), initialized_(false), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
72
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
73
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
74 75 76 77 78 79 80
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
81
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
82
// external api
S
starlord 已提交
83
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
84 85
Status
DBImpl::Start() {
86
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
87 88 89
        return Status::OK();
    }

S
Shouyu Luo 已提交
90
    // ENGINE_LOG_TRACE << "DB service start";
91
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
92

S
starlord 已提交
93
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
94
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
S
Shouyu Luo 已提交
95
        // ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
96
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
97
    }
S
starlord 已提交
98

S
starlord 已提交
99 100 101
    return Status::OK();
}

S
starlord 已提交
102 103
Status
DBImpl::Stop() {
104
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
105 106
        return Status::OK();
    }
107
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
108

S
starlord 已提交
109
    // makesure all memory data serialized
G
groot 已提交
110 111
    std::set<std::string> sync_table_ids;
    SyncMemData(sync_table_ids);
S
starlord 已提交
112

S
starlord 已提交
113
    // wait compaction/buildindex finish
S
starlord 已提交
114
    bg_timer_thread_.join();
S
starlord 已提交
115

Y
yudong.cai 已提交
116
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
117
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
118 119
    }

S
Shouyu Luo 已提交
120
    // ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
121
    return Status::OK();
X
Xu Peng 已提交
122 123
}

S
starlord 已提交
124 125
Status
DBImpl::DropAll() {
S
starlord 已提交
126 127 128
    return meta_ptr_->DropAll();
}

S
starlord 已提交
129
Status
Y
Yu Kun 已提交
130
DBImpl::CreateTable(meta::TableSchema& table_schema) {
131
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
132
        return SHUTDOWN_ERROR;
S
starlord 已提交
133 134
    }

135
    meta::TableSchema temp_schema = table_schema;
S
starlord 已提交
136
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
137
    return meta_ptr_->CreateTable(temp_schema);
138 139
}

S
starlord 已提交
140
Status
G
groot 已提交
141
DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
142
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
143
        return SHUTDOWN_ERROR;
S
starlord 已提交
144 145
    }

G
groot 已提交
146
    return DropTableRecursively(table_id, dates);
G
groot 已提交
147 148
}

S
starlord 已提交
149
Status
Y
Yu Kun 已提交
150
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
151
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
152
        return SHUTDOWN_ERROR;
S
starlord 已提交
153 154
    }

S
starlord 已提交
155
    auto stat = meta_ptr_->DescribeTable(table_schema);
S
starlord 已提交
156
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
157
    return stat;
158 159
}

S
starlord 已提交
160
Status
Y
Yu Kun 已提交
161
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
162
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
163
        return SHUTDOWN_ERROR;
S
starlord 已提交
164 165
    }

G
groot 已提交
166
    return meta_ptr_->HasTable(table_id, has_or_not);
167 168
}

S
starlord 已提交
169
Status
Y
Yu Kun 已提交
170
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
171
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
172
        return SHUTDOWN_ERROR;
S
starlord 已提交
173 174
    }

175 176 177 178 179 180 181 182 183 184 185 186
    std::vector<meta::TableSchema> all_tables;
    auto status = meta_ptr_->AllTables(all_tables);

    // only return real tables, dont return partition tables
    table_schema_array.clear();
    for (auto& schema : all_tables) {
        if (schema.owner_table_.empty()) {
            table_schema_array.push_back(schema);
        }
    }

    return status;
G
groot 已提交
187 188
}

S
starlord 已提交
189
Status
Y
Yu Kun 已提交
190
DBImpl::PreloadTable(const std::string& table_id) {
191
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
192
        return SHUTDOWN_ERROR;
S
starlord 已提交
193 194
    }

195
    // step 1: get all table files from parent table
G
groot 已提交
196
    meta::DatesT dates;
197
    std::vector<size_t> ids;
G
groot 已提交
198
    meta::TableFilesSchema files_array;
G
groot 已提交
199
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
Y
Yu Kun 已提交
200 201 202
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
203

204
    // step 2: get files from partition tables
G
groot 已提交
205 206 207
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
208
        status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
209 210
    }

Y
Yu Kun 已提交
211 212
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
213 214
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
215

216 217 218 219
    // step 3: load file one by one
    ENGINE_LOG_DEBUG << "Begin pre-load table:" + table_id + ", totally " << files_array.size()
                     << " files need to be pre-loaded";
    TimeRecorderAuto rc("Pre-load table:" + table_id);
G
groot 已提交
220 221 222 223 224 225 226
    for (auto& file : files_array) {
        ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                         (MetricType)file.metric_type_, file.nlist_);
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
227

G
groot 已提交
228 229
        size += engine->PhysicalSize();
        if (size > available_size) {
230
            ENGINE_LOG_DEBUG << "Pre-load canceled since cache almost full";
G
groot 已提交
231 232 233
            return Status(SERVER_CACHE_FULL, "Cache is full");
        } else {
            try {
234 235
                std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
                TimeRecorderAuto rc_1(msg);
G
groot 已提交
236 237 238 239 240
                engine->Load(true);
            } catch (std::exception& ex) {
                std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
                ENGINE_LOG_ERROR << msg;
                return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
241 242 243
            }
        }
    }
G
groot 已提交
244

Y
Yu Kun 已提交
245
    return Status::OK();
Y
Yu Kun 已提交
246 247
}

S
starlord 已提交
248
Status
Y
Yu Kun 已提交
249
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
250
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
251
        return SHUTDOWN_ERROR;
S
starlord 已提交
252 253
    }

S
starlord 已提交
254 255 256
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

S
starlord 已提交
257
Status
Y
Yu Kun 已提交
258
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
259
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
260 261 262 263 264 265 266 267 268
        return SHUTDOWN_ERROR;
    }

    return GetTableRowCountRecursively(table_id, row_count);
}

Status
DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name,
                        const std::string& partition_tag) {
269
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
270 271 272 273 274 275 276 277
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->CreatePartition(table_id, partition_name, partition_tag);
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
278
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
279
        return SHUTDOWN_ERROR;
S
starlord 已提交
280 281
    }

G
groot 已提交
282 283 284 285 286 287 288 289 290 291
    auto status = mem_mgr_->EraseMemVector(partition_name);  // not allow insert
    status = meta_ptr_->DropPartition(partition_name);       // soft delete table

    // scheduler will determine when to delete table files
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
292 293
}

S
starlord 已提交
294
Status
G
groot 已提交
295
DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) {
296
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
297 298 299 300 301
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
    auto status = meta_ptr_->GetPartitionName(table_id, partition_tag, partition_name);
302 303 304 305 306
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }

G
groot 已提交
307 308 309 310
    return DropPartition(partition_name);
}

Status
G
groot 已提交
311
DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
312
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
313 314 315
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
316
    return meta_ptr_->ShowPartitions(table_id, partition_schema_array);
G
groot 已提交
317 318 319 320 321
}

Status
DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors,
                      IDNumbers& vector_ids) {
S
starlord 已提交
322
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
323
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
324
        return SHUTDOWN_ERROR;
S
starlord 已提交
325
    }
Y
yu yunfeng 已提交
326

G
groot 已提交
327
    // if partition is specified, use partition as target table
328
    Status status;
G
groot 已提交
329 330 331 332
    std::string target_table_name = table_id;
    if (!partition_tag.empty()) {
        std::string partition_name;
        status = meta_ptr_->GetPartitionName(table_id, partition_tag, target_table_name);
G
groot 已提交
333 334 335 336
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
            return status;
        }
G
groot 已提交
337 338 339
    }

    // insert vectors into target table
S
starlord 已提交
340
    milvus::server::CollectInsertMetrics metrics(n, status);
G
groot 已提交
341
    status = mem_mgr_->InsertVectors(target_table_name, n, vectors, vector_ids);
S
starlord 已提交
342

G
groot 已提交
343
    return status;
X
Xu Peng 已提交
344 345
}

S
starlord 已提交
346
Status
Y
Yu Kun 已提交
347
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
348
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
349 350 351
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
352 353 354 355
    // serialize memory data
    std::set<std::string> sync_table_ids;
    auto status = SyncMemData(sync_table_ids);

S
starlord 已提交
356 357 358
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
359
        // step 1: check index difference
S
starlord 已提交
360
        TableIndex old_index;
G
groot 已提交
361
        status = DescribeIndex(table_id, old_index);
S
starlord 已提交
362
        if (!status.ok()) {
S
starlord 已提交
363 364 365 366
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

S
starlord 已提交
367
        // step 2: update index info
S
starlord 已提交
368
        TableIndex new_index = index;
S
starlord 已提交
369
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
S
starlord 已提交
370
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
371
            status = UpdateTableIndexRecursively(table_id, new_index);
S
starlord 已提交
372 373 374 375 376 377
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
378 379
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
380 381
    WaitMergeFileFinish();

S
starlord 已提交
382
    // step 4: wait and build index
383
    status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
384
    status = BuildTableIndexRecursively(table_id, index);
S
starlord 已提交
385

G
groot 已提交
386
    return status;
S
starlord 已提交
387 388
}

S
starlord 已提交
389
Status
Y
Yu Kun 已提交
390
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
391
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
392 393 394
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
395 396 397
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

S
starlord 已提交
398
Status
Y
Yu Kun 已提交
399
DBImpl::DropIndex(const std::string& table_id) {
400
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
401 402 403
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
404
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
G
groot 已提交
405
    return DropTableIndexRecursively(table_id);
S
starlord 已提交
406 407
}

S
starlord 已提交
408
Status
Z
Zhiru Zhu 已提交
409 410 411
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
              const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
              const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
412
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
413
        return SHUTDOWN_ERROR;
S
starlord 已提交
414 415
    }

416
    meta::DatesT dates = {utils::GetDate()};
Z
Zhiru Zhu 已提交
417 418
    Status result =
        Query(context, table_id, partition_tags, k, nq, nprobe, vectors, dates, result_ids, result_distances);
Y
yu yunfeng 已提交
419
    return result;
X
Xu Peng 已提交
420 421
}

S
starlord 已提交
422
Status
Z
Zhiru Zhu 已提交
423 424 425
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
              const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
              const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
G
groot 已提交
426
              ResultDistances& result_distances) {
Z
Zhiru Zhu 已提交
427 428
    auto query_ctx = context->Child("Query");

429
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
430
        return SHUTDOWN_ERROR;
S
starlord 已提交
431 432
    }

433
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
434

G
groot 已提交
435
    Status status;
436
    std::vector<size_t> ids;
G
groot 已提交
437
    meta::TableFilesSchema files_array;
438

G
groot 已提交
439 440 441
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
442
        status = GetFilesToSearch(table_id, ids, dates, files_array);
G
groot 已提交
443 444 445 446
        if (!status.ok()) {
            return status;
        }

G
groot 已提交
447 448 449
        std::vector<meta::TableSchema> partition_array;
        status = meta_ptr_->ShowPartitions(table_id, partition_array);
        for (auto& schema : partition_array) {
G
groot 已提交
450
            status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
451 452 453 454 455 456 457
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(table_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
458
            status = GetFilesToSearch(partition_name, ids, dates, files_array);
459 460 461
        }
    }

S
starlord 已提交
462
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
Z
Zhiru Zhu 已提交
463
    status = QueryAsync(query_ctx, table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
464
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
465 466 467

    query_ctx->GetTraceContext()->GetSpan()->Finish();

S
starlord 已提交
468
    return status;
G
groot 已提交
469
}
X
Xu Peng 已提交
470

S
starlord 已提交
471
Status
Z
Zhiru Zhu 已提交
472 473 474
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
                      const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq, uint64_t nprobe,
                      const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
G
groot 已提交
475
                      ResultDistances& result_distances) {
Z
Zhiru Zhu 已提交
476 477
    auto query_ctx = context->Child("Query by file id");

478
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
479
        return SHUTDOWN_ERROR;
S
starlord 已提交
480 481
    }

482
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
483

S
starlord 已提交
484
    // get specified files
485
    std::vector<size_t> ids;
Y
Yu Kun 已提交
486
    for (auto& id : file_ids) {
487
        meta::TableFileSchema table_file;
488 489
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
490
        ids.push_back(std::stoul(id, &sz));
491 492
    }

G
groot 已提交
493
    meta::TableFilesSchema files_array;
G
groot 已提交
494
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
495 496
    if (!status.ok()) {
        return status;
497 498
    }

G
groot 已提交
499
    if (files_array.empty()) {
S
starlord 已提交
500
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
501 502
    }

S
starlord 已提交
503
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
Z
Zhiru Zhu 已提交
504
    status = QueryAsync(query_ctx, table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
505
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
506 507 508

    query_ctx->GetTraceContext()->GetSpan()->Finish();

S
starlord 已提交
509
    return status;
510 511
}

S
starlord 已提交
512
Status
Y
Yu Kun 已提交
513
DBImpl::Size(uint64_t& result) {
514
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
515
        return SHUTDOWN_ERROR;
S
starlord 已提交
516 517
    }

S
starlord 已提交
518
    return meta_ptr_->Size(result);
S
starlord 已提交
519 520 521
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
522
// internal methods
S
starlord 已提交
523
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
524
Status
Z
Zhiru Zhu 已提交
525 526 527 528 529
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
                   const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
                   ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_async_ctx = context->Child("Query Async");

Y
Yu Kun 已提交
530 531
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
532
    TimeRecorder rc("");
G
groot 已提交
533

534 535 536
    // step 1: construct search job
    auto status = ongoing_files_checker_.MarkOngoingFiles(files);

537
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
Z
Zhiru Zhu 已提交
538
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(query_async_ctx, k, nq, nprobe, vectors);
Y
Yu Kun 已提交
539
    for (auto& file : files) {
S
starlord 已提交
540
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
541
        job->AddIndexFile(file_ptr);
G
groot 已提交
542 543
    }

544
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
545
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
546
    job->WaitResult();
547 548

    status = ongoing_files_checker_.UnmarkOngoingFiles(files);
W
wxyu 已提交
549 550
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
551
    }
G
groot 已提交
552

553
    // step 3: construct results
G
groot 已提交
554 555
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
556
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
557

Z
Zhiru Zhu 已提交
558 559
    query_async_ctx->GetTraceContext()->GetSpan()->Finish();

G
groot 已提交
560 561 562
    return Status::OK();
}

S
starlord 已提交
563 564
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
565
    Status status;
Y
yu yunfeng 已提交
566
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
567
    while (true) {
568
        if (!initialized_.load(std::memory_order_acquire)) {
569 570
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
571 572

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
573 574
            break;
        }
X
Xu Peng 已提交
575

G
groot 已提交
576
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
577

G
groot 已提交
578
        StartMetricTask();
G
groot 已提交
579 580 581
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
582 583
}

S
starlord 已提交
584 585
void
DBImpl::WaitMergeFileFinish() {
586
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
Y
Yu Kun 已提交
587
    for (auto& iter : compact_thread_results_) {
588 589 590 591
        iter.wait();
    }
}

S
starlord 已提交
592 593
void
DBImpl::WaitBuildIndexFinish() {
594
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
595
    for (auto& iter : index_thread_results_) {
596 597 598 599
        iter.wait();
    }
}

S
starlord 已提交
600 601
void
DBImpl::StartMetricTask() {
G
groot 已提交
602
    static uint64_t metric_clock_tick = 0;
603
    ++metric_clock_tick;
S
starlord 已提交
604
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
605 606 607
        return;
    }

S
Shouyu Luo 已提交
608
    // ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
609

G
groot 已提交
610 611 612
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
J
JinHai-CN 已提交
613 614 615 616 617 618 619
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
620
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
621 622 623 624 625 626 627 628
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
629

K
kun yu 已提交
630
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
631 632
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
633
    server::Metrics::GetInstance().PushToGateway();
K
kun yu 已提交
634

S
Shouyu Luo 已提交
635
    // ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
636 637
}

S
starlord 已提交
638
Status
G
groot 已提交
639
DBImpl::SyncMemData(std::set<std::string>& sync_table_ids) {
640
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
641
    std::set<std::string> temp_table_ids;
G
groot 已提交
642
    mem_mgr_->Serialize(temp_table_ids);
Y
Yu Kun 已提交
643
    for (auto& id : temp_table_ids) {
G
groot 已提交
644
        sync_table_ids.insert(id);
G
groot 已提交
645
    }
X
Xu Peng 已提交
646

S
starlord 已提交
647
    if (!temp_table_ids.empty()) {
648 649
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
650

651 652 653
    return Status::OK();
}

S
starlord 已提交
654 655
void
DBImpl::StartCompactionTask() {
656
    static uint64_t compact_clock_tick = 0;
657
    ++compact_clock_tick;
S
starlord 已提交
658
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
659 660 661
        return;
    }

S
starlord 已提交
662
    // serialize memory data
G
groot 已提交
663
    SyncMemData(compact_table_ids_);
664

S
starlord 已提交
665
    // compactiong has been finished?
666 667 668 669 670 671 672
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
673 674
        }
    }
X
Xu Peng 已提交
675

S
starlord 已提交
676
    // add new compaction task
677 678 679
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
680 681 682
            // collect merge files for all tables(if compact_table_ids_ is empty) for two reasons:
            // 1. other tables may still has un-merged files
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
G
groot 已提交
683
            if (compact_table_ids_.empty()) {
684 685
                std::vector<meta::TableSchema> table_schema_array;
                meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
686
                for (auto& schema : table_schema_array) {
687 688 689 690 691
                    compact_table_ids_.insert(schema.table_id_);
                }
            }

            // start merge file thread
692
            compact_thread_results_.push_back(
G
groot 已提交
693
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
694 695
            compact_table_ids_.clear();
        }
G
groot 已提交
696
    }
X
Xu Peng 已提交
697 698
}

S
starlord 已提交
699
Status
Y
Yu Kun 已提交
700
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
S
starlord 已提交
701
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
702

S
starlord 已提交
703
    // step 1: create table file
X
Xu Peng 已提交
704
    meta::TableFileSchema table_file;
G
groot 已提交
705 706
    table_file.table_id_ = table_id;
    table_file.date_ = date;
707
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
708
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
709

710
    if (!status.ok()) {
S
starlord 已提交
711
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
712 713 714
        return status;
    }

S
starlord 已提交
715
    // step 2: merge files
G
groot 已提交
716
    ExecutionEnginePtr index =
Y
Yu Kun 已提交
717 718
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
719

720
    meta::TableFilesSchema updated;
S
starlord 已提交
721
    int64_t index_size = 0;
722

Y
Yu Kun 已提交
723
    for (auto& file : files) {
Y
Yu Kun 已提交
724
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
725

G
groot 已提交
726
        index->Merge(file.location_);
727
        auto file_schema = file;
G
groot 已提交
728
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
729
        updated.push_back(file_schema);
G
groot 已提交
730
        index_size = index->Size();
X
Xu Peng 已提交
731

S
starlord 已提交
732
        if (index_size >= file_schema.index_file_size_) {
S
starlord 已提交
733
            break;
S
starlord 已提交
734
        }
735 736
    }

S
starlord 已提交
737
    // step 3: serialize to disk
S
starlord 已提交
738
    try {
G
groot 已提交
739
        status = index->Serialize();
G
groot 已提交
740
        if (!status.ok()) {
G
groot 已提交
741 742
            ENGINE_LOG_ERROR << status.message();
        }
Y
Yu Kun 已提交
743
    } catch (std::exception& ex) {
S
starlord 已提交
744
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
745
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
746 747
        status = Status(DB_ERROR, msg);
    }
Y
yu yunfeng 已提交
748

G
groot 已提交
749 750 751
    if (!status.ok()) {
        // if failed to serialize merge file to disk
        // typical error: out of disk space, out of memory or permition denied
S
starlord 已提交
752 753 754
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
755

G
groot 已提交
756
        ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_
G
groot 已提交
757
                         << ", possible out of disk space or memory";
S
starlord 已提交
758

G
groot 已提交
759
        return status;
S
starlord 已提交
760 761
    }

S
starlord 已提交
762 763 764
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
Y
Yu Kun 已提交
765
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
766 767
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
768 769 770
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
771 772
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
773
    updated.push_back(table_file);
G
groot 已提交
774
    status = meta_ptr_->UpdateTableFiles(updated);
S
starlord 已提交
775
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
776

S
starlord 已提交
777
    if (options_.insert_cache_immediately_) {
S
starlord 已提交
778 779
        index->Cache();
    }
X
Xu Peng 已提交
780

781 782 783
    return status;
}

S
starlord 已提交
784
Status
Y
Yu Kun 已提交
785
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
786
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
787
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
788
    if (!status.ok()) {
S
starlord 已提交
789
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
790 791
        return status;
    }
792

Y
Yu Kun 已提交
793
    for (auto& kv : raw_files) {
794
        meta::TableFilesSchema& files = kv.second;
S
starlord 已提交
795
        if (files.size() < options_.merge_trigger_number_) {
796
            ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
797 798
            continue;
        }
799

800
        status = ongoing_files_checker_.MarkOngoingFiles(files);
X
Xu Peng 已提交
801
        MergeFiles(table_id, kv.first, kv.second);
802
        status = ongoing_files_checker_.UnmarkOngoingFiles(files);
G
groot 已提交
803

804
        if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
805
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
806 807
            break;
        }
808
    }
X
Xu Peng 已提交
809

G
groot 已提交
810 811
    return Status::OK();
}
812

S
starlord 已提交
813 814
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
S
Shouyu Luo 已提交
815
    // ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
816

G
groot 已提交
817
    Status status;
Y
Yu Kun 已提交
818
    for (auto& table_id : table_ids) {
G
groot 已提交
819 820
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
821
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
822
        }
S
starlord 已提交
823

824
        if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
825 826 827
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
828
    }
X
Xu Peng 已提交
829

G
groot 已提交
830
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
831

832
    {
G
groot 已提交
833
        uint64_t ttl = 10 * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
834
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
835
            ttl = meta::HOUR;
836
        }
G
groot 已提交
837

G
groot 已提交
838
        meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_);
Z
update  
zhiru 已提交
839
    }
S
starlord 已提交
840

S
Shouyu Luo 已提交
841
    // ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
842
}
X
Xu Peng 已提交
843

S
starlord 已提交
844 845
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
846
    static uint64_t index_clock_tick = 0;
847
    ++index_clock_tick;
S
starlord 已提交
848
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
849 850 851
        return;
    }

S
starlord 已提交
852
    // build index has been finished?
853 854 855 856 857 858 859
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
860 861 862
        }
    }

S
starlord 已提交
863
    // add new build index task
864 865 866
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
867
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
868
        }
G
groot 已提交
869
    }
X
Xu Peng 已提交
870 871
}

S
starlord 已提交
872 873
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
874
    std::unique_lock<std::mutex> lock(build_index_mutex_);
875
    meta::TableFilesSchema to_index_files;
G
groot 已提交
876
    meta_ptr_->FilesToIndex(to_index_files);
877
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
878

879
    if (!to_index_files.empty()) {
G
groot 已提交
880
        ENGINE_LOG_DEBUG << "Background build index thread begin";
881 882
        status = ongoing_files_checker_.MarkOngoingFiles(to_index_files);

883
        // step 2: put build index task to scheduler
G
groot 已提交
884
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr>> job2file_map;
885
        for (auto& file : to_index_files) {
G
groot 已提交
886
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
887 888
            scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
889
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
890
            job2file_map.push_back(std::make_pair(job, file_ptr));
891
        }
G
groot 已提交
892

G
groot 已提交
893
        // step 3: wait build index finished and mark failed files
G
groot 已提交
894 895 896 897 898 899 900 901
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
            meta::TableFileSchema& file_schema = *(iter->second.get());
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
                ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();

902
                index_failed_checker_.MarkFailedIndexFile(file_schema);
G
groot 已提交
903 904
            } else {
                ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
G
groot 已提交
905 906

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
907
            }
G
groot 已提交
908
            status = ongoing_files_checker_.UnmarkOngoingFile(file_schema);
909
        }
G
groot 已提交
910 911

        ENGINE_LOG_DEBUG << "Background build index thread finished";
Y
Yu Kun 已提交
912
    }
X
Xu Peng 已提交
913 914
}

G
groot 已提交
915 916 917 918 919 920 921 922 923 924 925 926
Status
DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
                             meta::TableFilesSchema& files) {
    files.clear();
    auto status = meta_ptr_->FilesByType(table_id, file_types, files);

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
        if ((*it).file_type_ == static_cast<int>(meta::TableFileSchema::RAW) &&
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
927
            ++it;
G
groot 已提交
928 929 930 931 932 933
        }
    }

    return Status::OK();
}

G
groot 已提交
934
Status
G
groot 已提交
935
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
G
groot 已提交
936
                         meta::TableFilesSchema& files) {
937 938
    ENGINE_LOG_DEBUG << "Collect files from table: " << table_id;

G
groot 已提交
939 940 941 942 943 944 945 946 947 948 949 950 951
    meta::DatePartionedTableFilesSchema date_files;
    auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
    if (!status.ok()) {
        return status;
    }

    TraverseFiles(date_files, files);
    return Status::OK();
}

Status
DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags,
                            std::set<std::string>& partition_name_array) {
G
groot 已提交
952 953
    std::vector<meta::TableSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(table_id, partition_array);
G
groot 已提交
954 955

    for (auto& tag : partition_tags) {
956 957 958 959
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
960
        for (auto& schema : partition_array) {
961
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
G
groot 已提交
962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978
                partition_name_array.insert(schema.table_id_);
            }
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& dates) {
    // dates partly delete files of the table but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;

    Status status;
    if (dates.empty()) {
        status = mem_mgr_->EraseMemVector(table_id);  // not allow insert
        status = meta_ptr_->DropTable(table_id);      // soft delete table
979
        index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
980 981 982 983 984 985 986 987 988 989

        // scheduler will determine when to delete table files
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitAndDelete();
    } else {
        status = meta_ptr_->DropDataByDate(table_id, dates);
    }

G
groot 已提交
990 991 992
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
        status = DropTableRecursively(schema.table_id_, dates);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    DropIndex(table_id);

    auto status = meta_ptr_->UpdateTableIndex(table_id, index);
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
        return status;
    }

G
groot 已提交
1012 1013 1014
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
        status = UpdateTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
        };
    } else {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::RAW),
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
            static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
            static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
        };
    }

    // get files to build index
G
groot 已提交
1045 1046
    meta::TableFilesSchema table_files;
    auto status = GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
1047 1048
    int times = 1;

G
groot 已提交
1049
    while (!table_files.empty()) {
G
groot 已提交
1050 1051 1052 1053 1054 1055
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
G
groot 已提交
1056
        GetFilesToBuildIndex(table_id, file_types, table_files);
1057
        ++times;
G
groot 已提交
1058

1059
        index_failed_checker_.IgnoreFailedIndexFiles(table_files);
G
groot 已提交
1060 1061 1062
    }

    // build index for partition
G
groot 已提交
1063 1064 1065
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1066 1067 1068 1069 1070 1071
        status = BuildTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
1072 1073
    // failed to build index for some files, return error
    std::vector<std::string> failed_files;
1074
    index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files);
G
groot 已提交
1075 1076
    if (!failed_files.empty()) {
        std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
G
groot 已提交
1077
                          ((failed_files.size() == 1) ? " file" : " files");
Y
yudong.cai 已提交
1078
        msg += ", please double check index parameters.";
G
groot 已提交
1079 1080 1081
        return Status(DB_ERROR, msg);
    }

G
groot 已提交
1082 1083 1084 1085 1086 1087
    return Status::OK();
}

Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
1088
    index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
1089 1090 1091 1092 1093 1094
    auto status = meta_ptr_->DropTableIndex(table_id);
    if (!status.ok()) {
        return status;
    }

    // drop partition index
G
groot 已提交
1095 1096 1097
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
        status = DropTableIndexRecursively(schema.table_id_);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count) {
    row_count = 0;
    auto status = meta_ptr_->Count(table_id, row_count);
    if (!status.ok()) {
        return status;
    }

    // get partition row count
G
groot 已提交
1116 1117 1118
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130
        uint64_t partition_row_count = 0;
        status = GetTableRowCountRecursively(schema.table_id_, partition_row_count);
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

S
starlord 已提交
1131 1132
}  // namespace engine
}  // namespace milvus