DBImpl.cpp 34.4 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/DBImpl.h"
S
starlord 已提交
19
#include "Utils.h"
S
starlord 已提交
20 21
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
22
#include "engine/EngineFactory.h"
S
starlord 已提交
23
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
24
#include "meta/MetaConsts.h"
S
starlord 已提交
25 26
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
27
#include "metrics/Metrics.h"
S
starlord 已提交
28
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
29
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
30 31
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
S
starlord 已提交
32
#include "utils/Log.h"
G
groot 已提交
33
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
34
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
35

X
Xu Peng 已提交
36
#include <assert.h>
S
starlord 已提交
37
#include <algorithm>
G
groot 已提交
38
#include <boost/filesystem.hpp>
S
starlord 已提交
39 40 41
#include <chrono>
#include <cstring>
#include <iostream>
G
groot 已提交
42
#include <set>
S
starlord 已提交
43
#include <thread>
X
Xu Peng 已提交
44

J
jinhai 已提交
45
namespace milvus {
X
Xu Peng 已提交
46
namespace engine {
X
Xu Peng 已提交
47

G
groot 已提交
48 49
namespace {

J
jinhai 已提交
50 51 52
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
53

G
groot 已提交
54 55 56 57 58 59 60 61 62 63 64
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!");

void
TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) {
    for (auto& day_files : date_files) {
        for (auto& file : day_files.second) {
            files_array.push_back(file);
        }
    }
}

S
starlord 已提交
65
}  // namespace
G
groot 已提交
66

Y
Yu Kun 已提交
67
DBImpl::DBImpl(const DBOptions& options)
S
starlord 已提交
68
    : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
69
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
70
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
71 72 73 74 75 76 77
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
78
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
79
// external api
S
starlord 已提交
80
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
81 82 83
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
84 85 86
        return Status::OK();
    }

S
Shouyu Luo 已提交
87
    // ENGINE_LOG_TRACE << "DB service start";
S
starlord 已提交
88 89
    shutting_down_.store(false, std::memory_order_release);

S
starlord 已提交
90
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
91
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
S
Shouyu Luo 已提交
92
        // ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
93
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
94
    }
S
starlord 已提交
95

S
starlord 已提交
96 97 98
    return Status::OK();
}

S
starlord 已提交
99 100 101
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
102 103 104 105
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
S
starlord 已提交
106

S
starlord 已提交
107
    // makesure all memory data serialized
G
groot 已提交
108 109
    std::set<std::string> sync_table_ids;
    SyncMemData(sync_table_ids);
S
starlord 已提交
110

S
starlord 已提交
111
    // wait compaction/buildindex finish
S
starlord 已提交
112
    bg_timer_thread_.join();
S
starlord 已提交
113

Y
yudong.cai 已提交
114
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
115
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
116 117
    }

S
Shouyu Luo 已提交
118
    // ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
119
    return Status::OK();
X
Xu Peng 已提交
120 121
}

S
starlord 已提交
122 123
Status
DBImpl::DropAll() {
S
starlord 已提交
124 125 126
    return meta_ptr_->DropAll();
}

S
starlord 已提交
127
Status
Y
Yu Kun 已提交
128
DBImpl::CreateTable(meta::TableSchema& table_schema) {
S
starlord 已提交
129
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
130
        return SHUTDOWN_ERROR;
S
starlord 已提交
131 132
    }

133
    meta::TableSchema temp_schema = table_schema;
S
starlord 已提交
134
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
135
    return meta_ptr_->CreateTable(temp_schema);
136 137
}

S
starlord 已提交
138
Status
G
groot 已提交
139
DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
S
starlord 已提交
140
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
141
        return SHUTDOWN_ERROR;
S
starlord 已提交
142 143
    }

G
groot 已提交
144
    return DropTableRecursively(table_id, dates);
G
groot 已提交
145 146
}

S
starlord 已提交
147
Status
Y
Yu Kun 已提交
148
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
S
starlord 已提交
149
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
150
        return SHUTDOWN_ERROR;
S
starlord 已提交
151 152
    }

S
starlord 已提交
153
    auto stat = meta_ptr_->DescribeTable(table_schema);
S
starlord 已提交
154
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
155
    return stat;
156 157
}

S
starlord 已提交
158
Status
Y
Yu Kun 已提交
159
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
160
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
161
        return SHUTDOWN_ERROR;
S
starlord 已提交
162 163
    }

G
groot 已提交
164
    return meta_ptr_->HasTable(table_id, has_or_not);
165 166
}

S
starlord 已提交
167
Status
Y
Yu Kun 已提交
168
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
S
starlord 已提交
169
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
170
        return SHUTDOWN_ERROR;
S
starlord 已提交
171 172
    }

G
groot 已提交
173
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
174 175
}

S
starlord 已提交
176
Status
Y
Yu Kun 已提交
177
DBImpl::PreloadTable(const std::string& table_id) {
S
starlord 已提交
178
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
179
        return SHUTDOWN_ERROR;
S
starlord 已提交
180 181
    }

G
groot 已提交
182
    // get all table files from parent table
G
groot 已提交
183
    meta::DatesT dates;
184
    std::vector<size_t> ids;
G
groot 已提交
185
    meta::TableFilesSchema files_array;
G
groot 已提交
186
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
Y
Yu Kun 已提交
187 188 189
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
190

G
groot 已提交
191 192 193 194
    // get files from partition tables
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
G
groot 已提交
195
        status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
196 197
    }

Y
Yu Kun 已提交
198 199
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
200 201
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
202

G
groot 已提交
203 204 205 206 207 208 209
    for (auto& file : files_array) {
        ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                         (MetricType)file.metric_type_, file.nlist_);
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
210

G
groot 已提交
211 212 213 214 215 216 217 218 219 220 221
        size += engine->PhysicalSize();
        if (size > available_size) {
            return Status(SERVER_CACHE_FULL, "Cache is full");
        } else {
            try {
                // step 1: load index
                engine->Load(true);
            } catch (std::exception& ex) {
                std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
                ENGINE_LOG_ERROR << msg;
                return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
222 223 224
            }
        }
    }
G
groot 已提交
225

Y
Yu Kun 已提交
226
    return Status::OK();
Y
Yu Kun 已提交
227 228
}

S
starlord 已提交
229
Status
Y
Yu Kun 已提交
230
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
S
starlord 已提交
231
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
232
        return SHUTDOWN_ERROR;
S
starlord 已提交
233 234
    }

S
starlord 已提交
235 236 237
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

S
starlord 已提交
238
Status
Y
Yu Kun 已提交
239
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
S
starlord 已提交
240
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
        return SHUTDOWN_ERROR;
    }

    return GetTableRowCountRecursively(table_id, row_count);
}

Status
DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name,
                        const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->CreatePartition(table_id, partition_name, partition_tag);
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
S
starlord 已提交
261 262
    }

G
groot 已提交
263 264 265 266 267 268 269 270 271 272
    auto status = mem_mgr_->EraseMemVector(partition_name);  // not allow insert
    status = meta_ptr_->DropPartition(partition_name);       // soft delete table

    // scheduler will determine when to delete table files
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
273 274
}

S
starlord 已提交
275
Status
G
groot 已提交
276 277 278 279 280 281 282
DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
    auto status = meta_ptr_->GetPartitionName(table_id, partition_tag, partition_name);
283 284 285 286 287
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }

G
groot 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    return DropPartition(partition_name);
}

Status
DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->ShowPartitions(table_id, partiton_schema_array);
}

Status
DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors,
                      IDNumbers& vector_ids) {
S
starlord 已提交
303
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
S
starlord 已提交
304
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
305
        return SHUTDOWN_ERROR;
S
starlord 已提交
306
    }
Y
yu yunfeng 已提交
307

G
groot 已提交
308
    // if partition is specified, use partition as target table
309
    Status status;
G
groot 已提交
310 311 312 313
    std::string target_table_name = table_id;
    if (!partition_tag.empty()) {
        std::string partition_name;
        status = meta_ptr_->GetPartitionName(table_id, partition_tag, target_table_name);
G
groot 已提交
314 315 316 317
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
            return status;
        }
G
groot 已提交
318 319 320
    }

    // insert vectors into target table
S
starlord 已提交
321
    milvus::server::CollectInsertMetrics metrics(n, status);
G
groot 已提交
322
    status = mem_mgr_->InsertVectors(target_table_name, n, vectors, vector_ids);
S
starlord 已提交
323

G
groot 已提交
324
    return status;
X
Xu Peng 已提交
325 326
}

S
starlord 已提交
327
Status
Y
Yu Kun 已提交
328
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
G
groot 已提交
329 330 331 332
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
333 334 335 336
    // serialize memory data
    std::set<std::string> sync_table_ids;
    auto status = SyncMemData(sync_table_ids);

S
starlord 已提交
337 338 339
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
340
        // step 1: check index difference
S
starlord 已提交
341
        TableIndex old_index;
G
groot 已提交
342
        status = DescribeIndex(table_id, old_index);
S
starlord 已提交
343
        if (!status.ok()) {
S
starlord 已提交
344 345 346 347
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

S
starlord 已提交
348
        // step 2: update index info
S
starlord 已提交
349
        TableIndex new_index = index;
S
starlord 已提交
350
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
S
starlord 已提交
351
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
352
            status = UpdateTableIndexRecursively(table_id, new_index);
S
starlord 已提交
353 354 355 356 357 358
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
359 360
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
361 362
    WaitMergeFileFinish();

S
starlord 已提交
363
    // step 4: wait and build index
G
groot 已提交
364
    status = BuildTableIndexRecursively(table_id, index);
S
starlord 已提交
365

G
groot 已提交
366
    return status;
S
starlord 已提交
367 368
}

S
starlord 已提交
369
Status
Y
Yu Kun 已提交
370
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
G
groot 已提交
371 372 373 374
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
375 376 377
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

S
starlord 已提交
378
Status
Y
Yu Kun 已提交
379
DBImpl::DropIndex(const std::string& table_id) {
G
groot 已提交
380 381 382 383
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
384
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
G
groot 已提交
385
    return DropTableIndexRecursively(table_id);
S
starlord 已提交
386 387
}

S
starlord 已提交
388
Status
G
groot 已提交
389 390
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
S
starlord 已提交
391
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
392
        return SHUTDOWN_ERROR;
S
starlord 已提交
393 394
    }

395
    meta::DatesT dates = {utils::GetDate()};
G
groot 已提交
396
    Status result = Query(table_id, partition_tags, k, nq, nprobe, vectors, dates, result_ids, result_distances);
Y
yu yunfeng 已提交
397
    return result;
X
Xu Peng 已提交
398 399
}

S
starlord 已提交
400
Status
G
groot 已提交
401 402 403
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
              ResultDistances& result_distances) {
S
starlord 已提交
404
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
405
        return SHUTDOWN_ERROR;
S
starlord 已提交
406 407
    }

408
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
409

G
groot 已提交
410
    Status status;
411
    std::vector<size_t> ids;
G
groot 已提交
412
    meta::TableFilesSchema files_array;
413

G
groot 已提交
414 415 416
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
417
        status = GetFilesToSearch(table_id, ids, dates, files_array);
G
groot 已提交
418 419 420 421 422 423 424
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::TableSchema> partiton_array;
        status = meta_ptr_->ShowPartitions(table_id, partiton_array);
        for (auto& schema : partiton_array) {
G
groot 已提交
425
            status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
426 427 428 429 430 431 432
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(table_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
433
            status = GetFilesToSearch(partition_name, ids, dates, files_array);
434 435 436
        }
    }

S
starlord 已提交
437
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
438
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
439
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
440
    return status;
G
groot 已提交
441
}
X
Xu Peng 已提交
442

S
starlord 已提交
443
Status
G
groot 已提交
444 445 446
DBImpl::QueryByFileID(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
                      uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
                      ResultDistances& result_distances) {
S
starlord 已提交
447
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
448
        return SHUTDOWN_ERROR;
S
starlord 已提交
449 450
    }

451
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
452

S
starlord 已提交
453
    // get specified files
454
    std::vector<size_t> ids;
Y
Yu Kun 已提交
455
    for (auto& id : file_ids) {
456
        meta::TableFileSchema table_file;
457 458
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
459
        ids.push_back(std::stoul(id, &sz));
460 461
    }

G
groot 已提交
462
    meta::TableFilesSchema files_array;
G
groot 已提交
463
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
464 465
    if (!status.ok()) {
        return status;
466 467
    }

G
groot 已提交
468
    if (files_array.empty()) {
S
starlord 已提交
469
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
470 471
    }

S
starlord 已提交
472
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
473
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
474
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
475
    return status;
476 477
}

S
starlord 已提交
478
Status
Y
Yu Kun 已提交
479
DBImpl::Size(uint64_t& result) {
S
starlord 已提交
480
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
481
        return SHUTDOWN_ERROR;
S
starlord 已提交
482 483
    }

S
starlord 已提交
484
    return meta_ptr_->Size(result);
S
starlord 已提交
485 486 487
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
488
// internal methods
S
starlord 已提交
489
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
490
Status
Y
Yu Kun 已提交
491
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
G
groot 已提交
492
                   uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
Y
Yu Kun 已提交
493 494
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
495
    TimeRecorder rc("");
G
groot 已提交
496

S
starlord 已提交
497
    // step 1: get files to search
498
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
W
wxyu 已提交
499
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
Y
Yu Kun 已提交
500
    for (auto& file : files) {
S
starlord 已提交
501
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
502
        job->AddIndexFile(file_ptr);
G
groot 已提交
503 504
    }

S
starlord 已提交
505
    // step 2: put search task to scheduler
S
starlord 已提交
506
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
507 508 509
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
510
    }
G
groot 已提交
511

512
    // step 3: construct results
G
groot 已提交
513 514
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
515
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
516 517 518 519

    return Status::OK();
}

S
starlord 已提交
520 521
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
522
    Status status;
Y
yu yunfeng 已提交
523
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
524
    while (true) {
S
starlord 已提交
525
        if (shutting_down_.load(std::memory_order_acquire)) {
526 527
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
528 529

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
530 531
            break;
        }
X
Xu Peng 已提交
532

G
groot 已提交
533
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
534

G
groot 已提交
535
        StartMetricTask();
G
groot 已提交
536 537 538
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
539 540
}

S
starlord 已提交
541 542
void
DBImpl::WaitMergeFileFinish() {
543
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
Y
Yu Kun 已提交
544
    for (auto& iter : compact_thread_results_) {
545 546 547 548
        iter.wait();
    }
}

S
starlord 已提交
549 550
void
DBImpl::WaitBuildIndexFinish() {
551
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
552
    for (auto& iter : index_thread_results_) {
553 554 555 556
        iter.wait();
    }
}

S
starlord 已提交
557 558
void
DBImpl::StartMetricTask() {
G
groot 已提交
559 560
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
S
starlord 已提交
561
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
562 563 564
        return;
    }

S
Shouyu Luo 已提交
565
    // ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
566

G
groot 已提交
567 568 569
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
J
JinHai-CN 已提交
570 571 572 573 574 575 576
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
577
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
578 579 580 581 582 583 584 585
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
586

K
kun yu 已提交
587
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
588 589
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
590

S
Shouyu Luo 已提交
591
    // ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
592 593
}

S
starlord 已提交
594
Status
G
groot 已提交
595
DBImpl::SyncMemData(std::set<std::string>& sync_table_ids) {
596
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
597
    std::set<std::string> temp_table_ids;
G
groot 已提交
598
    mem_mgr_->Serialize(temp_table_ids);
Y
Yu Kun 已提交
599
    for (auto& id : temp_table_ids) {
G
groot 已提交
600
        sync_table_ids.insert(id);
G
groot 已提交
601
    }
X
Xu Peng 已提交
602

S
starlord 已提交
603
    if (!temp_table_ids.empty()) {
604 605
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
606

607 608 609
    return Status::OK();
}

S
starlord 已提交
610 611
void
DBImpl::StartCompactionTask() {
612 613
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
S
starlord 已提交
614
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
615 616 617
        return;
    }

S
starlord 已提交
618
    // serialize memory data
G
groot 已提交
619
    SyncMemData(compact_table_ids_);
620

S
starlord 已提交
621
    // compactiong has been finished?
622 623 624 625 626 627 628
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
629 630
        }
    }
X
Xu Peng 已提交
631

S
starlord 已提交
632
    // add new compaction task
633 634 635
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
636 637 638
            // collect merge files for all tables(if compact_table_ids_ is empty) for two reasons:
            // 1. other tables may still has un-merged files
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
G
groot 已提交
639
            if (compact_table_ids_.empty()) {
640 641
                std::vector<meta::TableSchema> table_schema_array;
                meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
642
                for (auto& schema : table_schema_array) {
643 644 645 646 647
                    compact_table_ids_.insert(schema.table_id_);
                }
            }

            // start merge file thread
648
            compact_thread_results_.push_back(
G
groot 已提交
649
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
650 651
            compact_table_ids_.clear();
        }
G
groot 已提交
652
    }
X
Xu Peng 已提交
653 654
}

S
starlord 已提交
655
Status
Y
Yu Kun 已提交
656
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
S
starlord 已提交
657
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
658

S
starlord 已提交
659
    // step 1: create table file
X
Xu Peng 已提交
660
    meta::TableFileSchema table_file;
G
groot 已提交
661 662
    table_file.table_id_ = table_id;
    table_file.date_ = date;
663
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
664
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
665

666
    if (!status.ok()) {
S
starlord 已提交
667
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
668 669 670
        return status;
    }

S
starlord 已提交
671
    // step 2: merge files
G
groot 已提交
672
    ExecutionEnginePtr index =
Y
Yu Kun 已提交
673 674
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
675

676
    meta::TableFilesSchema updated;
S
starlord 已提交
677
    int64_t index_size = 0;
678

Y
Yu Kun 已提交
679
    for (auto& file : files) {
Y
Yu Kun 已提交
680
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
681

G
groot 已提交
682
        index->Merge(file.location_);
683
        auto file_schema = file;
G
groot 已提交
684
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
685
        updated.push_back(file_schema);
G
groot 已提交
686
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
687
        index_size = index->Size();
X
Xu Peng 已提交
688

S
starlord 已提交
689
        if (index_size >= file_schema.index_file_size_) {
S
starlord 已提交
690
            break;
S
starlord 已提交
691
        }
692 693
    }

S
starlord 已提交
694
    // step 3: serialize to disk
S
starlord 已提交
695 696
    try {
        index->Serialize();
Y
Yu Kun 已提交
697
    } catch (std::exception& ex) {
S
starlord 已提交
698
        // typical error: out of disk space or permition denied
S
starlord 已提交
699
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
700
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
701

S
starlord 已提交
702 703 704
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
705

S
starlord 已提交
706 707 708
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

S
starlord 已提交
709
        return Status(DB_ERROR, msg);
S
starlord 已提交
710 711
    }

S
starlord 已提交
712 713 714
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
Y
Yu Kun 已提交
715
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
716 717
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
718 719 720
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
721 722
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
723
    updated.push_back(table_file);
G
groot 已提交
724
    status = meta_ptr_->UpdateTableFiles(updated);
S
starlord 已提交
725
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
726

S
starlord 已提交
727
    if (options_.insert_cache_immediately_) {
S
starlord 已提交
728 729
        index->Cache();
    }
X
Xu Peng 已提交
730

731 732 733
    return status;
}

S
starlord 已提交
734
Status
Y
Yu Kun 已提交
735
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
736
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
737
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
738
    if (!status.ok()) {
S
starlord 已提交
739
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
740 741
        return status;
    }
742

Y
Yu Kun 已提交
743
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
744
        auto files = kv.second;
S
starlord 已提交
745
        if (files.size() < options_.merge_trigger_number_) {
746
            ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
747 748
            continue;
        }
749

X
Xu Peng 已提交
750
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
751

S
starlord 已提交
752
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
753
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
754 755
            break;
        }
756
    }
X
Xu Peng 已提交
757

G
groot 已提交
758 759
    return Status::OK();
}
760

S
starlord 已提交
761 762
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
S
Shouyu Luo 已提交
763
    // ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
764

G
groot 已提交
765
    Status status;
Y
Yu Kun 已提交
766
    for (auto& table_id : table_ids) {
G
groot 已提交
767 768
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
769
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
770
        }
S
starlord 已提交
771

S
starlord 已提交
772
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
773 774 775
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
776
    }
X
Xu Peng 已提交
777

G
groot 已提交
778
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
779

780 781 782 783 784 785 786 787 788 789 790
    {
        uint64_t ttl = 10 * meta::SECOND;  // default: file data will be erase from cache after few seconds
        meta_ptr_->CleanUpCacheWithTTL(ttl);
    }

    {
        uint64_t ttl = 5 * meta::M_SEC;  // default: file will be deleted after few minutes
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
            ttl = meta::D_SEC;
        }
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
791
    }
S
starlord 已提交
792

S
Shouyu Luo 已提交
793
    // ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
794
}
X
Xu Peng 已提交
795

S
starlord 已提交
796 797
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
798 799
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
S
starlord 已提交
800
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
801 802 803
        return;
    }

S
starlord 已提交
804
    // build index has been finished?
805 806 807 808 809 810 811
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
812 813 814
        }
    }

S
starlord 已提交
815
    // add new build index task
816 817 818
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
819
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
820
        }
G
groot 已提交
821
    }
X
Xu Peng 已提交
822 823
}

S
starlord 已提交
824 825
void
DBImpl::BackgroundBuildIndex() {
S
Shouyu Luo 已提交
826
    // ENGINE_LOG_TRACE << "Background build index thread start";
S
starlord 已提交
827

P
peng.xu 已提交
828
    std::unique_lock<std::mutex> lock(build_index_mutex_);
829
    meta::TableFilesSchema to_index_files;
G
groot 已提交
830
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
831
    Status status;
832

833
    if (!to_index_files.empty()) {
W
wxyu 已提交
834
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
Y
Yu Kun 已提交
835

836 837 838 839 840 841 842 843 844 845 846
        // step 2: put build index task to scheduler
        for (auto& file : to_index_files) {
            scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            job->AddToIndexFiles(file_ptr);
        }
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitBuildIndexFinish();
        if (!job->GetStatus().ok()) {
            Status status = job->GetStatus();
            ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
        }
Y
Yu Kun 已提交
847
    }
Y
Yu Kun 已提交
848

S
Shouyu Luo 已提交
849
    // ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
850 851
}

G
groot 已提交
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870
Status
DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
                             meta::TableFilesSchema& files) {
    files.clear();
    auto status = meta_ptr_->FilesByType(table_id, file_types, files);

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
        if ((*it).file_type_ == static_cast<int>(meta::TableFileSchema::RAW) &&
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
            it++;
        }
    }

    return Status::OK();
}

G
groot 已提交
871
Status
G
groot 已提交
872
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
G
groot 已提交
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
                         meta::TableFilesSchema& files) {
    meta::DatePartionedTableFilesSchema date_files;
    auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
    if (!status.ok()) {
        return status;
    }

    TraverseFiles(date_files, files);
    return Status::OK();
}

Status
DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags,
                            std::set<std::string>& partition_name_array) {
    std::vector<meta::TableSchema> partiton_array;
    auto status = meta_ptr_->ShowPartitions(table_id, partiton_array);

    for (auto& tag : partition_tags) {
891 892 893 894
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
895
        for (auto& schema : partiton_array) {
896
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
G
groot 已提交
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978
                partition_name_array.insert(schema.table_id_);
            }
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& dates) {
    // dates partly delete files of the table but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;

    Status status;
    if (dates.empty()) {
        status = mem_mgr_->EraseMemVector(table_id);  // not allow insert
        status = meta_ptr_->DropTable(table_id);      // soft delete table

        // scheduler will determine when to delete table files
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitAndDelete();
    } else {
        status = meta_ptr_->DropDataByDate(table_id, dates);
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableRecursively(schema.table_id_, dates);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    DropIndex(table_id);

    auto status = meta_ptr_->UpdateTableIndex(table_id, index);
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
        return status;
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = UpdateTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
        };
    } else {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::RAW),
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
            static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
            static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
        };
    }

    // get files to build index
G
groot 已提交
979 980
    meta::TableFilesSchema table_files;
    auto status = GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
981 982
    int times = 1;

G
groot 已提交
983
    while (!table_files.empty()) {
G
groot 已提交
984 985 986 987 988 989
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
G
groot 已提交
990
        GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
        times++;
    }

    // build index for partition
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = BuildTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    auto status = meta_ptr_->DropTableIndex(table_id);
    if (!status.ok()) {
        return status;
    }

    // drop partition index
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableIndexRecursively(schema.table_id_);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count) {
    row_count = 0;
    auto status = meta_ptr_->Count(table_id, row_count);
    if (!status.ok()) {
        return status;
    }

    // get partition row count
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        uint64_t partition_row_count = 0;
        status = GetTableRowCountRecursively(schema.table_id_, partition_row_count);
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

S
starlord 已提交
1052 1053
}  // namespace engine
}  // namespace milvus