MySQLMetaImpl.cpp 100.9 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/meta/MySQLMetaImpl.h"
Z
update  
zhiru 已提交
13

S
shengjh 已提交
14
#include <fiu-local.h>
S
starlord 已提交
15 16
#include <mysql++/mysql++.h>
#include <string.h>
Z
update  
zhiru 已提交
17
#include <unistd.h>
18

S
starlord 已提交
19
#include <boost/filesystem.hpp>
Z
update  
zhiru 已提交
20 21
#include <chrono>
#include <fstream>
S
starlord 已提交
22 23 24
#include <iostream>
#include <map>
#include <mutex>
Z
update  
zhiru 已提交
25
#include <regex>
S
starlord 已提交
26 27
#include <set>
#include <sstream>
Z
update  
zhiru 已提交
28
#include <string>
Z
zhiru 已提交
29
#include <thread>
Z
update  
zhiru 已提交
30

31 32 33 34 35 36 37 38 39
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
Z
Zhiru Zhu 已提交
40
#include "utils/ValidationUtil.h"
41

Z
update  
zhiru 已提交
42 43 44 45
namespace milvus {
namespace engine {
namespace meta {

46
namespace {
Z
update  
zhiru 已提交
47

S
starlord 已提交
48
Status
S
starlord 已提交
49
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
50
    if (what == nullptr) {
S
starlord 已提交
51 52 53
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    }
S
starlord 已提交
54 55 56 57

    std::string msg = desc + ":" + what;
    ENGINE_LOG_ERROR << msg;
    return Status(DB_META_TRANSACTION_FAILED, msg);
58
}
Z
update  
zhiru 已提交
59

60
class MetaField {
S
starlord 已提交
61
 public:
S
starlord 已提交
62 63
    MetaField(const std::string& name, const std::string& type, const std::string& setting)
        : name_(name), type_(type), setting_(setting) {
64 65
    }

S
starlord 已提交
66 67
    std::string
    name() const {
68 69 70
        return name_;
    }

S
starlord 已提交
71 72
    std::string
    ToString() const {
73 74 75 76 77
        return name_ + " " + type_ + " " + setting_;
    }

    // mysql field type has additional information. for instance, a filed type is defined as 'BIGINT'
    // we get the type from sql is 'bigint(20)', so we need to ignore the '(20)'
S
starlord 已提交
78 79
    bool
    IsEqual(const MetaField& field) const {
80 81 82
        size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length();
        size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length();
        return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 &&
S
starlord 已提交
83
               strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0;
84 85
    }

S
starlord 已提交
86
 private:
87 88 89 90 91 92 93
    std::string name_;
    std::string type_;
    std::string setting_;
};

using MetaFields = std::vector<MetaField>;
class MetaSchema {
S
starlord 已提交
94
 public:
S
starlord 已提交
95
    MetaSchema(const std::string& name, const MetaFields& fields) : name_(name), fields_(fields) {
96 97
    }

S
starlord 已提交
98 99
    std::string
    name() const {
100 101 102
        return name_;
    }

S
starlord 已提交
103 104
    std::string
    ToString() const {
105
        std::string result;
S
starlord 已提交
106
        for (auto& field : fields_) {
S
starlord 已提交
107
            if (!result.empty()) {
108 109 110 111 112 113 114
                result += ",";
            }
            result += field.ToString();
        }
        return result;
    }

S
starlord 已提交
115 116 117 118
    // if the outer fields contains all this MetaSchema fields, return true
    // otherwise return false
    bool
    IsEqual(const MetaFields& fields) const {
119
        std::vector<std::string> found_field;
S
starlord 已提交
120 121
        for (const auto& this_field : fields_) {
            for (const auto& outer_field : fields) {
S
starlord 已提交
122
                if (this_field.IsEqual(outer_field)) {
123 124 125 126 127 128 129 130 131
                    found_field.push_back(this_field.name());
                    break;
                }
            }
        }

        return found_field.size() == fields_.size();
    }

S
starlord 已提交
132
 private:
133 134 135 136
    std::string name_;
    MetaFields fields_;
};

S
starlord 已提交
137
// Tables schema
138
static const MetaSchema TABLES_SCHEMA(META_TABLES, {
S
starlord 已提交
139 140 141 142 143 144 145 146
                                                       MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
                                                       MetaField("table_id", "VARCHAR(255)", "UNIQUE NOT NULL"),
                                                       MetaField("state", "INT", "NOT NULL"),
                                                       MetaField("dimension", "SMALLINT", "NOT NULL"),
                                                       MetaField("created_on", "BIGINT", "NOT NULL"),
                                                       MetaField("flag", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                       MetaField("index_file_size", "BIGINT", "DEFAULT 1024 NOT NULL"),
                                                       MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
147
                                                       MetaField("index_params", "VARCHAR(512)", "NOT NULL"),
S
starlord 已提交
148
                                                       MetaField("metric_type", "INT", "DEFAULT 1 NOT NULL"),
G
groot 已提交
149 150 151 152
                                                       MetaField("owner_table", "VARCHAR(255)", "NOT NULL"),
                                                       MetaField("partition_tag", "VARCHAR(255)", "NOT NULL"),
                                                       MetaField("version", "VARCHAR(64)",
                                                                 std::string("DEFAULT '") + CURRENT_VERSION + "'"),
153
                                                       MetaField("flush_lsn", "BIGINT", "DEFAULT 0 NOT NULL"),
S
starlord 已提交
154 155 156
                                                   });

// TableFiles schema
J
JinHai-CN 已提交
157 158 159
static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
                                                               MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
                                                               MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
160
                                                               MetaField("segment_id", "VARCHAR(255)", "NOT NULL"),
J
JinHai-CN 已提交
161 162 163 164 165 166 167 168
                                                               MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
                                                               MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
                                                               MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("updated_time", "BIGINT", "NOT NULL"),
                                                               MetaField("created_on", "BIGINT", "NOT NULL"),
                                                               MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
169
                                                               MetaField("flush_lsn", "BIGINT", "DEFAULT 0 NOT NULL"),
J
JinHai-CN 已提交
170
                                                           });
S
starlord 已提交
171 172

}  // namespace
173

174
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
175
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions& options, const int& mode) : options_(options), mode_(mode) {
176 177 178 179 180 181
    Initialize();
}

MySQLMetaImpl::~MySQLMetaImpl() {
}

S
starlord 已提交
182
Status
S
starlord 已提交
183
MySQLMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
184
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
185
    std::stringstream ss;
J
Jin Hai 已提交
186 187
    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
188 189 190 191
    table_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
192
Status
S
starlord 已提交
193
MySQLMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
194
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
195
    std::stringstream ss;
J
Jin Hai 已提交
196 197 198

    SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
    ss << id_generator.GetNextIDNumber();
199 200 201 202
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
203 204 205
void
MySQLMetaImpl::ValidateMetaSchema() {
    if (nullptr == mysql_connection_pool_) {
206 207 208
        return;
    }

S
starlord 已提交
209
    mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
210 211 212 213
    if (connectionPtr == nullptr) {
        return;
    }

S
starlord 已提交
214
    auto validate_func = [&](const MetaSchema& schema) {
S
shengjh 已提交
215
        fiu_return_on("MySQLMetaImpl.ValidateMetaSchema.fail_validate", false);
S
starlord 已提交
216
        mysqlpp::Query query_statement = connectionPtr->query();
217 218 219 220 221
        query_statement << "DESC " << schema.name() << ";";

        MetaFields exist_fields;

        try {
S
starlord 已提交
222
            mysqlpp::StoreQueryResult res = query_statement.store();
223
            for (size_t i = 0; i < res.num_rows(); ++i) {
S
starlord 已提交
224
                const mysqlpp::Row& row = res[i];
225 226 227 228 229 230
                std::string name, type;
                row["Field"].to_string(name);
                row["Type"].to_string(type);

                exist_fields.push_back(MetaField(name, type, ""));
            }
S
starlord 已提交
231
        } catch (std::exception& e) {
232 233 234
            ENGINE_LOG_DEBUG << "Meta table '" << schema.name() << "' not exist and will be created";
        }

S
starlord 已提交
235
        if (exist_fields.empty()) {
236 237 238 239 240 241
            return true;
        }

        return schema.IsEqual(exist_fields);
    };

S
starlord 已提交
242
    // verify Tables
243 244 245 246
    if (!validate_func(TABLES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }

S
starlord 已提交
247
    // verufy TableFiles
248 249 250 251 252
    if (!validate_func(TABLEFILES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
253 254
Status
MySQLMetaImpl::Initialize() {
S
starlord 已提交
255
    // step 1: create db root path
S
starlord 已提交
256 257
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
S
shengjh 已提交
258
        fiu_do_on("MySQLMetaImpl.Initialize.fail_create_directory", ret = false);
259
        if (!ret) {
S
starlord 已提交
260
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
261
            ENGINE_LOG_ERROR << msg;
S
shengjh 已提交
262
            throw Exception(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
263 264 265
        }
    }

S
starlord 已提交
266
    std::string uri = options_.backend_uri_;
267

S
starlord 已提交
268
    // step 2: parse and check meta uri
269 270
    utils::MetaUriInfo uri_info;
    auto status = utils::ParseMetaUri(uri, uri_info);
S
starlord 已提交
271
    if (!status.ok()) {
272 273 274 275 276 277 278 279 280 281 282
        std::string msg = "Wrong URI format: " + uri;
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

    if (strcasecmp(uri_info.dialect_.c_str(), "mysql") != 0) {
        std::string msg = "URI's dialect is not MySQL";
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

S
starlord 已提交
283
    // step 3: connect mysql
284 285 286 287 288 289 290
    int thread_hint = std::thread::hardware_concurrency();
    int max_pool_size = (thread_hint == 0) ? 8 : thread_hint;
    unsigned int port = 0;
    if (!uri_info.port_.empty()) {
        port = std::stoi(uri_info.port_);
    }

S
starlord 已提交
291 292
    mysql_connection_pool_ = std::make_shared<MySQLConnectionPool>(
        uri_info.db_name_, uri_info.username_, uri_info.password_, uri_info.host_, port, max_pool_size);
293 294
    ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(max_pool_size);

S
starlord 已提交
295
    // step 4: validate to avoid open old version schema
296 297
    ValidateMetaSchema();

298 299 300 301
    // step 5: clean shadow files
    if (mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        CleanUpShadowFiles();
    }
302

303 304
    // step 6: try connect mysql server
    mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
305

306
    if (connectionPtr == nullptr) {
G
groot 已提交
307
        std::string msg = "Failed to connect MySQL meta server: " + uri;
308 309 310
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }
311

S
shengjh 已提交
312 313 314
    bool is_thread_aware = connectionPtr->thread_aware();
    fiu_do_on("MySQLMetaImpl.Initialize.is_thread_aware", is_thread_aware = false);
    if (!is_thread_aware) {
G
groot 已提交
315 316
        std::string msg =
            "Failed to initialize MySQL meta backend: MySQL client component wasn't built with thread awareness";
317 318 319
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }
320

321 322
    // step 7: create meta table Tables
    mysqlpp::Query InitializeQuery = connectionPtr->query();
323

324
    InitializeQuery << "CREATE TABLE IF NOT EXISTS " << TABLES_SCHEMA.name() << " (" << TABLES_SCHEMA.ToString() + ");";
Z
update  
zhiru 已提交
325

326
    ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
327

S
shengjh 已提交
328 329 330
    bool initialize_query_exec = InitializeQuery.exec();
    fiu_do_on("MySQLMetaImpl.Initialize.fail_create_table_scheme", initialize_query_exec = false);
    if (!initialize_query_exec) {
G
groot 已提交
331
        std::string msg = "Failed to create meta table 'Tables' in MySQL";
332 333 334
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_META_TRANSACTION_FAILED, msg);
    }
Z
update  
zhiru 已提交
335

336 337 338
    // step 8: create meta table TableFiles
    InitializeQuery << "CREATE TABLE IF NOT EXISTS " << TABLEFILES_SCHEMA.name() << " ("
                    << TABLEFILES_SCHEMA.ToString() + ");";
Z
update  
zhiru 已提交
339

340 341
    ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();

S
shengjh 已提交
342 343 344
    initialize_query_exec = InitializeQuery.exec();
    fiu_do_on("MySQLMetaImpl.Initialize.fail_create_table_files", initialize_query_exec = false);
    if (!initialize_query_exec) {
G
groot 已提交
345
        std::string msg = "Failed to create meta table 'TableFiles' in MySQL";
346 347
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
348
    }
S
starlord 已提交
349 350

    return Status::OK();
351 352
}

S
starlord 已提交
353
Status
S
starlord 已提交
354
MySQLMetaImpl::CreateTable(TableSchema& table_schema) {
355
    try {
Y
Yu Kun 已提交
356
        server::MetricCollector metric;
357
        {
S
starlord 已提交
358
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
359

S
shengjh 已提交
360 361 362 363
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CreateTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CreateTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
364
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
365
            }
Z
update  
zhiru 已提交
366

S
starlord 已提交
367
            mysqlpp::Query createTableQuery = connectionPtr->query();
Z
update  
zhiru 已提交
368

369 370 371
            if (table_schema.table_id_.empty()) {
                NextTableId(table_schema.table_id_);
            } else {
G
groot 已提交
372 373
                createTableQuery << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                 << table_schema.table_id_ << ";";
Z
zhiru 已提交
374

375
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
Z
update  
zhiru 已提交
376

S
starlord 已提交
377
                mysqlpp::StoreQueryResult res = createTableQuery.store();
378

379 380
                if (res.num_rows() == 1) {
                    int state = res[0]["state"];
S
shengjh 已提交
381
                    fiu_do_on("MySQLMetaImpl.CreateTableTable.schema_TO_DELETE", state = TableSchema::TO_DELETE);
382
                    if (TableSchema::TO_DELETE == state) {
S
starlord 已提交
383
                        return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
384
                    } else {
S
starlord 已提交
385
                        return Status(DB_ALREADY_EXIST, "Table already exists");
386 387 388
                    }
                }
            }
389

390 391
            table_schema.id_ = -1;
            table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
392

S
starlord 已提交
393
            std::string id = "NULL";  // auto-increment
G
groot 已提交
394
            std::string& table_id = table_schema.table_id_;
395 396 397
            std::string state = std::to_string(table_schema.state_);
            std::string dimension = std::to_string(table_schema.dimension_);
            std::string created_on = std::to_string(table_schema.created_on_);
S
starlord 已提交
398 399
            std::string flag = std::to_string(table_schema.flag_);
            std::string index_file_size = std::to_string(table_schema.index_file_size_);
400
            std::string engine_type = std::to_string(table_schema.engine_type_);
401
            std::string& index_params = table_schema.index_params_;
S
starlord 已提交
402
            std::string metric_type = std::to_string(table_schema.metric_type_);
G
groot 已提交
403 404 405
            std::string& owner_table = table_schema.owner_table_;
            std::string& partition_tag = table_schema.partition_tag_;
            std::string& version = table_schema.version_;
406
            std::string flush_lsn = std::to_string(table_schema.flush_lsn_);
Z
update  
zhiru 已提交
407

G
groot 已提交
408 409
            createTableQuery << "INSERT INTO " << META_TABLES << " VALUES(" << id << ", " << mysqlpp::quote << table_id
                             << ", " << state << ", " << dimension << ", " << created_on << ", " << flag << ", "
410 411 412
                             << index_file_size << ", " << engine_type << ", " << mysqlpp::quote << index_params << ", "
                             << metric_type << ", " << mysqlpp::quote << owner_table << ", " << mysqlpp::quote
                             << partition_tag << ", " << mysqlpp::quote << version << ", " << flush_lsn << ");";
Z
update  
zhiru 已提交
413

414
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
415

S
starlord 已提交
416
            if (mysqlpp::SimpleResult res = createTableQuery.execute()) {
S
starlord 已提交
417
                table_schema.id_ = res.insert_id();  // Might need to use SELECT LAST_INSERT_ID()?
418

S
starlord 已提交
419
                // Consume all results to avoid "Commands out of sync" error
420
            } else {
S
starlord 已提交
421
                return HandleException("Add Table Error", createTableQuery.error());
422
            }
S
starlord 已提交
423
        }  // Scoped Connection
424

425
        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
426
        return utils::CreateTablePath(options_, table_schema.table_id_);
S
starlord 已提交
427
    } catch (std::exception& e) {
S
starlord 已提交
428
        return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
429 430
    }
}
431

S
starlord 已提交
432
Status
G
groot 已提交
433
MySQLMetaImpl::DescribeTable(TableSchema& table_schema) {
Z
zhiru 已提交
434
    try {
G
groot 已提交
435
        server::MetricCollector metric;
S
starlord 已提交
436
        mysqlpp::StoreQueryResult res;
Z
zhiru 已提交
437
        {
S
starlord 已提交
438
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
439

S
shengjh 已提交
440 441 442 443
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DescribeTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DescribeTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
444
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
445 446
            }

G
groot 已提交
447 448
            mysqlpp::Query describeTableQuery = connectionPtr->query();
            describeTableQuery
449 450
                << "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type, index_params"
                << " , metric_type ,owner_table, partition_tag, version, flush_lsn"
G
groot 已提交
451 452
                << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_schema.table_id_
                << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
zhiru 已提交
453

G
groot 已提交
454
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
Z
zhiru 已提交
455

G
groot 已提交
456
            res = describeTableQuery.store();
S
starlord 已提交
457
        }  // Scoped Connection
Z
zhiru 已提交
458

G
groot 已提交
459 460 461 462 463 464 465 466 467
        if (res.num_rows() == 1) {
            const mysqlpp::Row& resRow = res[0];
            table_schema.id_ = resRow["id"];  // implicit conversion
            table_schema.state_ = resRow["state"];
            table_schema.dimension_ = resRow["dimension"];
            table_schema.created_on_ = resRow["created_on"];
            table_schema.flag_ = resRow["flag"];
            table_schema.index_file_size_ = resRow["index_file_size"];
            table_schema.engine_type_ = resRow["engine_type"];
468
            resRow["index_params"].to_string(table_schema.index_params_);
G
groot 已提交
469 470 471 472
            table_schema.metric_type_ = resRow["metric_type"];
            resRow["owner_table"].to_string(table_schema.owner_table_);
            resRow["partition_tag"].to_string(table_schema.partition_tag_);
            resRow["version"].to_string(table_schema.version_);
473
            table_schema.flush_lsn_ = resRow["flush_lsn"];
G
groot 已提交
474 475
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
476
        }
S
starlord 已提交
477
    } catch (std::exception& e) {
G
groot 已提交
478
        return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
Z
zhiru 已提交
479 480
    }

481 482
    return Status::OK();
}
483

S
starlord 已提交
484
Status
G
groot 已提交
485
MySQLMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
486
    try {
Y
Yu Kun 已提交
487
        server::MetricCollector metric;
G
groot 已提交
488
        mysqlpp::StoreQueryResult res;
S
starlord 已提交
489
        {
S
starlord 已提交
490
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
491

S
shengjh 已提交
492 493 494 495
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.HasTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.HasTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
496
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
497 498
            }

G
groot 已提交
499 500 501 502 503 504 505
            mysqlpp::Query hasTableQuery = connectionPtr->query();
            // since table_id is a unique column we just need to check whether it exists or not
            hasTableQuery << "SELECT EXISTS"
                          << " (SELECT 1 FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                          << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ")"
                          << " AS " << mysqlpp::quote << "check"
                          << ";";
S
starlord 已提交
506

G
groot 已提交
507
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
S
starlord 已提交
508

G
groot 已提交
509
            res = hasTableQuery.store();
S
starlord 已提交
510
        }  // Scoped Connection
S
starlord 已提交
511

G
groot 已提交
512 513
        int check = res[0]["check"];
        has_or_not = (check == 1);
S
starlord 已提交
514
    } catch (std::exception& e) {
G
groot 已提交
515
        return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
S
starlord 已提交
516 517
    }

518 519 520
    return Status::OK();
}

S
starlord 已提交
521
Status
G
groot 已提交
522
MySQLMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
S
starlord 已提交
523
    try {
Y
Yu Kun 已提交
524
        server::MetricCollector metric;
G
groot 已提交
525
        mysqlpp::StoreQueryResult res;
S
starlord 已提交
526
        {
S
starlord 已提交
527
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
528

S
shengjh 已提交
529 530 531 532
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.AllTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.AllTable.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
533
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
534 535
            }

G
groot 已提交
536
            mysqlpp::Query allTablesQuery = connectionPtr->query();
537
            allTablesQuery << "SELECT id, table_id, dimension, engine_type, index_params, index_file_size, metric_type"
538
                           << " ,owner_table, partition_tag, version, flush_lsn"
G
groot 已提交
539
                           << " FROM " << META_TABLES << " WHERE state <> " << std::to_string(TableSchema::TO_DELETE)
540
                           << " AND owner_table = \"\";";
S
starlord 已提交
541

G
groot 已提交
542
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
S
starlord 已提交
543

G
groot 已提交
544
            res = allTablesQuery.store();
S
starlord 已提交
545
        }  // Scoped Connection
S
starlord 已提交
546

G
groot 已提交
547 548 549 550 551 552 553
        for (auto& resRow : res) {
            TableSchema table_schema;
            table_schema.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_schema.table_id_);
            table_schema.dimension_ = resRow["dimension"];
            table_schema.index_file_size_ = resRow["index_file_size"];
            table_schema.engine_type_ = resRow["engine_type"];
554
            resRow["index_params"].to_string(table_schema.index_params_);
G
groot 已提交
555 556 557 558
            table_schema.metric_type_ = resRow["metric_type"];
            resRow["owner_table"].to_string(table_schema.owner_table_);
            resRow["partition_tag"].to_string(table_schema.partition_tag_);
            resRow["version"].to_string(table_schema.version_);
559
            table_schema.flush_lsn_ = resRow["flush_lsn"];
G
groot 已提交
560 561 562

            table_schema_array.emplace_back(table_schema);
        }
S
starlord 已提交
563
    } catch (std::exception& e) {
G
groot 已提交
564
        return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
S
starlord 已提交
565 566 567 568 569
    }

    return Status::OK();
}

S
starlord 已提交
570
Status
G
groot 已提交
571
MySQLMetaImpl::DropTable(const std::string& table_id) {
S
starlord 已提交
572
    try {
Y
Yu Kun 已提交
573
        server::MetricCollector metric;
S
starlord 已提交
574
        {
S
starlord 已提交
575
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
576

S
shengjh 已提交
577 578 579 580 581
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DropTable.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DropTable.throw_exception", throw std::exception(););

            if (is_null_connection) {
G
groot 已提交
582
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
583 584
            }

G
groot 已提交
585 586 587 588 589
            // soft delete table
            mysqlpp::Query deleteTableQuery = connectionPtr->query();
            //
            deleteTableQuery << "UPDATE " << META_TABLES << " SET state = " << std::to_string(TableSchema::TO_DELETE)
                             << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
S
starlord 已提交
590

G
groot 已提交
591
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
S
starlord 已提交
592

G
groot 已提交
593 594
            if (!deleteTableQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
S
starlord 已提交
595
            }
S
starlord 已提交
596
        }  // Scoped Connection
G
groot 已提交
597

S
shengjh 已提交
598 599 600
        bool is_writable_mode{mode_ == DBOptions::MODE::CLUSTER_WRITABLE};
        fiu_do_on("MySQLMetaImpl.DropTable.CLUSTER_WRITABLE_MODE", is_writable_mode = true);
        if (is_writable_mode) {
G
groot 已提交
601 602 603 604
            DeleteTableFiles(table_id);
        }

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
S
starlord 已提交
605
    } catch (std::exception& e) {
G
groot 已提交
606
        return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
S
starlord 已提交
607 608 609 610 611
    }

    return Status::OK();
}

S
starlord 已提交
612
Status
G
groot 已提交
613
MySQLMetaImpl::DeleteTableFiles(const std::string& table_id) {
S
starlord 已提交
614
    try {
Y
Yu Kun 已提交
615
        server::MetricCollector metric;
616
        {
S
starlord 已提交
617
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
618

S
shengjh 已提交
619 620 621 622 623
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DeleteTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DeleteTableFiles.throw_exception", throw std::exception(););

            if (is_null_connection) {
G
groot 已提交
624
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
625 626
            }

G
groot 已提交
627 628 629 630 631 632 633 634
            // soft delete table files
            mysqlpp::Query deleteTableFilesQuery = connectionPtr->query();
            //
            deleteTableFilesQuery << "UPDATE " << META_TABLEFILES
                                  << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                  << " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp())
                                  << " WHERE table_id = " << mysqlpp::quote << table_id << " AND file_type <> "
                                  << std::to_string(TableFileSchema::TO_DELETE) << ";";
635

G
groot 已提交
636
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
637

G
groot 已提交
638 639
            if (!deleteTableFilesQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
S
starlord 已提交
640
            }
S
starlord 已提交
641
        }  // Scoped Connection
S
starlord 已提交
642

G
groot 已提交
643
        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
S
starlord 已提交
644
    } catch (std::exception& e) {
G
groot 已提交
645
        return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
S
starlord 已提交
646
    }
647

S
starlord 已提交
648 649
    return Status::OK();
}
Z
update  
zhiru 已提交
650

S
starlord 已提交
651
Status
G
groot 已提交
652 653 654 655 656 657 658 659 660 661
MySQLMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
Z
zhiru 已提交
662

G
groot 已提交
663 664
    try {
        server::MetricCollector metric;
Z
update  
zhiru 已提交
665

G
groot 已提交
666
        NextFileId(file_schema.file_id_);
667 668 669
        if (file_schema.segment_id_.empty()) {
            file_schema.segment_id_ = file_schema.file_id_;
        }
G
groot 已提交
670 671 672 673 674 675
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
676
        file_schema.index_params_ = table_schema.index_params_;
Z
Zhiru Zhu 已提交
677 678 679 680 681 682 683 684 685 686

        if (file_schema.file_type_ == TableFileSchema::FILE_TYPE::NEW ||
            file_schema.file_type_ == TableFileSchema::FILE_TYPE::NEW_MERGE) {
            file_schema.engine_type_ = server::ValidationUtil::IsBinaryMetricType(table_schema.metric_type_)
                                           ? (int32_t)EngineType::FAISS_BIN_IDMAP
                                           : (int32_t)EngineType::FAISS_IDMAP;
        } else {
            file_schema.engine_type_ = table_schema.engine_type_;
        }

G
groot 已提交
687
        file_schema.metric_type_ = table_schema.metric_type_;
688

G
groot 已提交
689 690
        std::string id = "NULL";  // auto-increment
        std::string table_id = file_schema.table_id_;
691
        std::string segment_id = file_schema.segment_id_;
G
groot 已提交
692 693 694 695 696 697 698 699
        std::string engine_type = std::to_string(file_schema.engine_type_);
        std::string file_id = file_schema.file_id_;
        std::string file_type = std::to_string(file_schema.file_type_);
        std::string file_size = std::to_string(file_schema.file_size_);
        std::string row_count = std::to_string(file_schema.row_count_);
        std::string updated_time = std::to_string(file_schema.updated_time_);
        std::string created_on = std::to_string(file_schema.created_on_);
        std::string date = std::to_string(file_schema.date_);
700
        std::string flush_lsn = std::to_string(file_schema.flush_lsn_);
G
groot 已提交
701 702 703 704

        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
705 706 707 708
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CreateTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CreateTableFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
709
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
710
            }
Z
zhiru 已提交
711

G
groot 已提交
712
            mysqlpp::Query createTableFileQuery = connectionPtr->query();
Z
update  
zhiru 已提交
713

G
groot 已提交
714
            createTableFileQuery << "INSERT INTO " << META_TABLEFILES << " VALUES(" << id << ", " << mysqlpp::quote
715 716 717 718
                                 << table_id << ", " << mysqlpp::quote << segment_id << ", " << engine_type << ", "
                                 << mysqlpp::quote << file_id << ", " << file_type << ", " << file_size << ", "
                                 << row_count << ", " << updated_time << ", " << created_on << ", " << date << ", "
                                 << flush_lsn << ");";
G
groot 已提交
719 720 721 722 723 724 725 726 727 728 729 730 731 732

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();

            if (mysqlpp::SimpleResult res = createTableFileQuery.execute()) {
                file_schema.id_ = res.insert_id();  // Might need to use SELECT LAST_INSERT_ID()?

                // Consume all results to avoid "Commands out of sync" error
            } else {
                return HandleException("QUERY ERROR WHEN CREATING TABLE FILE", createTableFileQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
S
starlord 已提交
733
    } catch (std::exception& e) {
G
groot 已提交
734
        return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
735 736
    }
}
Z
update  
zhiru 已提交
737

S
starlord 已提交
738
Status
G
groot 已提交
739 740 741 742 743 744 745 746 747 748 749 750 751
MySQLMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
                             TableFilesSchema& table_files) {
    if (ids.empty()) {
        return Status::OK();
    }

    std::stringstream idSS;
    for (auto& id : ids) {
        idSS << "id = " << std::to_string(id) << " OR ";
    }
    std::string idStr = idSS.str();
    idStr = idStr.substr(0, idStr.size() - 4);  // remove the last " OR "

752
    try {
S
starlord 已提交
753
        mysqlpp::StoreQueryResult res;
754
        {
S
starlord 已提交
755
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
756

S
shengjh 已提交
757 758 759 760
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.GetTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.GetTableFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
761
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
762
            }
Z
zhiru 已提交
763

G
groot 已提交
764
            mysqlpp::Query getTableFileQuery = connectionPtr->query();
765 766 767 768 769
            getTableFileQuery
                << "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << " AND ("
                << idStr << ")"
                << " AND file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
770

G
groot 已提交
771
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();
Z
update  
zhiru 已提交
772

G
groot 已提交
773
            res = getTableFileQuery.store();
S
starlord 已提交
774
        }  // Scoped Connection
775

G
groot 已提交
776 777 778
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        DescribeTable(table_schema);
S
starlord 已提交
779

G
groot 已提交
780 781 782 783 784
        Status ret;
        for (auto& resRow : res) {
            TableFileSchema file_schema;
            file_schema.id_ = resRow["id"];
            file_schema.table_id_ = table_id;
785
            resRow["segment_id"].to_string(file_schema.segment_id_);
G
groot 已提交
786 787
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.engine_type_ = resRow["engine_type"];
788
            file_schema.index_params_ = table_schema.index_params_;
G
groot 已提交
789 790 791 792 793 794 795 796
            file_schema.metric_type_ = table_schema.metric_type_;
            resRow["file_id"].to_string(file_schema.file_id_);
            file_schema.file_type_ = resRow["file_type"];
            file_schema.file_size_ = resRow["file_size"];
            file_schema.row_count_ = resRow["row_count"];
            file_schema.date_ = resRow["date"];
            file_schema.created_on_ = resRow["created_on"];
            file_schema.dimension_ = table_schema.dimension_;
S
starlord 已提交
797

G
groot 已提交
798 799
            utils::GetTableFilePath(options_, file_schema);
            table_files.emplace_back(file_schema);
800
        }
G
groot 已提交
801 802 803

        ENGINE_LOG_DEBUG << "Get table files by id";
        return ret;
S
starlord 已提交
804
    } catch (std::exception& e) {
G
groot 已提交
805
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
Z
update  
zhiru 已提交
806
    }
807
}
Z
zhiru 已提交
808

809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
Status
MySQLMetaImpl::GetTableFilesBySegmentId(const std::string& segment_id,
                                        milvus::engine::meta::TableFilesSchema& table_files) {
    try {
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query getTableFileQuery = connectionPtr->query();
            getTableFileQuery << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
                              << "row_count, date, created_on"
                              << " FROM " << META_TABLEFILES << " WHERE segment_id = " << mysqlpp::quote << segment_id
                              << " AND file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFilesBySegmentId: " << getTableFileQuery.str();

            res = getTableFileQuery.store();
        }  // Scoped Connection

        if (!res.empty()) {
            TableSchema table_schema;
            res[0]["table_id"].to_string(table_schema.table_id_);
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }

            for (auto& resRow : res) {
                TableFileSchema file_schema;
                file_schema.id_ = resRow["id"];
                file_schema.table_id_ = table_schema.table_id_;
                resRow["segment_id"].to_string(file_schema.segment_id_);
                file_schema.index_file_size_ = table_schema.index_file_size_;
                file_schema.engine_type_ = resRow["engine_type"];
847
                file_schema.index_params_ = table_schema.index_params_;
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868
                file_schema.metric_type_ = table_schema.metric_type_;
                resRow["file_id"].to_string(file_schema.file_id_);
                file_schema.file_type_ = resRow["file_type"];
                file_schema.file_size_ = resRow["file_size"];
                file_schema.row_count_ = resRow["row_count"];
                file_schema.date_ = resRow["date"];
                file_schema.created_on_ = resRow["created_on"];
                file_schema.dimension_ = table_schema.dimension_;

                utils::GetTableFilePath(options_, file_schema);
                table_files.emplace_back(file_schema);
            }
        }

        ENGINE_LOG_DEBUG << "Get table files by segment id";
        return Status::OK();
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES BY SEGMENT ID", e.what());
    }
}

S
starlord 已提交
869
Status
G
groot 已提交
870
MySQLMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
871
    try {
Y
Yu Kun 已提交
872
        server::MetricCollector metric;
G
groot 已提交
873

874
        {
S
starlord 已提交
875
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
876

S
shengjh 已提交
877 878 879 880
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
881
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
882
            }
Z
update  
zhiru 已提交
883

G
groot 已提交
884 885 886 887
            mysqlpp::Query updateTableIndexParamQuery = connectionPtr->query();
            updateTableIndexParamQuery << "SELECT id, state, dimension, created_on"
                                       << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                                       << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
888

G
groot 已提交
889
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
890

G
groot 已提交
891 892 893 894 895 896 897 898 899 900 901 902
            mysqlpp::StoreQueryResult res = updateTableIndexParamQuery.store();

            if (res.num_rows() == 1) {
                const mysqlpp::Row& resRow = res[0];

                size_t id = resRow["id"];
                int32_t state = resRow["state"];
                uint16_t dimension = resRow["dimension"];
                int64_t created_on = resRow["created_on"];

                updateTableIndexParamQuery << "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state
                                           << " ,dimension = " << dimension << " ,created_on = " << created_on
903 904
                                           << " ,engine_type = " << index.engine_type_
                                           << " ,index_params = " << mysqlpp::quote << index.extra_params_.dump()
G
groot 已提交
905 906 907 908 909 910 911 912 913 914 915 916
                                           << " ,metric_type = " << index.metric_type_
                                           << " WHERE table_id = " << mysqlpp::quote << table_id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();

                if (!updateTableIndexParamQuery.exec()) {
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM",
                                           updateTableIndexParamQuery.error());
                }
            } else {
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
            }
S
starlord 已提交
917
        }  // Scoped Connection
918

G
groot 已提交
919
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
S
starlord 已提交
920
    } catch (std::exception& e) {
G
groot 已提交
921
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
922
    }
923

924 925
    return Status::OK();
}
926

S
starlord 已提交
927
Status
G
groot 已提交
928
MySQLMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
929
    try {
Y
Yu Kun 已提交
930
        server::MetricCollector metric;
G
groot 已提交
931

932
        {
S
starlord 已提交
933
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
934

S
shengjh 已提交
935 936 937 938
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableFlag.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableFlag.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
939
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
940
            }
Z
update  
zhiru 已提交
941

G
groot 已提交
942 943 944
            mysqlpp::Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE " << META_TABLES << " SET flag = " << flag
                                 << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
Z
zhiru 已提交
945

G
groot 已提交
946
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();
Z
zhiru 已提交
947

G
groot 已提交
948 949 950
            if (!updateTableFlagQuery.exec()) {
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLAG", updateTableFlagQuery.error());
            }
S
starlord 已提交
951
        }  // Scoped Connection
952

G
groot 已提交
953
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
S
starlord 已提交
954
    } catch (std::exception& e) {
G
groot 已提交
955
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
956
    }
Z
update  
zhiru 已提交
957

958 959
    return Status::OK();
}
960

961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047
Status
MySQLMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_lsn) {
    try {
        server::MetricCollector metric;

        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE " << META_TABLES << " SET flush_lsn = " << flush_lsn
                                 << " WHERE table_id = " << mysqlpp::quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlushLSN: " << updateTableFlagQuery.str();

            if (!updateTableFlagQuery.exec()) {
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLUSH_LSN", updateTableFlagQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully update table flush_lsn, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLUSH_LSN", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::GetTableFlushLSN(const std::string& table_id, uint64_t& flush_lsn) {
    return Status::OK();
}

Status
MySQLMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, TableFilesSchema& table_files) {
    table_files.clear();

    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query filesToIndexQuery = connectionPtr->query();
            filesToIndexQuery << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
                                 "row_count, date, created_on"
                              << " FROM " << META_TABLEFILES << " WHERE flush_lsn = " << flush_lsn << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();

            res = filesToIndexQuery.store();
        }  // Scoped Connection

        Status ret;
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
        for (auto& resRow : res) {
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
            resRow["segment_id"].to_string(table_file.segment_id_);
            table_file.engine_type_ = resRow["engine_type"];
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.created_on_ = resRow["created_on"];

            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
1048
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }

            table_files.push_back(table_file);
        }

        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " files with flush_lsn = " << flush_lsn;
        }
        return ret;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES BY LSN", e.what());
    }
}

G
groot 已提交
1068
// ZR: this function assumes all fields in file_schema have value
S
starlord 已提交
1069
Status
G
groot 已提交
1070 1071
MySQLMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1072

1073
    try {
Y
Yu Kun 已提交
1074
        server::MetricCollector metric;
G
groot 已提交
1075 1076
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1077

S
shengjh 已提交
1078 1079 1080 1081
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableFile.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableFile.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1082
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1083
            }
Z
update  
zhiru 已提交
1084

G
groot 已提交
1085
            mysqlpp::Query updateTableFileQuery = connectionPtr->query();
1086

G
groot 已提交
1087 1088 1089 1090
            // if the table has been deleted, just mark the table file as TO_DELETE
            // clean thread will delete the file later
            updateTableFileQuery << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                 << file_schema.table_id_ << ";";
1091

G
groot 已提交
1092
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
1093

G
groot 已提交
1094
            mysqlpp::StoreQueryResult res = updateTableFileQuery.store();
1095

G
groot 已提交
1096 1097 1098 1099 1100
            if (res.num_rows() == 1) {
                int state = res[0]["state"];
                if (state == TableSchema::TO_DELETE) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
                }
1101
            } else {
G
groot 已提交
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
                file_schema.file_type_ = TableFileSchema::TO_DELETE;
            }

            std::string id = std::to_string(file_schema.id_);
            std::string table_id = file_schema.table_id_;
            std::string engine_type = std::to_string(file_schema.engine_type_);
            std::string file_id = file_schema.file_id_;
            std::string file_type = std::to_string(file_schema.file_type_);
            std::string file_size = std::to_string(file_schema.file_size_);
            std::string row_count = std::to_string(file_schema.row_count_);
            std::string updated_time = std::to_string(file_schema.updated_time_);
            std::string created_on = std::to_string(file_schema.created_on_);
            std::string date = std::to_string(file_schema.date_);

            updateTableFileQuery << "UPDATE " << META_TABLEFILES << " SET table_id = " << mysqlpp::quote << table_id
                                 << " ,engine_type = " << engine_type << " ,file_id = " << mysqlpp::quote << file_id
                                 << " ,file_type = " << file_type << " ,file_size = " << file_size
                                 << " ,row_count = " << row_count << " ,updated_time = " << updated_time
                                 << " ,created_on = " << created_on << " ,date = " << date << " WHERE id = " << id
                                 << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();

            if (!updateTableFileQuery.exec()) {
                ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error());
1128
            }
S
starlord 已提交
1129
        }  // Scoped Connection
1130

G
groot 已提交
1131
        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
S
starlord 已提交
1132
    } catch (std::exception& e) {
G
groot 已提交
1133
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
1134
    }
G
groot 已提交
1135 1136

    return Status::OK();
1137
}
1138

S
starlord 已提交
1139
Status
G
groot 已提交
1140 1141 1142 1143
MySQLMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
    try {
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1144 1145 1146 1147
        bool is_null_connection = (connectionPtr == nullptr);
        fiu_do_on("MySQLMetaImpl.UpdateTableFilesToIndex.null_connection", is_null_connection = true);
        fiu_do_on("MySQLMetaImpl.UpdateTableFilesToIndex.throw_exception", throw std::exception(););
        if (is_null_connection) {
G
groot 已提交
1148 1149 1150 1151 1152 1153 1154 1155
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
        }

        mysqlpp::Query updateTableFilesToIndexQuery = connectionPtr->query();

        updateTableFilesToIndexQuery << "UPDATE " << META_TABLEFILES
                                     << " SET file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                                     << " WHERE table_id = " << mysqlpp::quote << table_id
G
groot 已提交
1156
                                     << " AND row_count >= " << std::to_string(meta::BUILD_INDEX_THRESHOLD)
G
groot 已提交
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
                                     << " AND file_type = " << std::to_string(TableFileSchema::RAW) << ";";

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();

        if (!updateTableFilesToIndexQuery.exec()) {
            return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX",
                                   updateTableFilesToIndexQuery.error());
        }

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
    }

    return Status::OK();
}
1173

G
groot 已提交
1174 1175
Status
MySQLMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
1176
    try {
Y
Yu Kun 已提交
1177
        server::MetricCollector metric;
1178
        {
S
starlord 已提交
1179
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1180

S
shengjh 已提交
1181 1182 1183 1184
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.UpdateTableFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.UpdateTableFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1185
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1186
            }
1187

G
groot 已提交
1188
            mysqlpp::Query updateTableFilesQuery = connectionPtr->query();
1189

G
groot 已提交
1190 1191 1192 1193 1194
            std::map<std::string, bool> has_tables;
            for (auto& file_schema : files) {
                if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
                    continue;
                }
1195

G
groot 已提交
1196 1197 1198 1199 1200 1201
                updateTableFilesQuery << "SELECT EXISTS"
                                      << " (SELECT 1 FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                      << file_schema.table_id_ << " AND state <> "
                                      << std::to_string(TableSchema::TO_DELETE) << ")"
                                      << " AS " << mysqlpp::quote << "check"
                                      << ";";
1202

G
groot 已提交
1203
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
1204

G
groot 已提交
1205
                mysqlpp::StoreQueryResult res = updateTableFilesQuery.store();
1206

G
groot 已提交
1207 1208 1209
                int check = res[0]["check"];
                has_tables[file_schema.table_id_] = (check == 1);
            }
1210

G
groot 已提交
1211 1212 1213 1214 1215
            for (auto& file_schema : files) {
                if (!has_tables[file_schema.table_id_]) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
                }
                file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1216

G
groot 已提交
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
                std::string id = std::to_string(file_schema.id_);
                std::string& table_id = file_schema.table_id_;
                std::string engine_type = std::to_string(file_schema.engine_type_);
                std::string& file_id = file_schema.file_id_;
                std::string file_type = std::to_string(file_schema.file_type_);
                std::string file_size = std::to_string(file_schema.file_size_);
                std::string row_count = std::to_string(file_schema.row_count_);
                std::string updated_time = std::to_string(file_schema.updated_time_);
                std::string created_on = std::to_string(file_schema.created_on_);
                std::string date = std::to_string(file_schema.date_);
1227

G
groot 已提交
1228 1229 1230 1231 1232 1233
                updateTableFilesQuery << "UPDATE " << META_TABLEFILES << " SET table_id = " << mysqlpp::quote
                                      << table_id << " ,engine_type = " << engine_type
                                      << " ,file_id = " << mysqlpp::quote << file_id << " ,file_type = " << file_type
                                      << " ,file_size = " << file_size << " ,row_count = " << row_count
                                      << " ,updated_time = " << updated_time << " ,created_on = " << created_on
                                      << " ,date = " << date << " WHERE id = " << id << ";";
1234

G
groot 已提交
1235
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
S
starlord 已提交
1236

G
groot 已提交
1237 1238
                if (!updateTableFilesQuery.exec()) {
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error());
1239
                }
S
starlord 已提交
1240
            }
G
groot 已提交
1241
        }  // Scoped Connection
1242

G
groot 已提交
1243
        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
S
starlord 已提交
1244
    } catch (std::exception& e) {
G
groot 已提交
1245
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
Z
update  
zhiru 已提交
1246
    }
G
groot 已提交
1247 1248

    return Status::OK();
1249 1250
}

S
starlord 已提交
1251
Status
G
groot 已提交
1252
MySQLMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
X
xj.lin 已提交
1253
    try {
Y
Yu Kun 已提交
1254
        server::MetricCollector metric;
G
groot 已提交
1255

X
xj.lin 已提交
1256
        {
S
starlord 已提交
1257
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
X
xj.lin 已提交
1258

S
shengjh 已提交
1259 1260 1261 1262
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DescribeTableIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DescribeTableIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1263
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
X
xj.lin 已提交
1264 1265
            }

G
groot 已提交
1266
            mysqlpp::Query describeTableIndexQuery = connectionPtr->query();
1267
            describeTableIndexQuery << "SELECT engine_type, index_params, index_file_size, metric_type"
G
groot 已提交
1268 1269
                                    << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                                    << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
X
xj.lin 已提交
1270

G
groot 已提交
1271
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
X
xj.lin 已提交
1272

G
groot 已提交
1273
            mysqlpp::StoreQueryResult res = describeTableIndexQuery.store();
X
xj.lin 已提交
1274

G
groot 已提交
1275 1276
            if (res.num_rows() == 1) {
                const mysqlpp::Row& resRow = res[0];
X
xj.lin 已提交
1277

G
groot 已提交
1278
                index.engine_type_ = resRow["engine_type"];
1279 1280 1281
                std::string str_index_params;
                resRow["index_params"].to_string(str_index_params);
                index.extra_params_ = milvus::json::parse(str_index_params);
G
groot 已提交
1282 1283 1284
                index.metric_type_ = resRow["metric_type"];
            } else {
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
X
xj.lin 已提交
1285
            }
S
starlord 已提交
1286
        }  // Scoped Connection
G
groot 已提交
1287 1288 1289
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
    }
X
xj.lin 已提交
1290

G
groot 已提交
1291 1292
    return Status::OK();
}
X
xj.lin 已提交
1293

G
groot 已提交
1294 1295 1296 1297
Status
MySQLMetaImpl::DropTableIndex(const std::string& table_id) {
    try {
        server::MetricCollector metric;
X
xj.lin 已提交
1298

G
groot 已提交
1299 1300
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
X
xj.lin 已提交
1301

S
shengjh 已提交
1302 1303 1304 1305
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DropTableIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DropTableIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1306 1307
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }
1308

G
groot 已提交
1309
            mysqlpp::Query dropTableIndexQuery = connectionPtr->query();
X
xj.lin 已提交
1310

G
groot 已提交
1311 1312 1313 1314 1315 1316
            // soft delete index files
            dropTableIndexQuery << "UPDATE " << META_TABLEFILES
                                << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                << " ,updated_time = " << utils::GetMicroSecTimeStamp()
                                << " WHERE table_id = " << mysqlpp::quote << table_id
                                << " AND file_type = " << std::to_string(TableFileSchema::INDEX) << ";";
S
starlord 已提交
1317

G
groot 已提交
1318
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
S
starlord 已提交
1319

G
groot 已提交
1320 1321 1322
            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
X
xj.lin 已提交
1323

G
groot 已提交
1324 1325 1326 1327 1328 1329
            // set all backup file to raw
            dropTableIndexQuery << "UPDATE " << META_TABLEFILES
                                << " SET file_type = " << std::to_string(TableFileSchema::RAW)
                                << " ,updated_time = " << utils::GetMicroSecTimeStamp()
                                << " WHERE table_id = " << mysqlpp::quote << table_id
                                << " AND file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";
X
xj.lin 已提交
1330

G
groot 已提交
1331
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
S
starlord 已提交
1332

G
groot 已提交
1333 1334 1335
            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
X
xj.lin 已提交
1336

G
groot 已提交
1337 1338 1339
            // set table index type to raw
            dropTableIndexQuery << "UPDATE " << META_TABLES
                                << " SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE)
1340
                                << " , index_params = '{}'"
G
groot 已提交
1341
                                << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
X
xj.lin 已提交
1342

G
groot 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
    }

    return Status::OK();
}

Status
1359 1360
MySQLMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag,
                               uint64_t lsn) {
G
groot 已提交
1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
    server::MetricCollector metric;

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    // not allow create partition under partition
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
1372
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
1373 1374
    }

1375 1376 1377 1378
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
1379

1380 1381 1382 1383
    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
    if (!exist_partition.empty()) {
G
groot 已提交
1384
        return Status(DB_ERROR, "Duplicate partition is not allowed");
1385 1386 1387 1388
    }

    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
1389 1390 1391 1392 1393 1394 1395 1396 1397
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
    }

    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
1398
    table_schema.partition_tag_ = valid_tag;
1399
    table_schema.flush_lsn_ = lsn;
G
groot 已提交
1400

1401
    status = CreateTable(table_schema);
S
shengjh 已提交
1402
    fiu_do_on("MySQLMetaImpl.CreatePartition.aleady_exist", status = Status(DB_ALREADY_EXIST, ""));
1403 1404 1405 1406 1407
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }

    return status;
G
groot 已提交
1408 1409 1410 1411 1412 1413 1414 1415
}

Status
MySQLMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}

Status
G
groot 已提交
1416
MySQLMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
G
groot 已提交
1417 1418 1419 1420 1421 1422
    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1423 1424 1425 1426
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.ShowPartitions.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.ShowPartitions.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1427 1428 1429 1430
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query allPartitionsQuery = connectionPtr->query();
1431
            allPartitionsQuery << "SELECT table_id, id, state, dimension, created_on, flag, index_file_size,"
1432
                               << " engine_type, index_params, metric_type, partition_tag, version FROM " << META_TABLES
1433 1434
                               << " WHERE owner_table = " << mysqlpp::quote << table_id << " AND state <> "
                               << std::to_string(TableSchema::TO_DELETE) << ";";
G
groot 已提交
1435 1436 1437 1438 1439 1440 1441 1442 1443

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allPartitionsQuery.str();

            res = allPartitionsQuery.store();
        }  // Scoped Connection

        for (auto& resRow : res) {
            meta::TableSchema partition_schema;
            resRow["table_id"].to_string(partition_schema.table_id_);
1444 1445 1446 1447 1448 1449 1450
            partition_schema.id_ = resRow["id"];  // implicit conversion
            partition_schema.state_ = resRow["state"];
            partition_schema.dimension_ = resRow["dimension"];
            partition_schema.created_on_ = resRow["created_on"];
            partition_schema.flag_ = resRow["flag"];
            partition_schema.index_file_size_ = resRow["index_file_size"];
            partition_schema.engine_type_ = resRow["engine_type"];
1451
            resRow["index_params"].to_string(partition_schema.index_params_);
1452 1453 1454 1455 1456
            partition_schema.metric_type_ = resRow["metric_type"];
            partition_schema.owner_table_ = table_id;
            resRow["partition_tag"].to_string(partition_schema.partition_tag_);
            resRow["version"].to_string(partition_schema.version_);

G
groot 已提交
1457
            partition_schema_array.emplace_back(partition_schema);
G
groot 已提交
1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN SHOW PARTITIONS", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
1471 1472 1473 1474 1475 1476

        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

G
groot 已提交
1477 1478 1479
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1480 1481 1482 1483
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.GetPartitionName.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.GetPartitionName.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1484 1485 1486 1487 1488
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query allPartitionsQuery = connectionPtr->query();
            allPartitionsQuery << "SELECT table_id FROM " << META_TABLES << " WHERE owner_table = " << mysqlpp::quote
1489
                               << table_id << " AND partition_tag = " << mysqlpp::quote << valid_tag << " AND state <> "
G
groot 已提交
1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
                               << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allPartitionsQuery.str();

            res = allPartitionsQuery.store();
        }  // Scoped Connection

        if (res.num_rows() > 0) {
            const mysqlpp::Row& resRow = res[0];
            resRow["table_id"].to_string(partition_name);
        } else {
1501
            return Status(DB_NOT_FOUND, "Partition " + valid_tag + " of table " + table_id + " not found");
G
groot 已提交
1502 1503 1504 1505 1506 1507 1508 1509 1510
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN GET PARTITION NAME", e.what());
    }

    return Status::OK();
}

Status
1511
MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
G
groot 已提交
1512 1513 1514 1515 1516 1517 1518 1519
    files.clear();

    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
1520 1521 1522 1523
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesToSearch.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesToSearch.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1524 1525 1526 1527
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query filesToSearchQuery = connectionPtr->query();
1528 1529 1530
            filesToSearchQuery
                << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id;
G
groot 已提交
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561

            if (!ids.empty()) {
                std::stringstream idSS;
                for (auto& id : ids) {
                    idSS << "id = " << std::to_string(id) << " OR ";
                }
                std::string idStr = idSS.str();
                idStr = idStr.substr(0, idStr.size() - 4);  // remove the last " OR "

                filesToSearchQuery << " AND (" << idStr << ")";
            }
            // End
            filesToSearchQuery << " AND"
                               << " (file_type = " << std::to_string(TableFileSchema::RAW)
                               << " OR file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                               << " OR file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

            res = filesToSearchQuery.store();
        }  // Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        Status ret;
        for (auto& resRow : res) {
1562
            TableFileSchema table_file;
G
groot 已提交
1563 1564
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
1565
            resRow["segment_id"].to_string(table_file.segment_id_);
G
groot 已提交
1566 1567
            table_file.index_file_size_ = table_schema.index_file_size_;
            table_file.engine_type_ = resRow["engine_type"];
1568
            table_file.index_params_ = table_schema.index_params_;
G
groot 已提交
1569 1570 1571 1572 1573 1574 1575
            table_file.metric_type_ = table_schema.metric_type_;
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.dimension_ = table_schema.dimension_;
X
xj.lin 已提交
1576

S
starlord 已提交
1577
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1578
            if (!status.ok()) {
S
starlord 已提交
1579
                ret = status;
S
starlord 已提交
1580
            }
1581

1582
            files.emplace_back(table_file);
1583
        }
S
starlord 已提交
1584

S
starlord 已提交
1585
        if (res.size() > 0) {
1586 1587
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-search files";
        }
S
starlord 已提交
1588
        return ret;
S
starlord 已提交
1589
    } catch (std::exception& e) {
S
starlord 已提交
1590
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
Z
update  
zhiru 已提交
1591
    }
1592
}
Z
update  
zhiru 已提交
1593

S
starlord 已提交
1594
Status
1595
MySQLMetaImpl::FilesToMerge(const std::string& table_id, TableFilesSchema& files) {
1596
    files.clear();
Z
update  
zhiru 已提交
1597

1598
    try {
Y
Yu Kun 已提交
1599
        server::MetricCollector metric;
S
starlord 已提交
1600

S
starlord 已提交
1601
        // check table existence
S
starlord 已提交
1602 1603 1604 1605 1606 1607 1608
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
1609
        mysqlpp::StoreQueryResult res;
1610
        {
S
starlord 已提交
1611
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1612

S
shengjh 已提交
1613 1614 1615 1616
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesToMerge.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesToMerge.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1617
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1618
            }
Z
update  
zhiru 已提交
1619

S
starlord 已提交
1620
            mysqlpp::Query filesToMergeQuery = connectionPtr->query();
1621 1622 1623 1624 1625
            filesToMergeQuery << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, "
                                 "engine_type, created_on"
                              << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                              << " AND file_type = " << std::to_string(TableFileSchema::RAW)
                              << " ORDER BY row_count DESC;";
Z
update  
zhiru 已提交
1626

1627
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
1628

1629
            res = filesToMergeQuery.store();
S
starlord 已提交
1630
        }  // Scoped Connection
1631

S
starlord 已提交
1632
        Status ret;
1633
        int64_t to_merge_files = 0;
S
starlord 已提交
1634
        for (auto& resRow : res) {
S
starlord 已提交
1635 1636
            TableFileSchema table_file;
            table_file.file_size_ = resRow["file_size"];
S
starlord 已提交
1637
            if (table_file.file_size_ >= table_schema.index_file_size_) {
S
starlord 已提交
1638
                continue;  // skip large file
S
starlord 已提交
1639
            }
Z
update  
zhiru 已提交
1640

S
starlord 已提交
1641
            table_file.id_ = resRow["id"];  // implicit conversion
G
groot 已提交
1642
            resRow["table_id"].to_string(table_file.table_id_);
1643
            resRow["segment_id"].to_string(table_file.segment_id_);
G
groot 已提交
1644
            resRow["file_id"].to_string(table_file.file_id_);
1645
            table_file.file_type_ = resRow["file_type"];
S
starlord 已提交
1646
            table_file.row_count_ = resRow["row_count"];
1647
            table_file.date_ = resRow["date"];
1648
            table_file.index_file_size_ = table_schema.index_file_size_;
S
starlord 已提交
1649
            table_file.engine_type_ = resRow["engine_type"];
1650
            table_file.index_params_ = table_schema.index_params_;
S
starlord 已提交
1651
            table_file.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
1652
            table_file.created_on_ = resRow["created_on"];
1653
            table_file.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1654

S
starlord 已提交
1655
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1656
            if (!status.ok()) {
S
starlord 已提交
1657
                ret = status;
S
starlord 已提交
1658
            }
Z
update  
zhiru 已提交
1659

1660 1661
            files.emplace_back(table_file);
            ++to_merge_files;
1662
        }
Z
update  
zhiru 已提交
1663

1664 1665
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1666
        }
S
starlord 已提交
1667
        return ret;
S
starlord 已提交
1668
    } catch (std::exception& e) {
S
starlord 已提交
1669
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
1670 1671 1672
    }
}

S
starlord 已提交
1673
Status
G
groot 已提交
1674 1675
MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) {
    files.clear();
Z
zhiru 已提交
1676

1677
    try {
G
groot 已提交
1678
        server::MetricCollector metric;
S
starlord 已提交
1679
        mysqlpp::StoreQueryResult res;
1680
        {
S
starlord 已提交
1681
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1682

S
shengjh 已提交
1683 1684 1685 1686
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesToIndex.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesToIndex.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1687
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1688 1689
            }

G
groot 已提交
1690
            mysqlpp::Query filesToIndexQuery = connectionPtr->query();
1691 1692 1693 1694
            filesToIndexQuery << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
                                 "row_count, date, created_on"
                              << " FROM " << META_TABLEFILES
                              << " WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
1695

G
groot 已提交
1696
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
1697

G
groot 已提交
1698
            res = filesToIndexQuery.store();
S
starlord 已提交
1699
        }  // Scoped Connection
1700

S
starlord 已提交
1701
        Status ret;
G
groot 已提交
1702 1703
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
S
starlord 已提交
1704
        for (auto& resRow : res) {
G
groot 已提交
1705 1706
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
1707
            resRow["segment_id"].to_string(table_file.segment_id_);
G
groot 已提交
1708 1709 1710 1711 1712 1713 1714
            table_file.engine_type_ = resRow["engine_type"];
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.created_on_ = resRow["created_on"];
Z
update  
zhiru 已提交
1715

G
groot 已提交
1716 1717 1718 1719 1720 1721 1722
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
1723
                }
G
groot 已提交
1724
                groups[table_file.table_id_] = table_schema;
Z
zhiru 已提交
1725
            }
G
groot 已提交
1726 1727
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
1728
            table_file.index_params_ = groups[table_file.table_id_].index_params_;
G
groot 已提交
1729
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
1730

G
groot 已提交
1731 1732 1733
            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
1734
            }
Z
zhiru 已提交
1735

G
groot 已提交
1736 1737
            files.push_back(table_file);
        }
Z
update  
zhiru 已提交
1738

G
groot 已提交
1739 1740
        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-index files";
1741
        }
G
groot 已提交
1742
        return ret;
S
starlord 已提交
1743
    } catch (std::exception& e) {
G
groot 已提交
1744
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
1745 1746
    }
}
1747

S
starlord 已提交
1748
Status
G
groot 已提交
1749
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
G
groot 已提交
1750
                           TableFilesSchema& table_files) {
G
groot 已提交
1751 1752
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
Z
update  
zhiru 已提交
1753 1754
    }

1755 1756
    Status ret = Status::OK();

1757
    try {
G
groot 已提交
1758
        table_files.clear();
G
groot 已提交
1759 1760

        mysqlpp::StoreQueryResult res;
1761
        {
S
starlord 已提交
1762
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1763

S
shengjh 已提交
1764 1765 1766 1767
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1768
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1769
            }
Z
zhiru 已提交
1770

G
groot 已提交
1771 1772 1773 1774
            std::string types;
            for (auto type : file_types) {
                if (!types.empty()) {
                    types += ",";
Z
update  
zhiru 已提交
1775
                }
G
groot 已提交
1776
                types += std::to_string(type);
1777
            }
Z
update  
zhiru 已提交
1778

G
groot 已提交
1779 1780
            mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
            // since table_id is a unique column we just need to check whether it exists or not
G
groot 已提交
1781
            hasNonIndexFilesQuery
1782
                << "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
G
groot 已提交
1783 1784
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                << " AND file_type in (" << types << ");";
1785

G
groot 已提交
1786
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
Z
update  
zhiru 已提交
1787

G
groot 已提交
1788
            res = hasNonIndexFilesQuery.store();
S
starlord 已提交
1789
        }  // Scoped Connection
1790

1791 1792 1793 1794 1795 1796 1797
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

G
groot 已提交
1798 1799 1800 1801
        if (res.num_rows() > 0) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
            for (auto& resRow : res) {
G
groot 已提交
1802 1803 1804
                TableFileSchema file_schema;
                file_schema.id_ = resRow["id"];
                file_schema.table_id_ = table_id;
1805
                resRow["segment_id"].to_string(file_schema.segment_id_);
G
groot 已提交
1806 1807 1808 1809 1810 1811 1812 1813
                file_schema.engine_type_ = resRow["engine_type"];
                resRow["file_id"].to_string(file_schema.file_id_);
                file_schema.file_type_ = resRow["file_type"];
                file_schema.file_size_ = resRow["file_size"];
                file_schema.row_count_ = resRow["row_count"];
                file_schema.date_ = resRow["date"];
                file_schema.created_on_ = resRow["created_on"];

1814
                file_schema.index_file_size_ = table_schema.index_file_size_;
1815
                file_schema.index_params_ = table_schema.index_params_;
1816 1817 1818 1819 1820 1821 1822 1823
                file_schema.metric_type_ = table_schema.metric_type_;
                file_schema.dimension_ = table_schema.dimension_;

                auto status = utils::GetTableFilePath(options_, file_schema);
                if (!status.ok()) {
                    ret = status;
                }

G
groot 已提交
1824
                table_files.emplace_back(file_schema);
1825

G
groot 已提交
1826 1827 1828
                int32_t file_type = resRow["file_type"];
                switch (file_type) {
                    case (int)TableFileSchema::RAW:
1829
                        ++raw_count;
G
groot 已提交
1830 1831
                        break;
                    case (int)TableFileSchema::NEW:
1832
                        ++new_count;
G
groot 已提交
1833 1834
                        break;
                    case (int)TableFileSchema::NEW_MERGE:
1835
                        ++new_merge_count;
G
groot 已提交
1836 1837
                        break;
                    case (int)TableFileSchema::NEW_INDEX:
1838
                        ++new_index_count;
G
groot 已提交
1839 1840
                        break;
                    case (int)TableFileSchema::TO_INDEX:
1841
                        ++to_index_count;
G
groot 已提交
1842 1843
                        break;
                    case (int)TableFileSchema::INDEX:
1844
                        ++index_count;
G
groot 已提交
1845 1846
                        break;
                    case (int)TableFileSchema::BACKUP:
1847
                        ++backup_count;
G
groot 已提交
1848 1849
                        break;
                    default:
S
shengjh 已提交
1850
                        break;
Z
update  
zhiru 已提交
1851
                }
1852 1853
            }

G
groot 已提交
1854
            std::string msg = "Get table files by type.";
G
groot 已提交
1855 1856 1857
            for (int file_type : file_types) {
                switch (file_type) {
                    case (int)TableFileSchema::RAW:
G
groot 已提交
1858
                        msg = msg + " raw files:" + std::to_string(raw_count);
G
groot 已提交
1859 1860
                        break;
                    case (int)TableFileSchema::NEW:
G
groot 已提交
1861
                        msg = msg + " new files:" + std::to_string(new_count);
G
groot 已提交
1862 1863
                        break;
                    case (int)TableFileSchema::NEW_MERGE:
G
groot 已提交
1864
                        msg = msg + " new_merge files:" + std::to_string(new_merge_count);
G
groot 已提交
1865 1866
                        break;
                    case (int)TableFileSchema::NEW_INDEX:
G
groot 已提交
1867
                        msg = msg + " new_index files:" + std::to_string(new_index_count);
G
groot 已提交
1868 1869
                        break;
                    case (int)TableFileSchema::TO_INDEX:
G
groot 已提交
1870
                        msg = msg + " to_index files:" + std::to_string(to_index_count);
G
groot 已提交
1871 1872
                        break;
                    case (int)TableFileSchema::INDEX:
G
groot 已提交
1873
                        msg = msg + " index files:" + std::to_string(index_count);
G
groot 已提交
1874 1875
                        break;
                    case (int)TableFileSchema::BACKUP:
G
groot 已提交
1876
                        msg = msg + " backup files:" + std::to_string(backup_count);
G
groot 已提交
1877
                        break;
1878
                    default:
S
shengjh 已提交
1879
                        break;
G
groot 已提交
1880 1881 1882
                }
            }
            ENGINE_LOG_DEBUG << msg;
G
groot 已提交
1883
        }
S
starlord 已提交
1884
    } catch (std::exception& e) {
G
groot 已提交
1885
        return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
1886
    }
S
starlord 已提交
1887

1888
    return ret;
1889
}
1890

G
groot 已提交
1891
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1892
Status
G
groot 已提交
1893 1894 1895 1896 1897
MySQLMetaImpl::Archive() {
    auto& criterias = options_.archive_conf_.GetCriterias();
    if (criterias.empty()) {
        return Status::OK();
    }
1898

G
groot 已提交
1899 1900 1901 1902
    for (auto& kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
1903
            size_t usecs = limit * DAY * US_PS;
G
groot 已提交
1904
            int64_t now = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
1905

G
groot 已提交
1906 1907
            try {
                mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1908

S
shengjh 已提交
1909 1910 1911 1912
                bool is_null_connection = (connectionPtr == nullptr);
                fiu_do_on("MySQLMetaImpl.Archive.null_connection", is_null_connection = true);
                fiu_do_on("MySQLMetaImpl.Archive.throw_exception", throw std::exception(););
                if (is_null_connection) {
G
groot 已提交
1913 1914
                    return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
                }
1915

G
groot 已提交
1916 1917 1918 1919 1920
                mysqlpp::Query archiveQuery = connectionPtr->query();
                archiveQuery << "UPDATE " << META_TABLEFILES
                             << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                             << " WHERE created_on < " << std::to_string(now - usecs) << " AND file_type <> "
                             << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1921

G
groot 已提交
1922 1923 1924 1925 1926 1927 1928 1929 1930 1931
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();

                if (!archiveQuery.exec()) {
                    return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
                }

                ENGINE_LOG_DEBUG << "Archive old files";
            } catch (std::exception& e) {
                return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
            }
Z
fix  
zhiru 已提交
1932
        }
G
groot 已提交
1933 1934 1935
        if (criteria == engine::ARCHIVE_CONF_DISK) {
            uint64_t sum = 0;
            Size(sum);
Z
fix  
zhiru 已提交
1936

G
groot 已提交
1937 1938 1939 1940 1941
            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);

            ENGINE_LOG_DEBUG << "Archive files to free disk";
        }
1942
    }
Z
update  
zhiru 已提交
1943

1944 1945
    return Status::OK();
}
Z
zhiru 已提交
1946

S
starlord 已提交
1947
Status
G
groot 已提交
1948 1949 1950
MySQLMetaImpl::Size(uint64_t& result) {
    result = 0;

1951
    try {
G
groot 已提交
1952
        mysqlpp::StoreQueryResult res;
1953
        {
S
starlord 已提交
1954
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1955

S
shengjh 已提交
1956 1957 1958 1959
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.Size.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.Size.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
1960
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1961
            }
Z
update  
zhiru 已提交
1962

G
groot 已提交
1963 1964 1965 1966
            mysqlpp::Query getSizeQuery = connectionPtr->query();
            getSizeQuery << "SELECT IFNULL(SUM(file_size),0) AS sum"
                         << " FROM " << META_TABLEFILES << " WHERE file_type <> "
                         << std::to_string(TableFileSchema::TO_DELETE) << ";";
1967

G
groot 已提交
1968
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
1969

G
groot 已提交
1970 1971
            res = getSizeQuery.store();
        }  // Scoped Connection
1972

G
groot 已提交
1973 1974 1975 1976 1977 1978 1979 1980
        if (res.empty()) {
            result = 0;
        } else {
            result = res[0]["sum"];
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
    }
1981

G
groot 已提交
1982 1983
    return Status::OK();
}
1984

G
groot 已提交
1985
Status
1986
MySQLMetaImpl::CleanUpShadowFiles() {
G
groot 已提交
1987 1988
    try {
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1989

S
shengjh 已提交
1990 1991 1992 1993
        bool is_null_connection = (connectionPtr == nullptr);
        fiu_do_on("MySQLMetaImpl.CleanUpShadowFiles.null_connection", is_null_connection = true);
        fiu_do_on("MySQLMetaImpl.CleanUpShadowFiles.throw_exception", throw std::exception(););
        if (is_null_connection) {
G
groot 已提交
1994 1995
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
        }
1996

G
groot 已提交
1997 1998 1999 2000 2001
        mysqlpp::Query cleanUpQuery = connectionPtr->query();
        cleanUpQuery << "SELECT table_name"
                     << " FROM information_schema.tables"
                     << " WHERE table_schema = " << mysqlpp::quote << mysql_connection_pool_->getDB()
                     << " AND table_name = " << mysqlpp::quote << META_TABLEFILES << ";";
Z
fix  
zhiru 已提交
2002

G
groot 已提交
2003
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
2004

G
groot 已提交
2005
        mysqlpp::StoreQueryResult res = cleanUpQuery.store();
2006

G
groot 已提交
2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
        if (!res.empty()) {
            ENGINE_LOG_DEBUG << "Remove table file type as NEW";
            cleanUpQuery << "DELETE FROM " << META_TABLEFILES << " WHERE file_type IN ("
                         << std::to_string(TableFileSchema::NEW) << "," << std::to_string(TableFileSchema::NEW_MERGE)
                         << "," << std::to_string(TableFileSchema::NEW_INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();

            if (!cleanUpQuery.exec()) {
                return HandleException("QUERY ERROR WHEN CLEANING UP FILES", cleanUpQuery.error());
2017
            }
G
groot 已提交
2018
        }
2019

G
groot 已提交
2020 2021 2022
        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << res.size() << " files";
        }
S
starlord 已提交
2023
    } catch (std::exception& e) {
G
groot 已提交
2024
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
2025
    }
S
starlord 已提交
2026

2027 2028
    return Status::OK();
}
Z
fix  
zhiru 已提交
2029

2030
Status
2031
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) {
2032
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
2033 2034
    std::set<std::string> table_ids;

S
starlord 已提交
2035
    // remove to_delete files
2036
    try {
Y
Yu Kun 已提交
2037
        server::MetricCollector metric;
2038

2039
        {
S
starlord 已提交
2040
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
2041

S
shengjh 已提交
2042 2043 2044 2045 2046
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RomoveToDeleteFiles_NullConnection",
                      is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RomoveToDeleteFiles_ThrowException", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2047
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
2048
            }
Z
zhiru 已提交
2049

2050
            mysqlpp::Query query = connectionPtr->query();
2051
            query << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date"
G
groot 已提交
2052
                  << " FROM " << META_TABLEFILES << " WHERE file_type IN ("
G
groot 已提交
2053
                  << std::to_string(TableFileSchema::TO_DELETE) << "," << std::to_string(TableFileSchema::BACKUP) << ")"
2054
                  << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
Z
update  
zhiru 已提交
2055

2056
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
Z
update  
zhiru 已提交
2057

2058
            mysqlpp::StoreQueryResult res = query.store();
Z
update  
zhiru 已提交
2059

2060 2061
            TableFileSchema table_file;
            std::vector<std::string> idsToDelete;
2062

G
groot 已提交
2063
            int64_t clean_files = 0;
S
starlord 已提交
2064 2065
            for (auto& resRow : res) {
                table_file.id_ = resRow["id"];  // implicit conversion
G
groot 已提交
2066
                resRow["table_id"].to_string(table_file.table_id_);
2067 2068
                resRow["segment_id"].to_string(table_file.segment_id_);
                table_file.engine_type_ = resRow["engine_type"];
G
groot 已提交
2069
                resRow["file_id"].to_string(table_file.file_id_);
2070
                table_file.date_ = resRow["date"];
G
groot 已提交
2071
                table_file.file_type_ = resRow["file_type"];
Z
update  
zhiru 已提交
2072

G
groot 已提交
2073
                // check if the file can be deleted
2074
                if (OngoingFileChecker::GetInstance().IsIgnored(table_file)) {
G
groot 已提交
2075 2076 2077
                    ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
                                     << " currently is in use, not able to delete now";
                    continue;  // ignore this file, don't delete it
G
groot 已提交
2078
                }
2079

G
groot 已提交
2080
                // erase file data from cache
G
groot 已提交
2081 2082
                // because GetTableFilePath won't able to generate file path after the file is deleted
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
2083 2084
                server::CommonUtil::EraseFromCache(table_file.location_);

G
groot 已提交
2085
                if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
2086 2087 2088
                    // If we are deleting a raw table file, it means it's okay to delete the entire segment directory.
                    // Else, we can only delete the single file
                    // TODO(zhiru): We determine whether a table file is raw by its engine type. This is a bit hacky
2089
                    if (utils::IsRawIndexType(table_file.engine_type_)) {
2090 2091 2092 2093 2094 2095 2096 2097
                        utils::DeleteSegment(options_, table_file);
                        std::string segment_dir;
                        utils::GetParentPath(table_file.location_, segment_dir);
                        ENGINE_LOG_DEBUG << "Remove segment directory: " << segment_dir;
                    } else {
                        utils::DeleteTableFilePath(options_, table_file);
                        ENGINE_LOG_DEBUG << "Remove table file: " << table_file.location_;
                    }
G
groot 已提交
2098 2099 2100 2101

                    idsToDelete.emplace_back(std::to_string(table_file.id_));
                    table_ids.insert(table_file.table_id_);

2102
                    ++clean_files;
G
typo  
groot 已提交
2103
                }
2104 2105
            }

G
groot 已提交
2106
            // delete file from meta
2107 2108
            if (!idsToDelete.empty()) {
                std::stringstream idsToDeleteSS;
S
starlord 已提交
2109
                for (auto& id : idsToDelete) {
2110
                    idsToDeleteSS << "id = " << id << " OR ";
2111
                }
2112

2113
                std::string idsToDeleteStr = idsToDeleteSS.str();
S
starlord 已提交
2114
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4);  // remove the last " OR "
2115
                query << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";";
2116

2117
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
2118

2119 2120
                if (!query.exec()) {
                    return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", query.error());
2121 2122
                }
            }
2123

G
groot 已提交
2124
            if (clean_files > 0) {
G
groot 已提交
2125
                ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
2126
            }
S
starlord 已提交
2127 2128
        }  // Scoped Connection
    } catch (std::exception& e) {
S
starlord 已提交
2129
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
Z
update  
zhiru 已提交
2130
    }
2131

S
starlord 已提交
2132
    // remove to_delete tables
2133
    try {
Y
Yu Kun 已提交
2134
        server::MetricCollector metric;
2135 2136

        {
S
starlord 已提交
2137
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
2138

S
shengjh 已提交
2139 2140 2141 2142 2143
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveToDeleteTables_NUllConnection",
                      is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveToDeleteTables_ThrowException", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2144
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
update  
zhiru 已提交
2145 2146
            }

2147 2148 2149
            mysqlpp::Query query = connectionPtr->query();
            query << "SELECT id, table_id"
                  << " FROM " << META_TABLES << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
2150

2151
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
2152

2153
            mysqlpp::StoreQueryResult res = query.store();
Z
update  
zhiru 已提交
2154

2155
            int64_t remove_tables = 0;
Z
update  
zhiru 已提交
2156
            if (!res.empty()) {
2157
                std::stringstream idsToDeleteSS;
S
starlord 已提交
2158
                for (auto& resRow : res) {
2159 2160 2161
                    size_t id = resRow["id"];
                    std::string table_id;
                    resRow["table_id"].to_string(table_id);
Z
update  
zhiru 已提交
2162

S
starlord 已提交
2163
                    utils::DeleteTablePath(options_, table_id, false);  // only delete empty folder
2164
                    ++remove_tables;
2165
                    idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
Z
update  
zhiru 已提交
2166
                }
2167
                std::string idsToDeleteStr = idsToDeleteSS.str();
S
starlord 已提交
2168
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4);  // remove the last " OR "
2169
                query << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";";
2170

2171
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
Z
update  
zhiru 已提交
2172

2173 2174
                if (!query.exec()) {
                    return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", query.error());
2175 2176
                }
            }
2177

2178 2179
            if (remove_tables > 0) {
                ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables from meta";
2180
            }
S
starlord 已提交
2181 2182
        }  // Scoped Connection
    } catch (std::exception& e) {
S
starlord 已提交
2183
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
Z
update  
zhiru 已提交
2184 2185
    }

S
starlord 已提交
2186 2187
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
2188
    try {
Y
Yu Kun 已提交
2189
        server::MetricCollector metric;
S
starlord 已提交
2190 2191

        {
S
starlord 已提交
2192
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
2193

S
shengjh 已提交
2194 2195 2196 2197 2198 2199
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveDeletedTableFolder_NUllConnection",
                      is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.CleanUpFilesWithTTL.RemoveDeletedTableFolder_ThrowException",
                      throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2200
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
2201 2202
            }

S
starlord 已提交
2203
            for (auto& table_id : table_ids) {
2204 2205 2206
                mysqlpp::Query query = connectionPtr->query();
                query << "SELECT file_id"
                      << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
S
starlord 已提交
2207

2208
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
S
starlord 已提交
2209

2210
                mysqlpp::StoreQueryResult res = query.store();
S
starlord 已提交
2211 2212 2213 2214 2215

                if (res.empty()) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }
2216

S
starlord 已提交
2217
            if (table_ids.size() > 0) {
2218 2219
                ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
            }
S
starlord 已提交
2220
        }
S
starlord 已提交
2221
    } catch (std::exception& e) {
S
starlord 已提交
2222
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
S
starlord 已提交
2223 2224
    }

2225 2226
    return Status::OK();
}
2227

S
starlord 已提交
2228
Status
S
starlord 已提交
2229
MySQLMetaImpl::Count(const std::string& table_id, uint64_t& result) {
2230
    try {
Y
Yu Kun 已提交
2231
        server::MetricCollector metric;
2232 2233 2234 2235 2236 2237 2238

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);

        if (!status.ok()) {
            return status;
2239
        }
Z
zhiru 已提交
2240

S
starlord 已提交
2241
        mysqlpp::StoreQueryResult res;
2242
        {
S
starlord 已提交
2243
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
2244

S
shengjh 已提交
2245 2246 2247 2248
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.Count.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.Count.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2249
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
update  
zhiru 已提交
2250 2251
            }

S
starlord 已提交
2252
            mysqlpp::Query countQuery = connectionPtr->query();
G
groot 已提交
2253 2254 2255 2256 2257
            countQuery << "SELECT row_count"
                       << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                       << " AND (file_type = " << std::to_string(TableFileSchema::RAW)
                       << " OR file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                       << " OR file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
Z
update  
zhiru 已提交
2258

2259
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
Z
update  
zhiru 已提交
2260

2261
            res = countQuery.store();
S
starlord 已提交
2262
        }  // Scoped Connection
2263 2264

        result = 0;
S
starlord 已提交
2265
        for (auto& resRow : res) {
S
starlord 已提交
2266
            size_t size = resRow["row_count"];
2267
            result += size;
2268
        }
S
starlord 已提交
2269
    } catch (std::exception& e) {
S
starlord 已提交
2270
        return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
Z
update  
zhiru 已提交
2271
    }
S
starlord 已提交
2272

2273 2274 2275
    return Status::OK();
}

S
starlord 已提交
2276 2277
Status
MySQLMetaImpl::DropAll() {
2278
    try {
S
starlord 已提交
2279
        ENGINE_LOG_DEBUG << "Drop all mysql meta";
S
starlord 已提交
2280
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
2281

S
shengjh 已提交
2282 2283 2284 2285
        bool is_null_connection = (connectionPtr == nullptr);
        fiu_do_on("MySQLMetaImpl.DropAll.null_connection", is_null_connection = true);
        fiu_do_on("MySQLMetaImpl.DropAll.throw_exception", throw std::exception(););
        if (is_null_connection) {
G
groot 已提交
2286
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
zhiru 已提交
2287
        }
2288

S
starlord 已提交
2289
        mysqlpp::Query dropTableQuery = connectionPtr->query();
2290
        dropTableQuery << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
2291 2292 2293 2294 2295 2296

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();

        if (dropTableQuery.exec()) {
            return Status::OK();
        }
S
starlord 已提交
2297
        return HandleException("QUERY ERROR WHEN DROPPING ALL", dropTableQuery.error());
S
starlord 已提交
2298
    } catch (std::exception& e) {
S
starlord 已提交
2299
        return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
2300 2301 2302
    }
}

G
groot 已提交
2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315
Status
MySQLMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
        server::MetricCollector metric;
        bool status;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

S
shengjh 已提交
2316 2317 2318 2319
            bool is_null_connection = (connectionPtr == nullptr);
            fiu_do_on("MySQLMetaImpl.DiscardFiles.null_connection", is_null_connection = true);
            fiu_do_on("MySQLMetaImpl.DiscardFiles.throw_exception", throw std::exception(););
            if (is_null_connection) {
G
groot 已提交
2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query discardFilesQuery = connectionPtr->query();
            discardFilesQuery << "SELECT id, file_size"
                              << " FROM " << META_TABLEFILES << " WHERE file_type <> "
                              << std::to_string(TableFileSchema::TO_DELETE) << " ORDER BY id ASC "
                              << " LIMIT 10;";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();

            mysqlpp::StoreQueryResult res = discardFilesQuery.store();
            if (res.num_rows() == 0) {
                return Status::OK();
            }

            TableFileSchema table_file;
            std::stringstream idsToDiscardSS;
            for (auto& resRow : res) {
                if (to_discard_size <= 0) {
                    break;
                }
                table_file.id_ = resRow["id"];
                table_file.file_size_ = resRow["file_size"];
                idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            std::string idsToDiscardStr = idsToDiscardSS.str();
            idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4);  // remove the last " OR "

            discardFilesQuery << "UPDATE " << META_TABLEFILES
                              << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                              << " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " WHERE "
                              << idsToDiscardStr << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();

            status = discardFilesQuery.exec();
            if (!status) {
                return HandleException("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
            }
        }  // Scoped Connection

        return DiscardFiles(to_discard_size);
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
    }
}

2372 2373 2374 2375 2376 2377 2378 2379 2380 2381
Status
MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
    return Status::OK();
}

Status
MySQLMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
    return Status::OK();
}

S
starlord 已提交
2382 2383 2384
}  // namespace meta
}  // namespace engine
}  // namespace milvus