SqliteMetaImpl.cpp 67.8 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/meta/SqliteMetaImpl.h"
X
Xu Peng 已提交
13

Y
youny626 已提交
14
#include <sqlite_orm.h>
X
Xu Peng 已提交
15
#include <unistd.h>
16

Z
Zhiru Zhu 已提交
17
#include <fiu-local.h>
X
Xu Peng 已提交
18
#include <boost/filesystem.hpp>
19
#include <chrono>
X
Xu Peng 已提交
20
#include <fstream>
Y
youny626 已提交
21
#include <iostream>
S
starlord 已提交
22
#include <map>
Y
youny626 已提交
23
#include <memory>
S
starlord 已提交
24
#include <set>
Y
youny626 已提交
25
#include <sstream>
S
starlord 已提交
26

27 28 29 30 31 32 33 34 35
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
Z
Zhiru Zhu 已提交
36
#include "utils/ValidationUtil.h"
37

J
jinhai 已提交
38
namespace milvus {
X
Xu Peng 已提交
39
namespace engine {
40
namespace meta {
X
Xu Peng 已提交
41

X
Xu Peng 已提交
42 43
using namespace sqlite_orm;

G
groot 已提交
44 45
namespace {

S
starlord 已提交
46
Status
Y
youny626 已提交
47
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
48
    if (what == nullptr) {
S
starlord 已提交
49 50 51 52 53 54 55
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
56 57
}

Y
youny626 已提交
58
}  // namespace
G
groot 已提交
59

S
starlord 已提交
60
inline auto
G
groot 已提交
61
StoragePrototype(const std::string& path) {
62 63 64 65 66 67 68 69 70
    return make_storage(
        path,
        make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))),
        make_table(META_TABLES, make_column("id", &TableSchema::id_, primary_key()),
                   make_column("table_id", &TableSchema::table_id_, unique()),
                   make_column("state", &TableSchema::state_), make_column("dimension", &TableSchema::dimension_),
                   make_column("created_on", &TableSchema::created_on_),
                   make_column("flag", &TableSchema::flag_, default_value(0)),
                   make_column("index_file_size", &TableSchema::index_file_size_),
71
                   make_column("engine_type", &TableSchema::engine_type_),
G
groot 已提交
72
                   make_column("index_params", &TableSchema::index_params_),
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
                   make_column("metric_type", &TableSchema::metric_type_),
                   make_column("owner_table", &TableSchema::owner_table_, default_value("")),
                   make_column("partition_tag", &TableSchema::partition_tag_, default_value("")),
                   make_column("version", &TableSchema::version_, default_value(CURRENT_VERSION)),
                   make_column("flush_lsn", &TableSchema::flush_lsn_)),
        make_table(
            META_TABLEFILES, make_column("id", &TableFileSchema::id_, primary_key()),
            make_column("table_id", &TableFileSchema::table_id_),
            make_column("segment_id", &TableFileSchema::segment_id_, default_value("")),
            make_column("engine_type", &TableFileSchema::engine_type_),
            make_column("file_id", &TableFileSchema::file_id_), make_column("file_type", &TableFileSchema::file_type_),
            make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
            make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
            make_column("updated_time", &TableFileSchema::updated_time_),
            make_column("created_on", &TableFileSchema::created_on_), make_column("date", &TableFileSchema::date_),
            make_column("flush_lsn", &TableFileSchema::flush_lsn_)));
X
Xu Peng 已提交
89 90
}

X
Xu Peng 已提交
91
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
92 93
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
94
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
95 96 97 98 99 100
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
101
Status
Y
youny626 已提交
102
SqliteMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
103
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
104
    std::stringstream ss;
J
Jin Hai 已提交
105 106
    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
107
    table_id = ss.str();
108 109 110
    return Status::OK();
}

S
starlord 已提交
111
Status
Y
youny626 已提交
112
SqliteMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
113
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
X
Xu Peng 已提交
114
    std::stringstream ss;
J
Jin Hai 已提交
115 116
    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
X
Xu Peng 已提交
117 118 119 120
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
121 122
void
SqliteMetaImpl::ValidateMetaSchema() {
S
shengjh 已提交
123 124 125
    bool is_null_connector{ConnectorPtr == nullptr};
    fiu_do_on("SqliteMetaImpl.ValidateMetaSchema.NullConnection", is_null_connector = true);
    if (is_null_connector) {
126 127 128
        return;
    }

Y
youny626 已提交
129
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
130
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
131 132
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
133 134
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
135 136
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
137 138 139 140
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
141 142
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
143 144
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
S
shengjh 已提交
145
        fiu_do_on("SqliteMetaImpl.Initialize.fail_create_directory", ret = false);
146
        if (!ret) {
S
starlord 已提交
147
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
148
            ENGINE_LOG_ERROR << msg;
S
shengjh 已提交
149
            throw Exception(DB_INVALID_PATH, msg);
150
        }
X
Xu Peng 已提交
151
    }
X
Xu Peng 已提交
152

S
starlord 已提交
153
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
154

155
    ValidateMetaSchema();
156

X
Xu Peng 已提交
157
    ConnectorPtr->sync_schema();
Y
youny626 已提交
158 159
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
160

161
    CleanUpShadowFiles();
X
Xu Peng 已提交
162

X
Xu Peng 已提交
163
    return Status::OK();
X
Xu Peng 已提交
164 165
}

G
groot 已提交
166
Status
G
groot 已提交
167
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
168 169 170
    try {
        server::MetricCollector metric;

171
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
172 173 174 175 176
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
        } else {
S
shengjh 已提交
177
            fiu_do_on("SqliteMetaImpl.CreateTable.throw_exception", throw std::exception());
G
groot 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
                } else {
                    // Change from no error to already exist.
                    return Status(DB_ALREADY_EXIST, "Table already exists");
                }
            }
        }

        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

        try {
S
shengjh 已提交
194
            fiu_do_on("SqliteMetaImpl.CreateTable.insert_throw_exception", throw std::exception());
G
groot 已提交
195 196
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id_ = id;
G
groot 已提交
197
        } catch (std::exception& e) {
G
groot 已提交
198 199 200 201 202 203
            return HandleException("Encounter exception when create table", e.what());
        }

        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
204
    } catch (std::exception& e) {
G
groot 已提交
205 206 207 208 209
        return HandleException("Encounter exception when create table", e.what());
    }
}

Status
G
groot 已提交
210
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
G
groot 已提交
211 212
    try {
        server::MetricCollector metric;
S
shengjh 已提交
213
        fiu_do_on("SqliteMetaImpl.DescribeTable.throw_exception", throw std::exception());
214 215 216
        auto groups = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
217
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::owner_table_,
218 219 220
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::table_id_) == table_schema.table_id_ and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
221 222 223 224 225 226 227 228 229

        if (groups.size() == 1) {
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.flag_ = std::get<4>(groups[0]);
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
230
            table_schema.index_params_ = std::get<7>(groups[0]);
G
groot 已提交
231 232 233 234
            table_schema.metric_type_ = std::get<8>(groups[0]);
            table_schema.owner_table_ = std::get<9>(groups[0]);
            table_schema.partition_tag_ = std::get<10>(groups[0]);
            table_schema.version_ = std::get<11>(groups[0]);
235
            table_schema.flush_lsn_ = std::get<12>(groups[0]);
G
groot 已提交
236 237 238
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
        }
G
groot 已提交
239
    } catch (std::exception& e) {
G
groot 已提交
240 241 242 243 244 245 246
        return HandleException("Encounter exception when describe table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
247
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
248 249 250
    has_or_not = false;

    try {
S
shengjh 已提交
251
        fiu_do_on("SqliteMetaImpl.HasTable.throw_exception", throw std::exception());
G
groot 已提交
252
        server::MetricCollector metric;
253 254 255
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
256 257 258 259 260
        if (tables.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
261
    } catch (std::exception& e) {
G
groot 已提交
262 263 264 265 266 267 268
        return HandleException("Encounter exception when lookup table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
269
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
270
    try {
S
shengjh 已提交
271
        fiu_do_on("SqliteMetaImpl.AllTables.throw_exception", throw std::exception());
G
groot 已提交
272
        server::MetricCollector metric;
273 274 275
        auto selected = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
276
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::owner_table_,
277 278
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE and c(&TableSchema::owner_table_) == ""));
G
groot 已提交
279
        for (auto& table : selected) {
G
groot 已提交
280 281 282 283 284 285 286 287
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
288
            schema.index_params_ = std::get<7>(table);
G
groot 已提交
289 290 291 292
            schema.metric_type_ = std::get<8>(table);
            schema.owner_table_ = std::get<9>(table);
            schema.partition_tag_ = std::get<10>(table);
            schema.version_ = std::get<11>(table);
293
            schema.flush_lsn_ = std::get<12>(table);
G
groot 已提交
294 295 296

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
297
    } catch (std::exception& e) {
G
groot 已提交
298 299 300 301 302 303 304
        return HandleException("Encounter exception when lookup all tables", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
305
SqliteMetaImpl::DropTable(const std::string& table_id) {
G
groot 已提交
306
    try {
S
shengjh 已提交
307 308
        fiu_do_on("SqliteMetaImpl.DropTable.throw_exception", throw std::exception());

G
groot 已提交
309 310
        server::MetricCollector metric;

311
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
312 313
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

314
        // soft delete table
G
groot 已提交
315
        ConnectorPtr->update_all(
316 317
            set(c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
318 319

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
G
groot 已提交
320
    } catch (std::exception& e) {
G
groot 已提交
321 322 323 324 325 326 327
        return HandleException("Encounter exception when delete table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
328
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
329
    try {
S
shengjh 已提交
330 331
        fiu_do_on("SqliteMetaImpl.DeleteTableFiles.throw_exception", throw std::exception());

G
groot 已提交
332 333
        server::MetricCollector metric;

334
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
335 336
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

337 338 339 340 341
        // soft delete table files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
342 343

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
G
groot 已提交
344
    } catch (std::exception& e) {
G
groot 已提交
345 346 347 348 349 350 351
        return HandleException("Encounter exception when delete table files", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
352
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
353 354 355 356 357 358 359 360 361 362 363
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    try {
S
shengjh 已提交
364
        fiu_do_on("SqliteMetaImpl.CreateTableFile.throw_exception", throw std::exception());
G
groot 已提交
365 366 367
        server::MetricCollector metric;

        NextFileId(file_schema.file_id_);
368 369 370
        if (file_schema.segment_id_.empty()) {
            file_schema.segment_id_ = file_schema.file_id_;
        }
G
groot 已提交
371 372 373 374 375 376
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
377
        file_schema.index_params_ = table_schema.index_params_;
378
        file_schema.engine_type_ = table_schema.engine_type_;
G
groot 已提交
379 380
        file_schema.metric_type_ = table_schema.metric_type_;

381
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
382 383 384 385 386 387 388
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
G
groot 已提交
389
    } catch (std::exception& e) {
G
groot 已提交
390 391 392 393 394 395
        return HandleException("Encounter exception when create table file", e.what());
    }

    return Status::OK();
}

S
starlord 已提交
396
Status
397
SqliteMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
G
groot 已提交
398
                              TableFilesSchema& table_files) {
G
groot 已提交
399
    try {
S
shengjh 已提交
400 401
        fiu_do_on("SqliteMetaImpl.GetTableFiles.throw_exception", throw std::exception());

G
groot 已提交
402
        table_files.clear();
403 404 405 406 407 408
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::id_, ids) and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
409 410 411 412 413
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
G
groot 已提交
414
        }
G
groot 已提交
415

G
groot 已提交
416
        Status result;
G
groot 已提交
417
        for (auto& file : files) {
G
groot 已提交
418 419 420
            TableFileSchema file_schema;
            file_schema.table_id_ = table_id;
            file_schema.id_ = std::get<0>(file);
421 422 423 424 425 426 427 428
            file_schema.segment_id_ = std::get<1>(file);
            file_schema.file_id_ = std::get<2>(file);
            file_schema.file_type_ = std::get<3>(file);
            file_schema.file_size_ = std::get<4>(file);
            file_schema.row_count_ = std::get<5>(file);
            file_schema.date_ = std::get<6>(file);
            file_schema.engine_type_ = std::get<7>(file);
            file_schema.created_on_ = std::get<8>(file);
G
groot 已提交
429 430
            file_schema.dimension_ = table_schema.dimension_;
            file_schema.index_file_size_ = table_schema.index_file_size_;
431
            file_schema.index_params_ = table_schema.index_params_;
G
groot 已提交
432
            file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
433

G
groot 已提交
434
            utils::GetTableFilePath(options_, file_schema);
435

G
groot 已提交
436 437
            table_files.emplace_back(file_schema);
        }
438

G
groot 已提交
439 440
        ENGINE_LOG_DEBUG << "Get table files by id";
        return result;
G
groot 已提交
441
    } catch (std::exception& e) {
G
groot 已提交
442
        return HandleException("Encounter exception when lookup table files", e.what());
443
    }
X
Xu Peng 已提交
444 445
}

446 447 448 449 450 451
Status
SqliteMetaImpl::GetTableFilesBySegmentId(const std::string& segment_id,
                                         milvus::engine::meta::TableFilesSchema& table_files) {
    try {
        table_files.clear();
        auto files = ConnectorPtr->select(
Z
Zhiru Zhu 已提交
452 453 454 455
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
            where(c(&TableFileSchema::segment_id_) == segment_id and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));

        if (!files.empty()) {
            TableSchema table_schema;
            table_schema.table_id_ = std::get<1>(files[0]);
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }

            for (auto& file : files) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_schema.table_id_;
                file_schema.id_ = std::get<0>(file);
                file_schema.segment_id_ = std::get<2>(file);
                file_schema.file_id_ = std::get<3>(file);
                file_schema.file_type_ = std::get<4>(file);
                file_schema.file_size_ = std::get<5>(file);
                file_schema.row_count_ = std::get<6>(file);
                file_schema.date_ = std::get<7>(file);
                file_schema.engine_type_ = std::get<8>(file);
                file_schema.created_on_ = std::get<9>(file);
                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
481
                file_schema.index_params_ = table_schema.index_params_;
482 483 484 485 486 487 488 489 490 491 492 493 494 495
                file_schema.metric_type_ = table_schema.metric_type_;

                utils::GetTableFilePath(options_, file_schema);
                table_files.emplace_back(file_schema);
            }
        }

        ENGINE_LOG_DEBUG << "Get table files by segment id";
        return Status::OK();
    } catch (std::exception& e) {
        return HandleException("Encounter exception when lookup table files by segment id", e.what());
    }
}

S
starlord 已提交
496
Status
G
groot 已提交
497
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
G
groot 已提交
498
    try {
Y
Yu Kun 已提交
499
        server::MetricCollector metric;
S
shengjh 已提交
500
        fiu_do_on("SqliteMetaImpl.UpdateTableFlag.throw_exception", throw std::exception());
G
groot 已提交
501

502 503
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableSchema::flag_) = flag), where(c(&TableSchema::table_id_) == table_id));
G
groot 已提交
504
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
G
groot 已提交
505
    } catch (std::exception& e) {
G
groot 已提交
506 507
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e.what());
G
groot 已提交
508 509 510 511 512
    }

    return Status::OK();
}

513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
Status
SqliteMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_lsn) {
    try {
        server::MetricCollector metric;

        ConnectorPtr->update_all(set(c(&TableSchema::flush_lsn_) = flush_lsn),
                                 where(c(&TableSchema::table_id_) == table_id));
        ENGINE_LOG_DEBUG << "Successfully update table flush_lsn, table id = " << table_id;
    } catch (std::exception& e) {
        std::string msg = "Encounter exception when update table lsn: table_id = " + table_id;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFlushLSN(const std::string& table_id, uint64_t& flush_lsn) {
    try {
        server::MetricCollector metric;

        auto selected =
            ConnectorPtr->select(columns(&TableSchema::flush_lsn_), where(c(&TableSchema::table_id_) == table_id));

        if (selected.size() > 0) {
            flush_lsn = std::get<0>(selected[0]);
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
        }

    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, TableFilesSchema& table_files) {
    table_files.clear();

    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::flush_lsn_) == flush_lsn));

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
        for (auto& file : selected) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
596
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
597 598 599 600 601 602 603 604 605 606 607 608 609
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            table_files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " files with flush_lsn = " << flush_lsn;
        }
        return ret;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }
}

S
starlord 已提交
610
Status
G
groot 已提交
611
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
612
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
G
groot 已提交
613
    try {
Y
Yu Kun 已提交
614
        server::MetricCollector metric;
S
shengjh 已提交
615
        fiu_do_on("SqliteMetaImpl.UpdateTableFile.throw_exception", throw std::exception());
G
groot 已提交
616

Y
youny626 已提交
617
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
618 619
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
620 621
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));
G
groot 已提交
622

623 624
        // if the table has been deleted, just mark the table file as TO_DELETE
        // clean thread will delete the file later
G
groot 已提交
625
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
626 627
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }
G
groot 已提交
628

G
groot 已提交
629 630 631
        ConnectorPtr->update(file_schema);

        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
G
groot 已提交
632
    } catch (std::exception& e) {
633 634
        std::string msg =
            "Exception update table file: table_id = " + file_schema.table_id_ + " file_id = " + file_schema.file_id_;
G
groot 已提交
635 636
        return HandleException(msg, e.what());
    }
G
groot 已提交
637 638 639
    return Status::OK();
}

S
starlord 已提交
640
Status
G
groot 已提交
641
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
642
    try {
Y
Yu Kun 已提交
643
        server::MetricCollector metric;
S
shengjh 已提交
644
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.throw_exception", throw std::exception());
G
groot 已提交
645

646
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
647
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);
G
groot 已提交
648

G
groot 已提交
649
        std::map<std::string, bool> has_tables;
G
groot 已提交
650
        for (auto& file : files) {
G
groot 已提交
651 652 653 654
            if (has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
655 656
                                               where(c(&TableSchema::table_id_) == file.table_id_ and
                                                     c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
657 658 659 660 661
            if (tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
662
        }
P
peng.xu 已提交
663

G
groot 已提交
664
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
665
            for (auto& file : files) {
G
groot 已提交
666 667
                if (!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
668
                }
G
groot 已提交
669 670 671

                file.updated_time_ = utils::GetMicroSecTimeStamp();
                ConnectorPtr->update(file);
672
            }
G
groot 已提交
673 674
            return true;
        });
S
shengjh 已提交
675
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.fail_commited", commited = false);
676

G
groot 已提交
677 678
        if (!commited) {
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
P
peng.xu 已提交
679
        }
G
groot 已提交
680 681

        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
G
groot 已提交
682
    } catch (std::exception& e) {
G
groot 已提交
683
        return HandleException("Encounter exception when update table files", e.what());
P
peng.xu 已提交
684 685 686 687
    }
    return Status::OK();
}

S
starlord 已提交
688
Status
Y
youny626 已提交
689
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
690
    try {
Y
Yu Kun 已提交
691
        server::MetricCollector metric;
S
shengjh 已提交
692
        fiu_do_on("SqliteMetaImpl.UpdateTableIndex.throw_exception", throw std::exception());
693

Y
youny626 已提交
694
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
695 696
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

697 698 699 700 701
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::owner_table_,
                    &TableSchema::partition_tag_, &TableSchema::version_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
702

S
starlord 已提交
703
        if (tables.size() > 0) {
704 705 706 707 708 709
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
710
            table_schema.flag_ = std::get<4>(tables[0]);
711
            table_schema.index_file_size_ = std::get<5>(tables[0]);
G
groot 已提交
712 713 714
            table_schema.owner_table_ = std::get<6>(tables[0]);
            table_schema.partition_tag_ = std::get<7>(tables[0]);
            table_schema.version_ = std::get<8>(tables[0]);
715
            table_schema.engine_type_ = index.engine_type_;
716
            table_schema.index_params_ = index.extra_params_.dump();
S
starlord 已提交
717
            table_schema.metric_type_ = index.metric_type_;
718 719 720

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
721
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
722 723
        }

724 725 726 727 728
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
729

730
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
731
    } catch (std::exception& e) {
732
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
733
        return HandleException(msg, e.what());
734
    }
S
starlord 已提交
735 736 737 738

    return Status::OK();
}

S
starlord 已提交
739
Status
G
groot 已提交
740
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
S
starlord 已提交
741
    try {
Y
Yu Kun 已提交
742
        server::MetricCollector metric;
S
shengjh 已提交
743
        fiu_do_on("SqliteMetaImpl.UpdateTableFilesToIndex.throw_exception", throw std::exception());
S
starlord 已提交
744

745
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
746 747
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

748 749 750 751
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
G
groot 已提交
752 753

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
G
groot 已提交
754
    } catch (std::exception& e) {
G
groot 已提交
755
        return HandleException("Encounter exception when update table files to to_index", e.what());
S
starlord 已提交
756 757
    }

758 759 760
    return Status::OK();
}

S
starlord 已提交
761
Status
Y
youny626 已提交
762
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
763
    try {
Y
Yu Kun 已提交
764
        server::MetricCollector metric;
S
shengjh 已提交
765
        fiu_do_on("SqliteMetaImpl.DescribeTableIndex.throw_exception", throw std::exception());
766

767
        auto groups = ConnectorPtr->select(
768
            columns(&TableSchema::engine_type_, &TableSchema::index_params_, &TableSchema::metric_type_),
769
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
770 771 772

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
773
            index.extra_params_ = milvus::json::parse(std::get<1>(groups[0]));
S
starlord 已提交
774
            index.metric_type_ = std::get<2>(groups[0]);
775
        } else {
S
starlord 已提交
776
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
777
        }
Y
youny626 已提交
778
    } catch (std::exception& e) {
S
starlord 已提交
779
        return HandleException("Encounter exception when describe index", e.what());
780 781 782 783 784
    }

    return Status::OK();
}

S
starlord 已提交
785
Status
Y
youny626 已提交
786
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
787
    try {
Y
Yu Kun 已提交
788
        server::MetricCollector metric;
S
shengjh 已提交
789
        fiu_do_on("SqliteMetaImpl.DropTableIndex.throw_exception", throw std::exception());
790

Y
youny626 已提交
791
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
792 793
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

794 795 796 797 798 799 800 801 802 803 804 805 806
        // soft delete index files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));

        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));

        // set table index type to raw
807 808 809 810 811 812 813 814 815 816
        auto groups = ConnectorPtr->select(columns(&TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id));

        int32_t raw_engine_type = DEFAULT_ENGINE_TYPE;
        if (groups.size() == 1) {
            int32_t metric_type_ = std::get<0>(groups[0]);
            if (engine::utils::IsBinaryMetricType(metric_type_)) {
                raw_engine_type = (int32_t)EngineType::FAISS_BIN_IDMAP;
            }
        }
G
groot 已提交
817
        ConnectorPtr->update_all(
818
            set(c(&TableSchema::engine_type_) = raw_engine_type, c(&TableSchema::index_params_) = "{}"),
819
            where(c(&TableSchema::table_id_) == table_id));
820

821
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
G
groot 已提交
822
    } catch (std::exception& e) {
S
starlord 已提交
823
        return HandleException("Encounter exception when delete table index files", e.what());
824 825 826 827 828
    }

    return Status::OK();
}

S
starlord 已提交
829
Status
830 831
SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag,
                                uint64_t lsn) {
G
groot 已提交
832
    server::MetricCollector metric;
833

G
groot 已提交
834 835 836 837 838
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
G
groot 已提交
839
    }
G
groot 已提交
840

G
groot 已提交
841
    // not allow create partition under partition
G
groot 已提交
842
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
843
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
844
    }
G
groot 已提交
845

846 847 848 849 850 851 852 853
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
G
groot 已提交
854
    if (!exist_partition.empty()) {
G
groot 已提交
855
        return Status(DB_ERROR, "Duplicate partition is not allowed");
856
    }
G
groot 已提交
857

858 859
    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
860 861 862
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
X
Xu Peng 已提交
863
    }
G
groot 已提交
864

G
groot 已提交
865 866 867 868
    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
869
    table_schema.partition_tag_ = valid_tag;
870
    table_schema.flush_lsn_ = lsn;
871 872 873 874 875

    status = CreateTable(table_schema);
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }
G
groot 已提交
876

877
    return status;
X
Xu Peng 已提交
878 879
}

S
starlord 已提交
880
Status
G
groot 已提交
881 882 883
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}
884

G
groot 已提交
885
Status
G
groot 已提交
886
SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
G
groot 已提交
887
    try {
Y
Yu Kun 已提交
888
        server::MetricCollector metric;
S
shengjh 已提交
889
        fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception());
G
groot 已提交
890

891 892 893 894 895 896 897 898
        auto partitions = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::partition_tag_,
                    &TableSchema::version_, &TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

899
        for (size_t i = 0; i < partitions.size(); i++) {
G
groot 已提交
900
            meta::TableSchema partition_schema;
901 902 903 904 905 906 907
            partition_schema.id_ = std::get<0>(partitions[i]);
            partition_schema.state_ = std::get<1>(partitions[i]);
            partition_schema.dimension_ = std::get<2>(partitions[i]);
            partition_schema.created_on_ = std::get<3>(partitions[i]);
            partition_schema.flag_ = std::get<4>(partitions[i]);
            partition_schema.index_file_size_ = std::get<5>(partitions[i]);
            partition_schema.engine_type_ = std::get<6>(partitions[i]);
908
            partition_schema.index_params_ = std::get<7>(partitions[i]);
909 910 911 912 913
            partition_schema.metric_type_ = std::get<8>(partitions[i]);
            partition_schema.owner_table_ = table_id;
            partition_schema.partition_tag_ = std::get<9>(partitions[i]);
            partition_schema.version_ = std::get<10>(partitions[i]);
            partition_schema.table_id_ = std::get<11>(partitions[i]);
G
groot 已提交
914
            partition_schema_array.emplace_back(partition_schema);
G
groot 已提交
915
        }
G
groot 已提交
916
    } catch (std::exception& e) {
G
groot 已提交
917
        return HandleException("Encounter exception when show partitions", e.what());
918 919
    }

X
Xu Peng 已提交
920
    return Status::OK();
X
Xu Peng 已提交
921 922
}

S
starlord 已提交
923
Status
G
groot 已提交
924
SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
925
    try {
Y
Yu Kun 已提交
926
        server::MetricCollector metric;
S
shengjh 已提交
927
        fiu_do_on("SqliteMetaImpl.GetPartitionName.throw_exception", throw std::exception());
G
groot 已提交
928

929 930 931 932 933
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

934 935 936 937
        auto name = ConnectorPtr->select(
            columns(&TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and c(&TableSchema::partition_tag_) == valid_tag and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
938 939 940
        if (name.size() > 0) {
            partition_name = std::get<0>(name[0]);
        } else {
941
            return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
942
        }
G
groot 已提交
943
    } catch (std::exception& e) {
G
groot 已提交
944
        return HandleException("Encounter exception when get partition name", e.what());
X
Xu Peng 已提交
945
    }
G
groot 已提交
946 947

    return Status::OK();
X
Xu Peng 已提交
948 949
}

S
starlord 已提交
950
Status
951
SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
X
xj.lin 已提交
952
    files.clear();
Y
Yu Kun 已提交
953
    server::MetricCollector metric;
X
xj.lin 已提交
954 955

    try {
S
shengjh 已提交
956 957
        fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());

Y
youny626 已提交
958
        auto select_columns =
959 960 961
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
962 963

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
964

Y
youny626 已提交
965 966
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
967
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
968 969 970 971

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
972 973 974
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
975

Y
youny626 已提交
976
        // perform query
977
        decltype(ConnectorPtr->select(select_columns)) selected;
978
        if (ids.empty()) {
X
xj.lin 已提交
979
            auto filter = where(match_tableid and match_type);
980
            selected = ConnectorPtr->select(select_columns, filter);
981
        } else {
X
xj.lin 已提交
982
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
983
            auto filter = where(match_tableid and match_fileid and match_type);
984
            selected = ConnectorPtr->select(select_columns, filter);
X
xj.lin 已提交
985 986
        }

S
starlord 已提交
987
        Status ret;
Y
youny626 已提交
988
        for (auto& file : selected) {
989
            TableFileSchema table_file;
X
xj.lin 已提交
990 991
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
992 993 994 995 996 997 998
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
X
xj.lin 已提交
999
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1000
            table_file.index_file_size_ = table_schema.index_file_size_;
1001
            table_file.index_params_ = table_schema.index_params_;
S
starlord 已提交
1002 1003
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1004
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1005
            if (!status.ok()) {
S
starlord 已提交
1006
                ret = status;
S
starlord 已提交
1007 1008
            }

1009
            files.emplace_back(table_file);
X
xj.lin 已提交
1010
        }
S
starlord 已提交
1011
        if (files.empty()) {
S
starlord 已提交
1012
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
1013
        }
S
starlord 已提交
1014

S
starlord 已提交
1015
        if (selected.size() > 0) {
1016 1017
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
1018
        return ret;
Y
youny626 已提交
1019
    } catch (std::exception& e) {
S
starlord 已提交
1020
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
1021 1022 1023
    }
}

S
starlord 已提交
1024
Status
1025
SqliteMetaImpl::FilesToMerge(const std::string& table_id, TableFilesSchema& files) {
X
Xu Peng 已提交
1026
    files.clear();
X
Xu Peng 已提交
1027

1028
    try {
S
shengjh 已提交
1029 1030
        fiu_do_on("SqliteMetaImpl.FilesToMerge.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1031
        server::MetricCollector metric;
G
groot 已提交
1032

Y
youny626 已提交
1033
        // check table existence
S
starlord 已提交
1034 1035 1036 1037 1038 1039 1040
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
1041 1042
        // get files to merge
        auto selected = ConnectorPtr->select(
1043 1044 1045
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::created_on_),
Y
youny626 已提交
1046 1047 1048
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
1049

S
starlord 已提交
1050
        Status result;
1051
        int64_t to_merge_files = 0;
Y
youny626 已提交
1052
        for (auto& file : selected) {
S
starlord 已提交
1053
            TableFileSchema table_file;
1054
            table_file.file_size_ = std::get<5>(file);
S
starlord 已提交
1055
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
1056
                continue;  // skip large file
S
starlord 已提交
1057 1058
            }

G
groot 已提交
1059 1060
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1061 1062 1063 1064 1065 1066
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
1067
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1068
            table_file.index_file_size_ = table_schema.index_file_size_;
1069
            table_file.index_params_ = table_schema.index_params_;
S
starlord 已提交
1070 1071
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1072
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1073
            if (!status.ok()) {
S
starlord 已提交
1074
                result = status;
S
starlord 已提交
1075 1076
            }

1077
            files.emplace_back(table_file);
1078
            ++to_merge_files;
X
Xu Peng 已提交
1079
        }
S
starlord 已提交
1080

1081 1082
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1083
        }
G
groot 已提交
1084 1085 1086 1087 1088 1089 1090
        return result;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when iterate merge files", e.what());
    }
}

Status
G
groot 已提交
1091
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
G
groot 已提交
1092 1093 1094
    files.clear();

    try {
S
shengjh 已提交
1095 1096
        fiu_do_on("SqliteMetaImpl.FilesToIndex.throw_exception", throw std::exception());

G
groot 已提交
1097 1098
        server::MetricCollector metric;

1099 1100 1101 1102 1103 1104
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_INDEX));
G
groot 已提交
1105 1106 1107 1108 1109

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
G
groot 已提交
1110
        for (auto& file : selected) {
G
groot 已提交
1111 1112
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1113 1114 1115 1116 1117 1118 1119 1120
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);
G
groot 已提交
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
S
shengjh 已提交
1131 1132
                fiu_do_on("SqliteMetaImpl_FilesToIndex_TableNotFound",
                          status = Status(DB_NOT_FOUND, "table not found"));
G
groot 已提交
1133 1134 1135 1136 1137 1138 1139
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
1140
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
G
groot 已提交
1141 1142 1143 1144 1145 1146 1147 1148
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
        return ret;
G
groot 已提交
1149
    } catch (std::exception& e) {
G
groot 已提交
1150
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
1151
    }
X
Xu Peng 已提交
1152 1153
}

S
starlord 已提交
1154
Status
1155
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
G
groot 已提交
1156
                            TableFilesSchema& table_files) {
G
groot 已提交
1157 1158 1159
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
    }
1160

1161 1162 1163 1164 1165 1166 1167 1168 1169
    Status ret = Status::OK();

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1170
    try {
S
shengjh 已提交
1171 1172
        fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());

G
groot 已提交
1173
        table_files.clear();
1174 1175 1176 1177 1178
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1179

G
groot 已提交
1180 1181 1182
        if (selected.size() >= 1) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
G
groot 已提交
1183 1184 1185 1186
            for (auto& file : selected) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_id;
                file_schema.id_ = std::get<0>(file);
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197
                file_schema.segment_id_ = std::get<1>(file);
                file_schema.file_id_ = std::get<2>(file);
                file_schema.file_type_ = std::get<3>(file);
                file_schema.file_size_ = std::get<4>(file);
                file_schema.row_count_ = std::get<5>(file);
                file_schema.date_ = std::get<6>(file);
                file_schema.engine_type_ = std::get<7>(file);
                file_schema.created_on_ = std::get<8>(file);

                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
1198
                file_schema.index_params_ = table_schema.index_params_;
1199
                file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
1200 1201

                switch (file_schema.file_type_) {
1202
                    case (int)TableFileSchema::RAW:++raw_count;
G
groot 已提交
1203
                        break;
1204
                    case (int)TableFileSchema::NEW:++new_count;
G
groot 已提交
1205
                        break;
1206
                    case (int)TableFileSchema::NEW_MERGE:++new_merge_count;
G
groot 已提交
1207
                        break;
1208
                    case (int)TableFileSchema::NEW_INDEX:++new_index_count;
G
groot 已提交
1209
                        break;
1210
                    case (int)TableFileSchema::TO_INDEX:++to_index_count;
G
groot 已提交
1211
                        break;
1212
                    case (int)TableFileSchema::INDEX:++index_count;
1213
                        break;
1214
                    case (int)TableFileSchema::BACKUP:++backup_count;
G
groot 已提交
1215
                        break;
1216
                    default:break;
1217 1218 1219 1220 1221
                }

                auto status = utils::GetTableFilePath(options_, file_schema);
                if (!status.ok()) {
                    ret = status;
G
groot 已提交
1222
                }
G
groot 已提交
1223 1224

                table_files.emplace_back(file_schema);
G
groot 已提交
1225
            }
1226

G
groot 已提交
1227
            std::string msg = "Get table files by type.";
G
groot 已提交
1228 1229
            for (int file_type : file_types) {
                switch (file_type) {
1230
                    case (int)TableFileSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
G
groot 已提交
1231
                        break;
1232
                    case (int)TableFileSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
G
groot 已提交
1233
                        break;
1234 1235
                    case (int)TableFileSchema::NEW_MERGE:msg = msg + " new_merge files:"
                                                               + std::to_string(new_merge_count);
G
groot 已提交
1236
                        break;
1237 1238
                    case (int)TableFileSchema::NEW_INDEX:msg = msg + " new_index files:"
                                                               + std::to_string(new_index_count);
G
groot 已提交
1239
                        break;
1240
                    case (int)TableFileSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count);
G
groot 已提交
1241
                        break;
1242
                    case (int)TableFileSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
G
groot 已提交
1243
                        break;
1244
                    case (int)TableFileSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
1245
                        break;
1246
                    default:break;
G
groot 已提交
1247 1248 1249
                }
            }
            ENGINE_LOG_DEBUG << msg;
X
Xu Peng 已提交
1250
        }
G
groot 已提交
1251
    } catch (std::exception& e) {
G
groot 已提交
1252
        return HandleException("Encounter exception when check non index files", e.what());
X
Xu Peng 已提交
1253
    }
1254 1255

    return ret;
X
Xu Peng 已提交
1256 1257
}

S
starlord 已提交
1258
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1259 1260
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
1261
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
1262 1263 1264 1265 1266
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
1267 1268
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
1269
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
L
Lizhou Gao 已提交
1270
            int64_t usecs = limit * DAY * US_PS;
S
starlord 已提交
1271
            int64_t now = utils::GetMicroSecTimeStamp();
1272
            try {
S
shengjh 已提交
1273 1274
                fiu_do_on("SqliteMetaImpl.Archive.throw_exception", throw std::exception());

Y
youny626 已提交
1275
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1276 1277
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1278 1279 1280
                ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
                                         where(c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                                               c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
1281
            } catch (std::exception& e) {
S
starlord 已提交
1282
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1283
            }
1284 1285

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
1286
        }
G
groot 已提交
1287
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
1288
            uint64_t sum = 0;
X
Xu Peng 已提交
1289
            Size(sum);
X
Xu Peng 已提交
1290

Y
youny626 已提交
1291
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
1292
            DiscardFiles(to_delete);
1293 1294

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
1295 1296 1297 1298 1299 1300
        }
    }

    return Status::OK();
}

S
starlord 已提交
1301
Status
Y
youny626 已提交
1302
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
1303
    result = 0;
X
Xu Peng 已提交
1304
    try {
S
shengjh 已提交
1305 1306
        fiu_do_on("SqliteMetaImpl.Size.throw_exception", throw std::exception());

1307
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
1308 1309
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
1310 1311
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
1312
            }
Y
youny626 已提交
1313
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
1314
        }
Y
youny626 已提交
1315
    } catch (std::exception& e) {
S
starlord 已提交
1316
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
1317 1318 1319 1320 1321
    }

    return Status::OK();
}

S
starlord 已提交
1322
Status
1323
SqliteMetaImpl::CleanUpShadowFiles() {
X
Xu Peng 已提交
1324
    try {
Y
Yu Kun 已提交
1325
        server::MetricCollector metric;
G
groot 已提交
1326

Y
youny626 已提交
1327
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1328 1329
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1330 1331
        std::vector<int> file_types = {(int)TableFileSchema::NEW, (int)TableFileSchema::NEW_INDEX,
                                       (int)TableFileSchema::NEW_MERGE};
G
groot 已提交
1332 1333
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1334

G
groot 已提交
1335
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
1336
            for (auto& file : files) {
G
groot 已提交
1337 1338
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
G
groot 已提交
1339 1340 1341 1342
            }
            return true;
        });

S
shengjh 已提交
1343 1344
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.fail_commited", commited = false);
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.throw_exception", throw std::exception());
G
groot 已提交
1345
        if (!commited) {
G
groot 已提交
1346
            return HandleException("CleanUp error: sqlite transaction failed");
G
groot 已提交
1347
        }
X
Xu Peng 已提交
1348

G
groot 已提交
1349 1350
        if (files.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
G
groot 已提交
1351
        }
G
groot 已提交
1352
    } catch (std::exception& e) {
G
groot 已提交
1353
        return HandleException("Encounter exception when clean table file", e.what());
P
peng.xu 已提交
1354 1355 1356 1357 1358
    }

    return Status::OK();
}

1359
Status
1360
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) {
X
Xu Peng 已提交
1361
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1362 1363
    std::set<std::string> table_ids;

Y
youny626 已提交
1364
    // remove to_delete files
1365
    try {
S
shengjh 已提交
1366 1367
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_ThrowException", throw std::exception());

Y
Yu Kun 已提交
1368
        server::MetricCollector metric;
1369

G
groot 已提交
1370 1371 1372 1373 1374
        std::vector<int> file_types = {
            (int)TableFileSchema::TO_DELETE,
            (int)TableFileSchema::BACKUP,
        };

Y
youny626 已提交
1375
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1376 1377
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1378
        // collect files to be deleted
1379 1380 1381 1382 1383 1384
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::engine_type_, &TableFileSchema::file_id_, &TableFileSchema::file_type_,
                    &TableFileSchema::date_),
            where(in(&TableFileSchema::file_type_, file_types) and
                  c(&TableFileSchema::updated_time_) < now - seconds * US_PS));
1385

G
groot 已提交
1386
        int64_t clean_files = 0;
G
groot 已提交
1387 1388
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1389
            for (auto& file : files) {
G
groot 已提交
1390 1391
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
1392 1393 1394 1395 1396
                table_file.segment_id_ = std::get<2>(file);
                table_file.engine_type_ = std::get<3>(file);
                table_file.file_id_ = std::get<4>(file);
                table_file.file_type_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
G
groot 已提交
1397

1398
                // check if the file can be deleted
1399
                if (OngoingFileChecker::GetInstance().IsIgnored(table_file)) {
G
groot 已提交
1400 1401
                    ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
                                     << " currently is in use, not able to delete now";
1402
                    continue;  // ignore this file, don't delete it
1403 1404
                }

G
groot 已提交
1405 1406
                // erase from cache, must do this before file deleted,
                // because GetTableFilePath won't able to generate file path after the file is deleted
1407
                // TODO(zhiru): clean up
G
groot 已提交
1408 1409
                utils::GetTableFilePath(options_, table_file);
                server::CommonUtil::EraseFromCache(table_file.location_);
G
groot 已提交
1410

G
groot 已提交
1411
                if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
1412 1413 1414
                    // If we are deleting a raw table file, it means it's okay to delete the entire segment directory.
                    // Else, we can only delete the single file
                    // TODO(zhiru): We determine whether a table file is raw by its engine type. This is a bit hacky
1415
                    if (utils::IsRawIndexType(table_file.engine_type_)) {
1416 1417 1418 1419 1420 1421 1422 1423 1424
                        utils::DeleteSegment(options_, table_file);
                        std::string segment_dir;
                        utils::GetParentPath(table_file.location_, segment_dir);
                        ENGINE_LOG_DEBUG << "Remove segment directory: " << segment_dir;
                    } else {
                        utils::DeleteTableFilePath(options_, table_file);
                        ENGINE_LOG_DEBUG << "Remove table file: " << table_file.location_;
                    }

G
groot 已提交
1425 1426
                    // delete file from meta
                    ConnectorPtr->remove<TableFileSchema>(table_file.id_);
G
groot 已提交
1427

G
groot 已提交
1428
                    table_ids.insert(table_file.table_id_);
G
groot 已提交
1429

1430
                    ++clean_files;
G
typo  
groot 已提交
1431
                }
1432
            }
G
groot 已提交
1433 1434
            return true;
        });
S
shengjh 已提交
1435
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_FailCommited", commited = false);
G
groot 已提交
1436 1437

        if (!commited) {
S
starlord 已提交
1438
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1439 1440
        }

G
groot 已提交
1441
        if (clean_files > 0) {
G
groot 已提交
1442
            ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
1443
        }
Y
youny626 已提交
1444
    } catch (std::exception& e) {
S
starlord 已提交
1445
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1446 1447
    }

Y
youny626 已提交
1448
    // remove to_delete tables
G
groot 已提交
1449
    try {
S
shengjh 已提交
1450
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1451
        server::MetricCollector metric;
G
groot 已提交
1452

Y
youny626 已提交
1453
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1454 1455
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1456 1457
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1458 1459

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1460 1461
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1462
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1463
            }
G
groot 已提交
1464 1465 1466

            return true;
        });
S
shengjh 已提交
1467
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_Failcommited", commited = false);
G
groot 已提交
1468 1469

        if (!commited) {
S
starlord 已提交
1470
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1471
        }
G
groot 已提交
1472

S
starlord 已提交
1473
        if (tables.size() > 0) {
1474 1475
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1476
    } catch (std::exception& e) {
S
starlord 已提交
1477
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1478 1479
    }

Y
youny626 已提交
1480 1481
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1482
    try {
S
shengjh 已提交
1483
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTableFolder_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1484
        server::MetricCollector metric;
S
starlord 已提交
1485

1486
        int64_t remove_tables = 0;
Y
youny626 已提交
1487
        for (auto& table_id : table_ids) {
S
starlord 已提交
1488 1489
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1490
            if (selected.size() == 0) {
S
starlord 已提交
1491
                utils::DeleteTablePath(options_, table_id);
1492
                ++remove_tables;
S
starlord 已提交
1493 1494 1495
            }
        }

1496 1497
        if (remove_tables) {
            ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables folder";
1498
        }
Y
youny626 已提交
1499
    } catch (std::exception& e) {
S
starlord 已提交
1500
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1501 1502
    }

X
Xu Peng 已提交
1503 1504 1505
    return Status::OK();
}

S
starlord 已提交
1506
Status
G
groot 已提交
1507
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1508
    try {
S
shengjh 已提交
1509 1510
        fiu_do_on("SqliteMetaImpl.Count.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1511
        server::MetricCollector metric;
1512

Y
youny626 已提交
1513 1514 1515 1516 1517
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1518

1519
        TableSchema table_schema;
G
groot 已提交
1520
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1521
        auto status = DescribeTable(table_schema);
1522

1523 1524 1525 1526 1527
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1528
        for (auto& file : selected) {
1529 1530
            result += std::get<0>(file);
        }
Y
youny626 已提交
1531
    } catch (std::exception& e) {
S
starlord 已提交
1532
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1533 1534 1535 1536
    }
    return Status::OK();
}

S
starlord 已提交
1537 1538
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1539 1540 1541
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1542 1543
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1544
    } catch (std::exception& e) {
S
starlord 已提交
1545
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1546
    }
S
starlord 已提交
1547

X
Xu Peng 已提交
1548 1549 1550
    return Status::OK();
}

G
groot 已提交
1551 1552 1553 1554 1555 1556 1557 1558 1559
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }

    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
S
shengjh 已提交
1560 1561
        fiu_do_on("SqliteMetaImpl.DiscardFiles.throw_exception", throw std::exception());

G
groot 已提交
1562 1563
        server::MetricCollector metric;

1564
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
1565 1566 1567
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto commited = ConnectorPtr->transaction([&]() mutable {
1568 1569 1570 1571
            auto selected =
                ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::file_size_),
                                     where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE),
                                     order_by(&TableFileSchema::id_), limit(10));
G
groot 已提交
1572 1573 1574 1575

            std::vector<int> ids;
            TableFileSchema table_file;

G
groot 已提交
1576
            for (auto& file : selected) {
1577 1578
                if (to_discard_size <= 0)
                    break;
G
groot 已提交
1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590
                table_file.id_ = std::get<0>(file);
                table_file.file_size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            if (ids.size() == 0) {
                return true;
            }

1591 1592 1593
            ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                         c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                     where(in(&TableFileSchema::id_, ids)));
G
groot 已提交
1594 1595 1596

            return true;
        });
S
shengjh 已提交
1597
        fiu_do_on("SqliteMetaImpl.DiscardFiles.fail_commited", commited = false);
G
groot 已提交
1598 1599 1600
        if (!commited) {
            return HandleException("DiscardFiles error: sqlite transaction failed");
        }
G
groot 已提交
1601
    } catch (std::exception& e) {
G
groot 已提交
1602 1603 1604 1605 1606 1607
        return HandleException("Encounter exception when discard table file", e.what());
    }

    return DiscardFiles(to_discard_size);
}

1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
Status
SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            EnvironmentSchema env;
            env.global_lsn_ = lsn;
            ConnectorPtr->insert(env);
        } else {
            uint64_t last_lsn = std::get<0>(selected[0]);
            if (lsn == last_lsn) {
                return Status::OK();
            }

            ConnectorPtr->update_all(set(c(&EnvironmentSchema::global_lsn_) = lsn));
        }

        ENGINE_LOG_DEBUG << "Update global lsn = " << lsn;
    } catch (std::exception& e) {
        std::string msg = "Exception update global lsn = " + lsn;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            lsn = 0;
        } else {
            lsn = std::get<0>(selected[0]);
        }
    } catch (std::exception& e) {
        return HandleException("Encounter exception when delete table folder", e.what());
    }

    return Status::OK();
}
G
groot 已提交
1653

1654 1655 1656
}  // namespace meta
}  // namespace engine
}  // namespace milvus