SqliteMetaImpl.cpp 67.1 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/meta/SqliteMetaImpl.h"
X
Xu Peng 已提交
13

Y
youny626 已提交
14
#include <sqlite_orm.h>
X
Xu Peng 已提交
15
#include <unistd.h>
16

X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
Y
youny626 已提交
20
#include <iostream>
S
starlord 已提交
21
#include <map>
Y
youny626 已提交
22
#include <memory>
S
starlord 已提交
23
#include <set>
Y
youny626 已提交
24
#include <sstream>
S
shengjh 已提交
25
#include <fiu-local.h>
S
starlord 已提交
26

27 28 29 30 31 32 33 34 35 36
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"

J
jinhai 已提交
37
namespace milvus {
X
Xu Peng 已提交
38
namespace engine {
39
namespace meta {
X
Xu Peng 已提交
40

X
Xu Peng 已提交
41 42
using namespace sqlite_orm;

G
groot 已提交
43 44
namespace {

S
starlord 已提交
45
Status
Y
youny626 已提交
46
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
47
    if (what == nullptr) {
S
starlord 已提交
48 49 50 51 52 53 54
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
55 56
}

Y
youny626 已提交
57
}  // namespace
G
groot 已提交
58

S
starlord 已提交
59
inline auto
G
groot 已提交
60
StoragePrototype(const std::string& path) {
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
    return make_storage(
        path,
        make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))),
        make_table(META_TABLES, make_column("id", &TableSchema::id_, primary_key()),
                   make_column("table_id", &TableSchema::table_id_, unique()),
                   make_column("state", &TableSchema::state_), make_column("dimension", &TableSchema::dimension_),
                   make_column("created_on", &TableSchema::created_on_),
                   make_column("flag", &TableSchema::flag_, default_value(0)),
                   make_column("index_file_size", &TableSchema::index_file_size_),
                   make_column("engine_type", &TableSchema::engine_type_), make_column("nlist", &TableSchema::nlist_),
                   make_column("metric_type", &TableSchema::metric_type_),
                   make_column("owner_table", &TableSchema::owner_table_, default_value("")),
                   make_column("partition_tag", &TableSchema::partition_tag_, default_value("")),
                   make_column("version", &TableSchema::version_, default_value(CURRENT_VERSION)),
                   make_column("flush_lsn", &TableSchema::flush_lsn_)),
        make_table(
            META_TABLEFILES, make_column("id", &TableFileSchema::id_, primary_key()),
            make_column("table_id", &TableFileSchema::table_id_),
            make_column("segment_id", &TableFileSchema::segment_id_, default_value("")),
            make_column("engine_type", &TableFileSchema::engine_type_),
            make_column("file_id", &TableFileSchema::file_id_), make_column("file_type", &TableFileSchema::file_type_),
            make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
            make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
            make_column("updated_time", &TableFileSchema::updated_time_),
            make_column("created_on", &TableFileSchema::created_on_), make_column("date", &TableFileSchema::date_),
            make_column("flush_lsn", &TableFileSchema::flush_lsn_)));
X
Xu Peng 已提交
87 88
}

X
Xu Peng 已提交
89
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
90 91
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
92
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
93 94 95 96 97 98
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
99
Status
Y
youny626 已提交
100
SqliteMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
101
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
102 103
    std::stringstream ss;
    SimpleIDGenerator g;
104
    ss << g.GetNextIDNumber();
105
    table_id = ss.str();
106 107 108
    return Status::OK();
}

S
starlord 已提交
109
Status
Y
youny626 已提交
110
SqliteMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
111
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
X
Xu Peng 已提交
112 113
    std::stringstream ss;
    SimpleIDGenerator g;
114
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
115 116 117 118
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
119 120
void
SqliteMetaImpl::ValidateMetaSchema() {
S
shengjh 已提交
121 122 123
    bool is_null_connector{ConnectorPtr == nullptr};
    fiu_do_on("SqliteMetaImpl.ValidateMetaSchema.NullConnection", is_null_connector = true);
    if (is_null_connector) {
124 125 126
        return;
    }

Y
youny626 已提交
127
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
128
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
129 130
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
131 132
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
133 134
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
135 136 137 138
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
139 140
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
141 142
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
S
shengjh 已提交
143
        fiu_do_on("SqliteMetaImpl.Initialize.fail_create_directory", ret = false);
144
        if (!ret) {
S
starlord 已提交
145
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
146
            ENGINE_LOG_ERROR << msg;
S
shengjh 已提交
147
            throw Exception(DB_INVALID_PATH, msg);
148
        }
X
Xu Peng 已提交
149
    }
X
Xu Peng 已提交
150

S
starlord 已提交
151
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
152

153
    ValidateMetaSchema();
154

X
Xu Peng 已提交
155
    ConnectorPtr->sync_schema();
Y
youny626 已提交
156 157
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
158

159
    CleanUpShadowFiles();
X
Xu Peng 已提交
160

X
Xu Peng 已提交
161
    return Status::OK();
X
Xu Peng 已提交
162 163
}

G
groot 已提交
164
Status
G
groot 已提交
165
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
166 167 168
    try {
        server::MetricCollector metric;

169
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
170 171 172 173 174
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
        } else {
S
shengjh 已提交
175
            fiu_do_on("SqliteMetaImpl.CreateTable.throw_exception", throw std::exception());
G
groot 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
                } else {
                    // Change from no error to already exist.
                    return Status(DB_ALREADY_EXIST, "Table already exists");
                }
            }
        }

        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

        try {
S
shengjh 已提交
192
            fiu_do_on("SqliteMetaImpl.CreateTable.insert_throw_exception", throw std::exception());
G
groot 已提交
193 194
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id_ = id;
G
groot 已提交
195
        } catch (std::exception& e) {
G
groot 已提交
196 197 198 199 200 201
            return HandleException("Encounter exception when create table", e.what());
        }

        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
202
    } catch (std::exception& e) {
G
groot 已提交
203 204 205 206 207
        return HandleException("Encounter exception when create table", e.what());
    }
}

Status
G
groot 已提交
208
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
G
groot 已提交
209 210
    try {
        server::MetricCollector metric;
S
shengjh 已提交
211
        fiu_do_on("SqliteMetaImpl.DescribeTable.throw_exception", throw std::exception());
212 213 214 215 216 217 218
        auto groups = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
                    &TableSchema::nlist_, &TableSchema::metric_type_, &TableSchema::owner_table_,
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::table_id_) == table_schema.table_id_ and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231 232

        if (groups.size() == 1) {
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.flag_ = std::get<4>(groups[0]);
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
            table_schema.nlist_ = std::get<7>(groups[0]);
            table_schema.metric_type_ = std::get<8>(groups[0]);
            table_schema.owner_table_ = std::get<9>(groups[0]);
            table_schema.partition_tag_ = std::get<10>(groups[0]);
            table_schema.version_ = std::get<11>(groups[0]);
233
            table_schema.flush_lsn_ = std::get<12>(groups[0]);
G
groot 已提交
234 235 236
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
        }
G
groot 已提交
237
    } catch (std::exception& e) {
G
groot 已提交
238 239 240 241 242 243 244
        return HandleException("Encounter exception when describe table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
245
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
246 247 248
    has_or_not = false;

    try {
S
shengjh 已提交
249
        fiu_do_on("SqliteMetaImpl.HasTable.throw_exception", throw std::exception());
G
groot 已提交
250
        server::MetricCollector metric;
251 252 253
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
254 255 256 257 258
        if (tables.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
259
    } catch (std::exception& e) {
G
groot 已提交
260 261 262 263 264 265 266
        return HandleException("Encounter exception when lookup table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
267
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
268
    try {
S
shengjh 已提交
269
        fiu_do_on("SqliteMetaImpl.AllTables.throw_exception", throw std::exception());
G
groot 已提交
270
        server::MetricCollector metric;
271 272 273 274 275 276
        auto selected = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
                    &TableSchema::nlist_, &TableSchema::metric_type_, &TableSchema::owner_table_,
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE and c(&TableSchema::owner_table_) == ""));
G
groot 已提交
277
        for (auto& table : selected) {
G
groot 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
            schema.nlist_ = std::get<7>(table);
            schema.metric_type_ = std::get<8>(table);
            schema.owner_table_ = std::get<9>(table);
            schema.partition_tag_ = std::get<10>(table);
            schema.version_ = std::get<11>(table);
291
            schema.flush_lsn_ = std::get<12>(table);
G
groot 已提交
292 293 294

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
295
    } catch (std::exception& e) {
G
groot 已提交
296 297 298 299 300 301 302
        return HandleException("Encounter exception when lookup all tables", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
303
SqliteMetaImpl::DropTable(const std::string& table_id) {
G
groot 已提交
304
    try {
S
shengjh 已提交
305 306
        fiu_do_on("SqliteMetaImpl.DropTable.throw_exception", throw std::exception());

G
groot 已提交
307 308
        server::MetricCollector metric;

309
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
310 311
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

312
        // soft delete table
G
groot 已提交
313
        ConnectorPtr->update_all(
314 315
            set(c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
316 317

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
G
groot 已提交
318
    } catch (std::exception& e) {
G
groot 已提交
319 320 321 322 323 324 325
        return HandleException("Encounter exception when delete table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
326
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
327
    try {
S
shengjh 已提交
328 329
        fiu_do_on("SqliteMetaImpl.DeleteTableFiles.throw_exception", throw std::exception());

G
groot 已提交
330 331
        server::MetricCollector metric;

332
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
333 334
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

335 336 337 338 339
        // soft delete table files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
340 341

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
G
groot 已提交
342
    } catch (std::exception& e) {
G
groot 已提交
343 344 345 346 347 348 349
        return HandleException("Encounter exception when delete table files", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
350
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
351 352 353 354 355 356 357 358 359 360 361
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    try {
S
shengjh 已提交
362
        fiu_do_on("SqliteMetaImpl.CreateTableFile.throw_exception", throw std::exception());
G
groot 已提交
363 364 365
        server::MetricCollector metric;

        NextFileId(file_schema.file_id_);
366 367 368
        if (file_schema.segment_id_.empty()) {
            file_schema.segment_id_ = file_schema.file_id_;
        }
G
groot 已提交
369 370 371 372 373 374 375 376 377 378
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
        file_schema.engine_type_ = table_schema.engine_type_;
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;

379
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
380 381 382 383 384 385 386
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
G
groot 已提交
387
    } catch (std::exception& e) {
G
groot 已提交
388 389 390 391 392 393
        return HandleException("Encounter exception when create table file", e.what());
    }

    return Status::OK();
}

S
starlord 已提交
394
Status
395
SqliteMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
G
groot 已提交
396
                              TableFilesSchema& table_files) {
G
groot 已提交
397
    try {
S
shengjh 已提交
398 399
        fiu_do_on("SqliteMetaImpl.GetTableFiles.throw_exception", throw std::exception());

G
groot 已提交
400
        table_files.clear();
401 402 403 404 405 406
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::id_, ids) and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
407 408 409 410 411
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
G
groot 已提交
412
        }
G
groot 已提交
413

G
groot 已提交
414
        Status result;
G
groot 已提交
415
        for (auto& file : files) {
G
groot 已提交
416 417 418
            TableFileSchema file_schema;
            file_schema.table_id_ = table_id;
            file_schema.id_ = std::get<0>(file);
419 420 421 422 423 424 425 426
            file_schema.segment_id_ = std::get<1>(file);
            file_schema.file_id_ = std::get<2>(file);
            file_schema.file_type_ = std::get<3>(file);
            file_schema.file_size_ = std::get<4>(file);
            file_schema.row_count_ = std::get<5>(file);
            file_schema.date_ = std::get<6>(file);
            file_schema.engine_type_ = std::get<7>(file);
            file_schema.created_on_ = std::get<8>(file);
G
groot 已提交
427 428 429 430
            file_schema.dimension_ = table_schema.dimension_;
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
431

G
groot 已提交
432
            utils::GetTableFilePath(options_, file_schema);
433

G
groot 已提交
434 435
            table_files.emplace_back(file_schema);
        }
436

G
groot 已提交
437 438
        ENGINE_LOG_DEBUG << "Get table files by id";
        return result;
G
groot 已提交
439
    } catch (std::exception& e) {
G
groot 已提交
440
        return HandleException("Encounter exception when lookup table files", e.what());
441
    }
X
Xu Peng 已提交
442 443
}

444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
Status
SqliteMetaImpl::GetTableFilesBySegmentId(const std::string& segment_id,
                                         milvus::engine::meta::TableFilesSchema& table_files) {
    try {
        table_files.clear();
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::segment_id_) == segment_id and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));

        if (!files.empty()) {
            TableSchema table_schema;
            table_schema.table_id_ = std::get<1>(files[0]);
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }

            for (auto& file : files) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_schema.table_id_;
                file_schema.id_ = std::get<0>(file);
                file_schema.segment_id_ = std::get<2>(file);
                file_schema.file_id_ = std::get<3>(file);
                file_schema.file_type_ = std::get<4>(file);
                file_schema.file_size_ = std::get<5>(file);
                file_schema.row_count_ = std::get<6>(file);
                file_schema.date_ = std::get<7>(file);
                file_schema.engine_type_ = std::get<8>(file);
                file_schema.created_on_ = std::get<9>(file);
                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
                file_schema.nlist_ = table_schema.nlist_;
                file_schema.metric_type_ = table_schema.metric_type_;

                utils::GetTableFilePath(options_, file_schema);
                table_files.emplace_back(file_schema);
            }
        }

        ENGINE_LOG_DEBUG << "Get table files by segment id";
        return Status::OK();
    } catch (std::exception& e) {
        return HandleException("Encounter exception when lookup table files by segment id", e.what());
    }
}

S
starlord 已提交
493
Status
G
groot 已提交
494
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
G
groot 已提交
495
    try {
Y
Yu Kun 已提交
496
        server::MetricCollector metric;
S
shengjh 已提交
497
        fiu_do_on("SqliteMetaImpl.UpdateTableFlag.throw_exception", throw std::exception());
G
groot 已提交
498

499 500
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableSchema::flag_) = flag), where(c(&TableSchema::table_id_) == table_id));
G
groot 已提交
501
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
G
groot 已提交
502
    } catch (std::exception& e) {
G
groot 已提交
503 504
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e.what());
G
groot 已提交
505 506 507 508 509
    }

    return Status::OK();
}

510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
Status
SqliteMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_lsn) {
    try {
        server::MetricCollector metric;

        ConnectorPtr->update_all(set(c(&TableSchema::flush_lsn_) = flush_lsn),
                                 where(c(&TableSchema::table_id_) == table_id));
        ENGINE_LOG_DEBUG << "Successfully update table flush_lsn, table id = " << table_id;
    } catch (std::exception& e) {
        std::string msg = "Encounter exception when update table lsn: table_id = " + table_id;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFlushLSN(const std::string& table_id, uint64_t& flush_lsn) {
    try {
        server::MetricCollector metric;

        auto selected =
            ConnectorPtr->select(columns(&TableSchema::flush_lsn_), where(c(&TableSchema::table_id_) == table_id));

        if (selected.size() > 0) {
            flush_lsn = std::get<0>(selected[0]);
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
        }

    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, TableFilesSchema& table_files) {
    table_files.clear();

    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::flush_lsn_) == flush_lsn));

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
        for (auto& file : selected) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            table_files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " files with flush_lsn = " << flush_lsn;
        }
        return ret;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }
}

S
starlord 已提交
607
Status
G
groot 已提交
608
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
609
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
G
groot 已提交
610
    try {
Y
Yu Kun 已提交
611
        server::MetricCollector metric;
S
shengjh 已提交
612
        fiu_do_on("SqliteMetaImpl.UpdateTableFile.throw_exception", throw std::exception());
G
groot 已提交
613

Y
youny626 已提交
614
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
615 616
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
617 618
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));
G
groot 已提交
619

620 621
        // if the table has been deleted, just mark the table file as TO_DELETE
        // clean thread will delete the file later
G
groot 已提交
622
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
623 624
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }
G
groot 已提交
625

G
groot 已提交
626 627 628
        ConnectorPtr->update(file_schema);

        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
G
groot 已提交
629
    } catch (std::exception& e) {
630 631
        std::string msg =
            "Exception update table file: table_id = " + file_schema.table_id_ + " file_id = " + file_schema.file_id_;
G
groot 已提交
632 633
        return HandleException(msg, e.what());
    }
G
groot 已提交
634 635 636
    return Status::OK();
}

S
starlord 已提交
637
Status
G
groot 已提交
638
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
639
    try {
Y
Yu Kun 已提交
640
        server::MetricCollector metric;
S
shengjh 已提交
641
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.throw_exception", throw std::exception());
G
groot 已提交
642

643
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
644
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);
G
groot 已提交
645

G
groot 已提交
646
        std::map<std::string, bool> has_tables;
G
groot 已提交
647
        for (auto& file : files) {
G
groot 已提交
648 649 650 651
            if (has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
652 653
                                               where(c(&TableSchema::table_id_) == file.table_id_ and
                                                     c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
654 655 656 657 658
            if (tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
659
        }
P
peng.xu 已提交
660

G
groot 已提交
661
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
662
            for (auto& file : files) {
G
groot 已提交
663 664
                if (!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
665
                }
G
groot 已提交
666 667 668

                file.updated_time_ = utils::GetMicroSecTimeStamp();
                ConnectorPtr->update(file);
669
            }
G
groot 已提交
670 671
            return true;
        });
S
shengjh 已提交
672
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.fail_commited", commited = false);
673

G
groot 已提交
674 675
        if (!commited) {
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
P
peng.xu 已提交
676
        }
G
groot 已提交
677 678

        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
G
groot 已提交
679
    } catch (std::exception& e) {
G
groot 已提交
680
        return HandleException("Encounter exception when update table files", e.what());
P
peng.xu 已提交
681 682 683 684
    }
    return Status::OK();
}

S
starlord 已提交
685
Status
Y
youny626 已提交
686
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
687
    try {
Y
Yu Kun 已提交
688
        server::MetricCollector metric;
S
shengjh 已提交
689
        fiu_do_on("SqliteMetaImpl.UpdateTableIndex.throw_exception", throw std::exception());
690

Y
youny626 已提交
691
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
692 693
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

694 695 696 697 698
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::owner_table_,
                    &TableSchema::partition_tag_, &TableSchema::version_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
699

S
starlord 已提交
700
        if (tables.size() > 0) {
701 702 703 704 705 706
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
707
            table_schema.flag_ = std::get<4>(tables[0]);
708
            table_schema.index_file_size_ = std::get<5>(tables[0]);
G
groot 已提交
709 710 711
            table_schema.owner_table_ = std::get<6>(tables[0]);
            table_schema.partition_tag_ = std::get<7>(tables[0]);
            table_schema.version_ = std::get<8>(tables[0]);
712
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
713 714
            table_schema.nlist_ = index.nlist_;
            table_schema.metric_type_ = index.metric_type_;
715 716 717

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
718
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
719 720
        }

721 722 723 724 725
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
726

727
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
728
    } catch (std::exception& e) {
729
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
730
        return HandleException(msg, e.what());
731
    }
S
starlord 已提交
732 733 734 735

    return Status::OK();
}

S
starlord 已提交
736
Status
G
groot 已提交
737
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
S
starlord 已提交
738
    try {
Y
Yu Kun 已提交
739
        server::MetricCollector metric;
S
shengjh 已提交
740
        fiu_do_on("SqliteMetaImpl.UpdateTableFilesToIndex.throw_exception", throw std::exception());
S
starlord 已提交
741

742
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
743 744
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

745 746 747 748
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
G
groot 已提交
749 750

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
G
groot 已提交
751
    } catch (std::exception& e) {
G
groot 已提交
752
        return HandleException("Encounter exception when update table files to to_index", e.what());
S
starlord 已提交
753 754
    }

755 756 757
    return Status::OK();
}

S
starlord 已提交
758
Status
Y
youny626 已提交
759
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
760
    try {
Y
Yu Kun 已提交
761
        server::MetricCollector metric;
S
shengjh 已提交
762
        fiu_do_on("SqliteMetaImpl.DescribeTableIndex.throw_exception", throw std::exception());
763

764 765 766
        auto groups = ConnectorPtr->select(
            columns(&TableSchema::engine_type_, &TableSchema::nlist_, &TableSchema::metric_type_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
767 768 769

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
770
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
771
            index.metric_type_ = std::get<2>(groups[0]);
772
        } else {
S
starlord 已提交
773
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
774
        }
Y
youny626 已提交
775
    } catch (std::exception& e) {
S
starlord 已提交
776
        return HandleException("Encounter exception when describe index", e.what());
777 778 779 780 781
    }

    return Status::OK();
}

S
starlord 已提交
782
Status
Y
youny626 已提交
783
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
784
    try {
Y
Yu Kun 已提交
785
        server::MetricCollector metric;
S
shengjh 已提交
786
        fiu_do_on("SqliteMetaImpl.DropTableIndex.throw_exception", throw std::exception());
787

Y
youny626 已提交
788
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
789 790
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

791 792 793 794 795 796 797 798 799 800 801 802 803
        // soft delete index files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));

        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));

        // set table index type to raw
G
groot 已提交
804
        ConnectorPtr->update_all(
805 806
            set(c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE, c(&TableSchema::nlist_) = DEFAULT_NLIST),
            where(c(&TableSchema::table_id_) == table_id));
807

808
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
G
groot 已提交
809
    } catch (std::exception& e) {
S
starlord 已提交
810
        return HandleException("Encounter exception when delete table index files", e.what());
811 812 813 814 815
    }

    return Status::OK();
}

S
starlord 已提交
816
Status
817 818
SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag,
                                uint64_t lsn) {
G
groot 已提交
819
    server::MetricCollector metric;
820

G
groot 已提交
821 822 823 824 825
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
G
groot 已提交
826
    }
G
groot 已提交
827

G
groot 已提交
828
    // not allow create partition under partition
G
groot 已提交
829
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
830
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
831
    }
G
groot 已提交
832

833 834 835 836 837 838 839 840
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
G
groot 已提交
841
    if (!exist_partition.empty()) {
G
groot 已提交
842
        return Status(DB_ERROR, "Duplicate partition is not allowed");
843
    }
G
groot 已提交
844

845 846
    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
847 848 849
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
X
Xu Peng 已提交
850
    }
G
groot 已提交
851

G
groot 已提交
852 853 854 855
    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
856
    table_schema.partition_tag_ = valid_tag;
857
    table_schema.flush_lsn_ = lsn;
858 859 860 861 862

    status = CreateTable(table_schema);
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }
G
groot 已提交
863

864
    return status;
X
Xu Peng 已提交
865 866
}

S
starlord 已提交
867
Status
G
groot 已提交
868 869 870
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}
871

G
groot 已提交
872
Status
G
groot 已提交
873
SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
G
groot 已提交
874
    try {
Y
Yu Kun 已提交
875
        server::MetricCollector metric;
S
shengjh 已提交
876
        fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception());
G
groot 已提交
877

878 879 880 881 882 883 884 885
        auto partitions = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
                    &TableSchema::nlist_, &TableSchema::metric_type_, &TableSchema::partition_tag_,
                    &TableSchema::version_, &TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
        for (size_t i = 0; i < partitions.size(); i++) {
G
groot 已提交
886
            meta::TableSchema partition_schema;
887 888 889 890 891 892 893 894 895 896 897 898 899
            partition_schema.id_ = std::get<0>(partitions[i]);
            partition_schema.state_ = std::get<1>(partitions[i]);
            partition_schema.dimension_ = std::get<2>(partitions[i]);
            partition_schema.created_on_ = std::get<3>(partitions[i]);
            partition_schema.flag_ = std::get<4>(partitions[i]);
            partition_schema.index_file_size_ = std::get<5>(partitions[i]);
            partition_schema.engine_type_ = std::get<6>(partitions[i]);
            partition_schema.nlist_ = std::get<7>(partitions[i]);
            partition_schema.metric_type_ = std::get<8>(partitions[i]);
            partition_schema.owner_table_ = table_id;
            partition_schema.partition_tag_ = std::get<9>(partitions[i]);
            partition_schema.version_ = std::get<10>(partitions[i]);
            partition_schema.table_id_ = std::get<11>(partitions[i]);
G
groot 已提交
900
            partition_schema_array.emplace_back(partition_schema);
G
groot 已提交
901
        }
G
groot 已提交
902
    } catch (std::exception& e) {
G
groot 已提交
903
        return HandleException("Encounter exception when show partitions", e.what());
904 905
    }

X
Xu Peng 已提交
906
    return Status::OK();
X
Xu Peng 已提交
907 908
}

S
starlord 已提交
909
Status
G
groot 已提交
910
SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
911
    try {
Y
Yu Kun 已提交
912
        server::MetricCollector metric;
S
shengjh 已提交
913
        fiu_do_on("SqliteMetaImpl.GetPartitionName.throw_exception", throw std::exception());
G
groot 已提交
914

915 916 917 918 919
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

920 921 922 923
        auto name = ConnectorPtr->select(
            columns(&TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and c(&TableSchema::partition_tag_) == valid_tag and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
924 925 926
        if (name.size() > 0) {
            partition_name = std::get<0>(name[0]);
        } else {
927
            return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
928
        }
G
groot 已提交
929
    } catch (std::exception& e) {
G
groot 已提交
930
        return HandleException("Encounter exception when get partition name", e.what());
X
Xu Peng 已提交
931
    }
G
groot 已提交
932 933

    return Status::OK();
X
Xu Peng 已提交
934 935
}

S
starlord 已提交
936
Status
937
SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
X
xj.lin 已提交
938
    files.clear();
Y
Yu Kun 已提交
939
    server::MetricCollector metric;
X
xj.lin 已提交
940 941

    try {
S
shengjh 已提交
942 943
        fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());

Y
youny626 已提交
944
        auto select_columns =
945 946 947
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
948 949

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
950

Y
youny626 已提交
951 952
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
953
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
954 955 956 957

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
958 959 960
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
961

Y
youny626 已提交
962
        // perform query
963
        decltype(ConnectorPtr->select(select_columns)) selected;
964
        if (ids.empty()) {
X
xj.lin 已提交
965
            auto filter = where(match_tableid and match_type);
966
            selected = ConnectorPtr->select(select_columns, filter);
967
        } else {
X
xj.lin 已提交
968
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
969
            auto filter = where(match_tableid and match_fileid and match_type);
970
            selected = ConnectorPtr->select(select_columns, filter);
X
xj.lin 已提交
971 972
        }

S
starlord 已提交
973
        Status ret;
Y
youny626 已提交
974
        for (auto& file : selected) {
975
            TableFileSchema table_file;
X
xj.lin 已提交
976 977
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
978 979 980 981 982 983 984
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
X
xj.lin 已提交
985
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
986
            table_file.index_file_size_ = table_schema.index_file_size_;
987
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
988 989
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
990
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
991
            if (!status.ok()) {
S
starlord 已提交
992
                ret = status;
S
starlord 已提交
993 994
            }

995
            files.emplace_back(table_file);
X
xj.lin 已提交
996
        }
S
starlord 已提交
997
        if (files.empty()) {
S
starlord 已提交
998
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
999
        }
S
starlord 已提交
1000

S
starlord 已提交
1001
        if (selected.size() > 0) {
1002 1003
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
1004
        return ret;
Y
youny626 已提交
1005
    } catch (std::exception& e) {
S
starlord 已提交
1006
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
1007 1008 1009
    }
}

S
starlord 已提交
1010
Status
1011
SqliteMetaImpl::FilesToMerge(const std::string& table_id, TableFilesSchema& files) {
X
Xu Peng 已提交
1012
    files.clear();
X
Xu Peng 已提交
1013

1014
    try {
S
shengjh 已提交
1015 1016
        fiu_do_on("SqliteMetaImpl.FilesToMerge.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1017
        server::MetricCollector metric;
G
groot 已提交
1018

Y
youny626 已提交
1019
        // check table existence
S
starlord 已提交
1020 1021 1022 1023 1024 1025 1026
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
1027 1028
        // get files to merge
        auto selected = ConnectorPtr->select(
1029 1030 1031
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::created_on_),
Y
youny626 已提交
1032 1033 1034
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
1035

S
starlord 已提交
1036
        Status result;
1037
        int64_t to_merge_files = 0;
Y
youny626 已提交
1038
        for (auto& file : selected) {
S
starlord 已提交
1039
            TableFileSchema table_file;
1040
            table_file.file_size_ = std::get<5>(file);
S
starlord 已提交
1041
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
1042
                continue;  // skip large file
S
starlord 已提交
1043 1044
            }

G
groot 已提交
1045 1046
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1047 1048 1049 1050 1051 1052
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
1053
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1054
            table_file.index_file_size_ = table_schema.index_file_size_;
1055
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
1056 1057
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1058
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1059
            if (!status.ok()) {
S
starlord 已提交
1060
                result = status;
S
starlord 已提交
1061 1062
            }

1063
            files.emplace_back(table_file);
1064
            ++to_merge_files;
X
Xu Peng 已提交
1065
        }
S
starlord 已提交
1066

1067 1068
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1069
        }
G
groot 已提交
1070 1071 1072 1073 1074 1075 1076
        return result;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when iterate merge files", e.what());
    }
}

Status
G
groot 已提交
1077
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
G
groot 已提交
1078 1079 1080
    files.clear();

    try {
S
shengjh 已提交
1081 1082
        fiu_do_on("SqliteMetaImpl.FilesToIndex.throw_exception", throw std::exception());

G
groot 已提交
1083 1084
        server::MetricCollector metric;

1085 1086 1087 1088 1089 1090
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_INDEX));
G
groot 已提交
1091 1092 1093 1094 1095

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
G
groot 已提交
1096
        for (auto& file : selected) {
G
groot 已提交
1097 1098
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1099 1100 1101 1102 1103 1104 1105 1106
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);
G
groot 已提交
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
S
shengjh 已提交
1117 1118
                fiu_do_on("SqliteMetaImpl_FilesToIndex_TableNotFound",
                          status = Status(DB_NOT_FOUND, "table not found"));
G
groot 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
        return ret;
G
groot 已提交
1135
    } catch (std::exception& e) {
G
groot 已提交
1136
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
1137
    }
X
Xu Peng 已提交
1138 1139
}

S
starlord 已提交
1140
Status
1141
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
G
groot 已提交
1142
                            TableFilesSchema& table_files) {
G
groot 已提交
1143 1144 1145
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
    }
1146

1147 1148 1149 1150 1151 1152 1153 1154 1155
    Status ret = Status::OK();

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1156
    try {
S
shengjh 已提交
1157 1158
        fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());

G
groot 已提交
1159
        table_files.clear();
1160 1161 1162 1163 1164
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1165

G
groot 已提交
1166 1167 1168
        if (selected.size() >= 1) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
G
groot 已提交
1169 1170 1171 1172
            for (auto& file : selected) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_id;
                file_schema.id_ = std::get<0>(file);
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
                file_schema.segment_id_ = std::get<1>(file);
                file_schema.file_id_ = std::get<2>(file);
                file_schema.file_type_ = std::get<3>(file);
                file_schema.file_size_ = std::get<4>(file);
                file_schema.row_count_ = std::get<5>(file);
                file_schema.date_ = std::get<6>(file);
                file_schema.engine_type_ = std::get<7>(file);
                file_schema.created_on_ = std::get<8>(file);

                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
                file_schema.nlist_ = table_schema.nlist_;
                file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
1186 1187

                switch (file_schema.file_type_) {
1188
                    case (int)TableFileSchema::RAW:++raw_count;
G
groot 已提交
1189
                        break;
1190
                    case (int)TableFileSchema::NEW:++new_count;
G
groot 已提交
1191
                        break;
1192
                    case (int)TableFileSchema::NEW_MERGE:++new_merge_count;
G
groot 已提交
1193
                        break;
1194
                    case (int)TableFileSchema::NEW_INDEX:++new_index_count;
G
groot 已提交
1195
                        break;
1196
                    case (int)TableFileSchema::TO_INDEX:++to_index_count;
G
groot 已提交
1197
                        break;
1198
                    case (int)TableFileSchema::INDEX:++index_count;
G
groot 已提交
1199
                        break;
1200 1201 1202
                    case (int)TableFileSchema::BACKUP:++backup_count;
                        break;
                    default:
G
groot 已提交
1203
                        break;
1204 1205 1206 1207 1208
                }

                auto status = utils::GetTableFilePath(options_, file_schema);
                if (!status.ok()) {
                    ret = status;
G
groot 已提交
1209
                }
G
groot 已提交
1210 1211

                table_files.emplace_back(file_schema);
G
groot 已提交
1212
            }
1213

G
groot 已提交
1214
            std::string msg = "Get table files by type.";
G
groot 已提交
1215 1216
            for (int file_type : file_types) {
                switch (file_type) {
S
shengjh 已提交
1217
                    case (int)TableFileSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
G
groot 已提交
1218
                        break;
S
shengjh 已提交
1219
                    case (int)TableFileSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
G
groot 已提交
1220
                        break;
1221 1222
                    case (int)TableFileSchema::NEW_MERGE:msg = msg + " new_merge files:"
                                                               + std::to_string(new_merge_count);
G
groot 已提交
1223
                        break;
1224 1225
                    case (int)TableFileSchema::NEW_INDEX:msg = msg + " new_index files:"
                                                               + std::to_string(new_index_count);
G
groot 已提交
1226
                        break;
S
shengjh 已提交
1227
                    case (int)TableFileSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count);
G
groot 已提交
1228
                        break;
S
shengjh 已提交
1229
                    case (int)TableFileSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
G
groot 已提交
1230
                        break;
S
shengjh 已提交
1231
                    case (int)TableFileSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
G
groot 已提交
1232
                        break;
1233 1234
                    default:
                        break;
G
groot 已提交
1235 1236 1237
                }
            }
            ENGINE_LOG_DEBUG << msg;
X
Xu Peng 已提交
1238
        }
G
groot 已提交
1239
    } catch (std::exception& e) {
G
groot 已提交
1240
        return HandleException("Encounter exception when check non index files", e.what());
X
Xu Peng 已提交
1241
    }
1242 1243

    return ret;
X
Xu Peng 已提交
1244 1245
}

S
starlord 已提交
1246
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1247 1248
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
1249
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
1250 1251 1252 1253 1254
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
1255 1256
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
1257
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
L
Lizhou Gao 已提交
1258
            int64_t usecs = limit * DAY * US_PS;
S
starlord 已提交
1259
            int64_t now = utils::GetMicroSecTimeStamp();
1260
            try {
S
shengjh 已提交
1261 1262
                fiu_do_on("SqliteMetaImpl.Archive.throw_exception", throw std::exception());

Y
youny626 已提交
1263
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1264 1265
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1266 1267 1268
                ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
                                         where(c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                                               c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
1269
            } catch (std::exception& e) {
S
starlord 已提交
1270
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1271
            }
1272 1273

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
1274
        }
G
groot 已提交
1275
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
1276
            uint64_t sum = 0;
X
Xu Peng 已提交
1277
            Size(sum);
X
Xu Peng 已提交
1278

Y
youny626 已提交
1279
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
1280
            DiscardFiles(to_delete);
1281 1282

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
1283 1284 1285 1286 1287 1288
        }
    }

    return Status::OK();
}

S
starlord 已提交
1289
Status
Y
youny626 已提交
1290
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
1291
    result = 0;
X
Xu Peng 已提交
1292
    try {
S
shengjh 已提交
1293 1294
        fiu_do_on("SqliteMetaImpl.Size.throw_exception", throw std::exception());

1295
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
1296 1297
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
1298 1299
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
1300
            }
Y
youny626 已提交
1301
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
1302
        }
Y
youny626 已提交
1303
    } catch (std::exception& e) {
S
starlord 已提交
1304
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
1305 1306 1307 1308 1309
    }

    return Status::OK();
}

S
starlord 已提交
1310
Status
1311
SqliteMetaImpl::CleanUpShadowFiles() {
X
Xu Peng 已提交
1312
    try {
Y
Yu Kun 已提交
1313
        server::MetricCollector metric;
G
groot 已提交
1314

Y
youny626 已提交
1315
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1316 1317
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1318 1319
        std::vector<int> file_types = {(int)TableFileSchema::NEW, (int)TableFileSchema::NEW_INDEX,
                                       (int)TableFileSchema::NEW_MERGE};
G
groot 已提交
1320 1321
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1322

G
groot 已提交
1323
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
1324
            for (auto& file : files) {
G
groot 已提交
1325 1326
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
G
groot 已提交
1327 1328 1329 1330
            }
            return true;
        });

S
shengjh 已提交
1331 1332
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.fail_commited", commited = false);
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.throw_exception", throw std::exception());
G
groot 已提交
1333
        if (!commited) {
G
groot 已提交
1334
            return HandleException("CleanUp error: sqlite transaction failed");
G
groot 已提交
1335
        }
X
Xu Peng 已提交
1336

G
groot 已提交
1337 1338
        if (files.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
G
groot 已提交
1339
        }
G
groot 已提交
1340
    } catch (std::exception& e) {
G
groot 已提交
1341
        return HandleException("Encounter exception when clean table file", e.what());
P
peng.xu 已提交
1342 1343 1344 1345 1346
    }

    return Status::OK();
}

1347
Status
1348
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) {
X
Xu Peng 已提交
1349
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1350 1351
    std::set<std::string> table_ids;

Y
youny626 已提交
1352
    // remove to_delete files
1353
    try {
S
shengjh 已提交
1354 1355
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_ThrowException", throw std::exception());

Y
Yu Kun 已提交
1356
        server::MetricCollector metric;
1357

G
groot 已提交
1358 1359 1360 1361 1362
        std::vector<int> file_types = {
            (int)TableFileSchema::TO_DELETE,
            (int)TableFileSchema::BACKUP,
        };

Y
youny626 已提交
1363
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1364 1365
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1366
        // collect files to be deleted
1367 1368 1369 1370 1371 1372
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::engine_type_, &TableFileSchema::file_id_, &TableFileSchema::file_type_,
                    &TableFileSchema::date_),
            where(in(&TableFileSchema::file_type_, file_types) and
                  c(&TableFileSchema::updated_time_) < now - seconds * US_PS));
1373

G
groot 已提交
1374
        int64_t clean_files = 0;
G
groot 已提交
1375 1376
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1377
            for (auto& file : files) {
G
groot 已提交
1378 1379
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
1380 1381 1382 1383 1384
                table_file.segment_id_ = std::get<2>(file);
                table_file.engine_type_ = std::get<3>(file);
                table_file.file_id_ = std::get<4>(file);
                table_file.file_type_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
G
groot 已提交
1385

1386
                // check if the file can be deleted
1387
                if (OngoingFileChecker::GetInstance().IsIgnored(table_file)) {
G
groot 已提交
1388 1389
                    ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
                                     << " currently is in use, not able to delete now";
1390
                    continue;  // ignore this file, don't delete it
1391 1392
                }

G
groot 已提交
1393 1394
                // erase from cache, must do this before file deleted,
                // because GetTableFilePath won't able to generate file path after the file is deleted
1395
                // TODO(zhiru): clean up
G
groot 已提交
1396 1397
                utils::GetTableFilePath(options_, table_file);
                server::CommonUtil::EraseFromCache(table_file.location_);
G
groot 已提交
1398

G
groot 已提交
1399
                if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413
                    // If we are deleting a raw table file, it means it's okay to delete the entire segment directory.
                    // Else, we can only delete the single file
                    // TODO(zhiru): We determine whether a table file is raw by its engine type. This is a bit hacky
                    if (table_file.engine_type_ == (int32_t)EngineType::FAISS_IDMAP ||
                        table_file.engine_type_ == (int32_t)EngineType::FAISS_BIN_IDMAP) {
                        utils::DeleteSegment(options_, table_file);
                        std::string segment_dir;
                        utils::GetParentPath(table_file.location_, segment_dir);
                        ENGINE_LOG_DEBUG << "Remove segment directory: " << segment_dir;
                    } else {
                        utils::DeleteTableFilePath(options_, table_file);
                        ENGINE_LOG_DEBUG << "Remove table file: " << table_file.location_;
                    }

G
groot 已提交
1414 1415
                    // delete file from meta
                    ConnectorPtr->remove<TableFileSchema>(table_file.id_);
G
groot 已提交
1416

G
groot 已提交
1417
                    table_ids.insert(table_file.table_id_);
G
groot 已提交
1418

1419
                    ++clean_files;
G
typo  
groot 已提交
1420
                }
1421
            }
G
groot 已提交
1422 1423
            return true;
        });
S
shengjh 已提交
1424
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_FailCommited", commited = false);
G
groot 已提交
1425 1426

        if (!commited) {
S
starlord 已提交
1427
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1428 1429
        }

G
groot 已提交
1430
        if (clean_files > 0) {
G
groot 已提交
1431
            ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
1432
        }
Y
youny626 已提交
1433
    } catch (std::exception& e) {
S
starlord 已提交
1434
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1435 1436
    }

Y
youny626 已提交
1437
    // remove to_delete tables
G
groot 已提交
1438
    try {
S
shengjh 已提交
1439
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1440
        server::MetricCollector metric;
G
groot 已提交
1441

Y
youny626 已提交
1442
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1443 1444
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1445 1446
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1447 1448

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1449 1450
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1451
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1452
            }
G
groot 已提交
1453 1454 1455

            return true;
        });
S
shengjh 已提交
1456
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_Failcommited", commited = false);
G
groot 已提交
1457 1458

        if (!commited) {
S
starlord 已提交
1459
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1460
        }
G
groot 已提交
1461

S
starlord 已提交
1462
        if (tables.size() > 0) {
1463 1464
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1465
    } catch (std::exception& e) {
S
starlord 已提交
1466
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1467 1468
    }

Y
youny626 已提交
1469 1470
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1471
    try {
S
shengjh 已提交
1472
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTableFolder_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1473
        server::MetricCollector metric;
S
starlord 已提交
1474

1475
        int64_t remove_tables = 0;
Y
youny626 已提交
1476
        for (auto& table_id : table_ids) {
S
starlord 已提交
1477 1478
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1479
            if (selected.size() == 0) {
S
starlord 已提交
1480
                utils::DeleteTablePath(options_, table_id);
1481
                ++remove_tables;
S
starlord 已提交
1482 1483 1484
            }
        }

1485 1486
        if (remove_tables) {
            ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables folder";
1487
        }
Y
youny626 已提交
1488
    } catch (std::exception& e) {
S
starlord 已提交
1489
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1490 1491
    }

X
Xu Peng 已提交
1492 1493 1494
    return Status::OK();
}

S
starlord 已提交
1495
Status
G
groot 已提交
1496
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1497
    try {
S
shengjh 已提交
1498 1499
        fiu_do_on("SqliteMetaImpl.Count.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1500
        server::MetricCollector metric;
1501

Y
youny626 已提交
1502 1503 1504 1505 1506
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1507

1508
        TableSchema table_schema;
G
groot 已提交
1509
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1510
        auto status = DescribeTable(table_schema);
1511

1512 1513 1514 1515 1516
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1517
        for (auto& file : selected) {
1518 1519
            result += std::get<0>(file);
        }
Y
youny626 已提交
1520
    } catch (std::exception& e) {
S
starlord 已提交
1521
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1522 1523 1524 1525
    }
    return Status::OK();
}

S
starlord 已提交
1526 1527
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1528 1529 1530
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1531 1532
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1533
    } catch (std::exception& e) {
S
starlord 已提交
1534
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1535
    }
S
starlord 已提交
1536

X
Xu Peng 已提交
1537 1538 1539
    return Status::OK();
}

G
groot 已提交
1540 1541 1542 1543 1544 1545 1546 1547 1548
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }

    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
S
shengjh 已提交
1549 1550
        fiu_do_on("SqliteMetaImpl.DiscardFiles.throw_exception", throw std::exception());

G
groot 已提交
1551 1552
        server::MetricCollector metric;

1553
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
1554 1555 1556
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto commited = ConnectorPtr->transaction([&]() mutable {
1557 1558 1559 1560
            auto selected =
                ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::file_size_),
                                     where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE),
                                     order_by(&TableFileSchema::id_), limit(10));
G
groot 已提交
1561 1562 1563 1564

            std::vector<int> ids;
            TableFileSchema table_file;

G
groot 已提交
1565
            for (auto& file : selected) {
1566 1567
                if (to_discard_size <= 0)
                    break;
G
groot 已提交
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579
                table_file.id_ = std::get<0>(file);
                table_file.file_size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            if (ids.size() == 0) {
                return true;
            }

1580 1581 1582
            ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                         c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                     where(in(&TableFileSchema::id_, ids)));
G
groot 已提交
1583 1584 1585

            return true;
        });
S
shengjh 已提交
1586
        fiu_do_on("SqliteMetaImpl.DiscardFiles.fail_commited", commited = false);
G
groot 已提交
1587 1588 1589
        if (!commited) {
            return HandleException("DiscardFiles error: sqlite transaction failed");
        }
G
groot 已提交
1590
    } catch (std::exception& e) {
G
groot 已提交
1591 1592 1593 1594 1595 1596
        return HandleException("Encounter exception when discard table file", e.what());
    }

    return DiscardFiles(to_discard_size);
}

1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
Status
SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            EnvironmentSchema env;
            env.global_lsn_ = lsn;
            ConnectorPtr->insert(env);
        } else {
            uint64_t last_lsn = std::get<0>(selected[0]);
            if (lsn == last_lsn) {
                return Status::OK();
            }

            ConnectorPtr->update_all(set(c(&EnvironmentSchema::global_lsn_) = lsn));
        }

        ENGINE_LOG_DEBUG << "Update global lsn = " << lsn;
    } catch (std::exception& e) {
        std::string msg = "Exception update global lsn = " + lsn;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            lsn = 0;
        } else {
            lsn = std::get<0>(selected[0]);
        }
    } catch (std::exception& e) {
        return HandleException("Encounter exception when delete table folder", e.what());
    }

    return Status::OK();
}
G
groot 已提交
1642

1643 1644 1645
}  // namespace meta
}  // namespace engine
}  // namespace milvus