DBImpl.cpp 37.1 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/DBImpl.h"
S
starlord 已提交
19
#include "Utils.h"
S
starlord 已提交
20 21
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
22
#include "engine/EngineFactory.h"
S
starlord 已提交
23
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
24
#include "meta/MetaConsts.h"
S
starlord 已提交
25 26
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
27
#include "metrics/Metrics.h"
S
starlord 已提交
28
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
29
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
30 31
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
S
starlord 已提交
32
#include "utils/Log.h"
G
groot 已提交
33
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
34
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
35

X
Xu Peng 已提交
36
#include <assert.h>
S
starlord 已提交
37
#include <algorithm>
G
groot 已提交
38
#include <boost/filesystem.hpp>
S
starlord 已提交
39 40 41
#include <chrono>
#include <cstring>
#include <iostream>
G
groot 已提交
42
#include <set>
S
starlord 已提交
43
#include <thread>
G
groot 已提交
44
#include <utility>
X
Xu Peng 已提交
45

J
jinhai 已提交
46
namespace milvus {
X
Xu Peng 已提交
47
namespace engine {
X
Xu Peng 已提交
48

G
groot 已提交
49 50
namespace {

J
jinhai 已提交
51 52 53
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
54

G
groot 已提交
55 56 57 58 59 60 61 62 63 64 65
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!");

void
TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) {
    for (auto& day_files : date_files) {
        for (auto& file : day_files.second) {
            files_array.push_back(file);
        }
    }
}

S
starlord 已提交
66
}  // namespace
G
groot 已提交
67

Y
Yu Kun 已提交
68
DBImpl::DBImpl(const DBOptions& options)
S
starlord 已提交
69
    : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
70
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
71
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
72 73 74 75 76 77 78
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
79
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
80
// external api
S
starlord 已提交
81
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
82 83 84
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
85 86 87
        return Status::OK();
    }

S
Shouyu Luo 已提交
88
    // ENGINE_LOG_TRACE << "DB service start";
S
starlord 已提交
89 90
    shutting_down_.store(false, std::memory_order_release);

S
starlord 已提交
91
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
92
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
S
Shouyu Luo 已提交
93
        // ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
94
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
95
    }
S
starlord 已提交
96

S
starlord 已提交
97 98 99
    return Status::OK();
}

S
starlord 已提交
100 101 102
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
103 104 105 106
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
S
starlord 已提交
107

S
starlord 已提交
108
    // makesure all memory data serialized
G
groot 已提交
109 110
    std::set<std::string> sync_table_ids;
    SyncMemData(sync_table_ids);
S
starlord 已提交
111

S
starlord 已提交
112
    // wait compaction/buildindex finish
S
starlord 已提交
113
    bg_timer_thread_.join();
S
starlord 已提交
114

Y
yudong.cai 已提交
115
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
116
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
117 118
    }

S
Shouyu Luo 已提交
119
    // ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
120
    return Status::OK();
X
Xu Peng 已提交
121 122
}

S
starlord 已提交
123 124
Status
DBImpl::DropAll() {
S
starlord 已提交
125 126 127
    return meta_ptr_->DropAll();
}

S
starlord 已提交
128
Status
Y
Yu Kun 已提交
129
DBImpl::CreateTable(meta::TableSchema& table_schema) {
S
starlord 已提交
130
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
131
        return SHUTDOWN_ERROR;
S
starlord 已提交
132 133
    }

134
    meta::TableSchema temp_schema = table_schema;
S
starlord 已提交
135
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
136
    return meta_ptr_->CreateTable(temp_schema);
137 138
}

S
starlord 已提交
139
Status
G
groot 已提交
140
DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
S
starlord 已提交
141
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
142
        return SHUTDOWN_ERROR;
S
starlord 已提交
143 144
    }

G
groot 已提交
145
    return DropTableRecursively(table_id, dates);
G
groot 已提交
146 147
}

S
starlord 已提交
148
Status
Y
Yu Kun 已提交
149
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
S
starlord 已提交
150
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
151
        return SHUTDOWN_ERROR;
S
starlord 已提交
152 153
    }

S
starlord 已提交
154
    auto stat = meta_ptr_->DescribeTable(table_schema);
S
starlord 已提交
155
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
156
    return stat;
157 158
}

S
starlord 已提交
159
Status
Y
Yu Kun 已提交
160
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
161
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
162
        return SHUTDOWN_ERROR;
S
starlord 已提交
163 164
    }

G
groot 已提交
165
    return meta_ptr_->HasTable(table_id, has_or_not);
166 167
}

S
starlord 已提交
168
Status
Y
Yu Kun 已提交
169
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
S
starlord 已提交
170
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
171
        return SHUTDOWN_ERROR;
S
starlord 已提交
172 173
    }

G
groot 已提交
174
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
175 176
}

S
starlord 已提交
177
Status
Y
Yu Kun 已提交
178
DBImpl::PreloadTable(const std::string& table_id) {
S
starlord 已提交
179
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
180
        return SHUTDOWN_ERROR;
S
starlord 已提交
181 182
    }

183
    // step 1: get all table files from parent table
G
groot 已提交
184
    meta::DatesT dates;
185
    std::vector<size_t> ids;
G
groot 已提交
186
    meta::TableFilesSchema files_array;
G
groot 已提交
187
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
Y
Yu Kun 已提交
188 189 190
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
191

192
    // step 2: get files from partition tables
G
groot 已提交
193 194 195
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
G
groot 已提交
196
        status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
197 198
    }

Y
Yu Kun 已提交
199 200
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
201 202
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
203

204 205 206 207
    // step 3: load file one by one
    ENGINE_LOG_DEBUG << "Begin pre-load table:" + table_id + ", totally " << files_array.size()
                     << " files need to be pre-loaded";
    TimeRecorderAuto rc("Pre-load table:" + table_id);
G
groot 已提交
208 209 210 211 212 213 214
    for (auto& file : files_array) {
        ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                         (MetricType)file.metric_type_, file.nlist_);
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
215

G
groot 已提交
216 217
        size += engine->PhysicalSize();
        if (size > available_size) {
218
            ENGINE_LOG_DEBUG << "Pre-load canceled since cache almost full";
G
groot 已提交
219 220 221
            return Status(SERVER_CACHE_FULL, "Cache is full");
        } else {
            try {
222 223
                std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
                TimeRecorderAuto rc_1(msg);
G
groot 已提交
224 225 226 227 228
                engine->Load(true);
            } catch (std::exception& ex) {
                std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
                ENGINE_LOG_ERROR << msg;
                return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
229 230 231
            }
        }
    }
G
groot 已提交
232

Y
Yu Kun 已提交
233
    return Status::OK();
Y
Yu Kun 已提交
234 235
}

S
starlord 已提交
236
Status
Y
Yu Kun 已提交
237
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
S
starlord 已提交
238
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
239
        return SHUTDOWN_ERROR;
S
starlord 已提交
240 241
    }

S
starlord 已提交
242 243 244
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

S
starlord 已提交
245
Status
Y
Yu Kun 已提交
246
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
S
starlord 已提交
247
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
        return SHUTDOWN_ERROR;
    }

    return GetTableRowCountRecursively(table_id, row_count);
}

Status
DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name,
                        const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->CreatePartition(table_id, partition_name, partition_tag);
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
S
starlord 已提交
268 269
    }

G
groot 已提交
270 271 272 273 274 275 276 277 278 279
    auto status = mem_mgr_->EraseMemVector(partition_name);  // not allow insert
    status = meta_ptr_->DropPartition(partition_name);       // soft delete table

    // scheduler will determine when to delete table files
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
280 281
}

S
starlord 已提交
282
Status
G
groot 已提交
283 284 285 286 287 288 289
DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
    auto status = meta_ptr_->GetPartitionName(table_id, partition_tag, partition_name);
290 291 292 293 294
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }

G
groot 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
    return DropPartition(partition_name);
}

Status
DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->ShowPartitions(table_id, partiton_schema_array);
}

Status
DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors,
                      IDNumbers& vector_ids) {
S
starlord 已提交
310
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
S
starlord 已提交
311
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
312
        return SHUTDOWN_ERROR;
S
starlord 已提交
313
    }
Y
yu yunfeng 已提交
314

G
groot 已提交
315
    // if partition is specified, use partition as target table
316
    Status status;
G
groot 已提交
317 318 319 320
    std::string target_table_name = table_id;
    if (!partition_tag.empty()) {
        std::string partition_name;
        status = meta_ptr_->GetPartitionName(table_id, partition_tag, target_table_name);
G
groot 已提交
321 322 323 324
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
            return status;
        }
G
groot 已提交
325 326 327
    }

    // insert vectors into target table
S
starlord 已提交
328
    milvus::server::CollectInsertMetrics metrics(n, status);
G
groot 已提交
329
    status = mem_mgr_->InsertVectors(target_table_name, n, vectors, vector_ids);
S
starlord 已提交
330

G
groot 已提交
331
    return status;
X
Xu Peng 已提交
332 333
}

S
starlord 已提交
334
Status
Y
Yu Kun 已提交
335
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
G
groot 已提交
336 337 338 339
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
340 341 342 343
    // serialize memory data
    std::set<std::string> sync_table_ids;
    auto status = SyncMemData(sync_table_ids);

S
starlord 已提交
344 345 346
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
347
        // step 1: check index difference
S
starlord 已提交
348
        TableIndex old_index;
G
groot 已提交
349
        status = DescribeIndex(table_id, old_index);
S
starlord 已提交
350
        if (!status.ok()) {
S
starlord 已提交
351 352 353 354
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

S
starlord 已提交
355
        // step 2: update index info
S
starlord 已提交
356
        TableIndex new_index = index;
S
starlord 已提交
357
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
S
starlord 已提交
358
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
359
            status = UpdateTableIndexRecursively(table_id, new_index);
S
starlord 已提交
360 361 362 363 364 365
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
366 367
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
368 369
    WaitMergeFileFinish();

S
starlord 已提交
370
    // step 4: wait and build index
371
    status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
372
    status = BuildTableIndexRecursively(table_id, index);
S
starlord 已提交
373

G
groot 已提交
374
    return status;
S
starlord 已提交
375 376
}

S
starlord 已提交
377
Status
Y
Yu Kun 已提交
378
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
G
groot 已提交
379 380 381 382
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
383 384 385
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

S
starlord 已提交
386
Status
Y
Yu Kun 已提交
387
DBImpl::DropIndex(const std::string& table_id) {
G
groot 已提交
388 389 390 391
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
392
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
G
groot 已提交
393
    return DropTableIndexRecursively(table_id);
S
starlord 已提交
394 395
}

S
starlord 已提交
396
Status
G
groot 已提交
397 398
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
S
starlord 已提交
399
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
400
        return SHUTDOWN_ERROR;
S
starlord 已提交
401 402
    }

403
    meta::DatesT dates = {utils::GetDate()};
G
groot 已提交
404
    Status result = Query(table_id, partition_tags, k, nq, nprobe, vectors, dates, result_ids, result_distances);
Y
yu yunfeng 已提交
405
    return result;
X
Xu Peng 已提交
406 407
}

S
starlord 已提交
408
Status
G
groot 已提交
409 410 411
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
              ResultDistances& result_distances) {
S
starlord 已提交
412
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
413
        return SHUTDOWN_ERROR;
S
starlord 已提交
414 415
    }

416
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
417

G
groot 已提交
418
    Status status;
419
    std::vector<size_t> ids;
G
groot 已提交
420
    meta::TableFilesSchema files_array;
421

G
groot 已提交
422 423 424
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
425
        status = GetFilesToSearch(table_id, ids, dates, files_array);
G
groot 已提交
426 427 428 429 430 431 432
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::TableSchema> partiton_array;
        status = meta_ptr_->ShowPartitions(table_id, partiton_array);
        for (auto& schema : partiton_array) {
G
groot 已提交
433
            status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
434 435 436 437 438 439 440
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(table_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
441
            status = GetFilesToSearch(partition_name, ids, dates, files_array);
442 443 444
        }
    }

S
starlord 已提交
445
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
446
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
447
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
448
    return status;
G
groot 已提交
449
}
X
Xu Peng 已提交
450

S
starlord 已提交
451
Status
G
groot 已提交
452 453 454
DBImpl::QueryByFileID(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
                      uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
                      ResultDistances& result_distances) {
S
starlord 已提交
455
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
456
        return SHUTDOWN_ERROR;
S
starlord 已提交
457 458
    }

459
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
460

S
starlord 已提交
461
    // get specified files
462
    std::vector<size_t> ids;
Y
Yu Kun 已提交
463
    for (auto& id : file_ids) {
464
        meta::TableFileSchema table_file;
465 466
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
467
        ids.push_back(std::stoul(id, &sz));
468 469
    }

G
groot 已提交
470
    meta::TableFilesSchema files_array;
G
groot 已提交
471
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
472 473
    if (!status.ok()) {
        return status;
474 475
    }

G
groot 已提交
476
    if (files_array.empty()) {
S
starlord 已提交
477
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
478 479
    }

S
starlord 已提交
480
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
481
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
482
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
483
    return status;
484 485
}

S
starlord 已提交
486
Status
Y
Yu Kun 已提交
487
DBImpl::Size(uint64_t& result) {
S
starlord 已提交
488
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
489
        return SHUTDOWN_ERROR;
S
starlord 已提交
490 491
    }

S
starlord 已提交
492
    return meta_ptr_->Size(result);
S
starlord 已提交
493 494 495
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
496
// internal methods
S
starlord 已提交
497
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
498
Status
Y
Yu Kun 已提交
499
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
G
groot 已提交
500
                   uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
Y
Yu Kun 已提交
501 502
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
503
    TimeRecorder rc("");
G
groot 已提交
504

505 506 507
    // step 1: construct search job
    auto status = ongoing_files_checker_.MarkOngoingFiles(files);

508
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
W
wxyu 已提交
509
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
Y
Yu Kun 已提交
510
    for (auto& file : files) {
S
starlord 已提交
511
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
512
        job->AddIndexFile(file_ptr);
G
groot 已提交
513 514
    }

515
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
516
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
517
    job->WaitResult();
518 519

    status = ongoing_files_checker_.UnmarkOngoingFiles(files);
W
wxyu 已提交
520 521
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
522
    }
G
groot 已提交
523

524
    // step 3: construct results
G
groot 已提交
525 526
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
527
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
528 529 530 531

    return Status::OK();
}

S
starlord 已提交
532 533
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
534
    Status status;
Y
yu yunfeng 已提交
535
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
536
    while (true) {
S
starlord 已提交
537
        if (shutting_down_.load(std::memory_order_acquire)) {
538 539
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
540 541

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
542 543
            break;
        }
X
Xu Peng 已提交
544

G
groot 已提交
545
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
546

G
groot 已提交
547
        StartMetricTask();
G
groot 已提交
548 549 550
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
551 552
}

S
starlord 已提交
553 554
void
DBImpl::WaitMergeFileFinish() {
555
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
Y
Yu Kun 已提交
556
    for (auto& iter : compact_thread_results_) {
557 558 559 560
        iter.wait();
    }
}

S
starlord 已提交
561 562
void
DBImpl::WaitBuildIndexFinish() {
563
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
564
    for (auto& iter : index_thread_results_) {
565 566 567 568
        iter.wait();
    }
}

S
starlord 已提交
569 570
void
DBImpl::StartMetricTask() {
G
groot 已提交
571 572
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
S
starlord 已提交
573
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
574 575 576
        return;
    }

S
Shouyu Luo 已提交
577
    // ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
578

G
groot 已提交
579 580 581
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
J
JinHai-CN 已提交
582 583 584 585 586 587 588
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
589
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
590 591 592 593 594 595 596 597
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
598

K
kun yu 已提交
599
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
600 601
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
602

S
Shouyu Luo 已提交
603
    // ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
604 605
}

S
starlord 已提交
606
Status
G
groot 已提交
607
DBImpl::SyncMemData(std::set<std::string>& sync_table_ids) {
608
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
609
    std::set<std::string> temp_table_ids;
G
groot 已提交
610
    mem_mgr_->Serialize(temp_table_ids);
Y
Yu Kun 已提交
611
    for (auto& id : temp_table_ids) {
G
groot 已提交
612
        sync_table_ids.insert(id);
G
groot 已提交
613
    }
X
Xu Peng 已提交
614

S
starlord 已提交
615
    if (!temp_table_ids.empty()) {
616 617
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
618

619 620 621
    return Status::OK();
}

S
starlord 已提交
622 623
void
DBImpl::StartCompactionTask() {
624 625
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
S
starlord 已提交
626
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
627 628 629
        return;
    }

S
starlord 已提交
630
    // serialize memory data
G
groot 已提交
631
    SyncMemData(compact_table_ids_);
632

S
starlord 已提交
633
    // compactiong has been finished?
634 635 636 637 638 639 640
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
641 642
        }
    }
X
Xu Peng 已提交
643

S
starlord 已提交
644
    // add new compaction task
645 646 647
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
648 649 650
            // collect merge files for all tables(if compact_table_ids_ is empty) for two reasons:
            // 1. other tables may still has un-merged files
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
G
groot 已提交
651
            if (compact_table_ids_.empty()) {
652 653
                std::vector<meta::TableSchema> table_schema_array;
                meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
654
                for (auto& schema : table_schema_array) {
655 656 657 658 659
                    compact_table_ids_.insert(schema.table_id_);
                }
            }

            // start merge file thread
660
            compact_thread_results_.push_back(
G
groot 已提交
661
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
662 663
            compact_table_ids_.clear();
        }
G
groot 已提交
664
    }
X
Xu Peng 已提交
665 666
}

S
starlord 已提交
667
Status
Y
Yu Kun 已提交
668
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
S
starlord 已提交
669
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
670

S
starlord 已提交
671
    // step 1: create table file
X
Xu Peng 已提交
672
    meta::TableFileSchema table_file;
G
groot 已提交
673 674
    table_file.table_id_ = table_id;
    table_file.date_ = date;
675
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
676
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
677

678
    if (!status.ok()) {
S
starlord 已提交
679
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
680 681 682
        return status;
    }

S
starlord 已提交
683
    // step 2: merge files
G
groot 已提交
684
    ExecutionEnginePtr index =
Y
Yu Kun 已提交
685 686
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
687

688
    meta::TableFilesSchema updated;
S
starlord 已提交
689
    int64_t index_size = 0;
690

Y
Yu Kun 已提交
691
    for (auto& file : files) {
Y
Yu Kun 已提交
692
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
693

G
groot 已提交
694
        index->Merge(file.location_);
695
        auto file_schema = file;
G
groot 已提交
696
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
697
        updated.push_back(file_schema);
G
groot 已提交
698
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
699
        index_size = index->Size();
X
Xu Peng 已提交
700

S
starlord 已提交
701
        if (index_size >= file_schema.index_file_size_) {
S
starlord 已提交
702
            break;
S
starlord 已提交
703
        }
704 705
    }

S
starlord 已提交
706
    // step 3: serialize to disk
S
starlord 已提交
707 708
    try {
        index->Serialize();
Y
Yu Kun 已提交
709
    } catch (std::exception& ex) {
S
starlord 已提交
710
        // typical error: out of disk space or permition denied
S
starlord 已提交
711
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
712
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
713

S
starlord 已提交
714 715 716
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
717

S
starlord 已提交
718 719 720
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

S
starlord 已提交
721
        return Status(DB_ERROR, msg);
S
starlord 已提交
722 723
    }

S
starlord 已提交
724 725 726
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
Y
Yu Kun 已提交
727
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
728 729
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
730 731 732
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
733 734
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
735
    updated.push_back(table_file);
G
groot 已提交
736
    status = meta_ptr_->UpdateTableFiles(updated);
S
starlord 已提交
737
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
738

S
starlord 已提交
739
    if (options_.insert_cache_immediately_) {
S
starlord 已提交
740 741
        index->Cache();
    }
X
Xu Peng 已提交
742

743 744 745
    return status;
}

S
starlord 已提交
746
Status
Y
Yu Kun 已提交
747
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
748
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
749
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
750
    if (!status.ok()) {
S
starlord 已提交
751
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
752 753
        return status;
    }
754

Y
Yu Kun 已提交
755
    for (auto& kv : raw_files) {
756
        meta::TableFilesSchema& files = kv.second;
S
starlord 已提交
757
        if (files.size() < options_.merge_trigger_number_) {
758
            ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
759 760
            continue;
        }
761

762
        status = ongoing_files_checker_.MarkOngoingFiles(files);
X
Xu Peng 已提交
763
        MergeFiles(table_id, kv.first, kv.second);
764
        status = ongoing_files_checker_.UnmarkOngoingFiles(files);
G
groot 已提交
765

S
starlord 已提交
766
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
767
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
768 769
            break;
        }
770
    }
X
Xu Peng 已提交
771

G
groot 已提交
772 773
    return Status::OK();
}
774

S
starlord 已提交
775 776
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
S
Shouyu Luo 已提交
777
    // ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
778

G
groot 已提交
779
    Status status;
Y
Yu Kun 已提交
780
    for (auto& table_id : table_ids) {
G
groot 已提交
781 782
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
783
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
784
        }
S
starlord 已提交
785

S
starlord 已提交
786
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
787 788 789
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
790
    }
X
Xu Peng 已提交
791

G
groot 已提交
792
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
793

794
    {
G
add log  
groot 已提交
795
        uint64_t ttl = 10 * meta::SECOND;  // default: file data will be erase from cache after few seconds
G
groot 已提交
796
        meta_ptr_->CleanUpCacheWithTTL(ttl, &ongoing_files_checker_);
797 798 799
    }

    {
G
add log  
groot 已提交
800
        uint64_t ttl = 20 * meta::SECOND;  // default: file will be deleted after few seconds
801
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
802
            ttl = meta::H_SEC;
803
        }
G
groot 已提交
804
        meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_);
Z
update  
zhiru 已提交
805
    }
S
starlord 已提交
806

S
Shouyu Luo 已提交
807
    // ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
808
}
X
Xu Peng 已提交
809

S
starlord 已提交
810 811
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
812 813
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
S
starlord 已提交
814
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
815 816 817
        return;
    }

S
starlord 已提交
818
    // build index has been finished?
819 820 821 822 823 824 825
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
826 827 828
        }
    }

S
starlord 已提交
829
    // add new build index task
830 831 832
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
833
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
834
        }
G
groot 已提交
835
    }
X
Xu Peng 已提交
836 837
}

S
starlord 已提交
838 839
void
DBImpl::BackgroundBuildIndex() {
S
Shouyu Luo 已提交
840
    // ENGINE_LOG_TRACE << "Background build index thread start";
S
starlord 已提交
841

P
peng.xu 已提交
842
    std::unique_lock<std::mutex> lock(build_index_mutex_);
843
    meta::TableFilesSchema to_index_files;
G
groot 已提交
844
    meta_ptr_->FilesToIndex(to_index_files);
845
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
846

847
    if (!to_index_files.empty()) {
848 849
        status = ongoing_files_checker_.MarkOngoingFiles(to_index_files);

850
        // step 2: put build index task to scheduler
G
groot 已提交
851
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr>> job2file_map;
852
        for (auto& file : to_index_files) {
G
groot 已提交
853
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
854 855
            scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
856
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
857
            job2file_map.push_back(std::make_pair(job, file_ptr));
858
        }
G
groot 已提交
859

G
groot 已提交
860
        // step 3: wait build index finished and mark failed files
G
groot 已提交
861 862 863 864 865 866 867 868
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
            meta::TableFileSchema& file_schema = *(iter->second.get());
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
                ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();

869
                index_failed_checker_.MarkFailedIndexFile(file_schema);
G
groot 已提交
870
            } else {
871
                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
872 873
                ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
            }
874
        }
G
groot 已提交
875

876 877
        status = ongoing_files_checker_.UnmarkOngoingFiles(to_index_files);

G
groot 已提交
878
        ENGINE_LOG_DEBUG << "Background build index thread finished";
Y
Yu Kun 已提交
879
    }
Y
Yu Kun 已提交
880

S
Shouyu Luo 已提交
881
    // ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
882 883
}

G
groot 已提交
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
Status
DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
                             meta::TableFilesSchema& files) {
    files.clear();
    auto status = meta_ptr_->FilesByType(table_id, file_types, files);

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
        if ((*it).file_type_ == static_cast<int>(meta::TableFileSchema::RAW) &&
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
            it++;
        }
    }

    return Status::OK();
}

G
groot 已提交
903
Status
G
groot 已提交
904
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
G
groot 已提交
905
                         meta::TableFilesSchema& files) {
906 907
    ENGINE_LOG_DEBUG << "Collect files from table: " << table_id;

G
groot 已提交
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924
    meta::DatePartionedTableFilesSchema date_files;
    auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
    if (!status.ok()) {
        return status;
    }

    TraverseFiles(date_files, files);
    return Status::OK();
}

Status
DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags,
                            std::set<std::string>& partition_name_array) {
    std::vector<meta::TableSchema> partiton_array;
    auto status = meta_ptr_->ShowPartitions(table_id, partiton_array);

    for (auto& tag : partition_tags) {
925 926 927 928
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
929
        for (auto& schema : partiton_array) {
930
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
G
groot 已提交
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
                partition_name_array.insert(schema.table_id_);
            }
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& dates) {
    // dates partly delete files of the table but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;

    Status status;
    if (dates.empty()) {
        status = mem_mgr_->EraseMemVector(table_id);  // not allow insert
        status = meta_ptr_->DropTable(table_id);      // soft delete table
948
        index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013

        // scheduler will determine when to delete table files
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitAndDelete();
    } else {
        status = meta_ptr_->DropDataByDate(table_id, dates);
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableRecursively(schema.table_id_, dates);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    DropIndex(table_id);

    auto status = meta_ptr_->UpdateTableIndex(table_id, index);
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
        return status;
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = UpdateTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
        };
    } else {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::RAW),
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
            static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
            static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
        };
    }

    // get files to build index
G
groot 已提交
1014 1015
    meta::TableFilesSchema table_files;
    auto status = GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
1016 1017
    int times = 1;

G
groot 已提交
1018
    while (!table_files.empty()) {
G
groot 已提交
1019 1020 1021 1022 1023 1024
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
G
groot 已提交
1025
        GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
1026
        times++;
G
groot 已提交
1027

1028
        index_failed_checker_.IgnoreFailedIndexFiles(table_files);
G
groot 已提交
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
    }

    // build index for partition
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = BuildTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
1041 1042
    // failed to build index for some files, return error
    std::vector<std::string> failed_files;
1043
    index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files);
G
groot 已提交
1044 1045
    if (!failed_files.empty()) {
        std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
G
groot 已提交
1046 1047 1048 1049 1050 1051
                          ((failed_files.size() == 1) ? " file" : " files");
#ifdef MILVUS_CPU_VERSION
        msg += ", please double check index parameters.";
#else
        msg += ", file size is too large or gpu memory is not enough.";
#endif
G
groot 已提交
1052 1053 1054
        return Status(DB_ERROR, msg);
    }

G
groot 已提交
1055 1056 1057 1058 1059 1060
    return Status::OK();
}

Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
1061
    index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
    auto status = meta_ptr_->DropTableIndex(table_id);
    if (!status.ok()) {
        return status;
    }

    // drop partition index
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableIndexRecursively(schema.table_id_);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count) {
    row_count = 0;
    auto status = meta_ptr_->Count(table_id, row_count);
    if (!status.ok()) {
        return status;
    }

    // get partition row count
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        uint64_t partition_row_count = 0;
        status = GetTableRowCountRecursively(schema.table_id_, partition_row_count);
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

S
starlord 已提交
1104 1105
}  // namespace engine
}  // namespace milvus