SqliteMetaImpl.cpp 67.6 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/meta/SqliteMetaImpl.h"
X
Xu Peng 已提交
13

Y
youny626 已提交
14
#include <sqlite_orm.h>
X
Xu Peng 已提交
15
#include <unistd.h>
16

Z
Zhiru Zhu 已提交
17
#include <fiu-local.h>
X
Xu Peng 已提交
18
#include <boost/filesystem.hpp>
19
#include <chrono>
X
Xu Peng 已提交
20
#include <fstream>
Y
youny626 已提交
21
#include <iostream>
S
starlord 已提交
22
#include <map>
Y
youny626 已提交
23
#include <memory>
S
starlord 已提交
24
#include <set>
Y
youny626 已提交
25
#include <sstream>
S
starlord 已提交
26

27 28 29 30 31 32 33 34 35
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
Z
Zhiru Zhu 已提交
36
#include "utils/ValidationUtil.h"
37

J
jinhai 已提交
38
namespace milvus {
X
Xu Peng 已提交
39
namespace engine {
40
namespace meta {
X
Xu Peng 已提交
41

X
Xu Peng 已提交
42 43
using namespace sqlite_orm;

G
groot 已提交
44 45
namespace {

S
starlord 已提交
46
Status
Y
youny626 已提交
47
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
48
    if (what == nullptr) {
S
starlord 已提交
49 50 51 52 53 54 55
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
56 57
}

Y
youny626 已提交
58
}  // namespace
G
groot 已提交
59

S
starlord 已提交
60
inline auto
G
groot 已提交
61
StoragePrototype(const std::string& path) {
62 63 64 65 66 67 68 69 70
    return make_storage(
        path,
        make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))),
        make_table(META_TABLES, make_column("id", &TableSchema::id_, primary_key()),
                   make_column("table_id", &TableSchema::table_id_, unique()),
                   make_column("state", &TableSchema::state_), make_column("dimension", &TableSchema::dimension_),
                   make_column("created_on", &TableSchema::created_on_),
                   make_column("flag", &TableSchema::flag_, default_value(0)),
                   make_column("index_file_size", &TableSchema::index_file_size_),
71
                   make_column("engine_type", &TableSchema::engine_type_),
G
groot 已提交
72
                   make_column("index_params", &TableSchema::index_params_, default_value("")),
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
                   make_column("metric_type", &TableSchema::metric_type_),
                   make_column("owner_table", &TableSchema::owner_table_, default_value("")),
                   make_column("partition_tag", &TableSchema::partition_tag_, default_value("")),
                   make_column("version", &TableSchema::version_, default_value(CURRENT_VERSION)),
                   make_column("flush_lsn", &TableSchema::flush_lsn_)),
        make_table(
            META_TABLEFILES, make_column("id", &TableFileSchema::id_, primary_key()),
            make_column("table_id", &TableFileSchema::table_id_),
            make_column("segment_id", &TableFileSchema::segment_id_, default_value("")),
            make_column("engine_type", &TableFileSchema::engine_type_),
            make_column("file_id", &TableFileSchema::file_id_), make_column("file_type", &TableFileSchema::file_type_),
            make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
            make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
            make_column("updated_time", &TableFileSchema::updated_time_),
            make_column("created_on", &TableFileSchema::created_on_), make_column("date", &TableFileSchema::date_),
            make_column("flush_lsn", &TableFileSchema::flush_lsn_)));
X
Xu Peng 已提交
89 90
}

X
Xu Peng 已提交
91
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
92 93
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
94
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
95 96 97 98 99 100
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
101
Status
Y
youny626 已提交
102
SqliteMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
103
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
104
    std::stringstream ss;
J
Jin Hai 已提交
105 106
    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
107
    table_id = ss.str();
108 109 110
    return Status::OK();
}

S
starlord 已提交
111
Status
Y
youny626 已提交
112
SqliteMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
113
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
X
Xu Peng 已提交
114
    std::stringstream ss;
J
Jin Hai 已提交
115 116
    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
X
Xu Peng 已提交
117 118 119 120
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
121 122
void
SqliteMetaImpl::ValidateMetaSchema() {
S
shengjh 已提交
123 124 125
    bool is_null_connector{ConnectorPtr == nullptr};
    fiu_do_on("SqliteMetaImpl.ValidateMetaSchema.NullConnection", is_null_connector = true);
    if (is_null_connector) {
126 127 128
        return;
    }

Y
youny626 已提交
129
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
130
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
131 132
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
133 134
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
135 136
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
137 138 139 140
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
141 142
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
143 144
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
S
shengjh 已提交
145
        fiu_do_on("SqliteMetaImpl.Initialize.fail_create_directory", ret = false);
146
        if (!ret) {
S
starlord 已提交
147
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
148
            ENGINE_LOG_ERROR << msg;
S
shengjh 已提交
149
            throw Exception(DB_INVALID_PATH, msg);
150
        }
X
Xu Peng 已提交
151
    }
X
Xu Peng 已提交
152

S
starlord 已提交
153
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
154

155
    ValidateMetaSchema();
156

X
Xu Peng 已提交
157
    ConnectorPtr->sync_schema();
Y
youny626 已提交
158 159
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
160

161
    CleanUpShadowFiles();
X
Xu Peng 已提交
162

X
Xu Peng 已提交
163
    return Status::OK();
X
Xu Peng 已提交
164 165
}

G
groot 已提交
166
Status
G
groot 已提交
167
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
168 169 170
    try {
        server::MetricCollector metric;

171
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
172 173 174 175 176
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
        } else {
S
shengjh 已提交
177
            fiu_do_on("SqliteMetaImpl.CreateTable.throw_exception", throw std::exception());
G
groot 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
                } else {
                    // Change from no error to already exist.
                    return Status(DB_ALREADY_EXIST, "Table already exists");
                }
            }
        }

        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

        try {
S
shengjh 已提交
194
            fiu_do_on("SqliteMetaImpl.CreateTable.insert_throw_exception", throw std::exception());
G
groot 已提交
195 196
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id_ = id;
G
groot 已提交
197
        } catch (std::exception& e) {
G
groot 已提交
198 199 200 201 202 203
            return HandleException("Encounter exception when create table", e.what());
        }

        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
204
    } catch (std::exception& e) {
G
groot 已提交
205 206 207 208 209
        return HandleException("Encounter exception when create table", e.what());
    }
}

Status
G
groot 已提交
210
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
G
groot 已提交
211 212
    try {
        server::MetricCollector metric;
S
shengjh 已提交
213
        fiu_do_on("SqliteMetaImpl.DescribeTable.throw_exception", throw std::exception());
214 215 216
        auto groups = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
217
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::owner_table_,
218 219 220
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::table_id_) == table_schema.table_id_ and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
221 222 223 224 225 226 227 228 229

        if (groups.size() == 1) {
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.flag_ = std::get<4>(groups[0]);
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
230
            table_schema.index_params_ = std::get<7>(groups[0]);
G
groot 已提交
231 232 233 234
            table_schema.metric_type_ = std::get<8>(groups[0]);
            table_schema.owner_table_ = std::get<9>(groups[0]);
            table_schema.partition_tag_ = std::get<10>(groups[0]);
            table_schema.version_ = std::get<11>(groups[0]);
235
            table_schema.flush_lsn_ = std::get<12>(groups[0]);
G
groot 已提交
236 237 238
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
        }
G
groot 已提交
239
    } catch (std::exception& e) {
G
groot 已提交
240 241 242 243 244 245 246
        return HandleException("Encounter exception when describe table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
247
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
248 249 250
    has_or_not = false;

    try {
S
shengjh 已提交
251
        fiu_do_on("SqliteMetaImpl.HasTable.throw_exception", throw std::exception());
G
groot 已提交
252
        server::MetricCollector metric;
253 254 255
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
256 257 258 259 260
        if (tables.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
261
    } catch (std::exception& e) {
G
groot 已提交
262 263 264 265 266 267 268
        return HandleException("Encounter exception when lookup table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
269
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
270
    try {
S
shengjh 已提交
271
        fiu_do_on("SqliteMetaImpl.AllTables.throw_exception", throw std::exception());
G
groot 已提交
272
        server::MetricCollector metric;
273 274 275
        auto selected = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
276
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::owner_table_,
277 278
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE and c(&TableSchema::owner_table_) == ""));
G
groot 已提交
279
        for (auto& table : selected) {
G
groot 已提交
280 281 282 283 284 285 286 287
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
288
            schema.index_params_ = std::get<7>(table);
G
groot 已提交
289 290 291 292
            schema.metric_type_ = std::get<8>(table);
            schema.owner_table_ = std::get<9>(table);
            schema.partition_tag_ = std::get<10>(table);
            schema.version_ = std::get<11>(table);
293
            schema.flush_lsn_ = std::get<12>(table);
G
groot 已提交
294 295 296

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
297
    } catch (std::exception& e) {
G
groot 已提交
298 299 300 301 302 303 304
        return HandleException("Encounter exception when lookup all tables", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
305
SqliteMetaImpl::DropTable(const std::string& table_id) {
G
groot 已提交
306
    try {
S
shengjh 已提交
307 308
        fiu_do_on("SqliteMetaImpl.DropTable.throw_exception", throw std::exception());

G
groot 已提交
309 310
        server::MetricCollector metric;

311
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
312 313
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

314
        // soft delete table
G
groot 已提交
315
        ConnectorPtr->update_all(
316 317
            set(c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
318 319

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
G
groot 已提交
320
    } catch (std::exception& e) {
G
groot 已提交
321 322 323 324 325 326 327
        return HandleException("Encounter exception when delete table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
328
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
329
    try {
S
shengjh 已提交
330 331
        fiu_do_on("SqliteMetaImpl.DeleteTableFiles.throw_exception", throw std::exception());

G
groot 已提交
332 333
        server::MetricCollector metric;

334
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
335 336
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

337 338 339 340 341
        // soft delete table files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
342 343

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
G
groot 已提交
344
    } catch (std::exception& e) {
G
groot 已提交
345 346 347 348 349 350 351
        return HandleException("Encounter exception when delete table files", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
352
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
353 354 355 356 357 358 359 360 361 362 363
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    try {
S
shengjh 已提交
364
        fiu_do_on("SqliteMetaImpl.CreateTableFile.throw_exception", throw std::exception());
G
groot 已提交
365 366 367
        server::MetricCollector metric;

        NextFileId(file_schema.file_id_);
368 369 370
        if (file_schema.segment_id_.empty()) {
            file_schema.segment_id_ = file_schema.file_id_;
        }
G
groot 已提交
371 372 373 374 375 376
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
377
        file_schema.index_params_ = table_schema.index_params_;
378
        file_schema.engine_type_ = table_schema.engine_type_;
G
groot 已提交
379 380
        file_schema.metric_type_ = table_schema.metric_type_;

381
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
382 383 384 385 386 387 388
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
G
groot 已提交
389
    } catch (std::exception& e) {
G
groot 已提交
390 391 392 393 394 395
        return HandleException("Encounter exception when create table file", e.what());
    }

    return Status::OK();
}

S
starlord 已提交
396
Status
397
SqliteMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
G
groot 已提交
398
                              TableFilesSchema& table_files) {
G
groot 已提交
399
    try {
S
shengjh 已提交
400 401
        fiu_do_on("SqliteMetaImpl.GetTableFiles.throw_exception", throw std::exception());

G
groot 已提交
402
        table_files.clear();
403 404 405 406 407 408
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::id_, ids) and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
409 410 411 412 413
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
G
groot 已提交
414
        }
G
groot 已提交
415

G
groot 已提交
416
        Status result;
G
groot 已提交
417
        for (auto& file : files) {
G
groot 已提交
418 419 420
            TableFileSchema file_schema;
            file_schema.table_id_ = table_id;
            file_schema.id_ = std::get<0>(file);
421 422 423 424 425 426 427 428
            file_schema.segment_id_ = std::get<1>(file);
            file_schema.file_id_ = std::get<2>(file);
            file_schema.file_type_ = std::get<3>(file);
            file_schema.file_size_ = std::get<4>(file);
            file_schema.row_count_ = std::get<5>(file);
            file_schema.date_ = std::get<6>(file);
            file_schema.engine_type_ = std::get<7>(file);
            file_schema.created_on_ = std::get<8>(file);
G
groot 已提交
429 430
            file_schema.dimension_ = table_schema.dimension_;
            file_schema.index_file_size_ = table_schema.index_file_size_;
431
            file_schema.index_params_ = table_schema.index_params_;
G
groot 已提交
432
            file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
433

G
groot 已提交
434
            utils::GetTableFilePath(options_, file_schema);
435

G
groot 已提交
436 437
            table_files.emplace_back(file_schema);
        }
438

G
groot 已提交
439 440
        ENGINE_LOG_DEBUG << "Get table files by id";
        return result;
G
groot 已提交
441
    } catch (std::exception& e) {
G
groot 已提交
442
        return HandleException("Encounter exception when lookup table files", e.what());
443
    }
X
Xu Peng 已提交
444 445
}

446 447 448 449 450 451
Status
SqliteMetaImpl::GetTableFilesBySegmentId(const std::string& segment_id,
                                         milvus::engine::meta::TableFilesSchema& table_files) {
    try {
        table_files.clear();
        auto files = ConnectorPtr->select(
Z
Zhiru Zhu 已提交
452 453 454 455
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
            where(c(&TableFileSchema::segment_id_) == segment_id and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));

        if (!files.empty()) {
            TableSchema table_schema;
            table_schema.table_id_ = std::get<1>(files[0]);
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }

            for (auto& file : files) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_schema.table_id_;
                file_schema.id_ = std::get<0>(file);
                file_schema.segment_id_ = std::get<2>(file);
                file_schema.file_id_ = std::get<3>(file);
                file_schema.file_type_ = std::get<4>(file);
                file_schema.file_size_ = std::get<5>(file);
                file_schema.row_count_ = std::get<6>(file);
                file_schema.date_ = std::get<7>(file);
                file_schema.engine_type_ = std::get<8>(file);
                file_schema.created_on_ = std::get<9>(file);
                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
481
                file_schema.index_params_ = table_schema.index_params_;
482 483 484 485 486 487 488 489 490 491 492 493 494 495
                file_schema.metric_type_ = table_schema.metric_type_;

                utils::GetTableFilePath(options_, file_schema);
                table_files.emplace_back(file_schema);
            }
        }

        ENGINE_LOG_DEBUG << "Get table files by segment id";
        return Status::OK();
    } catch (std::exception& e) {
        return HandleException("Encounter exception when lookup table files by segment id", e.what());
    }
}

S
starlord 已提交
496
Status
G
groot 已提交
497
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
G
groot 已提交
498
    try {
Y
Yu Kun 已提交
499
        server::MetricCollector metric;
S
shengjh 已提交
500
        fiu_do_on("SqliteMetaImpl.UpdateTableFlag.throw_exception", throw std::exception());
G
groot 已提交
501

502 503
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableSchema::flag_) = flag), where(c(&TableSchema::table_id_) == table_id));
G
groot 已提交
504
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
G
groot 已提交
505
    } catch (std::exception& e) {
G
groot 已提交
506 507
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e.what());
G
groot 已提交
508 509 510 511 512
    }

    return Status::OK();
}

513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
Status
SqliteMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_lsn) {
    try {
        server::MetricCollector metric;

        ConnectorPtr->update_all(set(c(&TableSchema::flush_lsn_) = flush_lsn),
                                 where(c(&TableSchema::table_id_) == table_id));
        ENGINE_LOG_DEBUG << "Successfully update table flush_lsn, table id = " << table_id;
    } catch (std::exception& e) {
        std::string msg = "Encounter exception when update table lsn: table_id = " + table_id;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFlushLSN(const std::string& table_id, uint64_t& flush_lsn) {
    try {
        server::MetricCollector metric;

        auto selected =
            ConnectorPtr->select(columns(&TableSchema::flush_lsn_), where(c(&TableSchema::table_id_) == table_id));

        if (selected.size() > 0) {
            flush_lsn = std::get<0>(selected[0]);
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
        }

    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, TableFilesSchema& table_files) {
    table_files.clear();

    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::flush_lsn_) == flush_lsn));

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
        for (auto& file : selected) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
596
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
597 598 599 600 601 602 603 604 605 606 607 608 609
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            table_files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " files with flush_lsn = " << flush_lsn;
        }
        return ret;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }
}

S
starlord 已提交
610
Status
G
groot 已提交
611
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
612
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
G
groot 已提交
613
    try {
Y
Yu Kun 已提交
614
        server::MetricCollector metric;
S
shengjh 已提交
615
        fiu_do_on("SqliteMetaImpl.UpdateTableFile.throw_exception", throw std::exception());
G
groot 已提交
616

Y
youny626 已提交
617
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
618 619
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
620 621
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));
G
groot 已提交
622

623 624
        // if the table has been deleted, just mark the table file as TO_DELETE
        // clean thread will delete the file later
G
groot 已提交
625
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
626 627
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }
G
groot 已提交
628

G
groot 已提交
629 630 631
        ConnectorPtr->update(file_schema);

        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
G
groot 已提交
632
    } catch (std::exception& e) {
633 634
        std::string msg =
            "Exception update table file: table_id = " + file_schema.table_id_ + " file_id = " + file_schema.file_id_;
G
groot 已提交
635 636
        return HandleException(msg, e.what());
    }
G
groot 已提交
637 638 639
    return Status::OK();
}

S
starlord 已提交
640
Status
G
groot 已提交
641
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
642
    try {
Y
Yu Kun 已提交
643
        server::MetricCollector metric;
S
shengjh 已提交
644
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.throw_exception", throw std::exception());
G
groot 已提交
645

646
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
647
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);
G
groot 已提交
648

G
groot 已提交
649
        std::map<std::string, bool> has_tables;
G
groot 已提交
650
        for (auto& file : files) {
G
groot 已提交
651 652 653 654
            if (has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
655 656
                                               where(c(&TableSchema::table_id_) == file.table_id_ and
                                                     c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
657 658 659 660 661
            if (tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
662
        }
P
peng.xu 已提交
663

G
groot 已提交
664
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
665
            for (auto& file : files) {
G
groot 已提交
666 667
                if (!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
668
                }
G
groot 已提交
669 670 671

                file.updated_time_ = utils::GetMicroSecTimeStamp();
                ConnectorPtr->update(file);
672
            }
G
groot 已提交
673 674
            return true;
        });
S
shengjh 已提交
675
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.fail_commited", commited = false);
676

G
groot 已提交
677 678
        if (!commited) {
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
P
peng.xu 已提交
679
        }
G
groot 已提交
680 681

        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
G
groot 已提交
682
    } catch (std::exception& e) {
G
groot 已提交
683
        return HandleException("Encounter exception when update table files", e.what());
P
peng.xu 已提交
684 685 686 687
    }
    return Status::OK();
}

S
starlord 已提交
688
Status
Y
youny626 已提交
689
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
690
    try {
Y
Yu Kun 已提交
691
        server::MetricCollector metric;
S
shengjh 已提交
692
        fiu_do_on("SqliteMetaImpl.UpdateTableIndex.throw_exception", throw std::exception());
693

Y
youny626 已提交
694
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
695 696
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

697 698 699 700 701
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::owner_table_,
                    &TableSchema::partition_tag_, &TableSchema::version_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
702

S
starlord 已提交
703
        if (tables.size() > 0) {
704 705 706 707 708 709
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
710
            table_schema.flag_ = std::get<4>(tables[0]);
711
            table_schema.index_file_size_ = std::get<5>(tables[0]);
G
groot 已提交
712 713 714
            table_schema.owner_table_ = std::get<6>(tables[0]);
            table_schema.partition_tag_ = std::get<7>(tables[0]);
            table_schema.version_ = std::get<8>(tables[0]);
715
            table_schema.engine_type_ = index.engine_type_;
716
            table_schema.index_params_ = index.extra_params_.dump();
S
starlord 已提交
717
            table_schema.metric_type_ = index.metric_type_;
718 719 720

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
721
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
722 723
        }

724 725 726 727 728
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
729

730
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
731
    } catch (std::exception& e) {
732
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
733
        return HandleException(msg, e.what());
734
    }
S
starlord 已提交
735 736 737 738

    return Status::OK();
}

S
starlord 已提交
739
Status
G
groot 已提交
740
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
S
starlord 已提交
741
    try {
Y
Yu Kun 已提交
742
        server::MetricCollector metric;
S
shengjh 已提交
743
        fiu_do_on("SqliteMetaImpl.UpdateTableFilesToIndex.throw_exception", throw std::exception());
S
starlord 已提交
744

745
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
746 747
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

748 749 750 751
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
G
groot 已提交
752 753

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
G
groot 已提交
754
    } catch (std::exception& e) {
G
groot 已提交
755
        return HandleException("Encounter exception when update table files to to_index", e.what());
S
starlord 已提交
756 757
    }

758 759 760
    return Status::OK();
}

S
starlord 已提交
761
Status
Y
youny626 已提交
762
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
763
    try {
Y
Yu Kun 已提交
764
        server::MetricCollector metric;
S
shengjh 已提交
765
        fiu_do_on("SqliteMetaImpl.DescribeTableIndex.throw_exception", throw std::exception());
766

767
        auto groups = ConnectorPtr->select(
768
            columns(&TableSchema::engine_type_, &TableSchema::index_params_, &TableSchema::metric_type_),
769
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
770 771 772

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
773
            index.extra_params_ = milvus::json::parse(std::get<1>(groups[0]));
S
starlord 已提交
774
            index.metric_type_ = std::get<2>(groups[0]);
775
        } else {
S
starlord 已提交
776
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
777
        }
Y
youny626 已提交
778
    } catch (std::exception& e) {
S
starlord 已提交
779
        return HandleException("Encounter exception when describe index", e.what());
780 781 782 783 784
    }

    return Status::OK();
}

S
starlord 已提交
785
Status
Y
youny626 已提交
786
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
787
    try {
Y
Yu Kun 已提交
788
        server::MetricCollector metric;
S
shengjh 已提交
789
        fiu_do_on("SqliteMetaImpl.DropTableIndex.throw_exception", throw std::exception());
790

Y
youny626 已提交
791
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
792 793
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

794 795 796 797 798 799 800 801 802 803 804 805 806
        // soft delete index files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));

        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));

        // set table index type to raw
G
groot 已提交
807
        ConnectorPtr->update_all(
808
            set(c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE, c(&TableSchema::index_params_) = "{}"),
809
            where(c(&TableSchema::table_id_) == table_id));
810

811
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
G
groot 已提交
812
    } catch (std::exception& e) {
S
starlord 已提交
813
        return HandleException("Encounter exception when delete table index files", e.what());
814 815 816 817 818
    }

    return Status::OK();
}

S
starlord 已提交
819
Status
820 821
SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag,
                                uint64_t lsn) {
G
groot 已提交
822
    server::MetricCollector metric;
823

G
groot 已提交
824 825 826 827 828
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
G
groot 已提交
829
    }
G
groot 已提交
830

G
groot 已提交
831
    // not allow create partition under partition
G
groot 已提交
832
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
833
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
834
    }
G
groot 已提交
835

836 837 838 839 840 841 842 843
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
G
groot 已提交
844
    if (!exist_partition.empty()) {
G
groot 已提交
845
        return Status(DB_ERROR, "Duplicate partition is not allowed");
846
    }
G
groot 已提交
847

848 849
    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
850 851 852
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
X
Xu Peng 已提交
853
    }
G
groot 已提交
854

G
groot 已提交
855 856 857 858
    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
859
    table_schema.partition_tag_ = valid_tag;
860
    table_schema.flush_lsn_ = lsn;
861 862 863 864 865

    status = CreateTable(table_schema);
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }
G
groot 已提交
866

867
    return status;
X
Xu Peng 已提交
868 869
}

S
starlord 已提交
870
Status
G
groot 已提交
871 872 873
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}
874

G
groot 已提交
875
Status
G
groot 已提交
876
SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
G
groot 已提交
877
    try {
Y
Yu Kun 已提交
878
        server::MetricCollector metric;
S
shengjh 已提交
879
        fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception());
G
groot 已提交
880

881 882 883 884 885 886 887 888
        auto partitions = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::partition_tag_,
                    &TableSchema::version_, &TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

889
        for (size_t i = 0; i < partitions.size(); i++) {
G
groot 已提交
890
            meta::TableSchema partition_schema;
891 892 893 894 895 896 897
            partition_schema.id_ = std::get<0>(partitions[i]);
            partition_schema.state_ = std::get<1>(partitions[i]);
            partition_schema.dimension_ = std::get<2>(partitions[i]);
            partition_schema.created_on_ = std::get<3>(partitions[i]);
            partition_schema.flag_ = std::get<4>(partitions[i]);
            partition_schema.index_file_size_ = std::get<5>(partitions[i]);
            partition_schema.engine_type_ = std::get<6>(partitions[i]);
898
            partition_schema.index_params_ = std::get<7>(partitions[i]);
899 900 901 902 903
            partition_schema.metric_type_ = std::get<8>(partitions[i]);
            partition_schema.owner_table_ = table_id;
            partition_schema.partition_tag_ = std::get<9>(partitions[i]);
            partition_schema.version_ = std::get<10>(partitions[i]);
            partition_schema.table_id_ = std::get<11>(partitions[i]);
G
groot 已提交
904
            partition_schema_array.emplace_back(partition_schema);
G
groot 已提交
905
        }
G
groot 已提交
906
    } catch (std::exception& e) {
G
groot 已提交
907
        return HandleException("Encounter exception when show partitions", e.what());
908 909
    }

X
Xu Peng 已提交
910
    return Status::OK();
X
Xu Peng 已提交
911 912
}

S
starlord 已提交
913
Status
G
groot 已提交
914
SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
915
    try {
Y
Yu Kun 已提交
916
        server::MetricCollector metric;
S
shengjh 已提交
917
        fiu_do_on("SqliteMetaImpl.GetPartitionName.throw_exception", throw std::exception());
G
groot 已提交
918

919 920 921 922 923
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

924 925 926 927
        auto name = ConnectorPtr->select(
            columns(&TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and c(&TableSchema::partition_tag_) == valid_tag and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
928 929 930
        if (name.size() > 0) {
            partition_name = std::get<0>(name[0]);
        } else {
931
            return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
932
        }
G
groot 已提交
933
    } catch (std::exception& e) {
G
groot 已提交
934
        return HandleException("Encounter exception when get partition name", e.what());
X
Xu Peng 已提交
935
    }
G
groot 已提交
936 937

    return Status::OK();
X
Xu Peng 已提交
938 939
}

S
starlord 已提交
940
Status
941
SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
X
xj.lin 已提交
942
    files.clear();
Y
Yu Kun 已提交
943
    server::MetricCollector metric;
X
xj.lin 已提交
944 945

    try {
S
shengjh 已提交
946 947
        fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());

Y
youny626 已提交
948
        auto select_columns =
949 950 951
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
952 953

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
954

Y
youny626 已提交
955 956
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
957
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
958 959 960 961

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
962 963 964
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
965

Y
youny626 已提交
966
        // perform query
967
        decltype(ConnectorPtr->select(select_columns)) selected;
968
        if (ids.empty()) {
X
xj.lin 已提交
969
            auto filter = where(match_tableid and match_type);
970
            selected = ConnectorPtr->select(select_columns, filter);
971
        } else {
X
xj.lin 已提交
972
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
973
            auto filter = where(match_tableid and match_fileid and match_type);
974
            selected = ConnectorPtr->select(select_columns, filter);
X
xj.lin 已提交
975 976
        }

S
starlord 已提交
977
        Status ret;
Y
youny626 已提交
978
        for (auto& file : selected) {
979
            TableFileSchema table_file;
X
xj.lin 已提交
980 981
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
982 983 984 985 986 987 988
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
X
xj.lin 已提交
989
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
990
            table_file.index_file_size_ = table_schema.index_file_size_;
991
            table_file.index_params_ = table_schema.index_params_;
S
starlord 已提交
992 993
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
994
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
995
            if (!status.ok()) {
S
starlord 已提交
996
                ret = status;
S
starlord 已提交
997 998
            }

999
            files.emplace_back(table_file);
X
xj.lin 已提交
1000
        }
S
starlord 已提交
1001
        if (files.empty()) {
S
starlord 已提交
1002
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
1003
        }
S
starlord 已提交
1004

S
starlord 已提交
1005
        if (selected.size() > 0) {
1006 1007
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
1008
        return ret;
Y
youny626 已提交
1009
    } catch (std::exception& e) {
S
starlord 已提交
1010
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
1011 1012 1013
    }
}

S
starlord 已提交
1014
Status
1015
SqliteMetaImpl::FilesToMerge(const std::string& table_id, TableFilesSchema& files) {
X
Xu Peng 已提交
1016
    files.clear();
X
Xu Peng 已提交
1017

1018
    try {
S
shengjh 已提交
1019 1020
        fiu_do_on("SqliteMetaImpl.FilesToMerge.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1021
        server::MetricCollector metric;
G
groot 已提交
1022

Y
youny626 已提交
1023
        // check table existence
S
starlord 已提交
1024 1025 1026 1027 1028 1029 1030
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
1031 1032
        // get files to merge
        auto selected = ConnectorPtr->select(
1033 1034 1035
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::created_on_),
Y
youny626 已提交
1036 1037 1038
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
1039

S
starlord 已提交
1040
        Status result;
1041
        int64_t to_merge_files = 0;
Y
youny626 已提交
1042
        for (auto& file : selected) {
S
starlord 已提交
1043
            TableFileSchema table_file;
1044
            table_file.file_size_ = std::get<5>(file);
S
starlord 已提交
1045
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
1046
                continue;  // skip large file
S
starlord 已提交
1047 1048
            }

G
groot 已提交
1049 1050
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1051 1052 1053 1054 1055 1056
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
1057
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1058
            table_file.index_file_size_ = table_schema.index_file_size_;
1059
            table_file.index_params_ = table_schema.index_params_;
S
starlord 已提交
1060 1061
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1062
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1063
            if (!status.ok()) {
S
starlord 已提交
1064
                result = status;
S
starlord 已提交
1065 1066
            }

1067
            files.emplace_back(table_file);
1068
            ++to_merge_files;
X
Xu Peng 已提交
1069
        }
S
starlord 已提交
1070

1071 1072
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1073
        }
G
groot 已提交
1074 1075 1076 1077 1078 1079 1080
        return result;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when iterate merge files", e.what());
    }
}

Status
G
groot 已提交
1081
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
G
groot 已提交
1082 1083 1084
    files.clear();

    try {
S
shengjh 已提交
1085 1086
        fiu_do_on("SqliteMetaImpl.FilesToIndex.throw_exception", throw std::exception());

G
groot 已提交
1087 1088
        server::MetricCollector metric;

1089 1090 1091 1092 1093 1094
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_INDEX));
G
groot 已提交
1095 1096 1097 1098 1099

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
G
groot 已提交
1100
        for (auto& file : selected) {
G
groot 已提交
1101 1102
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1103 1104 1105 1106 1107 1108 1109 1110
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);
G
groot 已提交
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
S
shengjh 已提交
1121 1122
                fiu_do_on("SqliteMetaImpl_FilesToIndex_TableNotFound",
                          status = Status(DB_NOT_FOUND, "table not found"));
G
groot 已提交
1123 1124 1125 1126 1127 1128 1129
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
1130
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
G
groot 已提交
1131 1132 1133 1134 1135 1136 1137 1138
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
        return ret;
G
groot 已提交
1139
    } catch (std::exception& e) {
G
groot 已提交
1140
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
1141
    }
X
Xu Peng 已提交
1142 1143
}

S
starlord 已提交
1144
Status
1145
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
G
groot 已提交
1146
                            TableFilesSchema& table_files) {
G
groot 已提交
1147 1148 1149
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
    }
1150

1151 1152 1153 1154 1155 1156 1157 1158 1159
    Status ret = Status::OK();

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1160
    try {
S
shengjh 已提交
1161 1162
        fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());

G
groot 已提交
1163
        table_files.clear();
1164 1165 1166 1167 1168
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1169

G
groot 已提交
1170 1171 1172
        if (selected.size() >= 1) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
G
groot 已提交
1173 1174 1175 1176
            for (auto& file : selected) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_id;
                file_schema.id_ = std::get<0>(file);
1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
                file_schema.segment_id_ = std::get<1>(file);
                file_schema.file_id_ = std::get<2>(file);
                file_schema.file_type_ = std::get<3>(file);
                file_schema.file_size_ = std::get<4>(file);
                file_schema.row_count_ = std::get<5>(file);
                file_schema.date_ = std::get<6>(file);
                file_schema.engine_type_ = std::get<7>(file);
                file_schema.created_on_ = std::get<8>(file);

                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
1188
                file_schema.index_params_ = table_schema.index_params_;
1189
                file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
1190 1191

                switch (file_schema.file_type_) {
Z
Zhiru Zhu 已提交
1192 1193
                    case (int)TableFileSchema::RAW:
                        ++raw_count;
G
groot 已提交
1194
                        break;
Z
Zhiru Zhu 已提交
1195 1196
                    case (int)TableFileSchema::NEW:
                        ++new_count;
G
groot 已提交
1197
                        break;
Z
Zhiru Zhu 已提交
1198 1199
                    case (int)TableFileSchema::NEW_MERGE:
                        ++new_merge_count;
G
groot 已提交
1200
                        break;
Z
Zhiru Zhu 已提交
1201 1202
                    case (int)TableFileSchema::NEW_INDEX:
                        ++new_index_count;
G
groot 已提交
1203
                        break;
Z
Zhiru Zhu 已提交
1204 1205
                    case (int)TableFileSchema::TO_INDEX:
                        ++to_index_count;
G
groot 已提交
1206
                        break;
Z
Zhiru Zhu 已提交
1207 1208
                    case (int)TableFileSchema::INDEX:
                        ++index_count;
G
groot 已提交
1209
                        break;
Z
Zhiru Zhu 已提交
1210 1211
                    case (int)TableFileSchema::BACKUP:
                        ++backup_count;
1212 1213
                        break;
                    default:
G
groot 已提交
1214
                        break;
1215 1216 1217 1218 1219
                }

                auto status = utils::GetTableFilePath(options_, file_schema);
                if (!status.ok()) {
                    ret = status;
G
groot 已提交
1220
                }
G
groot 已提交
1221 1222

                table_files.emplace_back(file_schema);
G
groot 已提交
1223
            }
1224

G
groot 已提交
1225
            std::string msg = "Get table files by type.";
G
groot 已提交
1226 1227
            for (int file_type : file_types) {
                switch (file_type) {
Z
Zhiru Zhu 已提交
1228 1229
                    case (int)TableFileSchema::RAW:
                        msg = msg + " raw files:" + std::to_string(raw_count);
G
groot 已提交
1230
                        break;
Z
Zhiru Zhu 已提交
1231 1232
                    case (int)TableFileSchema::NEW:
                        msg = msg + " new files:" + std::to_string(new_count);
G
groot 已提交
1233
                        break;
Z
Zhiru Zhu 已提交
1234 1235
                    case (int)TableFileSchema::NEW_MERGE:
                        msg = msg + " new_merge files:" + std::to_string(new_merge_count);
G
groot 已提交
1236
                        break;
Z
Zhiru Zhu 已提交
1237 1238
                    case (int)TableFileSchema::NEW_INDEX:
                        msg = msg + " new_index files:" + std::to_string(new_index_count);
G
groot 已提交
1239
                        break;
Z
Zhiru Zhu 已提交
1240 1241
                    case (int)TableFileSchema::TO_INDEX:
                        msg = msg + " to_index files:" + std::to_string(to_index_count);
G
groot 已提交
1242
                        break;
Z
Zhiru Zhu 已提交
1243 1244
                    case (int)TableFileSchema::INDEX:
                        msg = msg + " index files:" + std::to_string(index_count);
G
groot 已提交
1245
                        break;
Z
Zhiru Zhu 已提交
1246 1247
                    case (int)TableFileSchema::BACKUP:
                        msg = msg + " backup files:" + std::to_string(backup_count);
G
groot 已提交
1248
                        break;
1249 1250
                    default:
                        break;
G
groot 已提交
1251 1252 1253
                }
            }
            ENGINE_LOG_DEBUG << msg;
X
Xu Peng 已提交
1254
        }
G
groot 已提交
1255
    } catch (std::exception& e) {
G
groot 已提交
1256
        return HandleException("Encounter exception when check non index files", e.what());
X
Xu Peng 已提交
1257
    }
1258 1259

    return ret;
X
Xu Peng 已提交
1260 1261
}

S
starlord 已提交
1262
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1263 1264
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
1265
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
1266 1267 1268 1269 1270
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
1271 1272
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
1273
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
L
Lizhou Gao 已提交
1274
            int64_t usecs = limit * DAY * US_PS;
S
starlord 已提交
1275
            int64_t now = utils::GetMicroSecTimeStamp();
1276
            try {
S
shengjh 已提交
1277 1278
                fiu_do_on("SqliteMetaImpl.Archive.throw_exception", throw std::exception());

Y
youny626 已提交
1279
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1280 1281
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1282 1283 1284
                ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
                                         where(c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                                               c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
1285
            } catch (std::exception& e) {
S
starlord 已提交
1286
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1287
            }
1288 1289

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
1290
        }
G
groot 已提交
1291
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
1292
            uint64_t sum = 0;
X
Xu Peng 已提交
1293
            Size(sum);
X
Xu Peng 已提交
1294

Y
youny626 已提交
1295
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
1296
            DiscardFiles(to_delete);
1297 1298

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
1299 1300 1301 1302 1303 1304
        }
    }

    return Status::OK();
}

S
starlord 已提交
1305
Status
Y
youny626 已提交
1306
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
1307
    result = 0;
X
Xu Peng 已提交
1308
    try {
S
shengjh 已提交
1309 1310
        fiu_do_on("SqliteMetaImpl.Size.throw_exception", throw std::exception());

1311
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
1312 1313
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
1314 1315
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
1316
            }
Y
youny626 已提交
1317
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
1318
        }
Y
youny626 已提交
1319
    } catch (std::exception& e) {
S
starlord 已提交
1320
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
1321 1322 1323 1324 1325
    }

    return Status::OK();
}

S
starlord 已提交
1326
Status
1327
SqliteMetaImpl::CleanUpShadowFiles() {
X
Xu Peng 已提交
1328
    try {
Y
Yu Kun 已提交
1329
        server::MetricCollector metric;
G
groot 已提交
1330

Y
youny626 已提交
1331
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1332 1333
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1334 1335
        std::vector<int> file_types = {(int)TableFileSchema::NEW, (int)TableFileSchema::NEW_INDEX,
                                       (int)TableFileSchema::NEW_MERGE};
G
groot 已提交
1336 1337
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1338

G
groot 已提交
1339
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
1340
            for (auto& file : files) {
G
groot 已提交
1341 1342
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
G
groot 已提交
1343 1344 1345 1346
            }
            return true;
        });

S
shengjh 已提交
1347 1348
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.fail_commited", commited = false);
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.throw_exception", throw std::exception());
G
groot 已提交
1349
        if (!commited) {
G
groot 已提交
1350
            return HandleException("CleanUp error: sqlite transaction failed");
G
groot 已提交
1351
        }
X
Xu Peng 已提交
1352

G
groot 已提交
1353 1354
        if (files.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
G
groot 已提交
1355
        }
G
groot 已提交
1356
    } catch (std::exception& e) {
G
groot 已提交
1357
        return HandleException("Encounter exception when clean table file", e.what());
P
peng.xu 已提交
1358 1359 1360 1361 1362
    }

    return Status::OK();
}

1363
Status
1364
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) {
X
Xu Peng 已提交
1365
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1366 1367
    std::set<std::string> table_ids;

Y
youny626 已提交
1368
    // remove to_delete files
1369
    try {
S
shengjh 已提交
1370 1371
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_ThrowException", throw std::exception());

Y
Yu Kun 已提交
1372
        server::MetricCollector metric;
1373

G
groot 已提交
1374 1375 1376 1377 1378
        std::vector<int> file_types = {
            (int)TableFileSchema::TO_DELETE,
            (int)TableFileSchema::BACKUP,
        };

Y
youny626 已提交
1379
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1380 1381
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1382
        // collect files to be deleted
1383 1384 1385 1386 1387 1388
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::engine_type_, &TableFileSchema::file_id_, &TableFileSchema::file_type_,
                    &TableFileSchema::date_),
            where(in(&TableFileSchema::file_type_, file_types) and
                  c(&TableFileSchema::updated_time_) < now - seconds * US_PS));
1389

G
groot 已提交
1390
        int64_t clean_files = 0;
G
groot 已提交
1391 1392
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1393
            for (auto& file : files) {
G
groot 已提交
1394 1395
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
1396 1397 1398 1399 1400
                table_file.segment_id_ = std::get<2>(file);
                table_file.engine_type_ = std::get<3>(file);
                table_file.file_id_ = std::get<4>(file);
                table_file.file_type_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
G
groot 已提交
1401

1402
                // check if the file can be deleted
1403
                if (OngoingFileChecker::GetInstance().IsIgnored(table_file)) {
G
groot 已提交
1404 1405
                    ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
                                     << " currently is in use, not able to delete now";
1406
                    continue;  // ignore this file, don't delete it
1407 1408
                }

G
groot 已提交
1409 1410
                // erase from cache, must do this before file deleted,
                // because GetTableFilePath won't able to generate file path after the file is deleted
1411
                // TODO(zhiru): clean up
G
groot 已提交
1412 1413
                utils::GetTableFilePath(options_, table_file);
                server::CommonUtil::EraseFromCache(table_file.location_);
G
groot 已提交
1414

G
groot 已提交
1415
                if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
1416 1417 1418
                    // If we are deleting a raw table file, it means it's okay to delete the entire segment directory.
                    // Else, we can only delete the single file
                    // TODO(zhiru): We determine whether a table file is raw by its engine type. This is a bit hacky
1419
                    if (utils::IsRawIndexType(table_file.engine_type_)) {
1420 1421 1422 1423 1424 1425 1426 1427 1428
                        utils::DeleteSegment(options_, table_file);
                        std::string segment_dir;
                        utils::GetParentPath(table_file.location_, segment_dir);
                        ENGINE_LOG_DEBUG << "Remove segment directory: " << segment_dir;
                    } else {
                        utils::DeleteTableFilePath(options_, table_file);
                        ENGINE_LOG_DEBUG << "Remove table file: " << table_file.location_;
                    }

G
groot 已提交
1429 1430
                    // delete file from meta
                    ConnectorPtr->remove<TableFileSchema>(table_file.id_);
G
groot 已提交
1431

G
groot 已提交
1432
                    table_ids.insert(table_file.table_id_);
G
groot 已提交
1433

1434
                    ++clean_files;
G
typo  
groot 已提交
1435
                }
1436
            }
G
groot 已提交
1437 1438
            return true;
        });
S
shengjh 已提交
1439
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_FailCommited", commited = false);
G
groot 已提交
1440 1441

        if (!commited) {
S
starlord 已提交
1442
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1443 1444
        }

G
groot 已提交
1445
        if (clean_files > 0) {
G
groot 已提交
1446
            ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
1447
        }
Y
youny626 已提交
1448
    } catch (std::exception& e) {
S
starlord 已提交
1449
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1450 1451
    }

Y
youny626 已提交
1452
    // remove to_delete tables
G
groot 已提交
1453
    try {
S
shengjh 已提交
1454
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1455
        server::MetricCollector metric;
G
groot 已提交
1456

Y
youny626 已提交
1457
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1458 1459
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1460 1461
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1462 1463

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1464 1465
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1466
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1467
            }
G
groot 已提交
1468 1469 1470

            return true;
        });
S
shengjh 已提交
1471
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_Failcommited", commited = false);
G
groot 已提交
1472 1473

        if (!commited) {
S
starlord 已提交
1474
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1475
        }
G
groot 已提交
1476

S
starlord 已提交
1477
        if (tables.size() > 0) {
1478 1479
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1480
    } catch (std::exception& e) {
S
starlord 已提交
1481
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1482 1483
    }

Y
youny626 已提交
1484 1485
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1486
    try {
S
shengjh 已提交
1487
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTableFolder_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1488
        server::MetricCollector metric;
S
starlord 已提交
1489

1490
        int64_t remove_tables = 0;
Y
youny626 已提交
1491
        for (auto& table_id : table_ids) {
S
starlord 已提交
1492 1493
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1494
            if (selected.size() == 0) {
S
starlord 已提交
1495
                utils::DeleteTablePath(options_, table_id);
1496
                ++remove_tables;
S
starlord 已提交
1497 1498 1499
            }
        }

1500 1501
        if (remove_tables) {
            ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables folder";
1502
        }
Y
youny626 已提交
1503
    } catch (std::exception& e) {
S
starlord 已提交
1504
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1505 1506
    }

X
Xu Peng 已提交
1507 1508 1509
    return Status::OK();
}

S
starlord 已提交
1510
Status
G
groot 已提交
1511
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1512
    try {
S
shengjh 已提交
1513 1514
        fiu_do_on("SqliteMetaImpl.Count.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1515
        server::MetricCollector metric;
1516

Y
youny626 已提交
1517 1518 1519 1520 1521
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1522

1523
        TableSchema table_schema;
G
groot 已提交
1524
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1525
        auto status = DescribeTable(table_schema);
1526

1527 1528 1529 1530 1531
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1532
        for (auto& file : selected) {
1533 1534
            result += std::get<0>(file);
        }
Y
youny626 已提交
1535
    } catch (std::exception& e) {
S
starlord 已提交
1536
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1537 1538 1539 1540
    }
    return Status::OK();
}

S
starlord 已提交
1541 1542
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1543 1544 1545
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1546 1547
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1548
    } catch (std::exception& e) {
S
starlord 已提交
1549
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1550
    }
S
starlord 已提交
1551

X
Xu Peng 已提交
1552 1553 1554
    return Status::OK();
}

G
groot 已提交
1555 1556 1557 1558 1559 1560 1561 1562 1563
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }

    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
S
shengjh 已提交
1564 1565
        fiu_do_on("SqliteMetaImpl.DiscardFiles.throw_exception", throw std::exception());

G
groot 已提交
1566 1567
        server::MetricCollector metric;

1568
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
1569 1570 1571
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto commited = ConnectorPtr->transaction([&]() mutable {
1572 1573 1574 1575
            auto selected =
                ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::file_size_),
                                     where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE),
                                     order_by(&TableFileSchema::id_), limit(10));
G
groot 已提交
1576 1577 1578 1579

            std::vector<int> ids;
            TableFileSchema table_file;

G
groot 已提交
1580
            for (auto& file : selected) {
1581 1582
                if (to_discard_size <= 0)
                    break;
G
groot 已提交
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
                table_file.id_ = std::get<0>(file);
                table_file.file_size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            if (ids.size() == 0) {
                return true;
            }

1595 1596 1597
            ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                         c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                     where(in(&TableFileSchema::id_, ids)));
G
groot 已提交
1598 1599 1600

            return true;
        });
S
shengjh 已提交
1601
        fiu_do_on("SqliteMetaImpl.DiscardFiles.fail_commited", commited = false);
G
groot 已提交
1602 1603 1604
        if (!commited) {
            return HandleException("DiscardFiles error: sqlite transaction failed");
        }
G
groot 已提交
1605
    } catch (std::exception& e) {
G
groot 已提交
1606 1607 1608 1609 1610 1611
        return HandleException("Encounter exception when discard table file", e.what());
    }

    return DiscardFiles(to_discard_size);
}

1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656
Status
SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            EnvironmentSchema env;
            env.global_lsn_ = lsn;
            ConnectorPtr->insert(env);
        } else {
            uint64_t last_lsn = std::get<0>(selected[0]);
            if (lsn == last_lsn) {
                return Status::OK();
            }

            ConnectorPtr->update_all(set(c(&EnvironmentSchema::global_lsn_) = lsn));
        }

        ENGINE_LOG_DEBUG << "Update global lsn = " << lsn;
    } catch (std::exception& e) {
        std::string msg = "Exception update global lsn = " + lsn;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            lsn = 0;
        } else {
            lsn = std::get<0>(selected[0]);
        }
    } catch (std::exception& e) {
        return HandleException("Encounter exception when delete table folder", e.what());
    }

    return Status::OK();
}
G
groot 已提交
1657

1658 1659 1660
}  // namespace meta
}  // namespace engine
}  // namespace milvus