DBImpl.cpp 37.8 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
19 20 21 22 23 24 25 26 27 28 29 30

#include <assert.h>

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
#include <iostream>
#include <set>
#include <thread>
#include <utility>

S
starlord 已提交
31
#include "Utils.h"
S
starlord 已提交
32 33
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
34
#include "engine/EngineFactory.h"
S
starlord 已提交
35
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
36
#include "meta/MetaConsts.h"
S
starlord 已提交
37 38
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
39
#include "metrics/Metrics.h"
S
starlord 已提交
40
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
41
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
42 43
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
S
starlord 已提交
44
#include "utils/Log.h"
G
groot 已提交
45
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
46
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
47

J
jinhai 已提交
48
namespace milvus {
X
Xu Peng 已提交
49
namespace engine {
X
Xu Peng 已提交
50

G
groot 已提交
51 52
namespace {

J
jinhai 已提交
53 54 55
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
56

G
groot 已提交
57
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
58 59 60 61 62 63 64 65 66 67

void
TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) {
    for (auto& day_files : date_files) {
        for (auto& file : day_files.second) {
            files_array.push_back(file);
        }
    }
}

S
starlord 已提交
68
}  // namespace
G
groot 已提交
69

Y
Yu Kun 已提交
70
DBImpl::DBImpl(const DBOptions& options)
71
    : options_(options), initialized_(false), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
72
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
73
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
74 75 76 77 78 79 80
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
81
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
82
// external api
S
starlord 已提交
83
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
84 85
Status
DBImpl::Start() {
86
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
87 88 89
        return Status::OK();
    }

S
Shouyu Luo 已提交
90
    // ENGINE_LOG_TRACE << "DB service start";
91
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
92

S
starlord 已提交
93
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
94
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
S
Shouyu Luo 已提交
95
        // ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
96
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
97
    }
S
starlord 已提交
98

S
starlord 已提交
99 100 101
    return Status::OK();
}

S
starlord 已提交
102 103
Status
DBImpl::Stop() {
104
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
105 106
        return Status::OK();
    }
107
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
108

S
starlord 已提交
109
    // makesure all memory data serialized
G
groot 已提交
110 111
    std::set<std::string> sync_table_ids;
    SyncMemData(sync_table_ids);
S
starlord 已提交
112

S
starlord 已提交
113
    // wait compaction/buildindex finish
S
starlord 已提交
114
    bg_timer_thread_.join();
S
starlord 已提交
115

Y
yudong.cai 已提交
116
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
117
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
118 119
    }

S
Shouyu Luo 已提交
120
    // ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
121
    return Status::OK();
X
Xu Peng 已提交
122 123
}

S
starlord 已提交
124 125
Status
DBImpl::DropAll() {
S
starlord 已提交
126 127 128
    return meta_ptr_->DropAll();
}

S
starlord 已提交
129
Status
Y
Yu Kun 已提交
130
DBImpl::CreateTable(meta::TableSchema& table_schema) {
131
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
132
        return SHUTDOWN_ERROR;
S
starlord 已提交
133 134
    }

135
    meta::TableSchema temp_schema = table_schema;
S
starlord 已提交
136
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
137
    return meta_ptr_->CreateTable(temp_schema);
138 139
}

S
starlord 已提交
140
Status
G
groot 已提交
141
DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
142
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
143
        return SHUTDOWN_ERROR;
S
starlord 已提交
144 145
    }

G
groot 已提交
146
    return DropTableRecursively(table_id, dates);
G
groot 已提交
147 148
}

S
starlord 已提交
149
Status
Y
Yu Kun 已提交
150
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
151
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
152
        return SHUTDOWN_ERROR;
S
starlord 已提交
153 154
    }

S
starlord 已提交
155
    auto stat = meta_ptr_->DescribeTable(table_schema);
S
starlord 已提交
156
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
157
    return stat;
158 159
}

S
starlord 已提交
160
Status
Y
Yu Kun 已提交
161
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
162
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
163
        return SHUTDOWN_ERROR;
S
starlord 已提交
164 165
    }

G
groot 已提交
166
    return meta_ptr_->HasTable(table_id, has_or_not);
167 168
}

S
starlord 已提交
169
Status
Y
Yu Kun 已提交
170
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
171
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
172
        return SHUTDOWN_ERROR;
S
starlord 已提交
173 174
    }

175 176 177 178 179 180 181 182 183 184 185 186
    std::vector<meta::TableSchema> all_tables;
    auto status = meta_ptr_->AllTables(all_tables);

    // only return real tables, dont return partition tables
    table_schema_array.clear();
    for (auto& schema : all_tables) {
        if (schema.owner_table_.empty()) {
            table_schema_array.push_back(schema);
        }
    }

    return status;
G
groot 已提交
187 188
}

S
starlord 已提交
189
Status
Y
Yu Kun 已提交
190
DBImpl::PreloadTable(const std::string& table_id) {
191
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
192
        return SHUTDOWN_ERROR;
S
starlord 已提交
193 194
    }

195
    // step 1: get all table files from parent table
G
groot 已提交
196
    meta::DatesT dates;
197
    std::vector<size_t> ids;
G
groot 已提交
198
    meta::TableFilesSchema files_array;
G
groot 已提交
199
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
Y
Yu Kun 已提交
200 201 202
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
203

204
    // step 2: get files from partition tables
G
groot 已提交
205 206 207
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
208
        status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
209 210
    }

Y
Yu Kun 已提交
211 212
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
213 214
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
215

216 217 218 219
    // step 3: load file one by one
    ENGINE_LOG_DEBUG << "Begin pre-load table:" + table_id + ", totally " << files_array.size()
                     << " files need to be pre-loaded";
    TimeRecorderAuto rc("Pre-load table:" + table_id);
G
groot 已提交
220 221 222 223 224 225 226
    for (auto& file : files_array) {
        ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                         (MetricType)file.metric_type_, file.nlist_);
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
227

G
groot 已提交
228 229
        size += engine->PhysicalSize();
        if (size > available_size) {
230
            ENGINE_LOG_DEBUG << "Pre-load canceled since cache almost full";
G
groot 已提交
231 232 233
            return Status(SERVER_CACHE_FULL, "Cache is full");
        } else {
            try {
234 235
                std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
                TimeRecorderAuto rc_1(msg);
G
groot 已提交
236 237 238 239 240
                engine->Load(true);
            } catch (std::exception& ex) {
                std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
                ENGINE_LOG_ERROR << msg;
                return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
241 242 243
            }
        }
    }
G
groot 已提交
244

Y
Yu Kun 已提交
245
    return Status::OK();
Y
Yu Kun 已提交
246 247
}

S
starlord 已提交
248
Status
Y
Yu Kun 已提交
249
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
250
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
251
        return SHUTDOWN_ERROR;
S
starlord 已提交
252 253
    }

S
starlord 已提交
254 255 256
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

S
starlord 已提交
257
Status
Y
Yu Kun 已提交
258
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
259
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
260 261 262 263 264 265 266 267 268
        return SHUTDOWN_ERROR;
    }

    return GetTableRowCountRecursively(table_id, row_count);
}

Status
DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name,
                        const std::string& partition_tag) {
269
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
270 271 272 273 274 275 276 277
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->CreatePartition(table_id, partition_name, partition_tag);
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
278
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
279
        return SHUTDOWN_ERROR;
S
starlord 已提交
280 281
    }

G
groot 已提交
282 283 284 285 286 287 288 289 290 291
    auto status = mem_mgr_->EraseMemVector(partition_name);  // not allow insert
    status = meta_ptr_->DropPartition(partition_name);       // soft delete table

    // scheduler will determine when to delete table files
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
292 293
}

S
starlord 已提交
294
Status
G
groot 已提交
295
DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) {
296
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
297 298 299 300 301
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
    auto status = meta_ptr_->GetPartitionName(table_id, partition_tag, partition_name);
302 303 304 305 306
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }

G
groot 已提交
307 308 309 310
    return DropPartition(partition_name);
}

Status
G
groot 已提交
311
DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
312
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
313 314 315
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
316
    return meta_ptr_->ShowPartitions(table_id, partition_schema_array);
G
groot 已提交
317 318 319
}

Status
G
groot 已提交
320
DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, VectorsData& vectors) {
S
starlord 已提交
321
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
322
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
323
        return SHUTDOWN_ERROR;
S
starlord 已提交
324
    }
Y
yu yunfeng 已提交
325

G
groot 已提交
326
    // if partition is specified, use partition as target table
327
    Status status;
G
groot 已提交
328 329 330 331
    std::string target_table_name = table_id;
    if (!partition_tag.empty()) {
        std::string partition_name;
        status = meta_ptr_->GetPartitionName(table_id, partition_tag, target_table_name);
G
groot 已提交
332 333 334 335
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
            return status;
        }
G
groot 已提交
336 337 338
    }

    // insert vectors into target table
G
groot 已提交
339 340
    milvus::server::CollectInsertMetrics metrics(vectors.vector_count_, status);
    status = mem_mgr_->InsertVectors(target_table_name, vectors);
S
starlord 已提交
341

G
groot 已提交
342
    return status;
X
Xu Peng 已提交
343 344
}

S
starlord 已提交
345
Status
Y
Yu Kun 已提交
346
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
347
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
348 349 350
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
351 352 353 354
    // serialize memory data
    std::set<std::string> sync_table_ids;
    auto status = SyncMemData(sync_table_ids);

S
starlord 已提交
355 356 357
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
358
        // step 1: check index difference
S
starlord 已提交
359
        TableIndex old_index;
G
groot 已提交
360
        status = DescribeIndex(table_id, old_index);
S
starlord 已提交
361
        if (!status.ok()) {
S
starlord 已提交
362 363 364 365
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

S
starlord 已提交
366
        // step 2: update index info
S
starlord 已提交
367
        TableIndex new_index = index;
S
starlord 已提交
368
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
S
starlord 已提交
369
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
370
            status = UpdateTableIndexRecursively(table_id, new_index);
S
starlord 已提交
371 372 373 374 375 376
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
377 378
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
379 380
    WaitMergeFileFinish();

S
starlord 已提交
381
    // step 4: wait and build index
382
    status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
383
    status = BuildTableIndexRecursively(table_id, index);
S
starlord 已提交
384

G
groot 已提交
385
    return status;
S
starlord 已提交
386 387
}

S
starlord 已提交
388
Status
Y
Yu Kun 已提交
389
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
390
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
391 392 393
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
394 395 396
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

S
starlord 已提交
397
Status
Y
Yu Kun 已提交
398
DBImpl::DropIndex(const std::string& table_id) {
399
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
400 401 402
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
403
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
G
groot 已提交
404
    return DropTableIndexRecursively(table_id);
S
starlord 已提交
405 406
}

S
starlord 已提交
407
Status
Z
Zhiru Zhu 已提交
408
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
G
groot 已提交
409 410
              const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nprobe, const VectorsData& vectors,
              ResultIds& result_ids, ResultDistances& result_distances) {
411
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
412
        return SHUTDOWN_ERROR;
S
starlord 已提交
413 414
    }

415
    meta::DatesT dates = {utils::GetDate()};
G
groot 已提交
416
    Status result = Query(context, table_id, partition_tags, k, nprobe, vectors, dates, result_ids, result_distances);
Y
yu yunfeng 已提交
417
    return result;
X
Xu Peng 已提交
418 419
}

S
starlord 已提交
420
Status
Z
Zhiru Zhu 已提交
421
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
G
groot 已提交
422 423
              const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nprobe, const VectorsData& vectors,
              const meta::DatesT& dates, ResultIds& result_ids, ResultDistances& result_distances) {
Z
Zhiru Zhu 已提交
424 425
    auto query_ctx = context->Child("Query");

426
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
427
        return SHUTDOWN_ERROR;
S
starlord 已提交
428 429
    }

430
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
431

G
groot 已提交
432
    Status status;
433
    std::vector<size_t> ids;
G
groot 已提交
434
    meta::TableFilesSchema files_array;
435

G
groot 已提交
436 437 438
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
439
        status = GetFilesToSearch(table_id, ids, dates, files_array);
G
groot 已提交
440 441 442 443
        if (!status.ok()) {
            return status;
        }

G
groot 已提交
444 445 446
        std::vector<meta::TableSchema> partition_array;
        status = meta_ptr_->ShowPartitions(table_id, partition_array);
        for (auto& schema : partition_array) {
G
groot 已提交
447
            status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
448 449 450 451 452 453 454
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(table_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
455
            status = GetFilesToSearch(partition_name, ids, dates, files_array);
456 457 458
        }
    }

S
starlord 已提交
459
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
460
    status = QueryAsync(query_ctx, table_id, files_array, k, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
461
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
462 463 464

    query_ctx->GetTraceContext()->GetSpan()->Finish();

S
starlord 已提交
465
    return status;
G
groot 已提交
466
}
X
Xu Peng 已提交
467

S
starlord 已提交
468
Status
Z
Zhiru Zhu 已提交
469
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
G
groot 已提交
470 471
                      const std::vector<std::string>& file_ids, uint64_t k, uint64_t nprobe, const VectorsData& vectors,
                      const meta::DatesT& dates, ResultIds& result_ids, ResultDistances& result_distances) {
Z
Zhiru Zhu 已提交
472 473
    auto query_ctx = context->Child("Query by file id");

474
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
475
        return SHUTDOWN_ERROR;
S
starlord 已提交
476 477
    }

478
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
479

S
starlord 已提交
480
    // get specified files
481
    std::vector<size_t> ids;
Y
Yu Kun 已提交
482
    for (auto& id : file_ids) {
483
        meta::TableFileSchema table_file;
484 485
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
486
        ids.push_back(std::stoul(id, &sz));
487 488
    }

G
groot 已提交
489
    meta::TableFilesSchema files_array;
G
groot 已提交
490
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
491 492
    if (!status.ok()) {
        return status;
493 494
    }

G
groot 已提交
495
    if (files_array.empty()) {
S
starlord 已提交
496
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
497 498
    }

S
starlord 已提交
499
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
500
    status = QueryAsync(query_ctx, table_id, files_array, k, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
501
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
502 503 504

    query_ctx->GetTraceContext()->GetSpan()->Finish();

S
starlord 已提交
505
    return status;
506 507
}

S
starlord 已提交
508
Status
Y
Yu Kun 已提交
509
DBImpl::Size(uint64_t& result) {
510
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
511
        return SHUTDOWN_ERROR;
S
starlord 已提交
512 513
    }

S
starlord 已提交
514
    return meta_ptr_->Size(result);
S
starlord 已提交
515 516 517
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
518
// internal methods
S
starlord 已提交
519
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
520
Status
Z
Zhiru Zhu 已提交
521
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
G
groot 已提交
522
                   const meta::TableFilesSchema& files, uint64_t k, uint64_t nprobe, const VectorsData& vectors,
Z
Zhiru Zhu 已提交
523 524 525
                   ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_async_ctx = context->Child("Query Async");

G
groot 已提交
526
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
527

S
starlord 已提交
528
    TimeRecorder rc("");
G
groot 已提交
529

530 531 532
    // step 1: construct search job
    auto status = ongoing_files_checker_.MarkOngoingFiles(files);

533
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
G
groot 已提交
534
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(query_async_ctx, k, nprobe, vectors);
Y
Yu Kun 已提交
535
    for (auto& file : files) {
S
starlord 已提交
536
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
537
        job->AddIndexFile(file_ptr);
G
groot 已提交
538 539
    }

540
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
541
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
542
    job->WaitResult();
543 544

    status = ongoing_files_checker_.UnmarkOngoingFiles(files);
W
wxyu 已提交
545 546
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
547
    }
G
groot 已提交
548

549
    // step 3: construct results
G
groot 已提交
550 551
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
552
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
553

Z
Zhiru Zhu 已提交
554 555
    query_async_ctx->GetTraceContext()->GetSpan()->Finish();

G
groot 已提交
556 557 558
    return Status::OK();
}

S
starlord 已提交
559 560
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
561
    Status status;
Y
yu yunfeng 已提交
562
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
563
    while (true) {
564
        if (!initialized_.load(std::memory_order_acquire)) {
565 566
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
567 568

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
569 570
            break;
        }
X
Xu Peng 已提交
571

G
groot 已提交
572
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
573

G
groot 已提交
574
        StartMetricTask();
G
groot 已提交
575 576 577
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
578 579
}

S
starlord 已提交
580 581
void
DBImpl::WaitMergeFileFinish() {
582
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
Y
Yu Kun 已提交
583
    for (auto& iter : compact_thread_results_) {
584 585 586 587
        iter.wait();
    }
}

S
starlord 已提交
588 589
void
DBImpl::WaitBuildIndexFinish() {
590
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
591
    for (auto& iter : index_thread_results_) {
592 593 594 595
        iter.wait();
    }
}

S
starlord 已提交
596 597
void
DBImpl::StartMetricTask() {
G
groot 已提交
598
    static uint64_t metric_clock_tick = 0;
599
    ++metric_clock_tick;
S
starlord 已提交
600
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
601 602 603
        return;
    }

S
Shouyu Luo 已提交
604
    // ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
605

G
groot 已提交
606 607 608
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
J
JinHai-CN 已提交
609 610 611 612 613 614 615
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
616
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
617 618 619 620 621 622 623 624
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
625

K
kun yu 已提交
626
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
627 628
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
629
    server::Metrics::GetInstance().PushToGateway();
K
kun yu 已提交
630

S
Shouyu Luo 已提交
631
    // ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
632 633
}

S
starlord 已提交
634
Status
G
groot 已提交
635
DBImpl::SyncMemData(std::set<std::string>& sync_table_ids) {
636
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
637
    std::set<std::string> temp_table_ids;
G
groot 已提交
638
    mem_mgr_->Serialize(temp_table_ids);
Y
Yu Kun 已提交
639
    for (auto& id : temp_table_ids) {
G
groot 已提交
640
        sync_table_ids.insert(id);
G
groot 已提交
641
    }
X
Xu Peng 已提交
642

S
starlord 已提交
643
    if (!temp_table_ids.empty()) {
644 645
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
646

647 648 649
    return Status::OK();
}

S
starlord 已提交
650 651
void
DBImpl::StartCompactionTask() {
652
    static uint64_t compact_clock_tick = 0;
653
    ++compact_clock_tick;
S
starlord 已提交
654
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
655 656 657
        return;
    }

S
starlord 已提交
658
    // serialize memory data
G
groot 已提交
659
    SyncMemData(compact_table_ids_);
660

S
starlord 已提交
661
    // compactiong has been finished?
662 663 664 665 666 667 668
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
669 670
        }
    }
X
Xu Peng 已提交
671

S
starlord 已提交
672
    // add new compaction task
673 674 675
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
676 677 678
            // collect merge files for all tables(if compact_table_ids_ is empty) for two reasons:
            // 1. other tables may still has un-merged files
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
G
groot 已提交
679
            if (compact_table_ids_.empty()) {
680 681
                std::vector<meta::TableSchema> table_schema_array;
                meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
682
                for (auto& schema : table_schema_array) {
683 684 685 686 687
                    compact_table_ids_.insert(schema.table_id_);
                }
            }

            // start merge file thread
688
            compact_thread_results_.push_back(
G
groot 已提交
689
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
690 691
            compact_table_ids_.clear();
        }
G
groot 已提交
692
    }
X
Xu Peng 已提交
693 694
}

S
starlord 已提交
695
Status
Y
Yu Kun 已提交
696
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
S
starlord 已提交
697
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
698

S
starlord 已提交
699
    // step 1: create table file
X
Xu Peng 已提交
700
    meta::TableFileSchema table_file;
G
groot 已提交
701 702
    table_file.table_id_ = table_id;
    table_file.date_ = date;
703
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
704
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
705

706
    if (!status.ok()) {
S
starlord 已提交
707
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
708 709 710
        return status;
    }

S
starlord 已提交
711
    // step 2: merge files
G
groot 已提交
712
    ExecutionEnginePtr index =
Y
Yu Kun 已提交
713 714
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
715

716
    meta::TableFilesSchema updated;
S
starlord 已提交
717
    int64_t index_size = 0;
718

Y
Yu Kun 已提交
719
    for (auto& file : files) {
Y
Yu Kun 已提交
720
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
721

G
groot 已提交
722
        index->Merge(file.location_);
723
        auto file_schema = file;
G
groot 已提交
724
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
725
        updated.push_back(file_schema);
G
groot 已提交
726
        index_size = index->Size();
X
Xu Peng 已提交
727

S
starlord 已提交
728
        if (index_size >= file_schema.index_file_size_) {
S
starlord 已提交
729
            break;
S
starlord 已提交
730
        }
731 732
    }

S
starlord 已提交
733
    // step 3: serialize to disk
S
starlord 已提交
734
    try {
G
groot 已提交
735
        status = index->Serialize();
G
groot 已提交
736
        if (!status.ok()) {
G
groot 已提交
737 738
            ENGINE_LOG_ERROR << status.message();
        }
Y
Yu Kun 已提交
739
    } catch (std::exception& ex) {
S
starlord 已提交
740
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
741
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
742 743
        status = Status(DB_ERROR, msg);
    }
Y
yu yunfeng 已提交
744

G
groot 已提交
745 746 747
    if (!status.ok()) {
        // if failed to serialize merge file to disk
        // typical error: out of disk space, out of memory or permition denied
S
starlord 已提交
748 749 750
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
751

G
groot 已提交
752
        ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_
G
groot 已提交
753
                         << ", possible out of disk space or memory";
S
starlord 已提交
754

G
groot 已提交
755
        return status;
S
starlord 已提交
756 757
    }

S
starlord 已提交
758 759 760
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
Y
Yu Kun 已提交
761
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
762 763
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
764 765 766
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
767 768
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
769
    updated.push_back(table_file);
G
groot 已提交
770
    status = meta_ptr_->UpdateTableFiles(updated);
S
starlord 已提交
771
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
772

S
starlord 已提交
773
    if (options_.insert_cache_immediately_) {
S
starlord 已提交
774 775
        index->Cache();
    }
X
Xu Peng 已提交
776

777 778 779
    return status;
}

S
starlord 已提交
780
Status
Y
Yu Kun 已提交
781
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
782
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
783
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
784
    if (!status.ok()) {
S
starlord 已提交
785
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
786 787
        return status;
    }
788

Y
Yu Kun 已提交
789
    for (auto& kv : raw_files) {
790
        meta::TableFilesSchema& files = kv.second;
S
starlord 已提交
791
        if (files.size() < options_.merge_trigger_number_) {
792
            ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
793 794
            continue;
        }
795

796
        status = ongoing_files_checker_.MarkOngoingFiles(files);
X
Xu Peng 已提交
797
        MergeFiles(table_id, kv.first, kv.second);
798
        status = ongoing_files_checker_.UnmarkOngoingFiles(files);
G
groot 已提交
799

800
        if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
801
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
802 803
            break;
        }
804
    }
X
Xu Peng 已提交
805

G
groot 已提交
806 807
    return Status::OK();
}
808

S
starlord 已提交
809 810
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
S
Shouyu Luo 已提交
811
    // ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
812

G
groot 已提交
813
    Status status;
Y
Yu Kun 已提交
814
    for (auto& table_id : table_ids) {
G
groot 已提交
815 816
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
817
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
818
        }
S
starlord 已提交
819

820
        if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
821 822 823
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
824
    }
X
Xu Peng 已提交
825

G
groot 已提交
826
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
827

828
    {
G
groot 已提交
829
        uint64_t ttl = 10 * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
830
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
831
            ttl = meta::HOUR;
832
        }
G
groot 已提交
833

G
groot 已提交
834
        meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_);
Z
update  
zhiru 已提交
835
    }
S
starlord 已提交
836

S
Shouyu Luo 已提交
837
    // ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
838
}
X
Xu Peng 已提交
839

S
starlord 已提交
840 841
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
842
    static uint64_t index_clock_tick = 0;
843
    ++index_clock_tick;
S
starlord 已提交
844
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
845 846 847
        return;
    }

S
starlord 已提交
848
    // build index has been finished?
849 850 851 852 853 854 855
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
856 857 858
        }
    }

S
starlord 已提交
859
    // add new build index task
860 861 862
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
863
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
864
        }
G
groot 已提交
865
    }
X
Xu Peng 已提交
866 867
}

S
starlord 已提交
868 869
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
870
    std::unique_lock<std::mutex> lock(build_index_mutex_);
871
    meta::TableFilesSchema to_index_files;
G
groot 已提交
872
    meta_ptr_->FilesToIndex(to_index_files);
873
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
874

875
    if (!to_index_files.empty()) {
G
groot 已提交
876
        ENGINE_LOG_DEBUG << "Background build index thread begin";
877 878
        status = ongoing_files_checker_.MarkOngoingFiles(to_index_files);

879
        // step 2: put build index task to scheduler
G
groot 已提交
880
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr>> job2file_map;
881
        for (auto& file : to_index_files) {
G
groot 已提交
882
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
883 884
            scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
885
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
886
            job2file_map.push_back(std::make_pair(job, file_ptr));
887
        }
G
groot 已提交
888

G
groot 已提交
889
        // step 3: wait build index finished and mark failed files
G
groot 已提交
890 891 892 893 894 895 896 897
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
            meta::TableFileSchema& file_schema = *(iter->second.get());
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
                ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();

898
                index_failed_checker_.MarkFailedIndexFile(file_schema);
G
groot 已提交
899 900
            } else {
                ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
G
groot 已提交
901 902

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
903
            }
G
groot 已提交
904
            status = ongoing_files_checker_.UnmarkOngoingFile(file_schema);
905
        }
G
groot 已提交
906 907

        ENGINE_LOG_DEBUG << "Background build index thread finished";
Y
Yu Kun 已提交
908
    }
X
Xu Peng 已提交
909 910
}

G
groot 已提交
911 912 913 914 915 916 917 918 919 920 921 922
Status
DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
                             meta::TableFilesSchema& files) {
    files.clear();
    auto status = meta_ptr_->FilesByType(table_id, file_types, files);

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
        if ((*it).file_type_ == static_cast<int>(meta::TableFileSchema::RAW) &&
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
923
            ++it;
G
groot 已提交
924 925 926 927 928 929
        }
    }

    return Status::OK();
}

G
groot 已提交
930
Status
G
groot 已提交
931
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
G
groot 已提交
932
                         meta::TableFilesSchema& files) {
933 934
    ENGINE_LOG_DEBUG << "Collect files from table: " << table_id;

G
groot 已提交
935 936 937 938 939 940 941 942 943 944 945 946 947
    meta::DatePartionedTableFilesSchema date_files;
    auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
    if (!status.ok()) {
        return status;
    }

    TraverseFiles(date_files, files);
    return Status::OK();
}

Status
DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags,
                            std::set<std::string>& partition_name_array) {
G
groot 已提交
948 949
    std::vector<meta::TableSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(table_id, partition_array);
G
groot 已提交
950 951

    for (auto& tag : partition_tags) {
952 953 954 955
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
956
        for (auto& schema : partition_array) {
957
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
G
groot 已提交
958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974
                partition_name_array.insert(schema.table_id_);
            }
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& dates) {
    // dates partly delete files of the table but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;

    Status status;
    if (dates.empty()) {
        status = mem_mgr_->EraseMemVector(table_id);  // not allow insert
        status = meta_ptr_->DropTable(table_id);      // soft delete table
975
        index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
976 977 978 979 980 981 982 983 984 985

        // scheduler will determine when to delete table files
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitAndDelete();
    } else {
        status = meta_ptr_->DropDataByDate(table_id, dates);
    }

G
groot 已提交
986 987 988
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
        status = DropTableRecursively(schema.table_id_, dates);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    DropIndex(table_id);

    auto status = meta_ptr_->UpdateTableIndex(table_id, index);
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
        return status;
    }

G
groot 已提交
1008 1009 1010
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
        status = UpdateTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
        };
    } else {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::RAW),
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
            static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
            static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
        };
    }

    // get files to build index
G
groot 已提交
1041 1042
    meta::TableFilesSchema table_files;
    auto status = GetFilesToBuildIndex(table_id, file_types, table_files);
G
groot 已提交
1043 1044
    int times = 1;

G
groot 已提交
1045
    while (!table_files.empty()) {
G
groot 已提交
1046 1047 1048 1049 1050 1051
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
G
groot 已提交
1052
        GetFilesToBuildIndex(table_id, file_types, table_files);
1053
        ++times;
G
groot 已提交
1054

1055
        index_failed_checker_.IgnoreFailedIndexFiles(table_files);
G
groot 已提交
1056 1057 1058
    }

    // build index for partition
G
groot 已提交
1059 1060 1061
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1062 1063 1064 1065 1066 1067
        status = BuildTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
1068 1069
    // failed to build index for some files, return error
    std::vector<std::string> failed_files;
1070
    index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files);
G
groot 已提交
1071 1072
    if (!failed_files.empty()) {
        std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
G
groot 已提交
1073
                          ((failed_files.size() == 1) ? " file" : " files");
Y
yudong.cai 已提交
1074
        msg += ", please double check index parameters.";
G
groot 已提交
1075 1076 1077
        return Status(DB_ERROR, msg);
    }

G
groot 已提交
1078 1079 1080 1081 1082 1083
    return Status::OK();
}

Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
1084
    index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
G
groot 已提交
1085 1086 1087 1088 1089 1090
    auto status = meta_ptr_->DropTableIndex(table_id);
    if (!status.ok()) {
        return status;
    }

    // drop partition index
G
groot 已提交
1091 1092 1093
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
        status = DropTableIndexRecursively(schema.table_id_);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count) {
    row_count = 0;
    auto status = meta_ptr_->Count(table_id, row_count);
    if (!status.ok()) {
        return status;
    }

    // get partition row count
G
groot 已提交
1112 1113 1114
    std::vector<meta::TableSchema> partition_array;
    status = meta_ptr_->ShowPartitions(table_id, partition_array);
    for (auto& schema : partition_array) {
G
groot 已提交
1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
        uint64_t partition_row_count = 0;
        status = GetTableRowCountRecursively(schema.table_id_, partition_row_count);
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

S
starlord 已提交
1127 1128
}  // namespace engine
}  // namespace milvus