MySQLMetaImpl.cpp 100.0 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/meta/MySQLMetaImpl.h"
Z
update  
zhiru 已提交
13

S
shengjh 已提交
14
#include <fiu-local.h>
S
starlord 已提交
15 16
#include <mysql++/mysql++.h>
#include <string.h>
Z
update  
zhiru 已提交
17
#include <unistd.h>
18

S
starlord 已提交
19
#include <boost/filesystem.hpp>
Z
update  
zhiru 已提交
20 21
#include <chrono>
#include <fstream>
S
starlord 已提交
22 23 24
#include <iostream>
#include <map>
#include <mutex>
Z
update  
zhiru 已提交
25
#include <regex>
S
starlord 已提交
26 27
#include <set>
#include <sstream>
Z
update  
zhiru 已提交
28
#include <string>
Z
zhiru 已提交
29
#include <thread>
Z
update  
zhiru 已提交
30

31 32 33 34 35 36 37 38 39 40
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"

Z
update  
zhiru 已提交
41 42 43 44
namespace milvus {
namespace engine {
namespace meta {

45
namespace {
Z
update  
zhiru 已提交
46

S
starlord 已提交
47
Status
S
starlord 已提交
48
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
49
    if (what == nullptr) {
S
starlord 已提交
50 51 52
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    }
S
starlord 已提交
53 54 55 56

    std::string msg = desc + ":" + what;
    ENGINE_LOG_ERROR << msg;
    return Status(DB_META_TRANSACTION_FAILED, msg);
57
}
Z
update  
zhiru 已提交
58

59
class MetaField {
S
starlord 已提交
60
 public:
S
starlord 已提交
61 62
    MetaField(const std::string& name, const std::string& type, const std::string& setting)
        : name_(name), type_(type), setting_(setting) {
63 64
    }

S
starlord 已提交
65 66
    std::string
    name() const {
67 68 69
        return name_;
    }

S
starlord 已提交
70 71
    std::string
    ToString() const {
72 73 74 75 76
        return name_ + " " + type_ + " " + setting_;
    }

    // mysql field type has additional information. for instance, a filed type is defined as 'BIGINT'
    // we get the type from sql is 'bigint(20)', so we need to ignore the '(20)'
S
starlord 已提交
77 78
    bool
    IsEqual(const MetaField& field) const {
79 80 81
        size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length();
        size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length();
        return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 &&
S
starlord 已提交
82
               strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0;
83 84
    }

S
starlord 已提交
85
 private:
86 87 88 89 90 91 92
    std::string name_;
    std::string type_;
    std::string setting_;
};

using MetaFields = std::vector<MetaField>;
class MetaSchema {
S
starlord 已提交
93
 public:
S
starlord 已提交
94
    MetaSchema(const std::string& name, const MetaFields& fields) : name_(name), fields_(fields) {
95 96
    }

S
starlord 已提交
97 98
    std::string
    name() const {
99 100 101
        return name_;
    }

S
starlord 已提交
102 103
    std::string
    ToString() const {
104
        std::string result;
S
starlord 已提交
105
        for (auto& field : fields_) {
S
starlord 已提交
106
            if (!result.empty()) {
107 108 109 110 111 112 113
                result += ",";
            }
            result += field.ToString();
        }
        return result;
    }

S
starlord 已提交
114 115 116 117
    // if the outer fields contains all this MetaSchema fields, return true
    // otherwise return false
    bool
    IsEqual(const MetaFields& fields) const {
118
        std::vector<std::string> found_field;
S
starlord 已提交
119 120
        for (const auto& this_field : fields_) {
            for (const auto& outer_field : fields) {
S
starlord 已提交
121
                if (this_field.IsEqual(outer_field)) {
122 123 124 125 126 127 128 129 130
                    found_field.push_back(this_field.name());
                    break;
                }
            }
        }

        return found_field.size() == fields_.size();
    }

S
starlord 已提交
131
 private:
132 133 134 135
    std::string name_;
    MetaFields fields_;
};

S
starlord 已提交
136
// Tables schema
137
static const MetaSchema TABLES_SCHEMA(META_TABLES, {
S
starlord 已提交
138 139 140 141 142 143 144 145 146 147
                                                       MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
                                                       MetaField("table_id", "VARCHAR(255)", "UNIQUE NOT NULL"),
                                                       MetaField("state", "INT", "NOT NULL"),
                                                       MetaField("dimension", "SMALLINT", "NOT NULL"),
                                                       MetaField("created_on", "BIGINT", "NOT NULL"),
                                                       MetaField("flag", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                       MetaField("index_file_size", "BIGINT", "DEFAULT 1024 NOT NULL"),
                                                       MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
                                                       MetaField("nlist", "INT", "DEFAULT 16384 NOT NULL"),
                                                       MetaField("metric_type", "INT", "DEFAULT 1 NOT NULL"),
G
groot 已提交
148 149 150 151
                                                       MetaField("owner_table", "VARCHAR(255)", "NOT NULL"),
                                                       MetaField("partition_tag", "VARCHAR(255)", "NOT NULL"),
                                                       MetaField("version", "VARCHAR(64)",
                                                                 std::string("DEFAULT '") + CURRENT_VERSION + "'"),
152
                                                       MetaField("flush_lsn", "BIGINT", "DEFAULT 0 NOT NULL"),
S
starlord 已提交
153 154 155
                                                   });

// TableFiles schema
J
JinHai-CN 已提交
156 157 158
static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
                                                               MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
                                                               MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
159
                                                               MetaField("segment_id", "VARCHAR(255)", "NOT NULL"),
J
JinHai-CN 已提交
160 161 162 163 164 165 166 167
                                                               MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
                                                               MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
                                                               MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("updated_time", "BIGINT", "NOT NULL"),
                                                               MetaField("created_on", "BIGINT", "NOT NULL"),
                                                               MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
168
                                                               MetaField("flush_lsn", "BIGINT", "DEFAULT 0 NOT NULL"),
J
JinHai-CN 已提交
169
                                                           });
S
starlord 已提交
170 171

}  // namespace
172

173
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
174
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions& options, const int& mode) : options_(options), mode_(mode) {
175 176 177 178 179 180
    Initialize();
}

MySQLMetaImpl::~MySQLMetaImpl() {
}

S
starlord 已提交
181
Status
S
starlord 已提交
182
MySQLMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
183
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
184 185 186 187 188 189 190
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    table_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
191
Status
S
starlord 已提交
192
MySQLMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
193
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
194 195 196 197 198 199 200
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
201 202 203
void
MySQLMetaImpl::ValidateMetaSchema() {
    if (nullptr == mysql_connection_pool_) {
204 205 206
        return;
    }

S
starlord 已提交
207
    mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
208 209 210 211
    if (connectionPtr == nullptr) {
        return;
    }

S
starlord 已提交
212
    auto validate_func = [&](const MetaSchema& schema) {
S
shengjh 已提交
213
        fiu_return_on("MySQLMetaImpl.ValidateMetaSchema.fail_validate", false);
S
starlord 已提交
214
        mysqlpp::Query query_statement = connectionPtr->query();
215 216 217 218 219
        query_statement << "DESC " << schema.name() << ";";

        MetaFields exist_fields;

        try {
S
starlord 已提交
220
            mysqlpp::StoreQueryResult res = query_statement.store();
221
            for (size_t i = 0; i < res.num_rows(); ++i) {
S
starlord 已提交
222
                const mysqlpp::Row& row = res[i];
223 224 225 226 227 228
                std::string name, type;
                row["Field"].to_string(name);
                row["Type"].to_string(type);

                exist_fields.push_back(MetaField(name, type, ""));
            }
S
starlord 已提交
229
        } catch (std::exception& e) {
230 231 232
            ENGINE_LOG_DEBUG << "Meta table '" << schema.name() << "' not exist and will be created";
        }

S
starlord 已提交
233
        if (exist_fields.empty()) {
234 235 236 237 238 239
            return true;
        }

        return schema.IsEqual(exist_fields);
    };

S
starlord 已提交
240
    // verify Tables
241 242 243 244
    if (!validate_func(TABLES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }

S
starlord 已提交
245
    // verufy TableFiles
246 247 248 249 250
    if (!validate_func(TABLEFILES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
251 252
Status
MySQLMetaImpl::Initialize() {
S
starlord 已提交
253
    // step 1: create db root path
S
starlord 已提交
254 255
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
S
shengjh 已提交
256
        fiu_do_on("MySQLMetaImpl.Initialize.fail_create_directory", ret = false);
257
        if (!ret) {
S
starlord 已提交
258
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
259
            ENGINE_LOG_ERROR << msg;
S
shengjh 已提交
260
            throw Exception(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
261 262 263
        }
    }

S
starlord 已提交
264
    std::string uri = options_.backend_uri_;
265

S
starlord 已提交
266
    // step 2: parse and check meta uri
267 268
    utils::MetaUriInfo uri_info;
    auto status = utils::ParseMetaUri(uri, uri_info);
S
starlord 已提交
269
    if (!status.ok()) {
270 271 272 273 274 275 276 277 278 279 280
        std::string msg = "Wrong URI format: " + uri;
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

    if (strcasecmp(uri_info.dialect_.c_str(), "mysql") != 0) {
        std::string msg = "URI's dialect is not MySQL";
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

S
starlord 已提交
281
    // step 3: connect mysql
282 283 284 285 286 287 288
    int thread_hint = std::thread::hardware_concurrency();
    int max_pool_size = (thread_hint == 0) ? 8 : thread_hint;
    unsigned int port = 0;
    if (!uri_info.port_.empty()) {
        port = std::stoi(uri_info.port_);
    }

S
starlord 已提交
289 290
    mysql_connection_pool_ = std::make_shared<MySQLConnectionPool>(
        uri_info.db_name_, uri_info.username_, uri_info.password_, uri_info.host_, port, max_pool_size);
291 292
    ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(max_pool_size);

S
starlord 已提交
293
    // step 4: validate to avoid open old version schema
294 295
    ValidateMetaSchema();

296 297 298 299
    // step 5: clean shadow files
    if (mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        CleanUpShadowFiles();
    }
300

301 302
    // step 6: try connect mysql server
    mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
303

304
    if (connectionPtr == nullptr) {
G
groot 已提交
305
        std::string msg = "Failed to connect MySQL meta server: " + uri;
306 307 308
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }
309

S
shengjh 已提交
310 311 312
    bool is_thread_aware = connectionPtr->thread_aware();
    fiu_do_on("MySQLMetaImpl.Initialize.is_thread_aware", is_thread_aware = false);
    if (!is_thread_aware) {
G
groot 已提交
313 314
        std::string msg =
            "Failed to initialize MySQL meta backend: MySQL client component wasn't built with thread awareness";
315 316 317
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }
318

319 320
    // step 7: create meta table Tables
    mysqlpp::Query InitializeQuery = connectionPtr->query();
321

322
    InitializeQuery << "CREATE TABLE IF NOT EXISTS " << TABLES_SCHEMA.name() << " (" << TABLES_SCHEMA.ToString() + ");";
Z
update  
zhiru 已提交
323

324
    ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
325

S
shengjh 已提交
326 327 328
    bool initialize_query_exec = InitializeQuery.exec();
    fiu_do_on("MySQLMetaImpl.Initialize.fail_create_table_scheme", initialize_query_exec = false);
    if (!initialize_query_exec) {
G
groot 已提交
329
        std::string msg = "Failed to create meta table 'Tables' in MySQL";
330 331 332
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_META_TRANSACTION_FAILED, msg);
    }
Z
update  
zhiru 已提交
333

334 335 336
    // step 8: create meta table TableFiles
    InitializeQuery << "CREATE TABLE IF NOT EXISTS " << TABLEFILES_SCHEMA.name() << " ("
                    << TABLEFILES_SCHEMA.ToString() + ");";
Z
update  
zhiru 已提交
337

338 339
    ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();

S
shengjh 已提交
340 341 342
    initialize_query_exec = InitializeQuery.exec();
    fiu_do_on("MySQLMetaImpl.Initialize.fail_create_table_files", initialize_query_exec = false);
    if (!initialize_query_exec) {
G
groot 已提交
343
        std::string msg = "Failed to create meta table 'TableFiles' in MySQL";
344 345
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
346
    }
S
starlord 已提交
347 348

    return Status::OK();
349 350
}

S
starlord 已提交
351
Status
S
starlord 已提交
352
MySQLMetaImpl::CreateTable(TableSchema& table_schema) {
353
    try {
Y
Yu Kun 已提交
354
        server::MetricCollector metric;
355
        {
S
starlord 已提交
356
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
357

S
shengjh 已提交
358 359 360 361
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CreateTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CreateTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
362
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
363
            }
Z
update  
zhiru 已提交
364

S
starlord 已提交
365
            mysqlpp::Query createTableQuery = connectionPtr->query();
Z
update  
zhiru 已提交
366

367 368 369
            if (table_schema.table_id_.empty()) {
                NextTableId(table_schema.table_id_);
            } else {
G
groot 已提交
370 371
                createTableQuery << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                 << table_schema.table_id_ << ";";
Z
zhiru 已提交
372

373
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
Z
update  
zhiru 已提交
374

S
starlord 已提交
375
                mysqlpp::StoreQueryResult res = createTableQuery.store();
376

377 378
                if (res.num_rows() == 1) {
                    int state = res[0]["state"];
S
shengjh 已提交
379
                    fiu_do_on("MySQLMetaImpl.CreateTableTable.schema_TO_DELETE", state = TableSchema::TO_DELETE);
380
                    if (TableSchema::TO_DELETE == state) {
S
starlord 已提交
381
                        return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
382
                    } else {
S
starlord 已提交
383
                        return Status(DB_ALREADY_EXIST, "Table already exists");
384 385 386
                    }
                }
            }
387

388 389
            table_schema.id_ = -1;
            table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
390

S
starlord 已提交
391
            std::string id = "NULL";  // auto-increment
G
groot 已提交
392
            std::string& table_id = table_schema.table_id_;
393 394 395
            std::string state = std::to_string(table_schema.state_);
            std::string dimension = std::to_string(table_schema.dimension_);
            std::string created_on = std::to_string(table_schema.created_on_);
S
starlord 已提交
396 397
            std::string flag = std::to_string(table_schema.flag_);
            std::string index_file_size = std::to_string(table_schema.index_file_size_);
398
            std::string engine_type = std::to_string(table_schema.engine_type_);
S
starlord 已提交
399 400
            std::string nlist = std::to_string(table_schema.nlist_);
            std::string metric_type = std::to_string(table_schema.metric_type_);
G
groot 已提交
401 402 403
            std::string& owner_table = table_schema.owner_table_;
            std::string& partition_tag = table_schema.partition_tag_;
            std::string& version = table_schema.version_;
404
            std::string flush_lsn = std::to_string(table_schema.flush_lsn_);
Z
update  
zhiru 已提交
405

G
groot 已提交
406 407 408 409
            createTableQuery << "INSERT INTO " << META_TABLES << " VALUES(" << id << ", " << mysqlpp::quote << table_id
                             << ", " << state << ", " << dimension << ", " << created_on << ", " << flag << ", "
                             << index_file_size << ", " << engine_type << ", " << nlist << ", " << metric_type << ", "
                             << mysqlpp::quote << owner_table << ", " << mysqlpp::quote << partition_tag << ", "
410
                             << mysqlpp::quote << version << ", " << flush_lsn << ");";
Z
update  
zhiru 已提交
411

412
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
413

S
starlord 已提交
414
            if (mysqlpp::SimpleResult res = createTableQuery.execute()) {
S
starlord 已提交
415
                table_schema.id_ = res.insert_id();  // Might need to use SELECT LAST_INSERT_ID()?
416

S
starlord 已提交
417
                // Consume all results to avoid "Commands out of sync" error
418
            } else {
S
starlord 已提交
419
                return HandleException("Add Table Error", createTableQuery.error());
420
            }
S
starlord 已提交
421
        }  // Scoped Connection
422

423
        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
424
        return utils::CreateTablePath(options_, table_schema.table_id_);
S
starlord 已提交
425
    } catch (std::exception& e) {
S
starlord 已提交
426
        return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
427 428
    }
}
429

S
starlord 已提交
430
Status
G
groot 已提交
431
MySQLMetaImpl::DescribeTable(TableSchema& table_schema) {
Z
zhiru 已提交
432
    try {
G
groot 已提交
433
        server::MetricCollector metric;
S
starlord 已提交
434
        mysqlpp::StoreQueryResult res;
Z
zhiru 已提交
435
        {
S
starlord 已提交
436
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
437

S
shengjh 已提交
438 439 440 441
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DescribeTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DescribeTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
442
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
443 444
            }

G
groot 已提交
445 446 447
            mysqlpp::Query describeTableQuery = connectionPtr->query();
            describeTableQuery
                << "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type, nlist, metric_type"
448
                << " ,owner_table, partition_tag, version, flush_lsn"
G
groot 已提交
449 450
                << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_schema.table_id_
                << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
zhiru 已提交
451

G
groot 已提交
452
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
Z
zhiru 已提交
453

G
groot 已提交
454
            res = describeTableQuery.store();
S
starlord 已提交
455
        }  // Scoped Connection
Z
zhiru 已提交
456

G
groot 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470
        if (res.num_rows() == 1) {
            const mysqlpp::Row& resRow = res[0];
            table_schema.id_ = resRow["id"];  // implicit conversion
            table_schema.state_ = resRow["state"];
            table_schema.dimension_ = resRow["dimension"];
            table_schema.created_on_ = resRow["created_on"];
            table_schema.flag_ = resRow["flag"];
            table_schema.index_file_size_ = resRow["index_file_size"];
            table_schema.engine_type_ = resRow["engine_type"];
            table_schema.nlist_ = resRow["nlist"];
            table_schema.metric_type_ = resRow["metric_type"];
            resRow["owner_table"].to_string(table_schema.owner_table_);
            resRow["partition_tag"].to_string(table_schema.partition_tag_);
            resRow["version"].to_string(table_schema.version_);
471
            table_schema.flush_lsn_ = resRow["flush_lsn"];
G
groot 已提交
472 473
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
474
        }
S
starlord 已提交
475
    } catch (std::exception& e) {
G
groot 已提交
476
        return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
Z
zhiru 已提交
477 478
    }

479 480
    return Status::OK();
}
481

S
starlord 已提交
482
Status
G
groot 已提交
483
MySQLMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
484
    try {
Y
Yu Kun 已提交
485
        server::MetricCollector metric;
G
groot 已提交
486
        mysqlpp::StoreQueryResult res;
S
starlord 已提交
487
        {
S
starlord 已提交
488
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
489

S
shengjh 已提交
490 491 492 493
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.HasTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.HasTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
494
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
495 496
            }

G
groot 已提交
497 498 499 500 501 502 503
            mysqlpp::Query hasTableQuery = connectionPtr->query();
            // since table_id is a unique column we just need to check whether it exists or not
            hasTableQuery << "SELECT EXISTS"
                          << " (SELECT 1 FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                          << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ")"
                          << " AS " << mysqlpp::quote << "check"
                          << ";";
S
starlord 已提交
504

G
groot 已提交
505
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
S
starlord 已提交
506

G
groot 已提交
507
            res = hasTableQuery.store();
S
starlord 已提交
508
        }  // Scoped Connection
S
starlord 已提交
509

G
groot 已提交
510 511
        int check = res[0]["check"];
        has_or_not = (check == 1);
S
starlord 已提交
512
    } catch (std::exception& e) {
G
groot 已提交
513
        return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
S
starlord 已提交
514 515
    }

516 517 518
    return Status::OK();
}

S
starlord 已提交
519
Status
G
groot 已提交
520
MySQLMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
S
starlord 已提交
521
    try {
Y
Yu Kun 已提交
522
        server::MetricCollector metric;
G
groot 已提交
523
        mysqlpp::StoreQueryResult res;
S
starlord 已提交
524
        {
S
starlord 已提交
525
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
526

S
shengjh 已提交
527 528 529 530
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.AllTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.AllTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
531
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
532 533
            }

G
groot 已提交
534 535
            mysqlpp::Query allTablesQuery = connectionPtr->query();
            allTablesQuery << "SELECT id, table_id, dimension, engine_type, nlist, index_file_size, metric_type"
536
                           << " ,owner_table, partition_tag, version, flush_lsn"
G
groot 已提交
537
                           << " FROM " << META_TABLES << " WHERE state <> " << std::to_string(TableSchema::TO_DELETE)
538
                           << " AND owner_table = \"\";";
S
starlord 已提交
539

G
groot 已提交
540
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
S
starlord 已提交
541

G
groot 已提交
542
            res = allTablesQuery.store();
S
starlord 已提交
543
        }  // Scoped Connection
S
starlord 已提交
544

G
groot 已提交
545 546 547 548 549 550 551 552 553 554 555 556
        for (auto& resRow : res) {
            TableSchema table_schema;
            table_schema.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_schema.table_id_);
            table_schema.dimension_ = resRow["dimension"];
            table_schema.index_file_size_ = resRow["index_file_size"];
            table_schema.engine_type_ = resRow["engine_type"];
            table_schema.nlist_ = resRow["nlist"];
            table_schema.metric_type_ = resRow["metric_type"];
            resRow["owner_table"].to_string(table_schema.owner_table_);
            resRow["partition_tag"].to_string(table_schema.partition_tag_);
            resRow["version"].to_string(table_schema.version_);
557
            table_schema.flush_lsn_ = resRow["flush_lsn"];
G
groot 已提交
558 559 560

            table_schema_array.emplace_back(table_schema);
        }
S
starlord 已提交
561
    } catch (std::exception& e) {
G
groot 已提交
562
        return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
S
starlord 已提交
563 564 565 566 567
    }

    return Status::OK();
}

S
starlord 已提交
568
Status
G
groot 已提交
569
MySQLMetaImpl::DropTable(const std::string& table_id) {
S
starlord 已提交
570
    try {
Y
Yu Kun 已提交
571
        server::MetricCollector metric;
S
starlord 已提交
572
        {
S
starlord 已提交
573
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
574

S
shengjh 已提交
575 576 577 578 579
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DropTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DropTable.throw_exception", throw std::exception(););

            if (is_null_connection) {
G
groot 已提交
580
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
581 582
            }

G
groot 已提交
583 584 585 586 587
            // soft delete table
            mysqlpp::Query deleteTableQuery = connectionPtr->query();
            //
            deleteTableQuery << "UPDATE " << META_TABLES << " SET state = " << std::to_string(TableSchema::TO_DELETE)
                             << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
S
starlord 已提交
588

G
groot 已提交
589
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
S
starlord 已提交
590

G
groot 已提交
591 592
            if (!deleteTableQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
S
starlord 已提交
593
            }
S
starlord 已提交
594
        }  // Scoped Connection
G
groot 已提交
595

S
shengjh 已提交
596 597 598
        bool is_writable_mode{mode_ == DBOptions::MODE::CLUSTER_WRITABLE};
        fiu_do_on("MySQLMetaImpl.DropTable.CLUSTER_WRITABLE_MODE", is_writable_mode = true);
        if (is_writable_mode) {
G
groot 已提交
599 600 601 602
            DeleteTableFiles(table_id);
        }

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
S
starlord 已提交
603
    } catch (std::exception& e) {
G
groot 已提交
604
        return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
S
starlord 已提交
605 606 607 608 609
    }

    return Status::OK();
}

S
starlord 已提交
610
Status
G
groot 已提交
611
MySQLMetaImpl::DeleteTableFiles(const std::string& table_id) {
S
starlord 已提交
612
    try {
Y
Yu Kun 已提交
613
        server::MetricCollector metric;
614
        {
S
starlord 已提交
615
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
616

S
shengjh 已提交
617 618 619 620 621
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DeleteTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DeleteTableFiles.throw_exception", throw std::exception(););

            if (is_null_connection) {
G
groot 已提交
622
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
623 624
            }

G
groot 已提交
625 626 627 628 629 630 631 632
            // soft delete table files
            mysqlpp::Query deleteTableFilesQuery = connectionPtr->query();
            //
            deleteTableFilesQuery << "UPDATE " << META_TABLEFILES
                                  << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                  << " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp())
                                  << " WHERE table_id = " << mysqlpp::quote << table_id << " AND file_type <> "
                                  << std::to_string(TableFileSchema::TO_DELETE) << ";";
633

G
groot 已提交
634
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
635

G
groot 已提交
636 637
            if (!deleteTableFilesQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
S
starlord 已提交
638
            }
S
starlord 已提交
639
        }  // Scoped Connection
S
starlord 已提交
640

G
groot 已提交
641
        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
S
starlord 已提交
642
    } catch (std::exception& e) {
G
groot 已提交
643
        return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
S
starlord 已提交
644
    }
645

S
starlord 已提交
646 647
    return Status::OK();
}
Z
update  
zhiru 已提交
648

S
starlord 已提交
649
Status
G
groot 已提交
650 651 652 653 654 655 656 657 658 659
MySQLMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
Z
zhiru 已提交
660

G
groot 已提交
661 662
    try {
        server::MetricCollector metric;
Z
update  
zhiru 已提交
663

G
groot 已提交
664
        NextFileId(file_schema.file_id_);
665 666 667
        if (file_schema.segment_id_.empty()) {
            file_schema.segment_id_ = file_schema.file_id_;
        }
G
groot 已提交
668 669 670 671 672 673 674 675 676
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
        file_schema.engine_type_ = table_schema.engine_type_;
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
677

G
groot 已提交
678 679
        std::string id = "NULL";  // auto-increment
        std::string table_id = file_schema.table_id_;
680
        std::string segment_id = file_schema.segment_id_;
G
groot 已提交
681 682 683 684 685 686 687 688
        std::string engine_type = std::to_string(file_schema.engine_type_);
        std::string file_id = file_schema.file_id_;
        std::string file_type = std::to_string(file_schema.file_type_);
        std::string file_size = std::to_string(file_schema.file_size_);
        std::string row_count = std::to_string(file_schema.row_count_);
        std::string updated_time = std::to_string(file_schema.updated_time_);
        std::string created_on = std::to_string(file_schema.created_on_);
        std::string date = std::to_string(file_schema.date_);
689
        std::string flush_lsn = std::to_string(file_schema.flush_lsn_);
G
groot 已提交
690 691 692 693

        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
694 695 696 697
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CreateTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CreateTableFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
698
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
699
            }
Z
zhiru 已提交
700

G
groot 已提交
701
            mysqlpp::Query createTableFileQuery = connectionPtr->query();
Z
update  
zhiru 已提交
702

G
groot 已提交
703
            createTableFileQuery << "INSERT INTO " << META_TABLEFILES << " VALUES(" << id << ", " << mysqlpp::quote
704 705 706 707
                                 << table_id << ", " << mysqlpp::quote << segment_id << ", " << engine_type << ", "
                                 << mysqlpp::quote << file_id << ", " << file_type << ", " << file_size << ", "
                                 << row_count << ", " << updated_time << ", " << created_on << ", " << date << ", "
                                 << flush_lsn << ");";
G
groot 已提交
708 709 710 711 712 713 714 715 716 717 718 719 720 721

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();

            if (mysqlpp::SimpleResult res = createTableFileQuery.execute()) {
                file_schema.id_ = res.insert_id();  // Might need to use SELECT LAST_INSERT_ID()?

                // Consume all results to avoid "Commands out of sync" error
            } else {
                return HandleException("QUERY ERROR WHEN CREATING TABLE FILE", createTableFileQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
S
starlord 已提交
722
    } catch (std::exception& e) {
G
groot 已提交
723
        return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
724 725
    }
}
Z
update  
zhiru 已提交
726

S
starlord 已提交
727
Status
G
groot 已提交
728 729 730 731 732 733 734 735 736 737 738 739 740
MySQLMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
                             TableFilesSchema& table_files) {
    if (ids.empty()) {
        return Status::OK();
    }

    std::stringstream idSS;
    for (auto& id : ids) {
        idSS << "id = " << std::to_string(id) << " OR ";
    }
    std::string idStr = idSS.str();
    idStr = idStr.substr(0, idStr.size() - 4);  // remove the last " OR "

741
    try {
S
starlord 已提交
742
        mysqlpp::StoreQueryResult res;
743
        {
S
starlord 已提交
744
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
745

S
shengjh 已提交
746 747 748 749
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.GetTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.GetTableFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
750
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
751
            }
Z
zhiru 已提交
752

G
groot 已提交
753
            mysqlpp::Query getTableFileQuery = connectionPtr->query();
754 755 756 757 758
            getTableFileQuery
                << "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << " AND ("
                << idStr << ")"
                << " AND file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
759

G
groot 已提交
760
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();
Z
update  
zhiru 已提交
761

G
groot 已提交
762
            res = getTableFileQuery.store();
S
starlord 已提交
763
        }  // Scoped Connection
764

G
groot 已提交
765 766 767
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        DescribeTable(table_schema);
S
starlord 已提交
768

G
groot 已提交
769 770 771 772 773
        Status ret;
        for (auto& resRow : res) {
            TableFileSchema file_schema;
            file_schema.id_ = resRow["id"];
            file_schema.table_id_ = table_id;
774
            resRow["segment_id"].to_string(file_schema.segment_id_);
G
groot 已提交
775 776 777 778 779 780 781 782 783 784 785
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.engine_type_ = resRow["engine_type"];
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
            resRow["file_id"].to_string(file_schema.file_id_);
            file_schema.file_type_ = resRow["file_type"];
            file_schema.file_size_ = resRow["file_size"];
            file_schema.row_count_ = resRow["row_count"];
            file_schema.date_ = resRow["date"];
            file_schema.created_on_ = resRow["created_on"];
            file_schema.dimension_ = table_schema.dimension_;
S
starlord 已提交
786

G
groot 已提交
787 788
            utils::GetTableFilePath(options_, file_schema);
            table_files.emplace_back(file_schema);
789
        }
G
groot 已提交
790 791 792

        ENGINE_LOG_DEBUG << "Get table files by id";
        return ret;
S
starlord 已提交
793
    } catch (std::exception& e) {
G
groot 已提交
794
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
Z
update  
zhiru 已提交
795
    }
796
}
Z
zhiru 已提交
797

798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857
Status
MySQLMetaImpl::GetTableFilesBySegmentId(const std::string& segment_id,
                                        milvus::engine::meta::TableFilesSchema& table_files) {
    try {
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query getTableFileQuery = connectionPtr->query();
            getTableFileQuery << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
                              << "row_count, date, created_on"
                              << " FROM " << META_TABLEFILES << " WHERE segment_id = " << mysqlpp::quote << segment_id
                              << " AND file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFilesBySegmentId: " << getTableFileQuery.str();

            res = getTableFileQuery.store();
        }  // Scoped Connection

        if (!res.empty()) {
            TableSchema table_schema;
            res[0]["table_id"].to_string(table_schema.table_id_);
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }

            for (auto& resRow : res) {
                TableFileSchema file_schema;
                file_schema.id_ = resRow["id"];
                file_schema.table_id_ = table_schema.table_id_;
                resRow["segment_id"].to_string(file_schema.segment_id_);
                file_schema.index_file_size_ = table_schema.index_file_size_;
                file_schema.engine_type_ = resRow["engine_type"];
                file_schema.nlist_ = table_schema.nlist_;
                file_schema.metric_type_ = table_schema.metric_type_;
                resRow["file_id"].to_string(file_schema.file_id_);
                file_schema.file_type_ = resRow["file_type"];
                file_schema.file_size_ = resRow["file_size"];
                file_schema.row_count_ = resRow["row_count"];
                file_schema.date_ = resRow["date"];
                file_schema.created_on_ = resRow["created_on"];
                file_schema.dimension_ = table_schema.dimension_;

                utils::GetTableFilePath(options_, file_schema);
                table_files.emplace_back(file_schema);
            }
        }

        ENGINE_LOG_DEBUG << "Get table files by segment id";
        return Status::OK();
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES BY SEGMENT ID", e.what());
    }
}

S
starlord 已提交
858
Status
G
groot 已提交
859
MySQLMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
860
    try {
Y
Yu Kun 已提交
861
        server::MetricCollector metric;
G
groot 已提交
862

863
        {
S
starlord 已提交
864
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
865

S
shengjh 已提交
866 867 868 869
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
870
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
871
            }
Z
update  
zhiru 已提交
872

G
groot 已提交
873 874 875 876
            mysqlpp::Query updateTableIndexParamQuery = connectionPtr->query();
            updateTableIndexParamQuery << "SELECT id, state, dimension, created_on"
                                       << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                                       << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
877

G
groot 已提交
878
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
879

G
groot 已提交
880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
            mysqlpp::StoreQueryResult res = updateTableIndexParamQuery.store();

            if (res.num_rows() == 1) {
                const mysqlpp::Row& resRow = res[0];

                size_t id = resRow["id"];
                int32_t state = resRow["state"];
                uint16_t dimension = resRow["dimension"];
                int64_t created_on = resRow["created_on"];

                updateTableIndexParamQuery << "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state
                                           << " ,dimension = " << dimension << " ,created_on = " << created_on
                                           << " ,engine_type = " << index.engine_type_ << " ,nlist = " << index.nlist_
                                           << " ,metric_type = " << index.metric_type_
                                           << " WHERE table_id = " << mysqlpp::quote << table_id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();

                if (!updateTableIndexParamQuery.exec()) {
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM",
                                           updateTableIndexParamQuery.error());
                }
            } else {
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
            }
S
starlord 已提交
905
        }  // Scoped Connection
906

G
groot 已提交
907
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
S
starlord 已提交
908
    } catch (std::exception& e) {
G
groot 已提交
909
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
910
    }
911

912 913
    return Status::OK();
}
914

S
starlord 已提交
915
Status
G
groot 已提交
916
MySQLMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
917
    try {
Y
Yu Kun 已提交
918
        server::MetricCollector metric;
G
groot 已提交
919

920
        {
S
starlord 已提交
921
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
922

S
shengjh 已提交
923 924 925 926
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableFlag.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableFlag.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
927
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
928
            }
Z
update  
zhiru 已提交
929

G
groot 已提交
930 931 932
            mysqlpp::Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE " << META_TABLES << " SET flag = " << flag
                                 << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
Z
zhiru 已提交
933

G
groot 已提交
934
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();
Z
zhiru 已提交
935

G
groot 已提交
936 937 938
            if (!updateTableFlagQuery.exec()) {
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLAG", updateTableFlagQuery.error());
            }
S
starlord 已提交
939
        }  // Scoped Connection
940

G
groot 已提交
941
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
S
starlord 已提交
942
    } catch (std::exception& e) {
G
groot 已提交
943
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
944
    }
Z
update  
zhiru 已提交
945

946 947
    return Status::OK();
}
948

949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
Status
MySQLMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_lsn) {
    try {
        server::MetricCollector metric;

        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE " << META_TABLES << " SET flush_lsn = " << flush_lsn
                                 << " WHERE table_id = " << mysqlpp::quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlushLSN: " << updateTableFlagQuery.str();

            if (!updateTableFlagQuery.exec()) {
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLUSH_LSN", updateTableFlagQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully update table flush_lsn, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLUSH_LSN", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::GetTableFlushLSN(const std::string& table_id, uint64_t& flush_lsn) {
    return Status::OK();
}

Status
MySQLMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, TableFilesSchema& table_files) {
    table_files.clear();

    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query filesToIndexQuery = connectionPtr->query();
            filesToIndexQuery << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
                                 "row_count, date, created_on"
                              << " FROM " << META_TABLEFILES << " WHERE flush_lsn = " << flush_lsn << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();

            res = filesToIndexQuery.store();
        }  // Scoped Connection

        Status ret;
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
        for (auto& resRow : res) {
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
            resRow["segment_id"].to_string(table_file.segment_id_);
            table_file.engine_type_ = resRow["engine_type"];
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.created_on_ = resRow["created_on"];

            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }

            table_files.push_back(table_file);
        }

        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " files with flush_lsn = " << flush_lsn;
        }
        return ret;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES BY LSN", e.what());
    }
}

G
groot 已提交
1056
// ZR: this function assumes all fields in file_schema have value
S
starlord 已提交
1057
Status
G
groot 已提交
1058 1059
MySQLMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1060

1061
    try {
Y
Yu Kun 已提交
1062
        server::MetricCollector metric;
G
groot 已提交
1063 1064
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1065

S
shengjh 已提交
1066 1067 1068 1069
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableFile.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableFile.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1070
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1071
            }
Z
update  
zhiru 已提交
1072

G
groot 已提交
1073
            mysqlpp::Query updateTableFileQuery = connectionPtr->query();
1074

G
groot 已提交
1075 1076 1077 1078
            // if the table has been deleted, just mark the table file as TO_DELETE
            // clean thread will delete the file later
            updateTableFileQuery << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                 << file_schema.table_id_ << ";";
1079

G
groot 已提交
1080
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
1081

G
groot 已提交
1082
            mysqlpp::StoreQueryResult res = updateTableFileQuery.store();
1083

G
groot 已提交
1084 1085 1086 1087 1088
            if (res.num_rows() == 1) {
                int state = res[0]["state"];
                if (state == TableSchema::TO_DELETE) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
                }
1089
            } else {
G
groot 已提交
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
                file_schema.file_type_ = TableFileSchema::TO_DELETE;
            }

            std::string id = std::to_string(file_schema.id_);
            std::string table_id = file_schema.table_id_;
            std::string engine_type = std::to_string(file_schema.engine_type_);
            std::string file_id = file_schema.file_id_;
            std::string file_type = std::to_string(file_schema.file_type_);
            std::string file_size = std::to_string(file_schema.file_size_);
            std::string row_count = std::to_string(file_schema.row_count_);
            std::string updated_time = std::to_string(file_schema.updated_time_);
            std::string created_on = std::to_string(file_schema.created_on_);
            std::string date = std::to_string(file_schema.date_);

            updateTableFileQuery << "UPDATE " << META_TABLEFILES << " SET table_id = " << mysqlpp::quote << table_id
                                 << " ,engine_type = " << engine_type << " ,file_id = " << mysqlpp::quote << file_id
                                 << " ,file_type = " << file_type << " ,file_size = " << file_size
                                 << " ,row_count = " << row_count << " ,updated_time = " << updated_time
                                 << " ,created_on = " << created_on << " ,date = " << date << " WHERE id = " << id
                                 << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();

            if (!updateTableFileQuery.exec()) {
                ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error());
1116
            }
S
starlord 已提交
1117
        }  // Scoped Connection
1118

G
groot 已提交
1119
        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
S
starlord 已提交
1120
    } catch (std::exception& e) {
G
groot 已提交
1121
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
1122
    }
G
groot 已提交
1123 1124

    return Status::OK();
1125
}
1126

S
starlord 已提交
1127
Status
G
groot 已提交
1128 1129 1130 1131
MySQLMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
    try {
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1132 1133 1134 1135
        bool is_null_connection = (connectionPtr == nullptr);
        fiu_do_on("MySQLMetaImpl.UpdateTableFilesToIndex.null_connection", is_null_connection = true);
        fiu_do_on("MySQLMetaImpl.UpdateTableFilesToIndex.throw_exception", throw std::exception(););
        if (is_null_connection) {
G
groot 已提交
1136 1137 1138 1139 1140 1141 1142 1143
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
        }

        mysqlpp::Query updateTableFilesToIndexQuery = connectionPtr->query();

        updateTableFilesToIndexQuery << "UPDATE " << META_TABLEFILES
                                     << " SET file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                                     << " WHERE table_id = " << mysqlpp::quote << table_id
G
groot 已提交
1144
                                     << " AND row_count >= " << std::to_string(meta::BUILD_INDEX_THRESHOLD)
G
groot 已提交
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
                                     << " AND file_type = " << std::to_string(TableFileSchema::RAW) << ";";

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();

        if (!updateTableFilesToIndexQuery.exec()) {
            return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX",
                                   updateTableFilesToIndexQuery.error());
        }

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
    }

    return Status::OK();
}
1161

G
groot 已提交
1162 1163
Status
MySQLMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
1164
    try {
Y
Yu Kun 已提交
1165
        server::MetricCollector metric;
1166
        {
S
starlord 已提交
1167
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1168

S
shengjh 已提交
1169 1170 1171 1172
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1173
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1174
            }
1175

G
groot 已提交
1176
            mysqlpp::Query updateTableFilesQuery = connectionPtr->query();
1177

G
groot 已提交
1178 1179 1180 1181 1182
            std::map<std::string, bool> has_tables;
            for (auto& file_schema : files) {
                if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
                    continue;
                }
1183

G
groot 已提交
1184 1185 1186 1187 1188 1189
                updateTableFilesQuery << "SELECT EXISTS"
                                      << " (SELECT 1 FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                      << file_schema.table_id_ << " AND state <> "
                                      << std::to_string(TableSchema::TO_DELETE) << ")"
                                      << " AS " << mysqlpp::quote << "check"
                                      << ";";
1190

G
groot 已提交
1191
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
1192

G
groot 已提交
1193
                mysqlpp::StoreQueryResult res = updateTableFilesQuery.store();
1194

G
groot 已提交
1195 1196 1197
                int check = res[0]["check"];
                has_tables[file_schema.table_id_] = (check == 1);
            }
1198

G
groot 已提交
1199 1200 1201 1202 1203
            for (auto& file_schema : files) {
                if (!has_tables[file_schema.table_id_]) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
                }
                file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1204

G
groot 已提交
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
                std::string id = std::to_string(file_schema.id_);
                std::string& table_id = file_schema.table_id_;
                std::string engine_type = std::to_string(file_schema.engine_type_);
                std::string& file_id = file_schema.file_id_;
                std::string file_type = std::to_string(file_schema.file_type_);
                std::string file_size = std::to_string(file_schema.file_size_);
                std::string row_count = std::to_string(file_schema.row_count_);
                std::string updated_time = std::to_string(file_schema.updated_time_);
                std::string created_on = std::to_string(file_schema.created_on_);
                std::string date = std::to_string(file_schema.date_);
1215

G
groot 已提交
1216 1217 1218 1219 1220 1221
                updateTableFilesQuery << "UPDATE " << META_TABLEFILES << " SET table_id = " << mysqlpp::quote
                                      << table_id << " ,engine_type = " << engine_type
                                      << " ,file_id = " << mysqlpp::quote << file_id << " ,file_type = " << file_type
                                      << " ,file_size = " << file_size << " ,row_count = " << row_count
                                      << " ,updated_time = " << updated_time << " ,created_on = " << created_on
                                      << " ,date = " << date << " WHERE id = " << id << ";";
1222

G
groot 已提交
1223
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
S
starlord 已提交
1224

G
groot 已提交
1225 1226
                if (!updateTableFilesQuery.exec()) {
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error());
1227
                }
S
starlord 已提交
1228
            }
G
groot 已提交
1229
        }  // Scoped Connection
1230

G
groot 已提交
1231
        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
S
starlord 已提交
1232
    } catch (std::exception& e) {
G
groot 已提交
1233
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
Z
update  
zhiru 已提交
1234
    }
G
groot 已提交
1235 1236

    return Status::OK();
1237 1238
}

S
starlord 已提交
1239
Status
G
groot 已提交
1240
MySQLMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
X
xj.lin 已提交
1241
    try {
Y
Yu Kun 已提交
1242
        server::MetricCollector metric;
G
groot 已提交
1243

X
xj.lin 已提交
1244
        {
S
starlord 已提交
1245
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
X
xj.lin 已提交
1246

S
shengjh 已提交
1247 1248 1249 1250
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DescribeTableIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DescribeTableIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1251
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
X
xj.lin 已提交
1252 1253
            }

G
groot 已提交
1254 1255 1256 1257
            mysqlpp::Query describeTableIndexQuery = connectionPtr->query();
            describeTableIndexQuery << "SELECT engine_type, nlist, index_file_size, metric_type"
                                    << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                                    << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
X
xj.lin 已提交
1258

G
groot 已提交
1259
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
X
xj.lin 已提交
1260

G
groot 已提交
1261
            mysqlpp::StoreQueryResult res = describeTableIndexQuery.store();
X
xj.lin 已提交
1262

G
groot 已提交
1263 1264
            if (res.num_rows() == 1) {
                const mysqlpp::Row& resRow = res[0];
X
xj.lin 已提交
1265

G
groot 已提交
1266 1267 1268 1269 1270
                index.engine_type_ = resRow["engine_type"];
                index.nlist_ = resRow["nlist"];
                index.metric_type_ = resRow["metric_type"];
            } else {
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
X
xj.lin 已提交
1271
            }
S
starlord 已提交
1272
        }  // Scoped Connection
G
groot 已提交
1273 1274 1275
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
    }
X
xj.lin 已提交
1276

G
groot 已提交
1277 1278
    return Status::OK();
}
X
xj.lin 已提交
1279

G
groot 已提交
1280 1281 1282 1283
Status
MySQLMetaImpl::DropTableIndex(const std::string& table_id) {
    try {
        server::MetricCollector metric;
X
xj.lin 已提交
1284

G
groot 已提交
1285 1286
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
X
xj.lin 已提交
1287

S
shengjh 已提交
1288 1289 1290 1291
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DropTableIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DropTableIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1292 1293
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }
1294

G
groot 已提交
1295
            mysqlpp::Query dropTableIndexQuery = connectionPtr->query();
X
xj.lin 已提交
1296

G
groot 已提交
1297 1298 1299 1300 1301 1302
            // soft delete index files
            dropTableIndexQuery << "UPDATE " << META_TABLEFILES
                                << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                << " ,updated_time = " << utils::GetMicroSecTimeStamp()
                                << " WHERE table_id = " << mysqlpp::quote << table_id
                                << " AND file_type = " << std::to_string(TableFileSchema::INDEX) << ";";
S
starlord 已提交
1303

G
groot 已提交
1304
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
S
starlord 已提交
1305

G
groot 已提交
1306 1307 1308
            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
X
xj.lin 已提交
1309

G
groot 已提交
1310 1311 1312 1313 1314 1315
            // set all backup file to raw
            dropTableIndexQuery << "UPDATE " << META_TABLEFILES
                                << " SET file_type = " << std::to_string(TableFileSchema::RAW)
                                << " ,updated_time = " << utils::GetMicroSecTimeStamp()
                                << " WHERE table_id = " << mysqlpp::quote << table_id
                                << " AND file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";
X
xj.lin 已提交
1316

G
groot 已提交
1317
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
S
starlord 已提交
1318

G
groot 已提交
1319 1320 1321
            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
X
xj.lin 已提交
1322

G
groot 已提交
1323 1324 1325
            // set table index type to raw
            dropTableIndexQuery << "UPDATE " << META_TABLES
                                << " SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE)
1326
                                << " ,nlist = " << std::to_string(DEFAULT_NLIST)
G
groot 已提交
1327
                                << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
X
xj.lin 已提交
1328

G
groot 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
    }

    return Status::OK();
}

Status
1345 1346
MySQLMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag,
                               uint64_t lsn) {
G
groot 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
    server::MetricCollector metric;

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    // not allow create partition under partition
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
1358
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
1359 1360
    }

1361 1362 1363 1364
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
1365

1366 1367 1368 1369
    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
    if (!exist_partition.empty()) {
G
groot 已提交
1370
        return Status(DB_ERROR, "Duplicate partition is not allowed");
1371 1372 1373 1374
    }

    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
1375 1376 1377 1378 1379 1380 1381 1382 1383
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
    }

    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
1384
    table_schema.partition_tag_ = valid_tag;
1385
    table_schema.flush_lsn_ = lsn;
G
groot 已提交
1386

1387
    status = CreateTable(table_schema);
S
shengjh 已提交
1388
    fiu_do_on("MySQLMetaImpl.CreatePartition.aleady_exist", status = Status(DB_ALREADY_EXIST, ""));
1389 1390 1391 1392 1393
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }

    return status;
G
groot 已提交
1394 1395 1396 1397 1398 1399 1400 1401
}

Status
MySQLMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}

Status
G
groot 已提交
1402
MySQLMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
G
groot 已提交
1403 1404 1405 1406 1407 1408
    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1409 1410 1411 1412
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.ShowPartitions.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.ShowPartitions.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1413 1414 1415 1416
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query allPartitionsQuery = connectionPtr->query();
1417 1418 1419 1420
            allPartitionsQuery << "SELECT table_id, id, state, dimension, created_on, flag, index_file_size,"
                               << " engine_type, nlist, metric_type, partition_tag, version FROM " << META_TABLES
                               << " WHERE owner_table = " << mysqlpp::quote << table_id << " AND state <> "
                               << std::to_string(TableSchema::TO_DELETE) << ";";
G
groot 已提交
1421 1422 1423 1424 1425 1426 1427 1428 1429

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allPartitionsQuery.str();

            res = allPartitionsQuery.store();
        }  // Scoped Connection

        for (auto& resRow : res) {
            meta::TableSchema partition_schema;
            resRow["table_id"].to_string(partition_schema.table_id_);
1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442
            partition_schema.id_ = resRow["id"];  // implicit conversion
            partition_schema.state_ = resRow["state"];
            partition_schema.dimension_ = resRow["dimension"];
            partition_schema.created_on_ = resRow["created_on"];
            partition_schema.flag_ = resRow["flag"];
            partition_schema.index_file_size_ = resRow["index_file_size"];
            partition_schema.engine_type_ = resRow["engine_type"];
            partition_schema.nlist_ = resRow["nlist"];
            partition_schema.metric_type_ = resRow["metric_type"];
            partition_schema.owner_table_ = table_id;
            resRow["partition_tag"].to_string(partition_schema.partition_tag_);
            resRow["version"].to_string(partition_schema.version_);

G
groot 已提交
1443
            partition_schema_array.emplace_back(partition_schema);
G
groot 已提交
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN SHOW PARTITIONS", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
1457 1458 1459 1460 1461 1462

        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

G
groot 已提交
1463 1464 1465
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1466 1467 1468 1469
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.GetPartitionName.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.GetPartitionName.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1470 1471 1472 1473 1474
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query allPartitionsQuery = connectionPtr->query();
            allPartitionsQuery << "SELECT table_id FROM " << META_TABLES << " WHERE owner_table = " << mysqlpp::quote
1475
                               << table_id << " AND partition_tag = " << mysqlpp::quote << valid_tag << " AND state <> "
G
groot 已提交
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486
                               << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allPartitionsQuery.str();

            res = allPartitionsQuery.store();
        }  // Scoped Connection

        if (res.num_rows() > 0) {
            const mysqlpp::Row& resRow = res[0];
            resRow["table_id"].to_string(partition_name);
        } else {
1487
            return Status(DB_NOT_FOUND, "Partition " + valid_tag + " of table " + table_id + " not found");
G
groot 已提交
1488 1489 1490 1491 1492 1493 1494 1495 1496
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN GET PARTITION NAME", e.what());
    }

    return Status::OK();
}

Status
1497
MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
G
groot 已提交
1498 1499 1500 1501 1502 1503 1504 1505
    files.clear();

    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1506 1507 1508 1509
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesToSearch.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesToSearch.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1510 1511 1512 1513
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query filesToSearchQuery = connectionPtr->query();
1514 1515 1516
            filesToSearchQuery
                << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id;
G
groot 已提交
1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547

            if (!ids.empty()) {
                std::stringstream idSS;
                for (auto& id : ids) {
                    idSS << "id = " << std::to_string(id) << " OR ";
                }
                std::string idStr = idSS.str();
                idStr = idStr.substr(0, idStr.size() - 4);  // remove the last " OR "

                filesToSearchQuery << " AND (" << idStr << ")";
            }
            // End
            filesToSearchQuery << " AND"
                               << " (file_type = " << std::to_string(TableFileSchema::RAW)
                               << " OR file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                               << " OR file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

            res = filesToSearchQuery.store();
        }  // Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        Status ret;
        for (auto& resRow : res) {
1548
            TableFileSchema table_file;
G
groot 已提交
1549 1550
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
1551
            resRow["segment_id"].to_string(table_file.segment_id_);
G
groot 已提交
1552 1553 1554 1555 1556 1557 1558 1559 1560 1561
            table_file.index_file_size_ = table_schema.index_file_size_;
            table_file.engine_type_ = resRow["engine_type"];
            table_file.nlist_ = table_schema.nlist_;
            table_file.metric_type_ = table_schema.metric_type_;
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.dimension_ = table_schema.dimension_;
X
xj.lin 已提交
1562

S
starlord 已提交
1563
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1564
            if (!status.ok()) {
S
starlord 已提交
1565
                ret = status;
S
starlord 已提交
1566
            }
1567

1568
            files.emplace_back(table_file);
1569
        }
S
starlord 已提交
1570

S
starlord 已提交
1571
        if (res.size() > 0) {
1572 1573
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-search files";
        }
S
starlord 已提交
1574
        return ret;
S
starlord 已提交
1575
    } catch (std::exception& e) {
S
starlord 已提交
1576
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
Z
update  
zhiru 已提交
1577
    }
1578
}
Z
update  
zhiru 已提交
1579

S
starlord 已提交
1580
Status
1581
MySQLMetaImpl::FilesToMerge(const std::string& table_id, TableFilesSchema& files) {
1582
    files.clear();
Z
update  
zhiru 已提交
1583

1584
    try {
Y
Yu Kun 已提交
1585
        server::MetricCollector metric;
S
starlord 已提交
1586

S
starlord 已提交
1587
        // check table existence
S
starlord 已提交
1588 1589 1590 1591 1592 1593 1594
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
1595
        mysqlpp::StoreQueryResult res;
1596
        {
S
starlord 已提交
1597
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1598

S
shengjh 已提交
1599 1600 1601 1602
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesToMerge.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesToMerge.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1603
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1604
            }
Z
update  
zhiru 已提交
1605

S
starlord 已提交
1606
            mysqlpp::Query filesToMergeQuery = connectionPtr->query();
1607 1608 1609 1610 1611
            filesToMergeQuery << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, "
                                 "engine_type, created_on"
                              << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                              << " AND file_type = " << std::to_string(TableFileSchema::RAW)
                              << " ORDER BY row_count DESC;";
Z
update  
zhiru 已提交
1612

1613
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
1614

1615
            res = filesToMergeQuery.store();
S
starlord 已提交
1616
        }  // Scoped Connection
1617

S
starlord 已提交
1618
        Status ret;
1619
        int64_t to_merge_files = 0;
S
starlord 已提交
1620
        for (auto& resRow : res) {
S
starlord 已提交
1621 1622
            TableFileSchema table_file;
            table_file.file_size_ = resRow["file_size"];
S
starlord 已提交
1623
            if (table_file.file_size_ >= table_schema.index_file_size_) {
S
starlord 已提交
1624
                continue;  // skip large file
S
starlord 已提交
1625
            }
Z
update  
zhiru 已提交
1626

S
starlord 已提交
1627
            table_file.id_ = resRow["id"];  // implicit conversion
G
groot 已提交
1628
            resRow["table_id"].to_string(table_file.table_id_);
1629
            resRow["segment_id"].to_string(table_file.segment_id_);
G
groot 已提交
1630
            resRow["file_id"].to_string(table_file.file_id_);
1631
            table_file.file_type_ = resRow["file_type"];
S
starlord 已提交
1632
            table_file.row_count_ = resRow["row_count"];
1633
            table_file.date_ = resRow["date"];
1634
            table_file.index_file_size_ = table_schema.index_file_size_;
S
starlord 已提交
1635
            table_file.engine_type_ = resRow["engine_type"];
S
starlord 已提交
1636
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
1637
            table_file.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
1638
            table_file.created_on_ = resRow["created_on"];
1639
            table_file.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1640

S
starlord 已提交
1641
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1642
            if (!status.ok()) {
S
starlord 已提交
1643
                ret = status;
S
starlord 已提交
1644
            }
Z
update  
zhiru 已提交
1645

1646 1647
            files.emplace_back(table_file);
            ++to_merge_files;
1648
        }
Z
update  
zhiru 已提交
1649

1650 1651
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1652
        }
S
starlord 已提交
1653
        return ret;
S
starlord 已提交
1654
    } catch (std::exception& e) {
S
starlord 已提交
1655
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
1656 1657 1658
    }
}

S
starlord 已提交
1659
Status
G
groot 已提交
1660 1661
MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) {
    files.clear();
Z
zhiru 已提交
1662

1663
    try {
G
groot 已提交
1664
        server::MetricCollector metric;
S
starlord 已提交
1665
        mysqlpp::StoreQueryResult res;
1666
        {
S
starlord 已提交
1667
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1668

S
shengjh 已提交
1669 1670 1671 1672
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesToIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesToIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1673
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1674 1675
            }

G
groot 已提交
1676
            mysqlpp::Query filesToIndexQuery = connectionPtr->query();
1677 1678 1679 1680
            filesToIndexQuery << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
                                 "row_count, date, created_on"
                              << " FROM " << META_TABLEFILES
                              << " WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
1681

G
groot 已提交
1682
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
1683

G
groot 已提交
1684
            res = filesToIndexQuery.store();
S
starlord 已提交
1685
        }  // Scoped Connection
1686

S
starlord 已提交
1687
        Status ret;
G
groot 已提交
1688 1689
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
S
starlord 已提交
1690
        for (auto& resRow : res) {
G
groot 已提交
1691 1692
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
1693
            resRow["segment_id"].to_string(table_file.segment_id_);
G
groot 已提交
1694 1695 1696 1697 1698 1699 1700
            table_file.engine_type_ = resRow["engine_type"];
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.created_on_ = resRow["created_on"];
Z
update  
zhiru 已提交
1701

G
groot 已提交
1702 1703 1704 1705 1706 1707 1708
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
1709
                }
G
groot 已提交
1710
                groups[table_file.table_id_] = table_schema;
Z
zhiru 已提交
1711
            }
G
groot 已提交
1712 1713 1714 1715
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
1716

G
groot 已提交
1717 1718 1719
            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
1720
            }
Z
zhiru 已提交
1721

G
groot 已提交
1722 1723
            files.push_back(table_file);
        }
Z
update  
zhiru 已提交
1724

G
groot 已提交
1725 1726
        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-index files";
1727
        }
G
groot 已提交
1728
        return ret;
S
starlord 已提交
1729
    } catch (std::exception& e) {
G
groot 已提交
1730
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
1731 1732
    }
}
1733

S
starlord 已提交
1734
Status
G
groot 已提交
1735
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
G
groot 已提交
1736
                           TableFilesSchema& table_files) {
G
groot 已提交
1737 1738
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
Z
update  
zhiru 已提交
1739 1740
    }

1741 1742
    Status ret = Status::OK();

1743
    try {
G
groot 已提交
1744
        table_files.clear();
G
groot 已提交
1745 1746

        mysqlpp::StoreQueryResult res;
1747
        {
S
starlord 已提交
1748
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1749

S
shengjh 已提交
1750 1751 1752 1753
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1754
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1755
            }
Z
zhiru 已提交
1756

G
groot 已提交
1757 1758 1759 1760
            std::string types;
            for (auto type : file_types) {
                if (!types.empty()) {
                    types += ",";
Z
update  
zhiru 已提交
1761
                }
G
groot 已提交
1762
                types += std::to_string(type);
1763
            }
Z
update  
zhiru 已提交
1764

G
groot 已提交
1765 1766
            mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
            // since table_id is a unique column we just need to check whether it exists or not
G
groot 已提交
1767
            hasNonIndexFilesQuery
1768
                << "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
G
groot 已提交
1769 1770
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                << " AND file_type in (" << types << ");";
1771

G
groot 已提交
1772
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
Z
update  
zhiru 已提交
1773

G
groot 已提交
1774
            res = hasNonIndexFilesQuery.store();
S
starlord 已提交
1775
        }  // Scoped Connection
1776

1777 1778 1779 1780 1781 1782 1783
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

G
groot 已提交
1784 1785 1786 1787
        if (res.num_rows() > 0) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
            for (auto& resRow : res) {
G
groot 已提交
1788 1789 1790
                TableFileSchema file_schema;
                file_schema.id_ = resRow["id"];
                file_schema.table_id_ = table_id;
1791
                resRow["segment_id"].to_string(file_schema.segment_id_);
G
groot 已提交
1792 1793 1794 1795 1796 1797 1798 1799
                file_schema.engine_type_ = resRow["engine_type"];
                resRow["file_id"].to_string(file_schema.file_id_);
                file_schema.file_type_ = resRow["file_type"];
                file_schema.file_size_ = resRow["file_size"];
                file_schema.row_count_ = resRow["row_count"];
                file_schema.date_ = resRow["date"];
                file_schema.created_on_ = resRow["created_on"];

1800 1801 1802 1803 1804 1805 1806 1807 1808 1809
                file_schema.index_file_size_ = table_schema.index_file_size_;
                file_schema.nlist_ = table_schema.nlist_;
                file_schema.metric_type_ = table_schema.metric_type_;
                file_schema.dimension_ = table_schema.dimension_;

                auto status = utils::GetTableFilePath(options_, file_schema);
                if (!status.ok()) {
                    ret = status;
                }

G
groot 已提交
1810
                table_files.emplace_back(file_schema);
1811

G
groot 已提交
1812 1813 1814
                int32_t file_type = resRow["file_type"];
                switch (file_type) {
                    case (int)TableFileSchema::RAW:
1815
                        ++raw_count;
G
groot 已提交
1816 1817
                        break;
                    case (int)TableFileSchema::NEW:
1818
                        ++new_count;
G
groot 已提交
1819 1820
                        break;
                    case (int)TableFileSchema::NEW_MERGE:
1821
                        ++new_merge_count;
G
groot 已提交
1822 1823
                        break;
                    case (int)TableFileSchema::NEW_INDEX:
1824
                        ++new_index_count;
G
groot 已提交
1825 1826
                        break;
                    case (int)TableFileSchema::TO_INDEX:
1827
                        ++to_index_count;
G
groot 已提交
1828 1829
                        break;
                    case (int)TableFileSchema::INDEX:
1830
                        ++index_count;
G
groot 已提交
1831 1832
                        break;
                    case (int)TableFileSchema::BACKUP:
1833
                        ++backup_count;
G
groot 已提交
1834 1835
                        break;
                    default:
S
shengjh 已提交
1836
                        break;
Z
update  
zhiru 已提交
1837
                }
1838 1839
            }

G
groot 已提交
1840
            std::string msg = "Get table files by type.";
G
groot 已提交
1841 1842 1843
            for (int file_type : file_types) {
                switch (file_type) {
                    case (int)TableFileSchema::RAW:
G
groot 已提交
1844
                        msg = msg + " raw files:" + std::to_string(raw_count);
G
groot 已提交
1845 1846
                        break;
                    case (int)TableFileSchema::NEW:
G
groot 已提交
1847
                        msg = msg + " new files:" + std::to_string(new_count);
G
groot 已提交
1848 1849
                        break;
                    case (int)TableFileSchema::NEW_MERGE:
G
groot 已提交
1850
                        msg = msg + " new_merge files:" + std::to_string(new_merge_count);
G
groot 已提交
1851 1852
                        break;
                    case (int)TableFileSchema::NEW_INDEX:
G
groot 已提交
1853
                        msg = msg + " new_index files:" + std::to_string(new_index_count);
G
groot 已提交
1854 1855
                        break;
                    case (int)TableFileSchema::TO_INDEX:
G
groot 已提交
1856
                        msg = msg + " to_index files:" + std::to_string(to_index_count);
G
groot 已提交
1857 1858
                        break;
                    case (int)TableFileSchema::INDEX:
G
groot 已提交
1859
                        msg = msg + " index files:" + std::to_string(index_count);
G
groot 已提交
1860 1861
                        break;
                    case (int)TableFileSchema::BACKUP:
G
groot 已提交
1862
                        msg = msg + " backup files:" + std::to_string(backup_count);
G
groot 已提交
1863
                        break;
1864
                    default:
S
shengjh 已提交
1865
                        break;
G
groot 已提交
1866 1867 1868
                }
            }
            ENGINE_LOG_DEBUG << msg;
G
groot 已提交
1869
        }
S
starlord 已提交
1870
    } catch (std::exception& e) {
G
groot 已提交
1871
        return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
1872
    }
S
starlord 已提交
1873

1874
    return ret;
1875
}
1876

G
groot 已提交
1877
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1878
Status
G
groot 已提交
1879 1880 1881 1882 1883
MySQLMetaImpl::Archive() {
    auto& criterias = options_.archive_conf_.GetCriterias();
    if (criterias.empty()) {
        return Status::OK();
    }
1884

G
groot 已提交
1885 1886 1887 1888
    for (auto& kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
1889
            size_t usecs = limit * DAY * US_PS;
G
groot 已提交
1890
            int64_t now = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
1891

G
groot 已提交
1892 1893
            try {
                mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1894

S
shengjh 已提交
1895 1896 1897 1898
                bool is_null_connection = (connectionPtr == nullptr);
                fiu_do_on("MySQLMetaImpl.Archive.null_connection", is_null_connection = true);
                fiu_do_on("MySQLMetaImpl.Archive.throw_exception", throw std::exception(););
                if (is_null_connection) {
G
groot 已提交
1899 1900
                    return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
                }
1901

G
groot 已提交
1902 1903 1904 1905 1906
                mysqlpp::Query archiveQuery = connectionPtr->query();
                archiveQuery << "UPDATE " << META_TABLEFILES
                             << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                             << " WHERE created_on < " << std::to_string(now - usecs) << " AND file_type <> "
                             << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1907

G
groot 已提交
1908 1909 1910 1911 1912 1913 1914 1915 1916 1917
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();

                if (!archiveQuery.exec()) {
                    return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
                }

                ENGINE_LOG_DEBUG << "Archive old files";
            } catch (std::exception& e) {
                return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
            }
Z
fix  
zhiru 已提交
1918
        }
G
groot 已提交
1919 1920 1921
        if (criteria == engine::ARCHIVE_CONF_DISK) {
            uint64_t sum = 0;
            Size(sum);
Z
fix  
zhiru 已提交
1922

G
groot 已提交
1923 1924 1925 1926 1927
            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);

            ENGINE_LOG_DEBUG << "Archive files to free disk";
        }
1928
    }
Z
update  
zhiru 已提交
1929

1930 1931
    return Status::OK();
}
Z
zhiru 已提交
1932

S
starlord 已提交
1933
Status
G
groot 已提交
1934 1935 1936
MySQLMetaImpl::Size(uint64_t& result) {
    result = 0;

1937
    try {
G
groot 已提交
1938
        mysqlpp::StoreQueryResult res;
1939
        {
S
starlord 已提交
1940
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1941

S
shengjh 已提交
1942 1943 1944 1945
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.Size.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.Size.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1946
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1947
            }
Z
update  
zhiru 已提交
1948

G
groot 已提交
1949 1950 1951 1952
            mysqlpp::Query getSizeQuery = connectionPtr->query();
            getSizeQuery << "SELECT IFNULL(SUM(file_size),0) AS sum"
                         << " FROM " << META_TABLEFILES << " WHERE file_type <> "
                         << std::to_string(TableFileSchema::TO_DELETE) << ";";
1953

G
groot 已提交
1954
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
1955

G
groot 已提交
1956 1957
            res = getSizeQuery.store();
        }  // Scoped Connection
1958

G
groot 已提交
1959 1960 1961 1962 1963 1964 1965 1966
        if (res.empty()) {
            result = 0;
        } else {
            result = res[0]["sum"];
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
    }
1967

G
groot 已提交
1968 1969
    return Status::OK();
}
1970

G
groot 已提交
1971
Status
1972
MySQLMetaImpl::CleanUpShadowFiles() {
G
groot 已提交
1973 1974
    try {
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1975

S
shengjh 已提交
1976 1977 1978 1979
        bool is_null_connection = (connectionPtr == nullptr);
        fiu_do_on("MySQLMetaImpl.CleanUpShadowFiles.null_connection", is_null_connection = true);
        fiu_do_on("MySQLMetaImpl.CleanUpShadowFiles.throw_exception", throw std::exception(););
        if (is_null_connection) {
G
groot 已提交
1980 1981
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
        }
1982

G
groot 已提交
1983 1984 1985 1986 1987
        mysqlpp::Query cleanUpQuery = connectionPtr->query();
        cleanUpQuery << "SELECT table_name"
                     << " FROM information_schema.tables"
                     << " WHERE table_schema = " << mysqlpp::quote << mysql_connection_pool_->getDB()
                     << " AND table_name = " << mysqlpp::quote << META_TABLEFILES << ";";
Z
fix  
zhiru 已提交
1988

G
groot 已提交
1989
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
1990

G
groot 已提交
1991
        mysqlpp::StoreQueryResult res = cleanUpQuery.store();
1992

G
groot 已提交
1993 1994 1995 1996 1997 1998 1999 2000 2001 2002
        if (!res.empty()) {
            ENGINE_LOG_DEBUG << "Remove table file type as NEW";
            cleanUpQuery << "DELETE FROM " << META_TABLEFILES << " WHERE file_type IN ("
                         << std::to_string(TableFileSchema::NEW) << "," << std::to_string(TableFileSchema::NEW_MERGE)
                         << "," << std::to_string(TableFileSchema::NEW_INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();

            if (!cleanUpQuery.exec()) {
                return HandleException("QUERY ERROR WHEN CLEANING UP FILES", cleanUpQuery.error());
2003
            }
G
groot 已提交
2004
        }
2005

G
groot 已提交
2006 2007 2008
        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << res.size() << " files";
        }
S
starlord 已提交
2009
    } catch (std::exception& e) {
G
groot 已提交
2010
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
2011
    }
S
starlord 已提交
2012

2013 2014
    return Status::OK();
}
Z
fix  
zhiru 已提交
2015

2016
Status
2017
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) {
2018
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
2019 2020
    std::set<std::string> table_ids;

S
starlord 已提交
2021
    // remove to_delete files
2022
    try {
Y
Yu Kun 已提交
2023
        server::MetricCollector metric;
2024

2025
        {
S
starlord 已提交
2026
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
2027

S
shengjh 已提交
2028 2029 2030 2031 2032
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RomoveToDeleteFiles_NullConnection",
                      is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RomoveToDeleteFiles_ThrowException", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2033
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
2034
            }
Z
zhiru 已提交
2035

2036
            mysqlpp::Query query = connectionPtr->query();
2037
            query << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date"
G
groot 已提交
2038
                  << " FROM " << META_TABLEFILES << " WHERE file_type IN ("
G
groot 已提交
2039
                  << std::to_string(TableFileSchema::TO_DELETE) << "," << std::to_string(TableFileSchema::BACKUP) << ")"
2040
                  << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
Z
update  
zhiru 已提交
2041

2042
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
Z
update  
zhiru 已提交
2043

2044
            mysqlpp::StoreQueryResult res = query.store();
Z
update  
zhiru 已提交
2045

2046 2047
            TableFileSchema table_file;
            std::vector<std::string> idsToDelete;
2048

G
groot 已提交
2049
            int64_t clean_files = 0;
S
starlord 已提交
2050 2051
            for (auto& resRow : res) {
                table_file.id_ = resRow["id"];  // implicit conversion
G
groot 已提交
2052
                resRow["table_id"].to_string(table_file.table_id_);
2053 2054
                resRow["segment_id"].to_string(table_file.segment_id_);
                table_file.engine_type_ = resRow["engine_type"];
G
groot 已提交
2055
                resRow["file_id"].to_string(table_file.file_id_);
2056
                table_file.date_ = resRow["date"];
G
groot 已提交
2057
                table_file.file_type_ = resRow["file_type"];
Z
update  
zhiru 已提交
2058

G
groot 已提交
2059
                // check if the file can be deleted
2060
                if (OngoingFileChecker::GetInstance().IsIgnored(table_file)) {
G
groot 已提交
2061 2062 2063
                    ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
                                     << " currently is in use, not able to delete now";
                    continue;  // ignore this file, don't delete it
G
groot 已提交
2064
                }
2065

G
groot 已提交
2066
                // erase file data from cache
G
groot 已提交
2067 2068
                // because GetTableFilePath won't able to generate file path after the file is deleted
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
2069 2070
                server::CommonUtil::EraseFromCache(table_file.location_);

G
groot 已提交
2071
                if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084
                    // If we are deleting a raw table file, it means it's okay to delete the entire segment directory.
                    // Else, we can only delete the single file
                    // TODO(zhiru): We determine whether a table file is raw by its engine type. This is a bit hacky
                    if (table_file.engine_type_ == (int32_t)EngineType::FAISS_IDMAP ||
                        table_file.engine_type_ == (int32_t)EngineType::FAISS_BIN_IDMAP) {
                        utils::DeleteSegment(options_, table_file);
                        std::string segment_dir;
                        utils::GetParentPath(table_file.location_, segment_dir);
                        ENGINE_LOG_DEBUG << "Remove segment directory: " << segment_dir;
                    } else {
                        utils::DeleteTableFilePath(options_, table_file);
                        ENGINE_LOG_DEBUG << "Remove table file: " << table_file.location_;
                    }
G
groot 已提交
2085 2086 2087 2088

                    idsToDelete.emplace_back(std::to_string(table_file.id_));
                    table_ids.insert(table_file.table_id_);

2089
                    ++clean_files;
G
typo  
groot 已提交
2090
                }
2091 2092
            }

G
groot 已提交
2093
            // delete file from meta
2094 2095
            if (!idsToDelete.empty()) {
                std::stringstream idsToDeleteSS;
S
starlord 已提交
2096
                for (auto& id : idsToDelete) {
2097
                    idsToDeleteSS << "id = " << id << " OR ";
2098
                }
2099

2100
                std::string idsToDeleteStr = idsToDeleteSS.str();
S
starlord 已提交
2101
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4);  // remove the last " OR "
2102
                query << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";";
2103

2104
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
2105

2106 2107
                if (!query.exec()) {
                    return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", query.error());
2108 2109
                }
            }
2110

G
groot 已提交
2111
            if (clean_files > 0) {
G
groot 已提交
2112
                ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
2113
            }
S
starlord 已提交
2114 2115
        }  // Scoped Connection
    } catch (std::exception& e) {
S
starlord 已提交
2116
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
Z
update  
zhiru 已提交
2117
    }
2118

S
starlord 已提交
2119
    // remove to_delete tables
2120
    try {
Y
Yu Kun 已提交
2121
        server::MetricCollector metric;
2122 2123

        {
S
starlord 已提交
2124
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
2125

S
shengjh 已提交
2126 2127 2128 2129 2130
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveToDeleteTables_NUllConnection",
                      is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveToDeleteTables_ThrowException", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2131
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
update  
zhiru 已提交
2132 2133
            }

2134 2135 2136
            mysqlpp::Query query = connectionPtr->query();
            query << "SELECT id, table_id"
                  << " FROM " << META_TABLES << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
2137

2138
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
2139

2140
            mysqlpp::StoreQueryResult res = query.store();
Z
update  
zhiru 已提交
2141

2142
            int64_t remove_tables = 0;
Z
update  
zhiru 已提交
2143
            if (!res.empty()) {
2144
                std::stringstream idsToDeleteSS;
S
starlord 已提交
2145
                for (auto& resRow : res) {
2146 2147 2148
                    size_t id = resRow["id"];
                    std::string table_id;
                    resRow["table_id"].to_string(table_id);
Z
update  
zhiru 已提交
2149

S
starlord 已提交
2150
                    utils::DeleteTablePath(options_, table_id, false);  // only delete empty folder
2151
                    ++remove_tables;
2152
                    idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
Z
update  
zhiru 已提交
2153
                }
2154
                std::string idsToDeleteStr = idsToDeleteSS.str();
S
starlord 已提交
2155
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4);  // remove the last " OR "
2156
                query << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";";
2157

2158
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
Z
update  
zhiru 已提交
2159

2160 2161
                if (!query.exec()) {
                    return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", query.error());
2162 2163
                }
            }
2164

2165 2166
            if (remove_tables > 0) {
                ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables from meta";
2167
            }
S
starlord 已提交
2168 2169
        }  // Scoped Connection
    } catch (std::exception& e) {
S
starlord 已提交
2170
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
Z
update  
zhiru 已提交
2171 2172
    }

S
starlord 已提交
2173 2174
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
2175
    try {
Y
Yu Kun 已提交
2176
        server::MetricCollector metric;
S
starlord 已提交
2177 2178

        {
S
starlord 已提交
2179
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
2180

S
shengjh 已提交
2181 2182 2183 2184 2185 2186
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveDeletedTableFolder_NUllConnection",
                      is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveDeletedTableFolder_ThrowException",
                      throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2187
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
2188 2189
            }

S
starlord 已提交
2190
            for (auto& table_id : table_ids) {
2191 2192 2193
                mysqlpp::Query query = connectionPtr->query();
                query << "SELECT file_id"
                      << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
S
starlord 已提交
2194

2195
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
S
starlord 已提交
2196

2197
                mysqlpp::StoreQueryResult res = query.store();
S
starlord 已提交
2198 2199 2200 2201 2202

                if (res.empty()) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }
2203

S
starlord 已提交
2204
            if (table_ids.size() > 0) {
2205 2206
                ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
            }
S
starlord 已提交
2207
        }
S
starlord 已提交
2208
    } catch (std::exception& e) {
S
starlord 已提交
2209
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
S
starlord 已提交
2210 2211
    }

2212 2213
    return Status::OK();
}
2214

S
starlord 已提交
2215
Status
S
starlord 已提交
2216
MySQLMetaImpl::Count(const std::string& table_id, uint64_t& result) {
2217
    try {
Y
Yu Kun 已提交
2218
        server::MetricCollector metric;
2219 2220 2221 2222 2223 2224 2225

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);

        if (!status.ok()) {
            return status;
2226
        }
Z
zhiru 已提交
2227

S
starlord 已提交
2228
        mysqlpp::StoreQueryResult res;
2229
        {
S
starlord 已提交
2230
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
2231

S
shengjh 已提交
2232 2233 2234 2235
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.Count.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.Count.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2236
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
update  
zhiru 已提交
2237 2238
            }

S
starlord 已提交
2239
            mysqlpp::Query countQuery = connectionPtr->query();
G
groot 已提交
2240 2241 2242 2243 2244
            countQuery << "SELECT row_count"
                       << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                       << " AND (file_type = " << std::to_string(TableFileSchema::RAW)
                       << " OR file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                       << " OR file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
Z
update  
zhiru 已提交
2245

2246
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
Z
update  
zhiru 已提交
2247

2248
            res = countQuery.store();
S
starlord 已提交
2249
        }  // Scoped Connection
2250 2251

        result = 0;
S
starlord 已提交
2252
        for (auto& resRow : res) {
S
starlord 已提交
2253
            size_t size = resRow["row_count"];
2254
            result += size;
2255
        }
S
starlord 已提交
2256
    } catch (std::exception& e) {
S
starlord 已提交
2257
        return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
Z
update  
zhiru 已提交
2258
    }
S
starlord 已提交
2259

2260 2261 2262
    return Status::OK();
}

S
starlord 已提交
2263 2264
Status
MySQLMetaImpl::DropAll() {
2265
    try {
S
starlord 已提交
2266
        ENGINE_LOG_DEBUG << "Drop all mysql meta";
S
starlord 已提交
2267
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
2268

S
shengjh 已提交
2269 2270 2271 2272
        bool is_null_connection = (connectionPtr == nullptr);
        fiu_do_on("MySQLMetaImpl.DropAll.null_connection", is_null_connection = true);
        fiu_do_on("MySQLMetaImpl.DropAll.throw_exception", throw std::exception(););
        if (is_null_connection) {
G
groot 已提交
2273
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
zhiru 已提交
2274
        }
2275

S
starlord 已提交
2276
        mysqlpp::Query dropTableQuery = connectionPtr->query();
2277
        dropTableQuery << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
2278 2279 2280 2281 2282 2283

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();

        if (dropTableQuery.exec()) {
            return Status::OK();
        }
S
starlord 已提交
2284
        return HandleException("QUERY ERROR WHEN DROPPING ALL", dropTableQuery.error());
S
starlord 已提交
2285
    } catch (std::exception& e) {
S
starlord 已提交
2286
        return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
2287 2288 2289
    }
}

G
groot 已提交
2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302
Status
MySQLMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
        server::MetricCollector metric;
        bool status;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
2303 2304 2305 2306
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DiscardFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DiscardFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query discardFilesQuery = connectionPtr->query();
            discardFilesQuery << "SELECT id, file_size"
                              << " FROM " << META_TABLEFILES << " WHERE file_type <> "
                              << std::to_string(TableFileSchema::TO_DELETE) << " ORDER BY id ASC "
                              << " LIMIT 10;";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();

            mysqlpp::StoreQueryResult res = discardFilesQuery.store();
            if (res.num_rows() == 0) {
                return Status::OK();
            }

            TableFileSchema table_file;
            std::stringstream idsToDiscardSS;
            for (auto& resRow : res) {
                if (to_discard_size <= 0) {
                    break;
                }
                table_file.id_ = resRow["id"];
                table_file.file_size_ = resRow["file_size"];
                idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            std::string idsToDiscardStr = idsToDiscardSS.str();
            idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4);  // remove the last " OR "

            discardFilesQuery << "UPDATE " << META_TABLEFILES
                              << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                              << " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " WHERE "
                              << idsToDiscardStr << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();

            status = discardFilesQuery.exec();
            if (!status) {
                return HandleException("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
            }
        }  // Scoped Connection

        return DiscardFiles(to_discard_size);
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
    }
}

2359 2360 2361 2362 2363 2364 2365 2366 2367 2368
Status
MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
    return Status::OK();
}

Status
MySQLMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
    return Status::OK();
}

S
starlord 已提交
2369 2370 2371
}  // namespace meta
}  // namespace engine
}  // namespace milvus