SqliteMetaImpl.cpp 69.4 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/meta/SqliteMetaImpl.h"
X
Xu Peng 已提交
13

Y
youny626 已提交
14
#include <sqlite_orm.h>
X
Xu Peng 已提交
15
#include <unistd.h>
16

Z
Zhiru Zhu 已提交
17
#include <fiu-local.h>
X
Xu Peng 已提交
18
#include <boost/filesystem.hpp>
19
#include <chrono>
X
Xu Peng 已提交
20
#include <fstream>
Y
youny626 已提交
21
#include <iostream>
S
starlord 已提交
22
#include <map>
Y
youny626 已提交
23
#include <memory>
S
starlord 已提交
24
#include <set>
Y
youny626 已提交
25
#include <sstream>
S
starlord 已提交
26

27 28 29 30 31 32 33 34 35
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
Z
Zhiru Zhu 已提交
36
#include "utils/ValidationUtil.h"
37

J
jinhai 已提交
38
namespace milvus {
X
Xu Peng 已提交
39
namespace engine {
40
namespace meta {
X
Xu Peng 已提交
41

X
Xu Peng 已提交
42 43
using namespace sqlite_orm;

G
groot 已提交
44 45
namespace {

S
starlord 已提交
46
Status
Y
youny626 已提交
47
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
48
    if (what == nullptr) {
S
starlord 已提交
49 50 51 52 53 54 55
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
56 57
}

Y
youny626 已提交
58
}  // namespace
G
groot 已提交
59

S
starlord 已提交
60
inline auto
G
groot 已提交
61
StoragePrototype(const std::string& path) {
62 63 64 65 66 67 68 69 70
    return make_storage(
        path,
        make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))),
        make_table(META_TABLES, make_column("id", &TableSchema::id_, primary_key()),
                   make_column("table_id", &TableSchema::table_id_, unique()),
                   make_column("state", &TableSchema::state_), make_column("dimension", &TableSchema::dimension_),
                   make_column("created_on", &TableSchema::created_on_),
                   make_column("flag", &TableSchema::flag_, default_value(0)),
                   make_column("index_file_size", &TableSchema::index_file_size_),
71
                   make_column("engine_type", &TableSchema::engine_type_),
G
groot 已提交
72
                   make_column("index_params", &TableSchema::index_params_),
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
                   make_column("metric_type", &TableSchema::metric_type_),
                   make_column("owner_table", &TableSchema::owner_table_, default_value("")),
                   make_column("partition_tag", &TableSchema::partition_tag_, default_value("")),
                   make_column("version", &TableSchema::version_, default_value(CURRENT_VERSION)),
                   make_column("flush_lsn", &TableSchema::flush_lsn_)),
        make_table(
            META_TABLEFILES, make_column("id", &TableFileSchema::id_, primary_key()),
            make_column("table_id", &TableFileSchema::table_id_),
            make_column("segment_id", &TableFileSchema::segment_id_, default_value("")),
            make_column("engine_type", &TableFileSchema::engine_type_),
            make_column("file_id", &TableFileSchema::file_id_), make_column("file_type", &TableFileSchema::file_type_),
            make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
            make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
            make_column("updated_time", &TableFileSchema::updated_time_),
            make_column("created_on", &TableFileSchema::created_on_), make_column("date", &TableFileSchema::date_),
            make_column("flush_lsn", &TableFileSchema::flush_lsn_)));
X
Xu Peng 已提交
89 90
}

X
Xu Peng 已提交
91
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
92 93
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
94
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
95 96 97 98 99 100
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
101
Status
Y
youny626 已提交
102
SqliteMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
103
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
104
    std::stringstream ss;
J
Jin Hai 已提交
105 106
    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
107
    table_id = ss.str();
108 109 110
    return Status::OK();
}

S
starlord 已提交
111
Status
Y
youny626 已提交
112
SqliteMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
113
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
X
Xu Peng 已提交
114
    std::stringstream ss;
J
Jin Hai 已提交
115 116
    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
X
Xu Peng 已提交
117 118 119 120
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
121 122
void
SqliteMetaImpl::ValidateMetaSchema() {
S
shengjh 已提交
123 124 125
    bool is_null_connector{ConnectorPtr == nullptr};
    fiu_do_on("SqliteMetaImpl.ValidateMetaSchema.NullConnection", is_null_connector = true);
    if (is_null_connector) {
126 127 128
        return;
    }

Y
youny626 已提交
129
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
130
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
131 132
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
133 134
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
135 136
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
137 138 139 140
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
141 142
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
143 144
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
S
shengjh 已提交
145
        fiu_do_on("SqliteMetaImpl.Initialize.fail_create_directory", ret = false);
146
        if (!ret) {
S
starlord 已提交
147
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
148
            ENGINE_LOG_ERROR << msg;
S
shengjh 已提交
149
            throw Exception(DB_INVALID_PATH, msg);
150
        }
X
Xu Peng 已提交
151
    }
X
Xu Peng 已提交
152

S
starlord 已提交
153
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
154

155
    ValidateMetaSchema();
156

X
Xu Peng 已提交
157
    ConnectorPtr->sync_schema();
Y
youny626 已提交
158 159
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
160

161
    CleanUpShadowFiles();
X
Xu Peng 已提交
162

X
Xu Peng 已提交
163
    return Status::OK();
X
Xu Peng 已提交
164 165
}

G
groot 已提交
166
Status
G
groot 已提交
167
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
168 169 170
    try {
        server::MetricCollector metric;

171
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
172 173 174 175 176
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
        } else {
S
shengjh 已提交
177
            fiu_do_on("SqliteMetaImpl.CreateTable.throw_exception", throw std::exception());
G
groot 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
                } else {
                    // Change from no error to already exist.
                    return Status(DB_ALREADY_EXIST, "Table already exists");
                }
            }
        }

        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

        try {
S
shengjh 已提交
194
            fiu_do_on("SqliteMetaImpl.CreateTable.insert_throw_exception", throw std::exception());
G
groot 已提交
195 196
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id_ = id;
G
groot 已提交
197
        } catch (std::exception& e) {
G
groot 已提交
198 199 200 201 202 203
            return HandleException("Encounter exception when create table", e.what());
        }

        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
204
    } catch (std::exception& e) {
G
groot 已提交
205 206 207 208 209
        return HandleException("Encounter exception when create table", e.what());
    }
}

Status
G
groot 已提交
210
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
G
groot 已提交
211 212
    try {
        server::MetricCollector metric;
S
shengjh 已提交
213
        fiu_do_on("SqliteMetaImpl.DescribeTable.throw_exception", throw std::exception());
214 215 216
        auto groups = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
217
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::owner_table_,
218 219 220
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::table_id_) == table_schema.table_id_ and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
221 222 223 224 225 226 227 228 229

        if (groups.size() == 1) {
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.flag_ = std::get<4>(groups[0]);
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
230
            table_schema.index_params_ = std::get<7>(groups[0]);
G
groot 已提交
231 232 233 234
            table_schema.metric_type_ = std::get<8>(groups[0]);
            table_schema.owner_table_ = std::get<9>(groups[0]);
            table_schema.partition_tag_ = std::get<10>(groups[0]);
            table_schema.version_ = std::get<11>(groups[0]);
235
            table_schema.flush_lsn_ = std::get<12>(groups[0]);
G
groot 已提交
236 237 238
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
        }
G
groot 已提交
239
    } catch (std::exception& e) {
G
groot 已提交
240 241 242 243 244 245 246
        return HandleException("Encounter exception when describe table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
247
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
248 249 250
    has_or_not = false;

    try {
S
shengjh 已提交
251
        fiu_do_on("SqliteMetaImpl.HasTable.throw_exception", throw std::exception());
G
groot 已提交
252
        server::MetricCollector metric;
253 254 255
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
256 257 258 259 260
        if (tables.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
261
    } catch (std::exception& e) {
G
groot 已提交
262 263 264 265 266 267 268
        return HandleException("Encounter exception when lookup table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
269
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
270
    try {
S
shengjh 已提交
271
        fiu_do_on("SqliteMetaImpl.AllTables.throw_exception", throw std::exception());
G
groot 已提交
272
        server::MetricCollector metric;
273 274 275
        auto selected = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
276
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::owner_table_,
277 278
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE and c(&TableSchema::owner_table_) == ""));
G
groot 已提交
279
        for (auto& table : selected) {
G
groot 已提交
280 281 282 283 284 285 286 287
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
288
            schema.index_params_ = std::get<7>(table);
G
groot 已提交
289 290 291 292
            schema.metric_type_ = std::get<8>(table);
            schema.owner_table_ = std::get<9>(table);
            schema.partition_tag_ = std::get<10>(table);
            schema.version_ = std::get<11>(table);
293
            schema.flush_lsn_ = std::get<12>(table);
G
groot 已提交
294 295 296

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
297
    } catch (std::exception& e) {
G
groot 已提交
298 299 300 301 302 303 304
        return HandleException("Encounter exception when lookup all tables", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
305
SqliteMetaImpl::DropTable(const std::string& table_id) {
G
groot 已提交
306
    try {
S
shengjh 已提交
307 308
        fiu_do_on("SqliteMetaImpl.DropTable.throw_exception", throw std::exception());

G
groot 已提交
309 310
        server::MetricCollector metric;

311
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
312 313
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

314
        // soft delete table
G
groot 已提交
315
        ConnectorPtr->update_all(
316 317
            set(c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
318 319

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
G
groot 已提交
320
    } catch (std::exception& e) {
G
groot 已提交
321 322 323 324 325 326 327
        return HandleException("Encounter exception when delete table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
328
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
329
    try {
S
shengjh 已提交
330 331
        fiu_do_on("SqliteMetaImpl.DeleteTableFiles.throw_exception", throw std::exception());

G
groot 已提交
332 333
        server::MetricCollector metric;

334
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
335 336
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

337 338 339 340 341
        // soft delete table files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
342 343

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
G
groot 已提交
344
    } catch (std::exception& e) {
G
groot 已提交
345 346 347 348 349 350 351
        return HandleException("Encounter exception when delete table files", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
352
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
353 354 355 356 357 358 359 360 361 362 363
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    try {
S
shengjh 已提交
364
        fiu_do_on("SqliteMetaImpl.CreateTableFile.throw_exception", throw std::exception());
G
groot 已提交
365 366 367
        server::MetricCollector metric;

        NextFileId(file_schema.file_id_);
368 369 370
        if (file_schema.segment_id_.empty()) {
            file_schema.segment_id_ = file_schema.file_id_;
        }
G
groot 已提交
371 372 373 374 375 376
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
377
        file_schema.index_params_ = table_schema.index_params_;
378
        file_schema.engine_type_ = table_schema.engine_type_;
G
groot 已提交
379 380
        file_schema.metric_type_ = table_schema.metric_type_;

381
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
382 383 384 385 386 387 388
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
G
groot 已提交
389
    } catch (std::exception& e) {
G
groot 已提交
390 391 392 393 394 395
        return HandleException("Encounter exception when create table file", e.what());
    }

    return Status::OK();
}

S
starlord 已提交
396
Status
397
SqliteMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
G
groot 已提交
398
                              TableFilesSchema& table_files) {
G
groot 已提交
399
    try {
S
shengjh 已提交
400 401
        fiu_do_on("SqliteMetaImpl.GetTableFiles.throw_exception", throw std::exception());

G
groot 已提交
402
        table_files.clear();
403 404 405 406 407 408
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::id_, ids) and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
409 410 411 412 413
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
G
groot 已提交
414
        }
G
groot 已提交
415

G
groot 已提交
416
        Status result;
G
groot 已提交
417
        for (auto& file : files) {
G
groot 已提交
418 419 420
            TableFileSchema file_schema;
            file_schema.table_id_ = table_id;
            file_schema.id_ = std::get<0>(file);
421 422 423 424 425 426 427 428
            file_schema.segment_id_ = std::get<1>(file);
            file_schema.file_id_ = std::get<2>(file);
            file_schema.file_type_ = std::get<3>(file);
            file_schema.file_size_ = std::get<4>(file);
            file_schema.row_count_ = std::get<5>(file);
            file_schema.date_ = std::get<6>(file);
            file_schema.engine_type_ = std::get<7>(file);
            file_schema.created_on_ = std::get<8>(file);
G
groot 已提交
429 430
            file_schema.dimension_ = table_schema.dimension_;
            file_schema.index_file_size_ = table_schema.index_file_size_;
431
            file_schema.index_params_ = table_schema.index_params_;
G
groot 已提交
432
            file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
433

G
groot 已提交
434
            utils::GetTableFilePath(options_, file_schema);
435

G
groot 已提交
436 437
            table_files.emplace_back(file_schema);
        }
438

G
groot 已提交
439 440
        ENGINE_LOG_DEBUG << "Get table files by id";
        return result;
G
groot 已提交
441
    } catch (std::exception& e) {
G
groot 已提交
442
        return HandleException("Encounter exception when lookup table files", e.what());
443
    }
X
Xu Peng 已提交
444 445
}

446 447 448 449 450 451
Status
SqliteMetaImpl::GetTableFilesBySegmentId(const std::string& segment_id,
                                         milvus::engine::meta::TableFilesSchema& table_files) {
    try {
        table_files.clear();
        auto files = ConnectorPtr->select(
Z
Zhiru Zhu 已提交
452 453 454 455
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
            where(c(&TableFileSchema::segment_id_) == segment_id and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));

        if (!files.empty()) {
            TableSchema table_schema;
            table_schema.table_id_ = std::get<1>(files[0]);
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }

            for (auto& file : files) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_schema.table_id_;
                file_schema.id_ = std::get<0>(file);
                file_schema.segment_id_ = std::get<2>(file);
                file_schema.file_id_ = std::get<3>(file);
                file_schema.file_type_ = std::get<4>(file);
                file_schema.file_size_ = std::get<5>(file);
                file_schema.row_count_ = std::get<6>(file);
                file_schema.date_ = std::get<7>(file);
                file_schema.engine_type_ = std::get<8>(file);
                file_schema.created_on_ = std::get<9>(file);
                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
481
                file_schema.index_params_ = table_schema.index_params_;
482 483 484 485 486 487 488 489 490 491 492 493 494 495
                file_schema.metric_type_ = table_schema.metric_type_;

                utils::GetTableFilePath(options_, file_schema);
                table_files.emplace_back(file_schema);
            }
        }

        ENGINE_LOG_DEBUG << "Get table files by segment id";
        return Status::OK();
    } catch (std::exception& e) {
        return HandleException("Encounter exception when lookup table files by segment id", e.what());
    }
}

S
starlord 已提交
496
Status
G
groot 已提交
497
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
G
groot 已提交
498
    try {
Y
Yu Kun 已提交
499
        server::MetricCollector metric;
S
shengjh 已提交
500
        fiu_do_on("SqliteMetaImpl.UpdateTableFlag.throw_exception", throw std::exception());
G
groot 已提交
501

502 503
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableSchema::flag_) = flag), where(c(&TableSchema::table_id_) == table_id));
G
groot 已提交
504
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
G
groot 已提交
505
    } catch (std::exception& e) {
G
groot 已提交
506 507
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e.what());
G
groot 已提交
508 509 510 511 512
    }

    return Status::OK();
}

513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
Status
SqliteMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_lsn) {
    try {
        server::MetricCollector metric;

        ConnectorPtr->update_all(set(c(&TableSchema::flush_lsn_) = flush_lsn),
                                 where(c(&TableSchema::table_id_) == table_id));
        ENGINE_LOG_DEBUG << "Successfully update table flush_lsn, table id = " << table_id;
    } catch (std::exception& e) {
        std::string msg = "Encounter exception when update table lsn: table_id = " + table_id;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFlushLSN(const std::string& table_id, uint64_t& flush_lsn) {
    try {
        server::MetricCollector metric;

        auto selected =
            ConnectorPtr->select(columns(&TableSchema::flush_lsn_), where(c(&TableSchema::table_id_) == table_id));

        if (selected.size() > 0) {
            flush_lsn = std::get<0>(selected[0]);
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
        }

    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, TableFilesSchema& table_files) {
    table_files.clear();

    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::flush_lsn_) == flush_lsn));

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
        for (auto& file : selected) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
596
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
597 598 599 600 601 602 603 604 605 606 607 608 609
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            table_files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " files with flush_lsn = " << flush_lsn;
        }
        return ret;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when getting table files by flush_lsn", e.what());
    }
}

S
starlord 已提交
610
Status
G
groot 已提交
611
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
612
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
G
groot 已提交
613
    try {
Y
Yu Kun 已提交
614
        server::MetricCollector metric;
S
shengjh 已提交
615
        fiu_do_on("SqliteMetaImpl.UpdateTableFile.throw_exception", throw std::exception());
G
groot 已提交
616

Y
youny626 已提交
617
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
618 619
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
620 621
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));
G
groot 已提交
622

623 624
        // if the table has been deleted, just mark the table file as TO_DELETE
        // clean thread will delete the file later
G
groot 已提交
625
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
626 627
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }
G
groot 已提交
628

G
groot 已提交
629 630 631
        ConnectorPtr->update(file_schema);

        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
G
groot 已提交
632
    } catch (std::exception& e) {
633 634
        std::string msg =
            "Exception update table file: table_id = " + file_schema.table_id_ + " file_id = " + file_schema.file_id_;
G
groot 已提交
635 636
        return HandleException(msg, e.what());
    }
G
groot 已提交
637 638 639
    return Status::OK();
}

S
starlord 已提交
640
Status
G
groot 已提交
641
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
642
    try {
Y
Yu Kun 已提交
643
        server::MetricCollector metric;
S
shengjh 已提交
644
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.throw_exception", throw std::exception());
G
groot 已提交
645

646
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
647
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);
G
groot 已提交
648

G
groot 已提交
649
        std::map<std::string, bool> has_tables;
G
groot 已提交
650
        for (auto& file : files) {
G
groot 已提交
651 652 653 654
            if (has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
655 656
                                               where(c(&TableSchema::table_id_) == file.table_id_ and
                                                     c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
657 658 659 660 661
            if (tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
662
        }
P
peng.xu 已提交
663

G
groot 已提交
664
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
665
            for (auto& file : files) {
G
groot 已提交
666 667
                if (!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
668
                }
G
groot 已提交
669 670 671

                file.updated_time_ = utils::GetMicroSecTimeStamp();
                ConnectorPtr->update(file);
672
            }
G
groot 已提交
673 674
            return true;
        });
S
shengjh 已提交
675
        fiu_do_on("SqliteMetaImpl.UpdateTableFiles.fail_commited", commited = false);
676

G
groot 已提交
677 678
        if (!commited) {
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
P
peng.xu 已提交
679
        }
G
groot 已提交
680 681

        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
G
groot 已提交
682
    } catch (std::exception& e) {
G
groot 已提交
683
        return HandleException("Encounter exception when update table files", e.what());
P
peng.xu 已提交
684 685 686 687
    }
    return Status::OK();
}

688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
Status
SqliteMetaImpl::UpdateTableFilesRowCount(TableFilesSchema& files) {
    try {
        server::MetricCollector metric;

        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        for (auto& file : files) {
            ConnectorPtr->update_all(set(c(&TableFileSchema::row_count_) = file.row_count_,
                                         c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                     where(c(&TableFileSchema::file_id_) == file.file_id_));
            ENGINE_LOG_DEBUG << "Update file " << file.file_id_ << " row count to " << file.row_count_;
        }
    } catch (std::exception& e) {
        return HandleException("Encounter exception when update table files row count", e.what());
    }
    return Status::OK();
}

S
starlord 已提交
708
Status
Y
youny626 已提交
709
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
710
    try {
Y
Yu Kun 已提交
711
        server::MetricCollector metric;
S
shengjh 已提交
712
        fiu_do_on("SqliteMetaImpl.UpdateTableIndex.throw_exception", throw std::exception());
713

Y
youny626 已提交
714
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
715 716
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

717 718 719 720 721
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::owner_table_,
                    &TableSchema::partition_tag_, &TableSchema::version_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
722

S
starlord 已提交
723
        if (tables.size() > 0) {
724 725 726 727 728 729
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
730
            table_schema.flag_ = std::get<4>(tables[0]);
731
            table_schema.index_file_size_ = std::get<5>(tables[0]);
G
groot 已提交
732 733 734
            table_schema.owner_table_ = std::get<6>(tables[0]);
            table_schema.partition_tag_ = std::get<7>(tables[0]);
            table_schema.version_ = std::get<8>(tables[0]);
735
            table_schema.engine_type_ = index.engine_type_;
736
            table_schema.index_params_ = index.extra_params_.dump();
S
starlord 已提交
737
            table_schema.metric_type_ = index.metric_type_;
738 739 740

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
741
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
742 743
        }

744 745 746 747 748
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
749

750
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
751
    } catch (std::exception& e) {
752
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
753
        return HandleException(msg, e.what());
754
    }
S
starlord 已提交
755 756 757 758

    return Status::OK();
}

S
starlord 已提交
759
Status
G
groot 已提交
760
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
S
starlord 已提交
761
    try {
Y
Yu Kun 已提交
762
        server::MetricCollector metric;
S
shengjh 已提交
763
        fiu_do_on("SqliteMetaImpl.UpdateTableFilesToIndex.throw_exception", throw std::exception());
S
starlord 已提交
764

765
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
766 767
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

768 769 770 771
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
G
groot 已提交
772 773

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
G
groot 已提交
774
    } catch (std::exception& e) {
G
groot 已提交
775
        return HandleException("Encounter exception when update table files to to_index", e.what());
S
starlord 已提交
776 777
    }

778 779 780
    return Status::OK();
}

S
starlord 已提交
781
Status
Y
youny626 已提交
782
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
783
    try {
Y
Yu Kun 已提交
784
        server::MetricCollector metric;
S
shengjh 已提交
785
        fiu_do_on("SqliteMetaImpl.DescribeTableIndex.throw_exception", throw std::exception());
786

787
        auto groups = ConnectorPtr->select(
788
            columns(&TableSchema::engine_type_, &TableSchema::index_params_, &TableSchema::metric_type_),
789
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
790 791 792

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
793
            index.extra_params_ = milvus::json::parse(std::get<1>(groups[0]));
S
starlord 已提交
794
            index.metric_type_ = std::get<2>(groups[0]);
795
        } else {
S
starlord 已提交
796
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
797
        }
Y
youny626 已提交
798
    } catch (std::exception& e) {
S
starlord 已提交
799
        return HandleException("Encounter exception when describe index", e.what());
800 801 802 803 804
    }

    return Status::OK();
}

S
starlord 已提交
805
Status
Y
youny626 已提交
806
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
807
    try {
Y
Yu Kun 已提交
808
        server::MetricCollector metric;
S
shengjh 已提交
809
        fiu_do_on("SqliteMetaImpl.DropTableIndex.throw_exception", throw std::exception());
810

Y
youny626 已提交
811
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
812 813
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

814 815 816 817 818 819 820 821 822 823 824 825 826
        // soft delete index files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));

        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));

        // set table index type to raw
827 828 829 830 831 832 833 834 835 836
        auto groups = ConnectorPtr->select(columns(&TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id));

        int32_t raw_engine_type = DEFAULT_ENGINE_TYPE;
        if (groups.size() == 1) {
            int32_t metric_type_ = std::get<0>(groups[0]);
            if (engine::utils::IsBinaryMetricType(metric_type_)) {
                raw_engine_type = (int32_t)EngineType::FAISS_BIN_IDMAP;
            }
        }
G
groot 已提交
837
        ConnectorPtr->update_all(
838
            set(c(&TableSchema::engine_type_) = raw_engine_type, c(&TableSchema::index_params_) = "{}"),
839
            where(c(&TableSchema::table_id_) == table_id));
840

841
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
G
groot 已提交
842
    } catch (std::exception& e) {
S
starlord 已提交
843
        return HandleException("Encounter exception when delete table index files", e.what());
844 845 846 847 848
    }

    return Status::OK();
}

S
starlord 已提交
849
Status
850 851
SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag,
                                uint64_t lsn) {
G
groot 已提交
852
    server::MetricCollector metric;
853

G
groot 已提交
854 855 856 857 858
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
G
groot 已提交
859
    }
G
groot 已提交
860

G
groot 已提交
861
    // not allow create partition under partition
G
groot 已提交
862
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
863
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
864
    }
G
groot 已提交
865

866 867 868 869 870 871 872 873
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
G
groot 已提交
874
    if (!exist_partition.empty()) {
G
groot 已提交
875
        return Status(DB_ERROR, "Duplicate partition is not allowed");
876
    }
G
groot 已提交
877

878 879
    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
880 881 882
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
X
Xu Peng 已提交
883
    }
G
groot 已提交
884

G
groot 已提交
885 886 887 888
    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
889
    table_schema.partition_tag_ = valid_tag;
890
    table_schema.flush_lsn_ = lsn;
891 892 893 894 895

    status = CreateTable(table_schema);
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }
G
groot 已提交
896

897
    return status;
X
Xu Peng 已提交
898 899
}

S
starlord 已提交
900
Status
G
groot 已提交
901 902 903
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}
904

G
groot 已提交
905
Status
G
groot 已提交
906
SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
G
groot 已提交
907
    try {
Y
Yu Kun 已提交
908
        server::MetricCollector metric;
S
shengjh 已提交
909
        fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception());
G
groot 已提交
910

911 912 913 914 915 916 917 918
        auto partitions = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::engine_type_,
                    &TableSchema::index_params_, &TableSchema::metric_type_, &TableSchema::partition_tag_,
                    &TableSchema::version_, &TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

919
        for (size_t i = 0; i < partitions.size(); i++) {
G
groot 已提交
920
            meta::TableSchema partition_schema;
921 922 923 924 925 926 927
            partition_schema.id_ = std::get<0>(partitions[i]);
            partition_schema.state_ = std::get<1>(partitions[i]);
            partition_schema.dimension_ = std::get<2>(partitions[i]);
            partition_schema.created_on_ = std::get<3>(partitions[i]);
            partition_schema.flag_ = std::get<4>(partitions[i]);
            partition_schema.index_file_size_ = std::get<5>(partitions[i]);
            partition_schema.engine_type_ = std::get<6>(partitions[i]);
928
            partition_schema.index_params_ = std::get<7>(partitions[i]);
929 930 931 932 933
            partition_schema.metric_type_ = std::get<8>(partitions[i]);
            partition_schema.owner_table_ = table_id;
            partition_schema.partition_tag_ = std::get<9>(partitions[i]);
            partition_schema.version_ = std::get<10>(partitions[i]);
            partition_schema.table_id_ = std::get<11>(partitions[i]);
G
groot 已提交
934
            partition_schema_array.emplace_back(partition_schema);
G
groot 已提交
935
        }
G
groot 已提交
936
    } catch (std::exception& e) {
G
groot 已提交
937
        return HandleException("Encounter exception when show partitions", e.what());
938 939
    }

X
Xu Peng 已提交
940
    return Status::OK();
X
Xu Peng 已提交
941 942
}

S
starlord 已提交
943
Status
G
groot 已提交
944
SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
945
    try {
Y
Yu Kun 已提交
946
        server::MetricCollector metric;
S
shengjh 已提交
947
        fiu_do_on("SqliteMetaImpl.GetPartitionName.throw_exception", throw std::exception());
G
groot 已提交
948

949 950 951 952 953
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

954 955 956 957
        auto name = ConnectorPtr->select(
            columns(&TableSchema::table_id_),
            where(c(&TableSchema::owner_table_) == table_id and c(&TableSchema::partition_tag_) == valid_tag and
                  c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
958 959 960
        if (name.size() > 0) {
            partition_name = std::get<0>(name[0]);
        } else {
961
            return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
962
        }
G
groot 已提交
963
    } catch (std::exception& e) {
G
groot 已提交
964
        return HandleException("Encounter exception when get partition name", e.what());
X
Xu Peng 已提交
965
    }
G
groot 已提交
966 967

    return Status::OK();
X
Xu Peng 已提交
968 969
}

S
starlord 已提交
970
Status
971
SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
X
xj.lin 已提交
972
    files.clear();
Y
Yu Kun 已提交
973
    server::MetricCollector metric;
X
xj.lin 已提交
974 975

    try {
S
shengjh 已提交
976 977
        fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());

Y
youny626 已提交
978
        auto select_columns =
979 980 981
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
982 983

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
984

Y
youny626 已提交
985 986
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
987
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
988 989 990 991

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
992 993 994
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
995

Y
youny626 已提交
996
        // perform query
997
        decltype(ConnectorPtr->select(select_columns)) selected;
998
        if (ids.empty()) {
X
xj.lin 已提交
999
            auto filter = where(match_tableid and match_type);
1000
            selected = ConnectorPtr->select(select_columns, filter);
1001
        } else {
X
xj.lin 已提交
1002
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
1003
            auto filter = where(match_tableid and match_fileid and match_type);
1004
            selected = ConnectorPtr->select(select_columns, filter);
X
xj.lin 已提交
1005 1006
        }

S
starlord 已提交
1007
        Status ret;
Y
youny626 已提交
1008
        for (auto& file : selected) {
1009
            TableFileSchema table_file;
X
xj.lin 已提交
1010 1011
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1012 1013 1014 1015 1016 1017 1018
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
X
xj.lin 已提交
1019
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1020
            table_file.index_file_size_ = table_schema.index_file_size_;
1021
            table_file.index_params_ = table_schema.index_params_;
S
starlord 已提交
1022 1023
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1024
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1025
            if (!status.ok()) {
S
starlord 已提交
1026
                ret = status;
S
starlord 已提交
1027 1028
            }

1029
            files.emplace_back(table_file);
X
xj.lin 已提交
1030
        }
S
starlord 已提交
1031
        if (files.empty()) {
S
starlord 已提交
1032
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
1033
        }
S
starlord 已提交
1034

S
starlord 已提交
1035
        if (selected.size() > 0) {
1036 1037
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
1038
        return ret;
Y
youny626 已提交
1039
    } catch (std::exception& e) {
S
starlord 已提交
1040
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
1041 1042 1043
    }
}

S
starlord 已提交
1044
Status
1045
SqliteMetaImpl::FilesToMerge(const std::string& table_id, TableFilesSchema& files) {
X
Xu Peng 已提交
1046
    files.clear();
X
Xu Peng 已提交
1047

1048
    try {
S
shengjh 已提交
1049 1050
        fiu_do_on("SqliteMetaImpl.FilesToMerge.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1051
        server::MetricCollector metric;
G
groot 已提交
1052

Y
youny626 已提交
1053
        // check table existence
S
starlord 已提交
1054 1055 1056 1057 1058 1059 1060
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
1061 1062
        // get files to merge
        auto selected = ConnectorPtr->select(
1063 1064 1065
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::created_on_),
Y
youny626 已提交
1066 1067 1068
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
1069

S
starlord 已提交
1070
        Status result;
1071
        int64_t to_merge_files = 0;
Y
youny626 已提交
1072
        for (auto& file : selected) {
S
starlord 已提交
1073
            TableFileSchema table_file;
1074
            table_file.file_size_ = std::get<5>(file);
S
starlord 已提交
1075
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
1076
                continue;  // skip large file
S
starlord 已提交
1077 1078
            }

G
groot 已提交
1079 1080
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1081 1082 1083 1084 1085 1086
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
1087
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1088
            table_file.index_file_size_ = table_schema.index_file_size_;
1089
            table_file.index_params_ = table_schema.index_params_;
S
starlord 已提交
1090 1091
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1092
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1093
            if (!status.ok()) {
S
starlord 已提交
1094
                result = status;
S
starlord 已提交
1095 1096
            }

1097
            files.emplace_back(table_file);
1098
            ++to_merge_files;
X
Xu Peng 已提交
1099
        }
S
starlord 已提交
1100

1101 1102
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1103
        }
G
groot 已提交
1104 1105 1106 1107 1108 1109 1110
        return result;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when iterate merge files", e.what());
    }
}

Status
G
groot 已提交
1111
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
G
groot 已提交
1112 1113 1114
    files.clear();

    try {
S
shengjh 已提交
1115 1116
        fiu_do_on("SqliteMetaImpl.FilesToIndex.throw_exception", throw std::exception());

G
groot 已提交
1117 1118
        server::MetricCollector metric;

1119 1120 1121 1122 1123 1124
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
                    &TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_,
                    &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_INDEX));
G
groot 已提交
1125 1126 1127 1128 1129

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
G
groot 已提交
1130
        for (auto& file : selected) {
G
groot 已提交
1131 1132
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
1133 1134 1135 1136 1137 1138 1139 1140
            table_file.segment_id_ = std::get<2>(file);
            table_file.file_id_ = std::get<3>(file);
            table_file.file_type_ = std::get<4>(file);
            table_file.file_size_ = std::get<5>(file);
            table_file.row_count_ = std::get<6>(file);
            table_file.date_ = std::get<7>(file);
            table_file.engine_type_ = std::get<8>(file);
            table_file.created_on_ = std::get<9>(file);
G
groot 已提交
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
S
shengjh 已提交
1151 1152
                fiu_do_on("SqliteMetaImpl_FilesToIndex_TableNotFound",
                          status = Status(DB_NOT_FOUND, "table not found"));
G
groot 已提交
1153 1154 1155 1156 1157 1158 1159
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
1160
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
G
groot 已提交
1161 1162 1163 1164 1165 1166 1167 1168
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
        return ret;
G
groot 已提交
1169
    } catch (std::exception& e) {
G
groot 已提交
1170
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
1171
    }
X
Xu Peng 已提交
1172 1173
}

S
starlord 已提交
1174
Status
1175
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
G
groot 已提交
1176
                            TableFilesSchema& table_files) {
G
groot 已提交
1177 1178 1179
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
    }
1180

1181 1182 1183 1184 1185 1186 1187 1188 1189
    Status ret = Status::OK();

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1190
    try {
S
shengjh 已提交
1191 1192
        fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());

G
groot 已提交
1193
        table_files.clear();
1194 1195 1196 1197 1198
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1199

G
groot 已提交
1200 1201 1202
        if (selected.size() >= 1) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
G
groot 已提交
1203 1204 1205 1206
            for (auto& file : selected) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_id;
                file_schema.id_ = std::get<0>(file);
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
                file_schema.segment_id_ = std::get<1>(file);
                file_schema.file_id_ = std::get<2>(file);
                file_schema.file_type_ = std::get<3>(file);
                file_schema.file_size_ = std::get<4>(file);
                file_schema.row_count_ = std::get<5>(file);
                file_schema.date_ = std::get<6>(file);
                file_schema.engine_type_ = std::get<7>(file);
                file_schema.created_on_ = std::get<8>(file);

                file_schema.dimension_ = table_schema.dimension_;
                file_schema.index_file_size_ = table_schema.index_file_size_;
1218
                file_schema.index_params_ = table_schema.index_params_;
1219
                file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
1220 1221

                switch (file_schema.file_type_) {
1222
                    case (int)TableFileSchema::RAW:++raw_count;
G
groot 已提交
1223
                        break;
1224
                    case (int)TableFileSchema::NEW:++new_count;
G
groot 已提交
1225
                        break;
1226
                    case (int)TableFileSchema::NEW_MERGE:++new_merge_count;
G
groot 已提交
1227
                        break;
1228
                    case (int)TableFileSchema::NEW_INDEX:++new_index_count;
G
groot 已提交
1229
                        break;
1230
                    case (int)TableFileSchema::TO_INDEX:++to_index_count;
G
groot 已提交
1231
                        break;
1232
                    case (int)TableFileSchema::INDEX:++index_count;
1233
                        break;
1234
                    case (int)TableFileSchema::BACKUP:++backup_count;
G
groot 已提交
1235
                        break;
1236
                    default:break;
1237 1238 1239 1240 1241
                }

                auto status = utils::GetTableFilePath(options_, file_schema);
                if (!status.ok()) {
                    ret = status;
G
groot 已提交
1242
                }
G
groot 已提交
1243 1244

                table_files.emplace_back(file_schema);
G
groot 已提交
1245
            }
1246

G
groot 已提交
1247
            std::string msg = "Get table files by type.";
G
groot 已提交
1248 1249
            for (int file_type : file_types) {
                switch (file_type) {
1250
                    case (int)TableFileSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
G
groot 已提交
1251
                        break;
1252
                    case (int)TableFileSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
G
groot 已提交
1253
                        break;
1254 1255 1256
                    case (int)TableFileSchema::NEW_MERGE:
                        msg = msg + " new_merge files:"
                              + std::to_string(new_merge_count);
G
groot 已提交
1257
                        break;
1258 1259 1260
                    case (int)TableFileSchema::NEW_INDEX:
                        msg = msg + " new_index files:"
                              + std::to_string(new_index_count);
G
groot 已提交
1261
                        break;
1262
                    case (int)TableFileSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count);
G
groot 已提交
1263
                        break;
1264
                    case (int)TableFileSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
G
groot 已提交
1265
                        break;
1266
                    case (int)TableFileSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
1267
                        break;
1268
                    default:break;
G
groot 已提交
1269 1270 1271
                }
            }
            ENGINE_LOG_DEBUG << msg;
X
Xu Peng 已提交
1272
        }
G
groot 已提交
1273
    } catch (std::exception& e) {
G
groot 已提交
1274
        return HandleException("Encounter exception when check non index files", e.what());
X
Xu Peng 已提交
1275
    }
1276 1277

    return ret;
X
Xu Peng 已提交
1278 1279
}

S
starlord 已提交
1280
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1281 1282
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
1283
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
1284 1285 1286 1287 1288
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
1289 1290
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
1291
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
L
Lizhou Gao 已提交
1292
            int64_t usecs = limit * DAY * US_PS;
S
starlord 已提交
1293
            int64_t now = utils::GetMicroSecTimeStamp();
1294
            try {
S
shengjh 已提交
1295 1296
                fiu_do_on("SqliteMetaImpl.Archive.throw_exception", throw std::exception());

Y
youny626 已提交
1297
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1298 1299
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1300 1301 1302
                ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
                                         where(c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                                               c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
1303
            } catch (std::exception& e) {
S
starlord 已提交
1304
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1305
            }
1306 1307

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
1308
        }
G
groot 已提交
1309
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
1310
            uint64_t sum = 0;
X
Xu Peng 已提交
1311
            Size(sum);
X
Xu Peng 已提交
1312

Y
youny626 已提交
1313
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
1314
            DiscardFiles(to_delete);
1315 1316

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
1317 1318 1319 1320 1321 1322
        }
    }

    return Status::OK();
}

S
starlord 已提交
1323
Status
Y
youny626 已提交
1324
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
1325
    result = 0;
X
Xu Peng 已提交
1326
    try {
S
shengjh 已提交
1327 1328
        fiu_do_on("SqliteMetaImpl.Size.throw_exception", throw std::exception());

1329
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
1330 1331
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
1332 1333
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
1334
            }
Y
youny626 已提交
1335
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
1336
        }
Y
youny626 已提交
1337
    } catch (std::exception& e) {
S
starlord 已提交
1338
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
1339 1340 1341 1342 1343
    }

    return Status::OK();
}

S
starlord 已提交
1344
Status
1345
SqliteMetaImpl::CleanUpShadowFiles() {
X
Xu Peng 已提交
1346
    try {
Y
Yu Kun 已提交
1347
        server::MetricCollector metric;
G
groot 已提交
1348

Y
youny626 已提交
1349
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1350 1351
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1352 1353
        std::vector<int> file_types = {(int)TableFileSchema::NEW, (int)TableFileSchema::NEW_INDEX,
                                       (int)TableFileSchema::NEW_MERGE};
G
groot 已提交
1354 1355
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1356

G
groot 已提交
1357
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
1358
            for (auto& file : files) {
G
groot 已提交
1359 1360
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
G
groot 已提交
1361 1362 1363 1364
            }
            return true;
        });

S
shengjh 已提交
1365 1366
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.fail_commited", commited = false);
        fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.throw_exception", throw std::exception());
G
groot 已提交
1367
        if (!commited) {
G
groot 已提交
1368
            return HandleException("CleanUp error: sqlite transaction failed");
G
groot 已提交
1369
        }
X
Xu Peng 已提交
1370

G
groot 已提交
1371 1372
        if (files.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
G
groot 已提交
1373
        }
G
groot 已提交
1374
    } catch (std::exception& e) {
G
groot 已提交
1375
        return HandleException("Encounter exception when clean table file", e.what());
P
peng.xu 已提交
1376 1377 1378 1379 1380
    }

    return Status::OK();
}

1381
Status
1382
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) {
X
Xu Peng 已提交
1383
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1384
    std::set<std::string> table_ids;
1385
    std::map<std::string, TableFileSchema> segment_ids;
S
starlord 已提交
1386

Y
youny626 已提交
1387
    // remove to_delete files
1388
    try {
S
shengjh 已提交
1389 1390
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_ThrowException", throw std::exception());

Y
Yu Kun 已提交
1391
        server::MetricCollector metric;
1392

G
groot 已提交
1393 1394 1395 1396 1397
        std::vector<int> file_types = {
            (int)TableFileSchema::TO_DELETE,
            (int)TableFileSchema::BACKUP,
        };

Y
youny626 已提交
1398
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1399 1400
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1401
        // collect files to be deleted
1402 1403 1404 1405 1406 1407
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
                    &TableFileSchema::engine_type_, &TableFileSchema::file_id_, &TableFileSchema::file_type_,
                    &TableFileSchema::date_),
            where(in(&TableFileSchema::file_type_, file_types) and
                  c(&TableFileSchema::updated_time_) < now - seconds * US_PS));
1408

G
groot 已提交
1409
        int64_t clean_files = 0;
G
groot 已提交
1410 1411
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1412
            for (auto& file : files) {
G
groot 已提交
1413 1414
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
1415 1416 1417 1418 1419
                table_file.segment_id_ = std::get<2>(file);
                table_file.engine_type_ = std::get<3>(file);
                table_file.file_id_ = std::get<4>(file);
                table_file.file_type_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
G
groot 已提交
1420

1421
                // check if the file can be deleted
1422
                if (OngoingFileChecker::GetInstance().IsIgnored(table_file)) {
G
groot 已提交
1423 1424
                    ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
                                     << " currently is in use, not able to delete now";
1425
                    continue;  // ignore this file, don't delete it
1426 1427
                }

G
groot 已提交
1428 1429
                // erase from cache, must do this before file deleted,
                // because GetTableFilePath won't able to generate file path after the file is deleted
1430
                // TODO(zhiru): clean up
G
groot 已提交
1431 1432
                utils::GetTableFilePath(options_, table_file);
                server::CommonUtil::EraseFromCache(table_file.location_);
G
groot 已提交
1433

G
groot 已提交
1434 1435 1436
                if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
                    // delete file from meta
                    ConnectorPtr->remove<TableFileSchema>(table_file.id_);
G
groot 已提交
1437

1438 1439 1440 1441 1442
                    // delete file from disk storage
                    utils::DeleteTableFilePath(options_, table_file);

                    ENGINE_LOG_DEBUG << "Remove file id:" << table_file.file_id_ << " location:"
                                     << table_file.location_;
G
groot 已提交
1443
                    table_ids.insert(table_file.table_id_);
1444
                    segment_ids.insert(std::make_pair(table_file.segment_id_, table_file));
G
groot 已提交
1445

1446
                    ++clean_files;
G
typo  
groot 已提交
1447
                }
1448
            }
G
groot 已提交
1449 1450
            return true;
        });
S
shengjh 已提交
1451
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_FailCommited", commited = false);
G
groot 已提交
1452 1453

        if (!commited) {
S
starlord 已提交
1454
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1455 1456
        }

G
groot 已提交
1457
        if (clean_files > 0) {
G
groot 已提交
1458
            ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
1459
        }
Y
youny626 已提交
1460
    } catch (std::exception& e) {
S
starlord 已提交
1461
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1462 1463
    }

Y
youny626 已提交
1464
    // remove to_delete tables
G
groot 已提交
1465
    try {
S
shengjh 已提交
1466
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1467
        server::MetricCollector metric;
G
groot 已提交
1468

Y
youny626 已提交
1469
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1470 1471
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1472 1473
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1474 1475

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1476 1477
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1478
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1479
            }
G
groot 已提交
1480 1481 1482

            return true;
        });
S
shengjh 已提交
1483
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTable_Failcommited", commited = false);
G
groot 已提交
1484 1485

        if (!commited) {
S
starlord 已提交
1486
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1487
        }
G
groot 已提交
1488

S
starlord 已提交
1489
        if (tables.size() > 0) {
1490 1491
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1492
    } catch (std::exception& e) {
S
starlord 已提交
1493
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1494 1495
    }

Y
youny626 已提交
1496 1497
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1498
    try {
S
shengjh 已提交
1499
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveTableFolder_ThrowException", throw std::exception());
Y
Yu Kun 已提交
1500
        server::MetricCollector metric;
S
starlord 已提交
1501

1502
        int64_t remove_tables = 0;
Y
youny626 已提交
1503
        for (auto& table_id : table_ids) {
S
starlord 已提交
1504 1505
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1506
            if (selected.size() == 0) {
S
starlord 已提交
1507
                utils::DeleteTablePath(options_, table_id);
1508
                ++remove_tables;
S
starlord 已提交
1509 1510 1511
            }
        }

1512 1513
        if (remove_tables) {
            ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables folder";
1514
        }
Y
youny626 已提交
1515
    } catch (std::exception& e) {
S
starlord 已提交
1516
        return HandleException("Encounter exception when delete table folder", e.what());
1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542
    }

    // remove deleted segment folder
    // don't remove segment folder until all its tablefiles has been deleted
    try {
        fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveSegmentFolder_ThrowException", throw std::exception());
        server::MetricCollector metric;

        int64_t remove_segments = 0;
        for (auto& segment_id : segment_ids) {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_),
                                                 where(c(&TableFileSchema::segment_id_) == segment_id.first));
            if (selected.size() == 0) {
                utils::DeleteSegment(options_, segment_id.second);
                std::string segment_dir;
                utils::GetParentPath(segment_id.second.location_, segment_dir);
                ENGINE_LOG_DEBUG << "Remove segment directory: " << segment_dir;
                ++remove_segments;
            }
        }

        if (remove_segments > 0) {
            ENGINE_LOG_DEBUG << "Remove " << remove_segments << " segments folder";
        }
    } catch (std::exception& e) {
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1543 1544
    }

X
Xu Peng 已提交
1545 1546 1547
    return Status::OK();
}

S
starlord 已提交
1548
Status
G
groot 已提交
1549
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1550
    try {
S
shengjh 已提交
1551 1552
        fiu_do_on("SqliteMetaImpl.Count.throw_exception", throw std::exception());

Y
Yu Kun 已提交
1553
        server::MetricCollector metric;
1554

Y
youny626 已提交
1555 1556 1557 1558 1559
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1560

1561
        TableSchema table_schema;
G
groot 已提交
1562
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1563
        auto status = DescribeTable(table_schema);
1564

1565 1566 1567 1568 1569
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1570
        for (auto& file : selected) {
1571 1572
            result += std::get<0>(file);
        }
Y
youny626 已提交
1573
    } catch (std::exception& e) {
S
starlord 已提交
1574
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1575 1576 1577 1578
    }
    return Status::OK();
}

S
starlord 已提交
1579 1580
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1581 1582 1583
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1584 1585
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1586
    } catch (std::exception& e) {
S
starlord 已提交
1587
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1588
    }
S
starlord 已提交
1589

X
Xu Peng 已提交
1590 1591 1592
    return Status::OK();
}

G
groot 已提交
1593 1594 1595 1596 1597 1598 1599 1600 1601
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }

    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
S
shengjh 已提交
1602 1603
        fiu_do_on("SqliteMetaImpl.DiscardFiles.throw_exception", throw std::exception());

G
groot 已提交
1604 1605
        server::MetricCollector metric;

1606
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
G
groot 已提交
1607 1608 1609
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto commited = ConnectorPtr->transaction([&]() mutable {
1610 1611 1612 1613
            auto selected =
                ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::file_size_),
                                     where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE),
                                     order_by(&TableFileSchema::id_), limit(10));
G
groot 已提交
1614 1615 1616 1617

            std::vector<int> ids;
            TableFileSchema table_file;

G
groot 已提交
1618
            for (auto& file : selected) {
1619 1620
                if (to_discard_size <= 0)
                    break;
G
groot 已提交
1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
                table_file.id_ = std::get<0>(file);
                table_file.file_size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            if (ids.size() == 0) {
                return true;
            }

1633 1634 1635
            ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                         c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                     where(in(&TableFileSchema::id_, ids)));
G
groot 已提交
1636 1637 1638

            return true;
        });
S
shengjh 已提交
1639
        fiu_do_on("SqliteMetaImpl.DiscardFiles.fail_commited", commited = false);
G
groot 已提交
1640 1641 1642
        if (!commited) {
            return HandleException("DiscardFiles error: sqlite transaction failed");
        }
G
groot 已提交
1643
    } catch (std::exception& e) {
G
groot 已提交
1644 1645 1646 1647 1648 1649
        return HandleException("Encounter exception when discard table file", e.what());
    }

    return DiscardFiles(to_discard_size);
}

1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694
Status
SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            EnvironmentSchema env;
            env.global_lsn_ = lsn;
            ConnectorPtr->insert(env);
        } else {
            uint64_t last_lsn = std::get<0>(selected[0]);
            if (lsn == last_lsn) {
                return Status::OK();
            }

            ConnectorPtr->update_all(set(c(&EnvironmentSchema::global_lsn_) = lsn));
        }

        ENGINE_LOG_DEBUG << "Update global lsn = " << lsn;
    } catch (std::exception& e) {
        std::string msg = "Exception update global lsn = " + lsn;
        return HandleException(msg, e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
        if (selected.size() == 0) {
            lsn = 0;
        } else {
            lsn = std::get<0>(selected[0]);
        }
    } catch (std::exception& e) {
        return HandleException("Encounter exception when delete table folder", e.what());
    }

    return Status::OK();
}
G
groot 已提交
1695

1696 1697 1698
}  // namespace meta
}  // namespace engine
}  // namespace milvus