DBImpl.cpp 33.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/DBImpl.h"
S
starlord 已提交
19
#include "Utils.h"
S
starlord 已提交
20 21
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
22
#include "engine/EngineFactory.h"
S
starlord 已提交
23
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
24
#include "meta/MetaConsts.h"
S
starlord 已提交
25 26
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
27
#include "metrics/Metrics.h"
S
starlord 已提交
28
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
29
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
30 31
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
S
starlord 已提交
32
#include "utils/Log.h"
G
groot 已提交
33
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
34
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
35

X
Xu Peng 已提交
36
#include <assert.h>
S
starlord 已提交
37
#include <algorithm>
G
groot 已提交
38
#include <boost/filesystem.hpp>
S
starlord 已提交
39 40 41
#include <chrono>
#include <cstring>
#include <iostream>
G
groot 已提交
42
#include <set>
S
starlord 已提交
43
#include <thread>
X
Xu Peng 已提交
44

J
jinhai 已提交
45
namespace milvus {
X
Xu Peng 已提交
46
namespace engine {
X
Xu Peng 已提交
47

G
groot 已提交
48 49
namespace {

J
jinhai 已提交
50 51 52
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
53

G
groot 已提交
54 55 56 57 58 59 60 61 62 63 64
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!");

void
TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) {
    for (auto& day_files : date_files) {
        for (auto& file : day_files.second) {
            files_array.push_back(file);
        }
    }
}

S
starlord 已提交
65
}  // namespace
G
groot 已提交
66

Y
Yu Kun 已提交
67
DBImpl::DBImpl(const DBOptions& options)
S
starlord 已提交
68
    : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
69
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
70
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
71 72 73 74 75 76 77
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
78
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
79
// external api
S
starlord 已提交
80
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
81 82 83
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
84 85 86
        return Status::OK();
    }

S
starlord 已提交
87
    ENGINE_LOG_TRACE << "DB service start";
S
starlord 已提交
88 89
    shutting_down_.store(false, std::memory_order_release);

S
starlord 已提交
90
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
91
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
92
        ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
93
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
94
    }
S
starlord 已提交
95

S
starlord 已提交
96 97 98
    return Status::OK();
}

S
starlord 已提交
99 100 101
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
102 103 104 105
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
S
starlord 已提交
106

S
starlord 已提交
107
    // makesure all memory data serialized
S
starlord 已提交
108
    MemSerialize();
S
starlord 已提交
109

S
starlord 已提交
110
    // wait compaction/buildindex finish
S
starlord 已提交
111
    bg_timer_thread_.join();
S
starlord 已提交
112

Y
yudong.cai 已提交
113
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
S
starlord 已提交
114
        meta_ptr_->CleanUp();
S
starlord 已提交
115 116
    }

S
starlord 已提交
117
    ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
118
    return Status::OK();
X
Xu Peng 已提交
119 120
}

S
starlord 已提交
121 122
Status
DBImpl::DropAll() {
S
starlord 已提交
123 124 125
    return meta_ptr_->DropAll();
}

S
starlord 已提交
126
Status
Y
Yu Kun 已提交
127
DBImpl::CreateTable(meta::TableSchema& table_schema) {
S
starlord 已提交
128
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
129
        return SHUTDOWN_ERROR;
S
starlord 已提交
130 131
    }

132
    meta::TableSchema temp_schema = table_schema;
S
starlord 已提交
133
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
134
    return meta_ptr_->CreateTable(temp_schema);
135 136
}

S
starlord 已提交
137
Status
G
groot 已提交
138
DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
S
starlord 已提交
139
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
140
        return SHUTDOWN_ERROR;
S
starlord 已提交
141 142
    }

G
groot 已提交
143
    return DropTableRecursively(table_id, dates);
G
groot 已提交
144 145
}

S
starlord 已提交
146
Status
Y
Yu Kun 已提交
147
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
S
starlord 已提交
148
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
149
        return SHUTDOWN_ERROR;
S
starlord 已提交
150 151
    }

S
starlord 已提交
152
    auto stat = meta_ptr_->DescribeTable(table_schema);
S
starlord 已提交
153
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
154
    return stat;
155 156
}

S
starlord 已提交
157
Status
Y
Yu Kun 已提交
158
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
159
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
160
        return SHUTDOWN_ERROR;
S
starlord 已提交
161 162
    }

G
groot 已提交
163
    return meta_ptr_->HasTable(table_id, has_or_not);
164 165
}

S
starlord 已提交
166
Status
Y
Yu Kun 已提交
167
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
S
starlord 已提交
168
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
169
        return SHUTDOWN_ERROR;
S
starlord 已提交
170 171
    }

G
groot 已提交
172
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
173 174
}

S
starlord 已提交
175
Status
Y
Yu Kun 已提交
176
DBImpl::PreloadTable(const std::string& table_id) {
S
starlord 已提交
177
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
178
        return SHUTDOWN_ERROR;
S
starlord 已提交
179 180
    }

G
groot 已提交
181
    // get all table files from parent table
G
groot 已提交
182
    meta::DatesT dates;
183
    std::vector<size_t> ids;
G
groot 已提交
184
    meta::TableFilesSchema files_array;
G
groot 已提交
185
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
Y
Yu Kun 已提交
186 187 188
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
189

G
groot 已提交
190 191 192 193
    // get files from partition tables
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
G
groot 已提交
194
        status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
195 196
    }

Y
Yu Kun 已提交
197 198
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
199 200
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
201

G
groot 已提交
202 203 204 205 206 207 208
    for (auto& file : files_array) {
        ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                         (MetricType)file.metric_type_, file.nlist_);
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
209

G
groot 已提交
210 211 212 213 214 215 216 217 218 219 220
        size += engine->PhysicalSize();
        if (size > available_size) {
            return Status(SERVER_CACHE_FULL, "Cache is full");
        } else {
            try {
                // step 1: load index
                engine->Load(true);
            } catch (std::exception& ex) {
                std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
                ENGINE_LOG_ERROR << msg;
                return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
221 222 223
            }
        }
    }
G
groot 已提交
224

Y
Yu Kun 已提交
225
    return Status::OK();
Y
Yu Kun 已提交
226 227
}

S
starlord 已提交
228
Status
Y
Yu Kun 已提交
229
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
S
starlord 已提交
230
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
231
        return SHUTDOWN_ERROR;
S
starlord 已提交
232 233
    }

S
starlord 已提交
234 235 236
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

S
starlord 已提交
237
Status
Y
Yu Kun 已提交
238
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
S
starlord 已提交
239
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
        return SHUTDOWN_ERROR;
    }

    return GetTableRowCountRecursively(table_id, row_count);
}

Status
DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name,
                        const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->CreatePartition(table_id, partition_name, partition_tag);
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
S
starlord 已提交
260 261
    }

G
groot 已提交
262 263 264 265 266 267 268 269 270 271
    auto status = mem_mgr_->EraseMemVector(partition_name);  // not allow insert
    status = meta_ptr_->DropPartition(partition_name);       // soft delete table

    // scheduler will determine when to delete table files
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
272 273
}

S
starlord 已提交
274
Status
G
groot 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
    auto status = meta_ptr_->GetPartitionName(table_id, partition_tag, partition_name);
    return DropPartition(partition_name);
}

Status
DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    return meta_ptr_->ShowPartitions(table_id, partiton_schema_array);
}

Status
DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors,
                      IDNumbers& vector_ids) {
S
starlord 已提交
297
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
S
starlord 已提交
298
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
299
        return SHUTDOWN_ERROR;
S
starlord 已提交
300
    }
Y
yu yunfeng 已提交
301

G
groot 已提交
302
    // if partition is specified, use partition as target table
303
    Status status;
G
groot 已提交
304 305 306 307
    std::string target_table_name = table_id;
    if (!partition_tag.empty()) {
        std::string partition_name;
        status = meta_ptr_->GetPartitionName(table_id, partition_tag, target_table_name);
G
groot 已提交
308 309 310 311
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
            return status;
        }
G
groot 已提交
312 313 314
    }

    // insert vectors into target table
S
starlord 已提交
315
    milvus::server::CollectInsertMetrics metrics(n, status);
G
groot 已提交
316
    status = mem_mgr_->InsertVectors(target_table_name, n, vectors, vector_ids);
S
starlord 已提交
317

G
groot 已提交
318
    return status;
X
Xu Peng 已提交
319 320
}

S
starlord 已提交
321
Status
Y
Yu Kun 已提交
322
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
G
groot 已提交
323 324 325 326 327
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
S
starlord 已提交
328 329 330
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
331
        // step 1: check index difference
S
starlord 已提交
332
        TableIndex old_index;
G
groot 已提交
333
        status = DescribeIndex(table_id, old_index);
S
starlord 已提交
334
        if (!status.ok()) {
S
starlord 已提交
335 336 337 338
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

S
starlord 已提交
339
        // step 2: update index info
S
starlord 已提交
340
        TableIndex new_index = index;
S
starlord 已提交
341
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
S
starlord 已提交
342
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
343
            status = UpdateTableIndexRecursively(table_id, new_index);
S
starlord 已提交
344 345 346 347 348 349
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
350 351
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
352 353
    WaitMergeFileFinish();

S
starlord 已提交
354
    // step 4: wait and build index
G
groot 已提交
355
    status = BuildTableIndexRecursively(table_id, index);
S
starlord 已提交
356

G
groot 已提交
357
    return status;
S
starlord 已提交
358 359
}

S
starlord 已提交
360
Status
Y
Yu Kun 已提交
361
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
G
groot 已提交
362 363 364 365
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
366 367 368
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

S
starlord 已提交
369
Status
Y
Yu Kun 已提交
370
DBImpl::DropIndex(const std::string& table_id) {
G
groot 已提交
371 372 373 374
    if (shutting_down_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

S
starlord 已提交
375
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
G
groot 已提交
376
    return DropTableIndexRecursively(table_id);
S
starlord 已提交
377 378
}

S
starlord 已提交
379
Status
G
groot 已提交
380 381
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
S
starlord 已提交
382
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
383
        return SHUTDOWN_ERROR;
S
starlord 已提交
384 385
    }

386
    meta::DatesT dates = {utils::GetDate()};
G
groot 已提交
387
    Status result = Query(table_id, partition_tags, k, nq, nprobe, vectors, dates, result_ids, result_distances);
Y
yu yunfeng 已提交
388
    return result;
X
Xu Peng 已提交
389 390
}

S
starlord 已提交
391
Status
G
groot 已提交
392 393 394
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
              ResultDistances& result_distances) {
S
starlord 已提交
395
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
396
        return SHUTDOWN_ERROR;
S
starlord 已提交
397 398
    }

399
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
400

G
groot 已提交
401
    Status status;
402
    std::vector<size_t> ids;
G
groot 已提交
403
    meta::TableFilesSchema files_array;
404

G
groot 已提交
405 406 407
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
408
        status = GetFilesToSearch(table_id, ids, dates, files_array);
G
groot 已提交
409 410 411 412 413 414 415
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::TableSchema> partiton_array;
        status = meta_ptr_->ShowPartitions(table_id, partiton_array);
        for (auto& schema : partiton_array) {
G
groot 已提交
416
            status = GetFilesToSearch(schema.table_id_, ids, dates, files_array);
G
groot 已提交
417 418 419 420 421 422 423
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(table_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
424
            status = GetFilesToSearch(partition_name, ids, dates, files_array);
425 426 427
        }
    }

S
starlord 已提交
428
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
429
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
430
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
431
    return status;
G
groot 已提交
432
}
X
Xu Peng 已提交
433

S
starlord 已提交
434
Status
G
groot 已提交
435 436 437
DBImpl::QueryByFileID(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
                      uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
                      ResultDistances& result_distances) {
S
starlord 已提交
438
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
439
        return SHUTDOWN_ERROR;
S
starlord 已提交
440 441
    }

442
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
S
starlord 已提交
443

S
starlord 已提交
444
    // get specified files
445
    std::vector<size_t> ids;
Y
Yu Kun 已提交
446
    for (auto& id : file_ids) {
447
        meta::TableFileSchema table_file;
448 449
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
450
        ids.push_back(std::stoul(id, &sz));
451 452
    }

G
groot 已提交
453
    meta::TableFilesSchema files_array;
G
groot 已提交
454
    auto status = GetFilesToSearch(table_id, ids, dates, files_array);
455 456
    if (!status.ok()) {
        return status;
457 458
    }

G
groot 已提交
459
    if (files_array.empty()) {
S
starlord 已提交
460
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
461 462
    }

S
starlord 已提交
463
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
464
    status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
S
starlord 已提交
465
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
466
    return status;
467 468
}

S
starlord 已提交
469
Status
Y
Yu Kun 已提交
470
DBImpl::Size(uint64_t& result) {
S
starlord 已提交
471
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
472
        return SHUTDOWN_ERROR;
S
starlord 已提交
473 474
    }

S
starlord 已提交
475
    return meta_ptr_->Size(result);
S
starlord 已提交
476 477 478
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
479
// internal methods
S
starlord 已提交
480
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
481
Status
Y
Yu Kun 已提交
482
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
G
groot 已提交
483
                   uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
Y
Yu Kun 已提交
484 485
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
486
    TimeRecorder rc("");
G
groot 已提交
487

S
starlord 已提交
488
    // step 1: get files to search
489
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
W
wxyu 已提交
490
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
Y
Yu Kun 已提交
491
    for (auto& file : files) {
S
starlord 已提交
492
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
493
        job->AddIndexFile(file_ptr);
G
groot 已提交
494 495
    }

S
starlord 已提交
496
    // step 2: put search task to scheduler
S
starlord 已提交
497
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
498 499 500
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
501
    }
G
groot 已提交
502

503
    // step 3: construct results
G
groot 已提交
504 505
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
506
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
507 508 509 510

    return Status::OK();
}

S
starlord 已提交
511 512
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
513
    Status status;
Y
yu yunfeng 已提交
514
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
515
    while (true) {
S
starlord 已提交
516
        if (shutting_down_.load(std::memory_order_acquire)) {
517 518
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
519 520

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
521 522
            break;
        }
X
Xu Peng 已提交
523

G
groot 已提交
524
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
525

G
groot 已提交
526
        StartMetricTask();
G
groot 已提交
527 528 529
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
530 531
}

S
starlord 已提交
532 533
void
DBImpl::WaitMergeFileFinish() {
534
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
Y
Yu Kun 已提交
535
    for (auto& iter : compact_thread_results_) {
536 537 538 539
        iter.wait();
    }
}

S
starlord 已提交
540 541
void
DBImpl::WaitBuildIndexFinish() {
542
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
543
    for (auto& iter : index_thread_results_) {
544 545 546 547
        iter.wait();
    }
}

S
starlord 已提交
548 549
void
DBImpl::StartMetricTask() {
G
groot 已提交
550 551
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
S
starlord 已提交
552
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
553 554 555
        return;
    }

556
    ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
557

G
groot 已提交
558 559 560
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
J
JinHai-CN 已提交
561 562 563 564 565 566 567
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
568
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
569 570 571 572 573 574 575 576
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
577

K
kun yu 已提交
578
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
579 580
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
581

582
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
583 584
}

S
starlord 已提交
585 586
Status
DBImpl::MemSerialize() {
587
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
588
    std::set<std::string> temp_table_ids;
G
groot 已提交
589
    mem_mgr_->Serialize(temp_table_ids);
Y
Yu Kun 已提交
590
    for (auto& id : temp_table_ids) {
G
groot 已提交
591 592
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
593

S
starlord 已提交
594
    if (!temp_table_ids.empty()) {
595 596
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
597

598 599 600
    return Status::OK();
}

S
starlord 已提交
601 602
void
DBImpl::StartCompactionTask() {
603 604
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
S
starlord 已提交
605
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
606 607 608
        return;
    }

S
starlord 已提交
609
    // serialize memory data
610 611
    MemSerialize();

S
starlord 已提交
612
    // compactiong has been finished?
613 614 615 616 617 618 619
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
620 621
        }
    }
X
Xu Peng 已提交
622

S
starlord 已提交
623
    // add new compaction task
624 625 626
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
627 628 629
            // collect merge files for all tables(if compact_table_ids_ is empty) for two reasons:
            // 1. other tables may still has un-merged files
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
G
groot 已提交
630
            if (compact_table_ids_.empty()) {
631 632
                std::vector<meta::TableSchema> table_schema_array;
                meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
633
                for (auto& schema : table_schema_array) {
634 635 636 637 638
                    compact_table_ids_.insert(schema.table_id_);
                }
            }

            // start merge file thread
639
            compact_thread_results_.push_back(
G
groot 已提交
640
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
641 642
            compact_table_ids_.clear();
        }
G
groot 已提交
643
    }
X
Xu Peng 已提交
644 645
}

S
starlord 已提交
646
Status
Y
Yu Kun 已提交
647
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
S
starlord 已提交
648
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
649

S
starlord 已提交
650
    // step 1: create table file
X
Xu Peng 已提交
651
    meta::TableFileSchema table_file;
G
groot 已提交
652 653
    table_file.table_id_ = table_id;
    table_file.date_ = date;
654
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
655
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
656

657
    if (!status.ok()) {
S
starlord 已提交
658
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
659 660 661
        return status;
    }

S
starlord 已提交
662
    // step 2: merge files
G
groot 已提交
663
    ExecutionEnginePtr index =
Y
Yu Kun 已提交
664 665
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
666

667
    meta::TableFilesSchema updated;
S
starlord 已提交
668
    int64_t index_size = 0;
669

Y
Yu Kun 已提交
670
    for (auto& file : files) {
Y
Yu Kun 已提交
671
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
672

G
groot 已提交
673
        index->Merge(file.location_);
674
        auto file_schema = file;
G
groot 已提交
675
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
676
        updated.push_back(file_schema);
G
groot 已提交
677
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
678
        index_size = index->Size();
X
Xu Peng 已提交
679

S
starlord 已提交
680
        if (index_size >= file_schema.index_file_size_) {
S
starlord 已提交
681
            break;
S
starlord 已提交
682
        }
683 684
    }

S
starlord 已提交
685
    // step 3: serialize to disk
S
starlord 已提交
686 687
    try {
        index->Serialize();
Y
Yu Kun 已提交
688
    } catch (std::exception& ex) {
S
starlord 已提交
689
        // typical error: out of disk space or permition denied
S
starlord 已提交
690
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
691
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
692

S
starlord 已提交
693 694 695
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
696

S
starlord 已提交
697 698 699
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

S
starlord 已提交
700
        return Status(DB_ERROR, msg);
S
starlord 已提交
701 702
    }

S
starlord 已提交
703 704 705
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
Y
Yu Kun 已提交
706
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
707 708
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
709 710 711
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
712 713
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
714
    updated.push_back(table_file);
G
groot 已提交
715
    status = meta_ptr_->UpdateTableFiles(updated);
S
starlord 已提交
716
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
717

S
starlord 已提交
718
    if (options_.insert_cache_immediately_) {
S
starlord 已提交
719 720
        index->Cache();
    }
X
Xu Peng 已提交
721

722 723 724
    return status;
}

S
starlord 已提交
725
Status
Y
Yu Kun 已提交
726
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
727
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
728
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
729
    if (!status.ok()) {
S
starlord 已提交
730
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
731 732
        return status;
    }
733

Y
Yu Kun 已提交
734
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
735
        auto files = kv.second;
S
starlord 已提交
736
        if (files.size() < options_.merge_trigger_number_) {
737
            ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
738 739
            continue;
        }
740

X
Xu Peng 已提交
741
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
742

S
starlord 已提交
743
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
744
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
745 746
            break;
        }
747
    }
X
Xu Peng 已提交
748

G
groot 已提交
749 750
    return Status::OK();
}
751

S
starlord 已提交
752 753
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
754
    ENGINE_LOG_TRACE << "Background compaction thread start";
S
starlord 已提交
755

G
groot 已提交
756
    Status status;
Y
Yu Kun 已提交
757
    for (auto& table_id : table_ids) {
G
groot 已提交
758 759
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
760
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
761
        }
S
starlord 已提交
762

S
starlord 已提交
763
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
764 765 766
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
767
    }
X
Xu Peng 已提交
768

G
groot 已提交
769
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
770

S
starlord 已提交
771
    int ttl = 5 * meta::M_SEC;  // default: file will be deleted after 5 minutes
Y
yudong.cai 已提交
772
    if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
Z
update  
zhiru 已提交
773 774 775
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
S
starlord 已提交
776

777
    ENGINE_LOG_TRACE << "Background compaction thread exit";
G
groot 已提交
778
}
X
Xu Peng 已提交
779

S
starlord 已提交
780 781
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
782 783
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
S
starlord 已提交
784
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
785 786 787
        return;
    }

S
starlord 已提交
788
    // build index has been finished?
789 790 791 792 793 794 795
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
796 797 798
        }
    }

S
starlord 已提交
799
    // add new build index task
800 801 802
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
803
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
804
        }
G
groot 已提交
805
    }
X
Xu Peng 已提交
806 807
}

S
starlord 已提交
808 809
void
DBImpl::BackgroundBuildIndex() {
S
starlord 已提交
810
    ENGINE_LOG_TRACE << "Background build index thread start";
S
starlord 已提交
811

P
peng.xu 已提交
812
    std::unique_lock<std::mutex> lock(build_index_mutex_);
813
    meta::TableFilesSchema to_index_files;
G
groot 已提交
814
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
815
    Status status;
816

817
    if (!to_index_files.empty()) {
W
wxyu 已提交
818
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
Y
Yu Kun 已提交
819

820 821 822 823 824 825 826 827 828 829 830
        // step 2: put build index task to scheduler
        for (auto& file : to_index_files) {
            scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            job->AddToIndexFiles(file_ptr);
        }
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitBuildIndexFinish();
        if (!job->GetStatus().ok()) {
            Status status = job->GetStatus();
            ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
        }
Y
Yu Kun 已提交
831
    }
Y
Yu Kun 已提交
832

S
starlord 已提交
833
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
834 835
}

G
groot 已提交
836
Status
G
groot 已提交
837
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
G
groot 已提交
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
                         meta::TableFilesSchema& files) {
    meta::DatePartionedTableFilesSchema date_files;
    auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
    if (!status.ok()) {
        return status;
    }

    TraverseFiles(date_files, files);
    return Status::OK();
}

Status
DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags,
                            std::set<std::string>& partition_name_array) {
    std::vector<meta::TableSchema> partiton_array;
    auto status = meta_ptr_->ShowPartitions(table_id, partiton_array);

    for (auto& tag : partition_tags) {
        for (auto& schema : partiton_array) {
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, tag)) {
                partition_name_array.insert(schema.table_id_);
            }
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& dates) {
    // dates partly delete files of the table but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;

    Status status;
    if (dates.empty()) {
        status = mem_mgr_->EraseMemVector(table_id);  // not allow insert
        status = meta_ptr_->DropTable(table_id);      // soft delete table

        // scheduler will determine when to delete table files
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitAndDelete();
    } else {
        status = meta_ptr_->DropDataByDate(table_id, dates);
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableRecursively(schema.table_id_, dates);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    DropIndex(table_id);

    auto status = meta_ptr_->UpdateTableIndex(table_id, index);
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
        return status;
    }

    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = UpdateTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
        };
    } else {
        file_types = {
            static_cast<int32_t>(meta::TableFileSchema::RAW),
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
            static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
            static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
        };
    }

    // get files to build index
    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    // build index for partition
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = BuildTableIndexRecursively(schema.table_id_, index);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    auto status = meta_ptr_->DropTableIndex(table_id);
    if (!status.ok()) {
        return status;
    }

    // drop partition index
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        status = DropTableIndexRecursively(schema.table_id_);
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count) {
    row_count = 0;
    auto status = meta_ptr_->Count(table_id, row_count);
    if (!status.ok()) {
        return status;
    }

    // get partition row count
    std::vector<meta::TableSchema> partiton_array;
    status = meta_ptr_->ShowPartitions(table_id, partiton_array);
    for (auto& schema : partiton_array) {
        uint64_t partition_row_count = 0;
        status = GetTableRowCountRecursively(schema.table_id_, partition_row_count);
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

S
starlord 已提交
1013 1014
}  // namespace engine
}  // namespace milvus