DBImpl.cpp 32.3 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/DBImpl.h"
S
starlord 已提交
19
#include "Utils.h"
S
starlord 已提交
20 21
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
22
#include "engine/EngineFactory.h"
S
starlord 已提交
23
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
24
#include "meta/MetaConsts.h"
S
starlord 已提交
25 26
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
27
#include "metrics/Metrics.h"
S
starlord 已提交
28
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
29
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
30 31
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
S
starlord 已提交
32
#include "utils/Log.h"
S
starlord 已提交
33
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
34

X
Xu Peng 已提交
35
#include <assert.h>
S
starlord 已提交
36
#include <algorithm>
G
groot 已提交
37
#include <boost/filesystem.hpp>
S
starlord 已提交
38 39 40 41
#include <chrono>
#include <cstring>
#include <iostream>
#include <thread>
X
Xu Peng 已提交
42

J
jinhai 已提交
43
namespace milvus {
X
Xu Peng 已提交
44
namespace engine {
X
Xu Peng 已提交
45

G
groot 已提交
46 47
namespace {

J
jinhai 已提交
48 49 50
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
51

S
starlord 已提交
52
}  // namespace
G
groot 已提交
53

Y
Yu Kun 已提交
54
DBImpl::DBImpl(const DBOptions& options)
S
starlord 已提交
55
    : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
56
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
57
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
58 59 60 61 62 63 64
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
65
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
66
// external api
S
starlord 已提交
67
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
68 69 70
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
71 72 73
        return Status::OK();
    }

S
starlord 已提交
74
    ENGINE_LOG_TRACE << "DB service start";
S
starlord 已提交
75 76
    shutting_down_.store(false, std::memory_order_release);

S
starlord 已提交
77
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
78
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
79
        ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
80
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
81
    }
S
starlord 已提交
82

S
starlord 已提交
83 84 85
    return Status::OK();
}

S
starlord 已提交
86 87 88
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
89 90 91 92
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
S
starlord 已提交
93

S
starlord 已提交
94
    // makesure all memory data serialized
S
starlord 已提交
95
    MemSerialize();
S
starlord 已提交
96

S
starlord 已提交
97
    // wait compaction/buildindex finish
S
starlord 已提交
98
    bg_timer_thread_.join();
S
starlord 已提交
99

Y
yudong.cai 已提交
100
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
S
starlord 已提交
101
        meta_ptr_->CleanUp();
S
starlord 已提交
102 103
    }

S
starlord 已提交
104
    ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
105
    return Status::OK();
X
Xu Peng 已提交
106 107
}

S
starlord 已提交
108 109
Status
DBImpl::DropAll() {
S
starlord 已提交
110 111 112
    return meta_ptr_->DropAll();
}

S
starlord 已提交
113
Status
Y
Yu Kun 已提交
114
DBImpl::CreateTable(meta::TableSchema& table_schema) {
S
starlord 已提交
115
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
116
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
117 118
    }

119
    meta::TableSchema temp_schema = table_schema;
S
starlord 已提交
120
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
121
    return meta_ptr_->CreateTable(temp_schema);
122 123
}

S
starlord 已提交
124
Status
Y
Yu Kun 已提交
125
DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
S
starlord 已提交
126
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
127
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
128 129
    }

S
starlord 已提交
130
    // dates partly delete files of the table but currently we don't support
S
starlord 已提交
131
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
132

133
    if (dates.empty()) {
S
starlord 已提交
134 135
        mem_mgr_->EraseMemVector(table_id);  // not allow insert
        meta_ptr_->DeleteTable(table_id);    // soft delete table
136

S
starlord 已提交
137
        // scheduler will determine when to delete table files
W
wxyu 已提交
138
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
W
wxyu 已提交
139
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
W
wxyu 已提交
140
        scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
141
        job->WaitAndDelete();
142 143 144
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
145

G
groot 已提交
146 147 148
    return Status::OK();
}

S
starlord 已提交
149
Status
Y
Yu Kun 已提交
150
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
S
starlord 已提交
151
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
152
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
153 154
    }

S
starlord 已提交
155
    auto stat = meta_ptr_->DescribeTable(table_schema);
S
starlord 已提交
156
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
157
    return stat;
158 159
}

S
starlord 已提交
160
Status
Y
Yu Kun 已提交
161
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
162
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
163
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
164 165
    }

G
groot 已提交
166
    return meta_ptr_->HasTable(table_id, has_or_not);
167 168
}

S
starlord 已提交
169
Status
Y
Yu Kun 已提交
170
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
S
starlord 已提交
171
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
172
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
173 174
    }

G
groot 已提交
175
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
176 177
}

S
starlord 已提交
178
Status
Y
Yu Kun 已提交
179
DBImpl::PreloadTable(const std::string& table_id) {
S
starlord 已提交
180
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
181
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
182 183
    }

Y
Yu Kun 已提交
184
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
185

Y
Yu Kun 已提交
186
    meta::DatesT dates;
187 188
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
189 190 191
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
192

Y
Yu Kun 已提交
193 194
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
195 196
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
197

Y
Yu Kun 已提交
198 199
    for (auto& day_files : files) {
        for (auto& file : day_files.second) {
S
starlord 已提交
200
            ExecutionEnginePtr engine =
Y
Yu Kun 已提交
201 202
                EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                     (MetricType)file.metric_type_, file.nlist_);
S
starlord 已提交
203
            if (engine == nullptr) {
Y
Yu Kun 已提交
204
                ENGINE_LOG_ERROR << "Invalid engine type";
S
starlord 已提交
205
                return Status(DB_ERROR, "Invalid engine type");
Y
Yu Kun 已提交
206
            }
Y
Yu Kun 已提交
207

Y
Yu Kun 已提交
208
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
209
            if (size > available_size) {
S
starlord 已提交
210
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
211 212
            } else {
                try {
S
starlord 已提交
213
                    // step 1: load index
Y
Yu Kun 已提交
214
                    engine->Load(true);
Y
Yu Kun 已提交
215
                } catch (std::exception& ex) {
S
starlord 已提交
216
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
217
                    ENGINE_LOG_ERROR << msg;
S
starlord 已提交
218
                    return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
219
                }
Y
Yu Kun 已提交
220 221 222
            }
        }
    }
Y
Yu Kun 已提交
223
    return Status::OK();
Y
Yu Kun 已提交
224 225
}

S
starlord 已提交
226
Status
Y
Yu Kun 已提交
227
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
S
starlord 已提交
228
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
229
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
230 231
    }

S
starlord 已提交
232 233 234
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

S
starlord 已提交
235
Status
Y
Yu Kun 已提交
236
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
S
starlord 已提交
237
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
238
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
239 240
    }

G
groot 已提交
241
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
242 243
}

S
starlord 已提交
244
Status
S
starlord 已提交
245
DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vectors, IDNumbers& vector_ids) {
S
starlord 已提交
246
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
S
starlord 已提交
247
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
248
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
249
    }
Y
yu yunfeng 已提交
250

251
    Status status;
S
starlord 已提交
252
    milvus::server::CollectInsertMetrics metrics(n, status);
S
starlord 已提交
253
    status = mem_mgr_->InsertVectors(table_id, n, vectors, vector_ids);
S
starlord 已提交
254 255 256
    //    std::chrono::microseconds time_span =
    //          std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
    //    double average_time = double(time_span.count()) / n;
Y
yu yunfeng 已提交
257

S
starlord 已提交
258
    //    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
S
starlord 已提交
259

G
groot 已提交
260
    return status;
X
Xu Peng 已提交
261 262
}

S
starlord 已提交
263
Status
Y
Yu Kun 已提交
264
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
S
starlord 已提交
265 266 267
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
268
        // step 1: check index difference
S
starlord 已提交
269 270
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
S
starlord 已提交
271
        if (!status.ok()) {
S
starlord 已提交
272 273 274 275
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

S
starlord 已提交
276
        // step 2: update index info
S
starlord 已提交
277
        TableIndex new_index = index;
S
starlord 已提交
278
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
S
starlord 已提交
279
        if (!utils::IsSameIndex(old_index, new_index)) {
S
starlord 已提交
280 281
            DropIndex(table_id);

S
starlord 已提交
282
            status = meta_ptr_->UpdateTableIndex(table_id, new_index);
S
starlord 已提交
283 284 285 286 287 288 289
            if (!status.ok()) {
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
                return status;
            }
        }
    }

S
starlord 已提交
290 291
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
292 293
    WaitMergeFileFinish();

S
starlord 已提交
294 295 296
    // step 4: wait and build index
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
S
starlord 已提交
297
    std::vector<int> file_types;
S
starlord 已提交
298
    if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
S
starlord 已提交
299
        file_types = {
S
starlord 已提交
300 301
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
S
starlord 已提交
302 303 304
        };
    } else {
        file_types = {
S
starlord 已提交
305 306 307 308 309
            static_cast<int32_t>(meta::TableFileSchema::RAW),
            static_cast<int32_t>(meta::TableFileSchema::NEW),
            static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
            static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
            static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
S
starlord 已提交
310 311 312 313 314 315 316 317 318
        };
    }

    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
Y
Yu Kun 已提交
319
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
320 321 322
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

S
starlord 已提交
323
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
S
starlord 已提交
324 325 326 327 328 329 330
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    return Status::OK();
}

S
starlord 已提交
331
Status
Y
Yu Kun 已提交
332
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
S
starlord 已提交
333 334 335
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

S
starlord 已提交
336
Status
Y
Yu Kun 已提交
337
DBImpl::DropIndex(const std::string& table_id) {
S
starlord 已提交
338 339 340 341
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

S
starlord 已提交
342
Status
Y
Yu Kun 已提交
343 344
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
              QueryResults& results) {
S
starlord 已提交
345
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
346
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
347 348
    }

349
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
350
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
351

Y
yu yunfeng 已提交
352
    return result;
X
Xu Peng 已提交
353 354
}

S
starlord 已提交
355
Status
Y
Yu Kun 已提交
356 357
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
              const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
358
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
359
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
360 361
    }

S
starlord 已提交
362
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
S
starlord 已提交
363

S
starlord 已提交
364
    // get all table files from table
365
    meta::DatePartionedTableFilesSchema files;
366 367
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
S
starlord 已提交
368 369 370
    if (!status.ok()) {
        return status;
    }
371 372

    meta::TableFilesSchema file_id_array;
Y
Yu Kun 已提交
373 374
    for (auto& day_files : files) {
        for (auto& file : day_files.second) {
375 376 377 378
            file_id_array.push_back(file);
        }
    }

S
starlord 已提交
379
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
Y
Yu Kun 已提交
380
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
381
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
382
    return status;
G
groot 已提交
383
}
X
Xu Peng 已提交
384

S
starlord 已提交
385
Status
Y
Yu Kun 已提交
386 387
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
388
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
389
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
390 391
    }

S
starlord 已提交
392
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
S
starlord 已提交
393

S
starlord 已提交
394
    // get specified files
395
    std::vector<size_t> ids;
Y
Yu Kun 已提交
396
    for (auto& id : file_ids) {
397
        meta::TableFileSchema table_file;
398 399
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
400
        ids.push_back(std::stoul(id, &sz));
401 402
    }

X
xj.lin 已提交
403 404
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
405 406
    if (!status.ok()) {
        return status;
407 408
    }

X
xj.lin 已提交
409
    meta::TableFilesSchema file_id_array;
Y
Yu Kun 已提交
410 411
    for (auto& day_files : files_array) {
        for (auto& file : day_files.second) {
X
xj.lin 已提交
412 413 414 415
            file_id_array.push_back(file);
        }
    }

S
starlord 已提交
416
    if (file_id_array.empty()) {
S
starlord 已提交
417
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
418 419
    }

S
starlord 已提交
420
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
Y
Yu Kun 已提交
421
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
422
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
S
starlord 已提交
423
    return status;
424 425
}

S
starlord 已提交
426
Status
Y
Yu Kun 已提交
427
DBImpl::Size(uint64_t& result) {
S
starlord 已提交
428
    if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
429
        return Status(DB_ERROR, "Milsvus server is shutdown!");
S
starlord 已提交
430 431
    }

S
starlord 已提交
432
    return meta_ptr_->Size(result);
S
starlord 已提交
433 434 435
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
436
// internal methods
S
starlord 已提交
437
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
438
Status
Y
Yu Kun 已提交
439 440
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
                   uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
Y
Yu Kun 已提交
441 442
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
443
    TimeRecorder rc("");
G
groot 已提交
444

S
starlord 已提交
445 446 447
    // step 1: get files to search
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size()
                     << " date range count: " << dates.size();
S
starlord 已提交
448
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
Y
Yu Kun 已提交
449
    for (auto& file : files) {
S
starlord 已提交
450
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
451
        job->AddIndexFile(file_ptr);
G
groot 已提交
452 453
    }

S
starlord 已提交
454
    // step 2: put search task to scheduler
S
starlord 已提交
455
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
456 457 458
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
459
    }
G
groot 已提交
460

S
starlord 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
    // step 3: print time cost information
    //    double load_cost = context->LoadCost();
    //    double search_cost = context->SearchCost();
    //    double reduce_cost = context->ReduceCost();
    //    std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
    //    std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
    //    std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
    //    if(search_cost > 0.0 || reduce_cost > 0.0) {
    //        double total_cost = load_cost + search_cost + reduce_cost;
    //        double load_percent = load_cost/total_cost;
    //        double search_percent = search_cost/total_cost;
    //        double reduce_percent = reduce_cost/total_cost;
    //
    //        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info
    //          << " percent: " << load_percent*100 << "%";
    //        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info
    //          << " percent: " << search_percent*100 << "%";
    //        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info
    //          << " percent: " << reduce_percent*100 << "%";
    //    } else {
    //        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
    //            << " search cost: " << search_info
    //            << " reduce cost: " << reduce_info;
    //    }

    // step 4: construct results
W
wxyu 已提交
487
    results = job->GetResult();
S
starlord 已提交
488
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
489 490 491 492

    return Status::OK();
}

S
starlord 已提交
493 494
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
495
    Status status;
Y
yu yunfeng 已提交
496
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
497
    while (true) {
S
starlord 已提交
498
        if (shutting_down_.load(std::memory_order_acquire)) {
499 500
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
501 502

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
503 504
            break;
        }
X
Xu Peng 已提交
505

G
groot 已提交
506
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
507

G
groot 已提交
508
        StartMetricTask();
G
groot 已提交
509 510 511
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
512 513
}

S
starlord 已提交
514 515
void
DBImpl::WaitMergeFileFinish() {
516
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
Y
Yu Kun 已提交
517
    for (auto& iter : compact_thread_results_) {
518 519 520 521
        iter.wait();
    }
}

S
starlord 已提交
522 523
void
DBImpl::WaitBuildIndexFinish() {
524
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
525
    for (auto& iter : index_thread_results_) {
526 527 528 529
        iter.wait();
    }
}

S
starlord 已提交
530 531
void
DBImpl::StartMetricTask() {
G
groot 已提交
532 533
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
S
starlord 已提交
534
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
535 536 537
        return;
    }

538
    ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
539

G
groot 已提交
540 541 542
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
starlord 已提交
543
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage * 100 / cache_total);
Y
Yu Kun 已提交
544
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
545 546 547 548 549 550 551 552
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
553

K
kun yu 已提交
554
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
555 556
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
557

558
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
559 560
}

S
starlord 已提交
561 562
Status
DBImpl::MemSerialize() {
563
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
564
    std::set<std::string> temp_table_ids;
G
groot 已提交
565
    mem_mgr_->Serialize(temp_table_ids);
Y
Yu Kun 已提交
566
    for (auto& id : temp_table_ids) {
G
groot 已提交
567 568
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
569

S
starlord 已提交
570
    if (!temp_table_ids.empty()) {
571 572
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
573

574 575 576
    return Status::OK();
}

S
starlord 已提交
577 578
void
DBImpl::StartCompactionTask() {
579 580
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
S
starlord 已提交
581
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
582 583 584
        return;
    }

S
starlord 已提交
585
    // serialize memory data
586 587
    MemSerialize();

S
starlord 已提交
588
    // compactiong has been finished?
589 590 591 592 593 594 595
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
596 597
        }
    }
X
Xu Peng 已提交
598

S
starlord 已提交
599
    // add new compaction task
600 601 602 603
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
            compact_thread_results_.push_back(
G
groot 已提交
604
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
605 606
            compact_table_ids_.clear();
        }
G
groot 已提交
607
    }
X
Xu Peng 已提交
608 609
}

S
starlord 已提交
610
Status
Y
Yu Kun 已提交
611
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
S
starlord 已提交
612
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
613

S
starlord 已提交
614
    // step 1: create table file
X
Xu Peng 已提交
615
    meta::TableFileSchema table_file;
G
groot 已提交
616 617
    table_file.table_id_ = table_id;
    table_file.date_ = date;
618
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
619
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
620

621
    if (!status.ok()) {
S
starlord 已提交
622
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
623 624 625
        return status;
    }

S
starlord 已提交
626
    // step 2: merge files
G
groot 已提交
627
    ExecutionEnginePtr index =
Y
Yu Kun 已提交
628 629
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
630

631
    meta::TableFilesSchema updated;
S
starlord 已提交
632
    int64_t index_size = 0;
633

Y
Yu Kun 已提交
634
    for (auto& file : files) {
Y
Yu Kun 已提交
635
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
636

G
groot 已提交
637
        index->Merge(file.location_);
638
        auto file_schema = file;
G
groot 已提交
639
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
640
        updated.push_back(file_schema);
G
groot 已提交
641
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
642
        index_size = index->Size();
X
Xu Peng 已提交
643

S
starlord 已提交
644
        if (index_size >= file_schema.index_file_size_) {
S
starlord 已提交
645
            break;
S
starlord 已提交
646
        }
647 648
    }

S
starlord 已提交
649
    // step 3: serialize to disk
S
starlord 已提交
650 651
    try {
        index->Serialize();
Y
Yu Kun 已提交
652
    } catch (std::exception& ex) {
S
starlord 已提交
653
        // typical error: out of disk space or permition denied
S
starlord 已提交
654
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
655
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
656

S
starlord 已提交
657 658 659
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
660

S
starlord 已提交
661 662 663
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

S
starlord 已提交
664
        return Status(DB_ERROR, msg);
S
starlord 已提交
665 666
    }

S
starlord 已提交
667 668 669
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
Y
Yu Kun 已提交
670
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
S
starlord 已提交
671 672
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
673 674 675
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
676 677
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
678
    updated.push_back(table_file);
G
groot 已提交
679
    status = meta_ptr_->UpdateTableFiles(updated);
S
starlord 已提交
680
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
681

S
starlord 已提交
682
    if (options_.insert_cache_immediately_) {
S
starlord 已提交
683 684
        index->Cache();
    }
X
Xu Peng 已提交
685

686 687 688
    return status;
}

S
starlord 已提交
689
Status
Y
Yu Kun 已提交
690
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
691
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
692
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
693
    if (!status.ok()) {
S
starlord 已提交
694
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
695 696
        return status;
    }
697

X
Xu Peng 已提交
698
    bool has_merge = false;
Y
Yu Kun 已提交
699
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
700
        auto files = kv.second;
S
starlord 已提交
701
        if (files.size() < options_.merge_trigger_number_) {
S
starlord 已提交
702
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
703 704
            continue;
        }
X
Xu Peng 已提交
705
        has_merge = true;
X
Xu Peng 已提交
706
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
707

S
starlord 已提交
708
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
709
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
710 711
            break;
        }
712
    }
X
Xu Peng 已提交
713

G
groot 已提交
714 715
    return Status::OK();
}
716

S
starlord 已提交
717 718
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
719
    ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
720

G
groot 已提交
721
    Status status;
Y
Yu Kun 已提交
722
    for (auto& table_id : table_ids) {
G
groot 已提交
723 724
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
725
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
726
        }
S
starlord 已提交
727

S
starlord 已提交
728
        if (shutting_down_.load(std::memory_order_acquire)) {
S
starlord 已提交
729 730 731
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
732
    }
X
Xu Peng 已提交
733

G
groot 已提交
734
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
735

S
starlord 已提交
736
    int ttl = 5 * meta::M_SEC;  // default: file will be deleted after 5 minutes
Y
yudong.cai 已提交
737
    if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
Z
update  
zhiru 已提交
738 739 740
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
S
starlord 已提交
741

742
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
743
}
X
Xu Peng 已提交
744

S
starlord 已提交
745 746
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
747 748
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
S
starlord 已提交
749
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
750 751 752
        return;
    }

S
starlord 已提交
753
    // build index has been finished?
754 755 756 757 758 759 760
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
761 762 763
        }
    }

S
starlord 已提交
764
    // add new build index task
765 766 767
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
768
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
769
        }
G
groot 已提交
770
    }
X
Xu Peng 已提交
771 772
}

S
starlord 已提交
773
Status
Y
Yu Kun 已提交
774 775 776
DBImpl::BuildIndex(const meta::TableFileSchema& file) {
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                       (MetricType)file.metric_type_, file.nlist_);
S
starlord 已提交
777
    if (to_index == nullptr) {
S
starlord 已提交
778
        ENGINE_LOG_ERROR << "Invalid engine type";
S
starlord 已提交
779
        return Status(DB_ERROR, "Invalid engine type");
G
groot 已提交
780
    }
781

G
groot 已提交
782
    try {
S
starlord 已提交
783
        // step 1: load index
S
starlord 已提交
784 785 786 787 788
        Status status = to_index->Load(options_.insert_cache_immediately_);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
            return status;
        }
G
groot 已提交
789

S
starlord 已提交
790
        // step 2: create table file
G
groot 已提交
791 792 793
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
S
starlord 已提交
794
        table_file.file_type_ =
S
starlord 已提交
795
            meta::TableFileSchema::NEW_INDEX;  // for multi-db-path, distribute index file averagely to each path
S
starlord 已提交
796
        status = meta_ptr_->CreateTableFile(table_file);
G
groot 已提交
797
        if (!status.ok()) {
S
starlord 已提交
798
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
799 800 801
            return status;
        }

S
starlord 已提交
802
        // step 3: build index
803 804 805
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
806
            server::CollectBuildIndexMetrics metrics;
Y
Yu Kun 已提交
807
            index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
S
starlord 已提交
808 809 810
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
S
starlord 已提交
811 812
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
                                 << " to to_delete";
S
starlord 已提交
813 814 815

                return status;
            }
Y
Yu Kun 已提交
816
        } catch (std::exception& ex) {
S
starlord 已提交
817
            // typical error: out of gpu memory
S
starlord 已提交
818
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
819 820 821 822 823 824
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

S
starlord 已提交
825 826
            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
                      << std::endl;
827

S
starlord 已提交
828
            return Status(DB_ERROR, msg);
829
        }
830

S
starlord 已提交
831
        // step 4: if table has been deleted, dont save index file
G
groot 已提交
832 833
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
S
starlord 已提交
834
        if (!has_table) {
G
groot 已提交
835 836 837 838
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

S
starlord 已提交
839
        // step 5: save index file
840 841
        try {
            index->Serialize();
Y
Yu Kun 已提交
842
        } catch (std::exception& ex) {
S
starlord 已提交
843
            // typical error: out of disk space or permition denied
S
starlord 已提交
844
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
845 846 847 848 849 850 851
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
S
starlord 已提交
852
                      << ", possible out of disk space" << std::endl;
853

S
starlord 已提交
854
            return Status(DB_ERROR, msg);
855
        }
G
groot 已提交
856

S
starlord 已提交
857
        // step 6: update meta
G
groot 已提交
858
        table_file.file_type_ = meta::TableFileSchema::INDEX;
859 860
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
861

862 863
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
864

865
        meta::TableFilesSchema update_files = {table_file, origin_file};
866
        status = meta_ptr_->UpdateTableFiles(update_files);
S
starlord 已提交
867
        if (status.ok()) {
S
starlord 已提交
868 869
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
                             << " bytes"
870
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
871

S
starlord 已提交
872
            if (options_.insert_cache_immediately_) {
873 874 875
                index->Cache();
            }
        } else {
S
starlord 已提交
876
            // failed to update meta, mark the new file as to_delete, don't delete old file
877 878 879
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
880 881 882 883

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
S
starlord 已提交
884
        }
Y
Yu Kun 已提交
885
    } catch (std::exception& ex) {
S
starlord 已提交
886
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
S
starlord 已提交
887
        ENGINE_LOG_ERROR << msg;
S
starlord 已提交
888
        return Status(DB_ERROR, msg);
G
groot 已提交
889
    }
X
Xu Peng 已提交
890

X
Xu Peng 已提交
891 892 893
    return Status::OK();
}

S
starlord 已提交
894 895
void
DBImpl::BackgroundBuildIndex() {
S
starlord 已提交
896
    ENGINE_LOG_TRACE << "Background build index thread start";
S
starlord 已提交
897

P
peng.xu 已提交
898
    std::unique_lock<std::mutex> lock(build_index_mutex_);
899
    meta::TableFilesSchema to_index_files;
G
groot 已提交
900
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
901
    Status status;
902

903 904
    if (!to_index_files.empty()) {
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
Y
Yu Kun 已提交
905

906 907 908 909 910 911 912 913 914 915 916
        // step 2: put build index task to scheduler
        for (auto& file : to_index_files) {
            scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            job->AddToIndexFiles(file_ptr);
        }
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitBuildIndexFinish();
        if (!job->GetStatus().ok()) {
            Status status = job->GetStatus();
            ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
        }
Y
Yu Kun 已提交
917
    }
Y
Yu Kun 已提交
918 919 920 921 922 923 924 925 926 927 928
    //    for (auto &file : to_index_files) {
    //        status = BuildIndex(file);
    //        if (!status.ok()) {
    //            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
    //        }
    //
    //        if (shutting_down_.load(std::memory_order_acquire)) {
    //            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
    //            break;
    //        }
    //    }
Y
Yu Kun 已提交
929

S
starlord 已提交
930
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
931 932
}

S
starlord 已提交
933 934
}  // namespace engine
}  // namespace milvus