MySQLMetaImpl.cpp 84.8 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/meta/MySQLMetaImpl.h"
S
starlord 已提交
19
#include "MetaConsts.h"
S
starlord 已提交
20 21
#include "db/IDGenerator.h"
#include "db/Utils.h"
Z
update  
zhiru 已提交
22
#include "metrics/Metrics.h"
23
#include "utils/CommonUtil.h"
S
starlord 已提交
24 25
#include "utils/Exception.h"
#include "utils/Log.h"
26
#include "utils/StringHelpFunctions.h"
Z
update  
zhiru 已提交
27

S
starlord 已提交
28 29
#include <mysql++/mysql++.h>
#include <string.h>
Z
update  
zhiru 已提交
30
#include <unistd.h>
S
starlord 已提交
31
#include <boost/filesystem.hpp>
Z
update  
zhiru 已提交
32 33
#include <chrono>
#include <fstream>
S
starlord 已提交
34 35 36
#include <iostream>
#include <map>
#include <mutex>
Z
update  
zhiru 已提交
37
#include <regex>
S
starlord 已提交
38 39
#include <set>
#include <sstream>
Z
update  
zhiru 已提交
40
#include <string>
Z
zhiru 已提交
41
#include <thread>
Z
update  
zhiru 已提交
42 43 44 45 46

namespace milvus {
namespace engine {
namespace meta {

47
namespace {
Z
update  
zhiru 已提交
48

S
starlord 已提交
49
Status
S
starlord 已提交
50
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
51
    if (what == nullptr) {
S
starlord 已提交
52 53 54
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    }
S
starlord 已提交
55 56 57 58

    std::string msg = desc + ":" + what;
    ENGINE_LOG_ERROR << msg;
    return Status(DB_META_TRANSACTION_FAILED, msg);
59
}
Z
update  
zhiru 已提交
60

61
class MetaField {
S
starlord 已提交
62
 public:
S
starlord 已提交
63 64
    MetaField(const std::string& name, const std::string& type, const std::string& setting)
        : name_(name), type_(type), setting_(setting) {
65 66
    }

S
starlord 已提交
67 68
    std::string
    name() const {
69 70 71
        return name_;
    }

S
starlord 已提交
72 73
    std::string
    ToString() const {
74 75 76 77 78
        return name_ + " " + type_ + " " + setting_;
    }

    // mysql field type has additional information. for instance, a filed type is defined as 'BIGINT'
    // we get the type from sql is 'bigint(20)', so we need to ignore the '(20)'
S
starlord 已提交
79 80
    bool
    IsEqual(const MetaField& field) const {
81 82 83
        size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length();
        size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length();
        return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 &&
S
starlord 已提交
84
               strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0;
85 86
    }

S
starlord 已提交
87
 private:
88 89 90 91 92 93 94
    std::string name_;
    std::string type_;
    std::string setting_;
};

using MetaFields = std::vector<MetaField>;
class MetaSchema {
S
starlord 已提交
95
 public:
S
starlord 已提交
96
    MetaSchema(const std::string& name, const MetaFields& fields) : name_(name), fields_(fields) {
97 98
    }

S
starlord 已提交
99 100
    std::string
    name() const {
101 102 103
        return name_;
    }

S
starlord 已提交
104 105
    std::string
    ToString() const {
106
        std::string result;
S
starlord 已提交
107
        for (auto& field : fields_) {
S
starlord 已提交
108
            if (!result.empty()) {
109 110 111 112 113 114 115
                result += ",";
            }
            result += field.ToString();
        }
        return result;
    }

S
starlord 已提交
116 117 118 119
    // if the outer fields contains all this MetaSchema fields, return true
    // otherwise return false
    bool
    IsEqual(const MetaFields& fields) const {
120
        std::vector<std::string> found_field;
S
starlord 已提交
121 122
        for (const auto& this_field : fields_) {
            for (const auto& outer_field : fields) {
S
starlord 已提交
123
                if (this_field.IsEqual(outer_field)) {
124 125 126 127 128 129 130 131 132
                    found_field.push_back(this_field.name());
                    break;
                }
            }
        }

        return found_field.size() == fields_.size();
    }

S
starlord 已提交
133
 private:
134 135 136 137
    std::string name_;
    MetaFields fields_;
};

S
starlord 已提交
138
// Tables schema
139
static const MetaSchema TABLES_SCHEMA(META_TABLES, {
S
starlord 已提交
140 141 142 143 144 145 146 147 148 149
                                                       MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
                                                       MetaField("table_id", "VARCHAR(255)", "UNIQUE NOT NULL"),
                                                       MetaField("state", "INT", "NOT NULL"),
                                                       MetaField("dimension", "SMALLINT", "NOT NULL"),
                                                       MetaField("created_on", "BIGINT", "NOT NULL"),
                                                       MetaField("flag", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                       MetaField("index_file_size", "BIGINT", "DEFAULT 1024 NOT NULL"),
                                                       MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
                                                       MetaField("nlist", "INT", "DEFAULT 16384 NOT NULL"),
                                                       MetaField("metric_type", "INT", "DEFAULT 1 NOT NULL"),
G
groot 已提交
150 151 152 153
                                                       MetaField("owner_table", "VARCHAR(255)", "NOT NULL"),
                                                       MetaField("partition_tag", "VARCHAR(255)", "NOT NULL"),
                                                       MetaField("version", "VARCHAR(64)",
                                                                 std::string("DEFAULT '") + CURRENT_VERSION + "'"),
S
starlord 已提交
154 155 156
                                                   });

// TableFiles schema
J
JinHai-CN 已提交
157 158 159 160 161 162 163 164 165 166 167 168
static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
                                                               MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
                                                               MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
                                                               MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
                                                               MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
                                                               MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"),
                                                               MetaField("updated_time", "BIGINT", "NOT NULL"),
                                                               MetaField("created_on", "BIGINT", "NOT NULL"),
                                                               MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
                                                           });
S
starlord 已提交
169 170

}  // namespace
171

172
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
173
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions& options, const int& mode) : options_(options), mode_(mode) {
174 175 176 177 178 179
    Initialize();
}

MySQLMetaImpl::~MySQLMetaImpl() {
}

S
starlord 已提交
180
Status
S
starlord 已提交
181
MySQLMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
182
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
183 184 185 186 187 188 189
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    table_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
190
Status
S
starlord 已提交
191
MySQLMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
192
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
193 194 195 196 197 198 199
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
200 201 202
void
MySQLMetaImpl::ValidateMetaSchema() {
    if (nullptr == mysql_connection_pool_) {
203 204 205
        return;
    }

S
starlord 已提交
206
    mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
207 208 209 210
    if (connectionPtr == nullptr) {
        return;
    }

S
starlord 已提交
211
    auto validate_func = [&](const MetaSchema& schema) {
S
starlord 已提交
212
        mysqlpp::Query query_statement = connectionPtr->query();
213 214 215 216 217
        query_statement << "DESC " << schema.name() << ";";

        MetaFields exist_fields;

        try {
S
starlord 已提交
218
            mysqlpp::StoreQueryResult res = query_statement.store();
219
            for (size_t i = 0; i < res.num_rows(); i++) {
S
starlord 已提交
220
                const mysqlpp::Row& row = res[i];
221 222 223 224 225 226
                std::string name, type;
                row["Field"].to_string(name);
                row["Type"].to_string(type);

                exist_fields.push_back(MetaField(name, type, ""));
            }
S
starlord 已提交
227
        } catch (std::exception& e) {
228 229 230
            ENGINE_LOG_DEBUG << "Meta table '" << schema.name() << "' not exist and will be created";
        }

S
starlord 已提交
231
        if (exist_fields.empty()) {
232 233 234 235 236 237
            return true;
        }

        return schema.IsEqual(exist_fields);
    };

S
starlord 已提交
238
    // verify Tables
239 240 241 242
    if (!validate_func(TABLES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }

S
starlord 已提交
243
    // verufy TableFiles
244 245 246 247 248
    if (!validate_func(TABLEFILES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
249 250
Status
MySQLMetaImpl::Initialize() {
S
starlord 已提交
251
    // step 1: create db root path
S
starlord 已提交
252 253
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
254
        if (!ret) {
S
starlord 已提交
255
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
256 257
            ENGINE_LOG_ERROR << msg;
            return Status(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
258 259 260
        }
    }

S
starlord 已提交
261
    std::string uri = options_.backend_uri_;
262

S
starlord 已提交
263
    // step 2: parse and check meta uri
264 265
    utils::MetaUriInfo uri_info;
    auto status = utils::ParseMetaUri(uri, uri_info);
S
starlord 已提交
266
    if (!status.ok()) {
267 268 269 270 271 272 273 274 275 276 277
        std::string msg = "Wrong URI format: " + uri;
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

    if (strcasecmp(uri_info.dialect_.c_str(), "mysql") != 0) {
        std::string msg = "URI's dialect is not MySQL";
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

S
starlord 已提交
278
    // step 3: connect mysql
279 280 281 282 283 284 285
    int thread_hint = std::thread::hardware_concurrency();
    int max_pool_size = (thread_hint == 0) ? 8 : thread_hint;
    unsigned int port = 0;
    if (!uri_info.port_.empty()) {
        port = std::stoi(uri_info.port_);
    }

S
starlord 已提交
286 287
    mysql_connection_pool_ = std::make_shared<MySQLConnectionPool>(
        uri_info.db_name_, uri_info.username_, uri_info.password_, uri_info.host_, port, max_pool_size);
288 289
    ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(max_pool_size);

S
starlord 已提交
290
    // step 4: validate to avoid open old version schema
291 292
    ValidateMetaSchema();

S
starlord 已提交
293
    // step 5: create meta tables
294
    try {
Y
yudong.cai 已提交
295
        if (mode_ != DBOptions::MODE::CLUSTER_READONLY) {
296
            CleanUpShadowFiles();
297 298
        }

299
        {
S
starlord 已提交
300
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
301

302
            if (connectionPtr == nullptr) {
G
groot 已提交
303
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
304
            }
305

306 307 308
            if (!connectionPtr->thread_aware()) {
                ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
                return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
309
            }
S
starlord 已提交
310
            mysqlpp::Query InitializeQuery = connectionPtr->query();
311

S
starlord 已提交
312 313
            InitializeQuery << "CREATE TABLE IF NOT EXISTS " << TABLES_SCHEMA.name() << " ("
                            << TABLES_SCHEMA.ToString() + ");";
314

315
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
Z
update  
zhiru 已提交
316

317 318 319
            if (!InitializeQuery.exec()) {
                return HandleException("Initialization Error", InitializeQuery.error());
            }
320

S
starlord 已提交
321 322
            InitializeQuery << "CREATE TABLE IF NOT EXISTS " << TABLEFILES_SCHEMA.name() << " ("
                            << TABLEFILES_SCHEMA.ToString() + ");";
Z
update  
zhiru 已提交
323

324
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
Z
update  
zhiru 已提交
325

326 327 328
            if (!InitializeQuery.exec()) {
                return HandleException("Initialization Error", InitializeQuery.error());
            }
S
starlord 已提交
329 330
        }  // Scoped Connection
    } catch (std::exception& e) {
331
        return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
Z
update  
zhiru 已提交
332
    }
S
starlord 已提交
333 334

    return Status::OK();
335 336
}

S
starlord 已提交
337
Status
S
starlord 已提交
338
MySQLMetaImpl::CreateTable(TableSchema& table_schema) {
339
    try {
Y
Yu Kun 已提交
340
        server::MetricCollector metric;
341
        {
S
starlord 已提交
342
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
343

344
            if (connectionPtr == nullptr) {
G
groot 已提交
345
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
346
            }
Z
update  
zhiru 已提交
347

S
starlord 已提交
348
            mysqlpp::Query createTableQuery = connectionPtr->query();
Z
update  
zhiru 已提交
349

350 351 352
            if (table_schema.table_id_.empty()) {
                NextTableId(table_schema.table_id_);
            } else {
G
groot 已提交
353 354
                createTableQuery << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                 << table_schema.table_id_ << ";";
Z
zhiru 已提交
355

356
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
Z
update  
zhiru 已提交
357

S
starlord 已提交
358
                mysqlpp::StoreQueryResult res = createTableQuery.store();
359

360 361 362
                if (res.num_rows() == 1) {
                    int state = res[0]["state"];
                    if (TableSchema::TO_DELETE == state) {
S
starlord 已提交
363
                        return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
364
                    } else {
S
starlord 已提交
365
                        return Status(DB_ALREADY_EXIST, "Table already exists");
366 367 368
                    }
                }
            }
369

370 371
            table_schema.id_ = -1;
            table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
372

S
starlord 已提交
373
            std::string id = "NULL";  // auto-increment
G
groot 已提交
374
            std::string& table_id = table_schema.table_id_;
375 376 377
            std::string state = std::to_string(table_schema.state_);
            std::string dimension = std::to_string(table_schema.dimension_);
            std::string created_on = std::to_string(table_schema.created_on_);
S
starlord 已提交
378 379
            std::string flag = std::to_string(table_schema.flag_);
            std::string index_file_size = std::to_string(table_schema.index_file_size_);
380
            std::string engine_type = std::to_string(table_schema.engine_type_);
S
starlord 已提交
381 382
            std::string nlist = std::to_string(table_schema.nlist_);
            std::string metric_type = std::to_string(table_schema.metric_type_);
G
groot 已提交
383 384 385
            std::string& owner_table = table_schema.owner_table_;
            std::string& partition_tag = table_schema.partition_tag_;
            std::string& version = table_schema.version_;
Z
update  
zhiru 已提交
386

G
groot 已提交
387 388 389 390 391
            createTableQuery << "INSERT INTO " << META_TABLES << " VALUES(" << id << ", " << mysqlpp::quote << table_id
                             << ", " << state << ", " << dimension << ", " << created_on << ", " << flag << ", "
                             << index_file_size << ", " << engine_type << ", " << nlist << ", " << metric_type << ", "
                             << mysqlpp::quote << owner_table << ", " << mysqlpp::quote << partition_tag << ", "
                             << mysqlpp::quote << version << ");";
Z
update  
zhiru 已提交
392

393
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
394

S
starlord 已提交
395
            if (mysqlpp::SimpleResult res = createTableQuery.execute()) {
S
starlord 已提交
396
                table_schema.id_ = res.insert_id();  // Might need to use SELECT LAST_INSERT_ID()?
397

S
starlord 已提交
398
                // Consume all results to avoid "Commands out of sync" error
399
            } else {
S
starlord 已提交
400
                return HandleException("Add Table Error", createTableQuery.error());
401
            }
S
starlord 已提交
402
        }  // Scoped Connection
403

404
        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
405
        return utils::CreateTablePath(options_, table_schema.table_id_);
S
starlord 已提交
406
    } catch (std::exception& e) {
S
starlord 已提交
407
        return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
408 409
    }
}
410

S
starlord 已提交
411
Status
G
groot 已提交
412
MySQLMetaImpl::DescribeTable(TableSchema& table_schema) {
Z
zhiru 已提交
413
    try {
G
groot 已提交
414
        server::MetricCollector metric;
S
starlord 已提交
415
        mysqlpp::StoreQueryResult res;
Z
zhiru 已提交
416
        {
S
starlord 已提交
417
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
418 419

            if (connectionPtr == nullptr) {
G
groot 已提交
420
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
421 422
            }

G
groot 已提交
423 424 425 426 427 428
            mysqlpp::Query describeTableQuery = connectionPtr->query();
            describeTableQuery
                << "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type, nlist, metric_type"
                << " ,owner_table, partition_tag, version"
                << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_schema.table_id_
                << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
zhiru 已提交
429

G
groot 已提交
430
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
Z
zhiru 已提交
431

G
groot 已提交
432
            res = describeTableQuery.store();
S
starlord 已提交
433
        }  // Scoped Connection
Z
zhiru 已提交
434

G
groot 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
        if (res.num_rows() == 1) {
            const mysqlpp::Row& resRow = res[0];
            table_schema.id_ = resRow["id"];  // implicit conversion
            table_schema.state_ = resRow["state"];
            table_schema.dimension_ = resRow["dimension"];
            table_schema.created_on_ = resRow["created_on"];
            table_schema.flag_ = resRow["flag"];
            table_schema.index_file_size_ = resRow["index_file_size"];
            table_schema.engine_type_ = resRow["engine_type"];
            table_schema.nlist_ = resRow["nlist"];
            table_schema.metric_type_ = resRow["metric_type"];
            resRow["owner_table"].to_string(table_schema.owner_table_);
            resRow["partition_tag"].to_string(table_schema.partition_tag_);
            resRow["version"].to_string(table_schema.version_);
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
451
        }
S
starlord 已提交
452
    } catch (std::exception& e) {
G
groot 已提交
453
        return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
Z
zhiru 已提交
454 455
    }

456 457
    return Status::OK();
}
458

S
starlord 已提交
459
Status
G
groot 已提交
460
MySQLMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
461
    try {
Y
Yu Kun 已提交
462
        server::MetricCollector metric;
G
groot 已提交
463
        mysqlpp::StoreQueryResult res;
S
starlord 已提交
464
        {
S
starlord 已提交
465
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
466 467

            if (connectionPtr == nullptr) {
G
groot 已提交
468
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
469 470
            }

G
groot 已提交
471 472 473 474 475 476 477
            mysqlpp::Query hasTableQuery = connectionPtr->query();
            // since table_id is a unique column we just need to check whether it exists or not
            hasTableQuery << "SELECT EXISTS"
                          << " (SELECT 1 FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                          << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ")"
                          << " AS " << mysqlpp::quote << "check"
                          << ";";
S
starlord 已提交
478

G
groot 已提交
479
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
S
starlord 已提交
480

G
groot 已提交
481
            res = hasTableQuery.store();
S
starlord 已提交
482
        }  // Scoped Connection
S
starlord 已提交
483

G
groot 已提交
484 485
        int check = res[0]["check"];
        has_or_not = (check == 1);
S
starlord 已提交
486
    } catch (std::exception& e) {
G
groot 已提交
487
        return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
S
starlord 已提交
488 489
    }

490 491 492
    return Status::OK();
}

S
starlord 已提交
493
Status
G
groot 已提交
494
MySQLMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
S
starlord 已提交
495
    try {
Y
Yu Kun 已提交
496
        server::MetricCollector metric;
G
groot 已提交
497
        mysqlpp::StoreQueryResult res;
S
starlord 已提交
498
        {
S
starlord 已提交
499
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
500 501

            if (connectionPtr == nullptr) {
G
groot 已提交
502
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
503 504
            }

G
groot 已提交
505 506 507 508 509
            mysqlpp::Query allTablesQuery = connectionPtr->query();
            allTablesQuery << "SELECT id, table_id, dimension, engine_type, nlist, index_file_size, metric_type"
                           << " ,owner_table, partition_tag, version"
                           << " FROM " << META_TABLES << " WHERE state <> " << std::to_string(TableSchema::TO_DELETE)
                           << ";";
S
starlord 已提交
510

G
groot 已提交
511
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
S
starlord 已提交
512

G
groot 已提交
513
            res = allTablesQuery.store();
S
starlord 已提交
514
        }  // Scoped Connection
S
starlord 已提交
515

G
groot 已提交
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
        for (auto& resRow : res) {
            TableSchema table_schema;
            table_schema.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_schema.table_id_);
            table_schema.dimension_ = resRow["dimension"];
            table_schema.index_file_size_ = resRow["index_file_size"];
            table_schema.engine_type_ = resRow["engine_type"];
            table_schema.nlist_ = resRow["nlist"];
            table_schema.metric_type_ = resRow["metric_type"];
            resRow["owner_table"].to_string(table_schema.owner_table_);
            resRow["partition_tag"].to_string(table_schema.partition_tag_);
            resRow["version"].to_string(table_schema.version_);

            table_schema_array.emplace_back(table_schema);
        }
S
starlord 已提交
531
    } catch (std::exception& e) {
G
groot 已提交
532
        return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
S
starlord 已提交
533 534 535 536 537
    }

    return Status::OK();
}

S
starlord 已提交
538
Status
G
groot 已提交
539
MySQLMetaImpl::DropTable(const std::string& table_id) {
S
starlord 已提交
540
    try {
Y
Yu Kun 已提交
541
        server::MetricCollector metric;
S
starlord 已提交
542
        {
S
starlord 已提交
543
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
544

S
starlord 已提交
545
            if (connectionPtr == nullptr) {
G
groot 已提交
546
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
547 548
            }

G
groot 已提交
549 550 551 552 553
            // soft delete table
            mysqlpp::Query deleteTableQuery = connectionPtr->query();
            //
            deleteTableQuery << "UPDATE " << META_TABLES << " SET state = " << std::to_string(TableSchema::TO_DELETE)
                             << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
S
starlord 已提交
554

G
groot 已提交
555
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
S
starlord 已提交
556

G
groot 已提交
557 558
            if (!deleteTableQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
S
starlord 已提交
559
            }
S
starlord 已提交
560
        }  // Scoped Connection
G
groot 已提交
561 562 563 564 565 566

        if (mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
            DeleteTableFiles(table_id);
        }

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
S
starlord 已提交
567
    } catch (std::exception& e) {
G
groot 已提交
568
        return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
S
starlord 已提交
569 570 571 572 573
    }

    return Status::OK();
}

S
starlord 已提交
574
Status
G
groot 已提交
575
MySQLMetaImpl::DeleteTableFiles(const std::string& table_id) {
S
starlord 已提交
576
    try {
Y
Yu Kun 已提交
577
        server::MetricCollector metric;
578
        {
S
starlord 已提交
579
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
580

581
            if (connectionPtr == nullptr) {
G
groot 已提交
582
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
583 584
            }

G
groot 已提交
585 586 587 588 589 590 591 592
            // soft delete table files
            mysqlpp::Query deleteTableFilesQuery = connectionPtr->query();
            //
            deleteTableFilesQuery << "UPDATE " << META_TABLEFILES
                                  << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                  << " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp())
                                  << " WHERE table_id = " << mysqlpp::quote << table_id << " AND file_type <> "
                                  << std::to_string(TableFileSchema::TO_DELETE) << ";";
593

G
groot 已提交
594
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
595

G
groot 已提交
596 597
            if (!deleteTableFilesQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
S
starlord 已提交
598
            }
S
starlord 已提交
599
        }  // Scoped Connection
S
starlord 已提交
600

G
groot 已提交
601
        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
S
starlord 已提交
602
    } catch (std::exception& e) {
G
groot 已提交
603
        return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
S
starlord 已提交
604
    }
605

S
starlord 已提交
606 607
    return Status::OK();
}
Z
update  
zhiru 已提交
608

S
starlord 已提交
609
Status
G
groot 已提交
610 611 612 613 614 615 616 617 618 619
MySQLMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
Z
zhiru 已提交
620

G
groot 已提交
621 622
    try {
        server::MetricCollector metric;
Z
update  
zhiru 已提交
623

G
groot 已提交
624 625 626 627 628 629 630 631 632 633
        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
        file_schema.engine_type_ = table_schema.engine_type_;
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
634

G
groot 已提交
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
        std::string id = "NULL";  // auto-increment
        std::string table_id = file_schema.table_id_;
        std::string engine_type = std::to_string(file_schema.engine_type_);
        std::string file_id = file_schema.file_id_;
        std::string file_type = std::to_string(file_schema.file_type_);
        std::string file_size = std::to_string(file_schema.file_size_);
        std::string row_count = std::to_string(file_schema.row_count_);
        std::string updated_time = std::to_string(file_schema.updated_time_);
        std::string created_on = std::to_string(file_schema.created_on_);
        std::string date = std::to_string(file_schema.date_);

        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
651
            }
Z
zhiru 已提交
652

G
groot 已提交
653
            mysqlpp::Query createTableFileQuery = connectionPtr->query();
Z
update  
zhiru 已提交
654

G
groot 已提交
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
            createTableFileQuery << "INSERT INTO " << META_TABLEFILES << " VALUES(" << id << ", " << mysqlpp::quote
                                 << table_id << ", " << engine_type << ", " << mysqlpp::quote << file_id << ", "
                                 << file_type << ", " << file_size << ", " << row_count << ", " << updated_time << ", "
                                 << created_on << ", " << date << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();

            if (mysqlpp::SimpleResult res = createTableFileQuery.execute()) {
                file_schema.id_ = res.insert_id();  // Might need to use SELECT LAST_INSERT_ID()?

                // Consume all results to avoid "Commands out of sync" error
            } else {
                return HandleException("QUERY ERROR WHEN CREATING TABLE FILE", createTableFileQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
S
starlord 已提交
673
    } catch (std::exception& e) {
G
groot 已提交
674
        return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
675 676
    }
}
Z
update  
zhiru 已提交
677

G
groot 已提交
678
// TODO(myh): Delete single vecotor by id
S
starlord 已提交
679
Status
G
groot 已提交
680 681 682 683 684 685 686 687 688 689 690 691
MySQLMetaImpl::DropDataByDate(const std::string& table_id, const DatesT& dates) {
    if (dates.empty()) {
        return Status::OK();
    }

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

692
    try {
G
groot 已提交
693 694 695 696 697 698 699
        std::stringstream dateListSS;
        for (auto& date : dates) {
            dateListSS << std::to_string(date) << ", ";
        }
        std::string dateListStr = dateListSS.str();
        dateListStr = dateListStr.substr(0, dateListStr.size() - 2);  // remove the last ", "

700
        {
S
starlord 已提交
701
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
702 703

            if (connectionPtr == nullptr) {
G
groot 已提交
704
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
705
            }
706

G
groot 已提交
707
            mysqlpp::Query dropPartitionsByDatesQuery = connectionPtr->query();
708

G
groot 已提交
709 710 711 712 713
            dropPartitionsByDatesQuery << "UPDATE " << META_TABLEFILES
                                       << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                       << " ,updated_time = " << utils::GetMicroSecTimeStamp()
                                       << " WHERE table_id = " << mysqlpp::quote << table_id << " AND date in ("
                                       << dateListStr << ");";
714

G
groot 已提交
715 716 717 718 719
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropDataByDate: " << dropPartitionsByDatesQuery.str();

            if (!dropPartitionsByDatesQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES",
                                       dropPartitionsByDatesQuery.error());
720
            }
S
starlord 已提交
721
        }  // Scoped Connection
722

G
groot 已提交
723
        ENGINE_LOG_DEBUG << "Successfully drop data by date, table id = " << table_schema.table_id_;
S
starlord 已提交
724
    } catch (std::exception& e) {
G
groot 已提交
725
        return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
Z
update  
zhiru 已提交
726
    }
727 728
    return Status::OK();
}
Z
zhiru 已提交
729

S
starlord 已提交
730
Status
G
groot 已提交
731 732 733 734 735 736 737 738 739 740 741 742 743
MySQLMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
                             TableFilesSchema& table_files) {
    if (ids.empty()) {
        return Status::OK();
    }

    std::stringstream idSS;
    for (auto& id : ids) {
        idSS << "id = " << std::to_string(id) << " OR ";
    }
    std::string idStr = idSS.str();
    idStr = idStr.substr(0, idStr.size() - 4);  // remove the last " OR "

744
    try {
S
starlord 已提交
745
        mysqlpp::StoreQueryResult res;
746
        {
S
starlord 已提交
747
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
748

749
            if (connectionPtr == nullptr) {
G
groot 已提交
750
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
751
            }
Z
zhiru 已提交
752

G
groot 已提交
753 754 755 756 757
            mysqlpp::Query getTableFileQuery = connectionPtr->query();
            getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
                              << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                              << " AND (" << idStr << ")"
                              << " AND file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
758

G
groot 已提交
759
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();
Z
update  
zhiru 已提交
760

G
groot 已提交
761
            res = getTableFileQuery.store();
S
starlord 已提交
762
        }  // Scoped Connection
763

G
groot 已提交
764 765 766
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        DescribeTable(table_schema);
S
starlord 已提交
767

G
groot 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
        Status ret;
        for (auto& resRow : res) {
            TableFileSchema file_schema;
            file_schema.id_ = resRow["id"];
            file_schema.table_id_ = table_id;
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.engine_type_ = resRow["engine_type"];
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
            resRow["file_id"].to_string(file_schema.file_id_);
            file_schema.file_type_ = resRow["file_type"];
            file_schema.file_size_ = resRow["file_size"];
            file_schema.row_count_ = resRow["row_count"];
            file_schema.date_ = resRow["date"];
            file_schema.created_on_ = resRow["created_on"];
            file_schema.dimension_ = table_schema.dimension_;
S
starlord 已提交
784

G
groot 已提交
785 786
            utils::GetTableFilePath(options_, file_schema);
            table_files.emplace_back(file_schema);
787
        }
G
groot 已提交
788 789 790

        ENGINE_LOG_DEBUG << "Get table files by id";
        return ret;
S
starlord 已提交
791
    } catch (std::exception& e) {
G
groot 已提交
792
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
Z
update  
zhiru 已提交
793
    }
794
}
Z
zhiru 已提交
795

S
starlord 已提交
796
Status
G
groot 已提交
797
MySQLMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
798
    try {
Y
Yu Kun 已提交
799
        server::MetricCollector metric;
G
groot 已提交
800

801
        {
S
starlord 已提交
802
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
803

804
            if (connectionPtr == nullptr) {
G
groot 已提交
805
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
806
            }
Z
update  
zhiru 已提交
807

G
groot 已提交
808 809 810 811
            mysqlpp::Query updateTableIndexParamQuery = connectionPtr->query();
            updateTableIndexParamQuery << "SELECT id, state, dimension, created_on"
                                       << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                                       << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
812

G
groot 已提交
813
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
814

G
groot 已提交
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839
            mysqlpp::StoreQueryResult res = updateTableIndexParamQuery.store();

            if (res.num_rows() == 1) {
                const mysqlpp::Row& resRow = res[0];

                size_t id = resRow["id"];
                int32_t state = resRow["state"];
                uint16_t dimension = resRow["dimension"];
                int64_t created_on = resRow["created_on"];

                updateTableIndexParamQuery << "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state
                                           << " ,dimension = " << dimension << " ,created_on = " << created_on
                                           << " ,engine_type = " << index.engine_type_ << " ,nlist = " << index.nlist_
                                           << " ,metric_type = " << index.metric_type_
                                           << " WHERE table_id = " << mysqlpp::quote << table_id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();

                if (!updateTableIndexParamQuery.exec()) {
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM",
                                           updateTableIndexParamQuery.error());
                }
            } else {
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
            }
S
starlord 已提交
840
        }  // Scoped Connection
841

G
groot 已提交
842
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
S
starlord 已提交
843
    } catch (std::exception& e) {
G
groot 已提交
844
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
845
    }
846

847 848
    return Status::OK();
}
849

S
starlord 已提交
850
Status
G
groot 已提交
851
MySQLMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
852
    try {
Y
Yu Kun 已提交
853
        server::MetricCollector metric;
G
groot 已提交
854

855
        {
S
starlord 已提交
856
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
857

858
            if (connectionPtr == nullptr) {
G
groot 已提交
859
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
860
            }
Z
update  
zhiru 已提交
861

G
groot 已提交
862 863 864
            mysqlpp::Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE " << META_TABLES << " SET flag = " << flag
                                 << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
Z
zhiru 已提交
865

G
groot 已提交
866
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();
Z
zhiru 已提交
867

G
groot 已提交
868 869 870
            if (!updateTableFlagQuery.exec()) {
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLAG", updateTableFlagQuery.error());
            }
S
starlord 已提交
871
        }  // Scoped Connection
872

G
groot 已提交
873
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
S
starlord 已提交
874
    } catch (std::exception& e) {
G
groot 已提交
875
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
876
    }
Z
update  
zhiru 已提交
877

878 879
    return Status::OK();
}
880

G
groot 已提交
881
// ZR: this function assumes all fields in file_schema have value
S
starlord 已提交
882
Status
G
groot 已提交
883 884
MySQLMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
885

886
    try {
Y
Yu Kun 已提交
887
        server::MetricCollector metric;
G
groot 已提交
888 889
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
890

891
            if (connectionPtr == nullptr) {
G
groot 已提交
892
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
893
            }
Z
update  
zhiru 已提交
894

G
groot 已提交
895
            mysqlpp::Query updateTableFileQuery = connectionPtr->query();
896

G
groot 已提交
897 898 899 900
            // if the table has been deleted, just mark the table file as TO_DELETE
            // clean thread will delete the file later
            updateTableFileQuery << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                 << file_schema.table_id_ << ";";
901

G
groot 已提交
902
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
903

G
groot 已提交
904
            mysqlpp::StoreQueryResult res = updateTableFileQuery.store();
905

G
groot 已提交
906 907 908 909 910
            if (res.num_rows() == 1) {
                int state = res[0]["state"];
                if (state == TableSchema::TO_DELETE) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
                }
911
            } else {
G
groot 已提交
912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
                file_schema.file_type_ = TableFileSchema::TO_DELETE;
            }

            std::string id = std::to_string(file_schema.id_);
            std::string table_id = file_schema.table_id_;
            std::string engine_type = std::to_string(file_schema.engine_type_);
            std::string file_id = file_schema.file_id_;
            std::string file_type = std::to_string(file_schema.file_type_);
            std::string file_size = std::to_string(file_schema.file_size_);
            std::string row_count = std::to_string(file_schema.row_count_);
            std::string updated_time = std::to_string(file_schema.updated_time_);
            std::string created_on = std::to_string(file_schema.created_on_);
            std::string date = std::to_string(file_schema.date_);

            updateTableFileQuery << "UPDATE " << META_TABLEFILES << " SET table_id = " << mysqlpp::quote << table_id
                                 << " ,engine_type = " << engine_type << " ,file_id = " << mysqlpp::quote << file_id
                                 << " ,file_type = " << file_type << " ,file_size = " << file_size
                                 << " ,row_count = " << row_count << " ,updated_time = " << updated_time
                                 << " ,created_on = " << created_on << " ,date = " << date << " WHERE id = " << id
                                 << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();

            if (!updateTableFileQuery.exec()) {
                ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error());
938
            }
S
starlord 已提交
939
        }  // Scoped Connection
940

G
groot 已提交
941
        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
S
starlord 已提交
942
    } catch (std::exception& e) {
G
groot 已提交
943
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
944
    }
G
groot 已提交
945 946

    return Status::OK();
947
}
948

S
starlord 已提交
949
Status
G
groot 已提交
950 951 952 953 954 955 956 957 958 959 960 961 962
MySQLMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
    try {
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

        if (connectionPtr == nullptr) {
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
        }

        mysqlpp::Query updateTableFilesToIndexQuery = connectionPtr->query();

        updateTableFilesToIndexQuery << "UPDATE " << META_TABLEFILES
                                     << " SET file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                                     << " WHERE table_id = " << mysqlpp::quote << table_id
G
groot 已提交
963
                                     << " AND row_count >= " << std::to_string(meta::BUILD_INDEX_THRESHOLD)
G
groot 已提交
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
                                     << " AND file_type = " << std::to_string(TableFileSchema::RAW) << ";";

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();

        if (!updateTableFilesToIndexQuery.exec()) {
            return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX",
                                   updateTableFilesToIndexQuery.error());
        }

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
    }

    return Status::OK();
}
980

G
groot 已提交
981 982
Status
MySQLMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
983
    try {
Y
Yu Kun 已提交
984
        server::MetricCollector metric;
985
        {
S
starlord 已提交
986
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
987 988

            if (connectionPtr == nullptr) {
G
groot 已提交
989
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
990
            }
991

G
groot 已提交
992
            mysqlpp::Query updateTableFilesQuery = connectionPtr->query();
993

G
groot 已提交
994 995 996 997 998
            std::map<std::string, bool> has_tables;
            for (auto& file_schema : files) {
                if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
                    continue;
                }
999

G
groot 已提交
1000 1001 1002 1003 1004 1005
                updateTableFilesQuery << "SELECT EXISTS"
                                      << " (SELECT 1 FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
                                      << file_schema.table_id_ << " AND state <> "
                                      << std::to_string(TableSchema::TO_DELETE) << ")"
                                      << " AS " << mysqlpp::quote << "check"
                                      << ";";
1006

G
groot 已提交
1007
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
1008

G
groot 已提交
1009
                mysqlpp::StoreQueryResult res = updateTableFilesQuery.store();
1010

G
groot 已提交
1011 1012 1013
                int check = res[0]["check"];
                has_tables[file_schema.table_id_] = (check == 1);
            }
1014

G
groot 已提交
1015 1016 1017 1018 1019
            for (auto& file_schema : files) {
                if (!has_tables[file_schema.table_id_]) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
                }
                file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1020

G
groot 已提交
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
                std::string id = std::to_string(file_schema.id_);
                std::string& table_id = file_schema.table_id_;
                std::string engine_type = std::to_string(file_schema.engine_type_);
                std::string& file_id = file_schema.file_id_;
                std::string file_type = std::to_string(file_schema.file_type_);
                std::string file_size = std::to_string(file_schema.file_size_);
                std::string row_count = std::to_string(file_schema.row_count_);
                std::string updated_time = std::to_string(file_schema.updated_time_);
                std::string created_on = std::to_string(file_schema.created_on_);
                std::string date = std::to_string(file_schema.date_);
1031

G
groot 已提交
1032 1033 1034 1035 1036 1037
                updateTableFilesQuery << "UPDATE " << META_TABLEFILES << " SET table_id = " << mysqlpp::quote
                                      << table_id << " ,engine_type = " << engine_type
                                      << " ,file_id = " << mysqlpp::quote << file_id << " ,file_type = " << file_type
                                      << " ,file_size = " << file_size << " ,row_count = " << row_count
                                      << " ,updated_time = " << updated_time << " ,created_on = " << created_on
                                      << " ,date = " << date << " WHERE id = " << id << ";";
1038

G
groot 已提交
1039
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
S
starlord 已提交
1040

G
groot 已提交
1041 1042
                if (!updateTableFilesQuery.exec()) {
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error());
1043
                }
S
starlord 已提交
1044
            }
G
groot 已提交
1045
        }  // Scoped Connection
1046

G
groot 已提交
1047
        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
S
starlord 已提交
1048
    } catch (std::exception& e) {
G
groot 已提交
1049
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
Z
update  
zhiru 已提交
1050
    }
G
groot 已提交
1051 1052

    return Status::OK();
1053 1054
}

S
starlord 已提交
1055
Status
G
groot 已提交
1056
MySQLMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
X
xj.lin 已提交
1057
    try {
Y
Yu Kun 已提交
1058
        server::MetricCollector metric;
G
groot 已提交
1059

X
xj.lin 已提交
1060
        {
S
starlord 已提交
1061
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
X
xj.lin 已提交
1062 1063

            if (connectionPtr == nullptr) {
G
groot 已提交
1064
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
X
xj.lin 已提交
1065 1066
            }

G
groot 已提交
1067 1068 1069 1070
            mysqlpp::Query describeTableIndexQuery = connectionPtr->query();
            describeTableIndexQuery << "SELECT engine_type, nlist, index_file_size, metric_type"
                                    << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                                    << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
X
xj.lin 已提交
1071

G
groot 已提交
1072
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
X
xj.lin 已提交
1073

G
groot 已提交
1074
            mysqlpp::StoreQueryResult res = describeTableIndexQuery.store();
X
xj.lin 已提交
1075

G
groot 已提交
1076 1077
            if (res.num_rows() == 1) {
                const mysqlpp::Row& resRow = res[0];
X
xj.lin 已提交
1078

G
groot 已提交
1079 1080 1081 1082 1083
                index.engine_type_ = resRow["engine_type"];
                index.nlist_ = resRow["nlist"];
                index.metric_type_ = resRow["metric_type"];
            } else {
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
X
xj.lin 已提交
1084
            }
S
starlord 已提交
1085
        }  // Scoped Connection
G
groot 已提交
1086 1087 1088
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
    }
X
xj.lin 已提交
1089

G
groot 已提交
1090 1091
    return Status::OK();
}
X
xj.lin 已提交
1092

G
groot 已提交
1093 1094 1095 1096
Status
MySQLMetaImpl::DropTableIndex(const std::string& table_id) {
    try {
        server::MetricCollector metric;
X
xj.lin 已提交
1097

G
groot 已提交
1098 1099
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
X
xj.lin 已提交
1100

G
groot 已提交
1101 1102 1103
            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }
1104

G
groot 已提交
1105
            mysqlpp::Query dropTableIndexQuery = connectionPtr->query();
X
xj.lin 已提交
1106

G
groot 已提交
1107 1108 1109 1110 1111 1112
            // soft delete index files
            dropTableIndexQuery << "UPDATE " << META_TABLEFILES
                                << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                << " ,updated_time = " << utils::GetMicroSecTimeStamp()
                                << " WHERE table_id = " << mysqlpp::quote << table_id
                                << " AND file_type = " << std::to_string(TableFileSchema::INDEX) << ";";
S
starlord 已提交
1113

G
groot 已提交
1114
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
S
starlord 已提交
1115

G
groot 已提交
1116 1117 1118
            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
X
xj.lin 已提交
1119

G
groot 已提交
1120 1121 1122 1123 1124 1125
            // set all backup file to raw
            dropTableIndexQuery << "UPDATE " << META_TABLEFILES
                                << " SET file_type = " << std::to_string(TableFileSchema::RAW)
                                << " ,updated_time = " << utils::GetMicroSecTimeStamp()
                                << " WHERE table_id = " << mysqlpp::quote << table_id
                                << " AND file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";
X
xj.lin 已提交
1126

G
groot 已提交
1127
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
S
starlord 已提交
1128

G
groot 已提交
1129 1130 1131
            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
X
xj.lin 已提交
1132

G
groot 已提交
1133 1134 1135 1136 1137 1138
            // set table index type to raw
            dropTableIndexQuery << "UPDATE " << META_TABLES
                                << " SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE)
                                << " ,nlist = " << std::to_string(DEFAULT_NLIST)
                                << " ,metric_type = " << std::to_string(DEFAULT_METRIC_TYPE)
                                << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
X
xj.lin 已提交
1139

G
groot 已提交
1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag) {
    server::MetricCollector metric;

    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    // not allow create partition under partition
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
1168
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
1169 1170
    }

1171 1172 1173 1174
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);
G
groot 已提交
1175

1176 1177 1178 1179
    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
    if (!exist_partition.empty()) {
G
groot 已提交
1180
        return Status(DB_ERROR, "Duplicate partition is not allowed");
1181 1182 1183 1184
    }

    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
1185 1186 1187 1188 1189 1190 1191 1192 1193
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
    }

    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
1194
    table_schema.partition_tag_ = valid_tag;
G
groot 已提交
1195

1196 1197 1198 1199 1200 1201
    status = CreateTable(table_schema);
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }

    return status;
G
groot 已提交
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
}

Status
MySQLMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}

Status
MySQLMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) {
    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query allPartitionsQuery = connectionPtr->query();
            allPartitionsQuery << "SELECT table_id FROM " << META_TABLES << " WHERE owner_table = " << mysqlpp::quote
                               << table_id << " AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allPartitionsQuery.str();

            res = allPartitionsQuery.store();
        }  // Scoped Connection

        for (auto& resRow : res) {
            meta::TableSchema partition_schema;
            resRow["table_id"].to_string(partition_schema.table_id_);
            DescribeTable(partition_schema);
            partiton_schema_array.emplace_back(partition_schema);
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN SHOW PARTITIONS", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
1248 1249 1250 1251 1252 1253

        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

G
groot 已提交
1254 1255 1256 1257 1258 1259 1260 1261 1262
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query allPartitionsQuery = connectionPtr->query();
            allPartitionsQuery << "SELECT table_id FROM " << META_TABLES << " WHERE owner_table = " << mysqlpp::quote
1263
                               << table_id << " AND partition_tag = " << mysqlpp::quote << valid_tag << " AND state <> "
G
groot 已提交
1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
                               << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allPartitionsQuery.str();

            res = allPartitionsQuery.store();
        }  // Scoped Connection

        if (res.num_rows() > 0) {
            const mysqlpp::Row& resRow = res[0];
            resRow["table_id"].to_string(partition_name);
        } else {
1275
            return Status(DB_NOT_FOUND, "Partition " + valid_tag + " of table " + table_id + " not found");
G
groot 已提交
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN GET PARTITION NAME", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& dates,
                             DatePartionedTableFilesSchema& files) {
    files.clear();

    try {
        server::MetricCollector metric;
        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query filesToSearchQuery = connectionPtr->query();
            filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date"
                               << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id;

            if (!dates.empty()) {
                std::stringstream partitionListSS;
                for (auto& date : dates) {
                    partitionListSS << std::to_string(date) << ", ";
                }
                std::string partitionListStr = partitionListSS.str();

                partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2);  // remove the last ", "
                filesToSearchQuery << " AND date IN (" << partitionListStr << ")";
            }

            if (!ids.empty()) {
                std::stringstream idSS;
                for (auto& id : ids) {
                    idSS << "id = " << std::to_string(id) << " OR ";
                }
                std::string idStr = idSS.str();
                idStr = idStr.substr(0, idStr.size() - 4);  // remove the last " OR "

                filesToSearchQuery << " AND (" << idStr << ")";
            }
            // End
            filesToSearchQuery << " AND"
                               << " (file_type = " << std::to_string(TableFileSchema::RAW)
                               << " OR file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                               << " OR file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

            res = filesToSearchQuery.store();
        }  // Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        Status ret;
        TableFileSchema table_file;
        for (auto& resRow : res) {
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
            table_file.index_file_size_ = table_schema.index_file_size_;
            table_file.engine_type_ = resRow["engine_type"];
            table_file.nlist_ = table_schema.nlist_;
            table_file.metric_type_ = table_schema.metric_type_;
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.dimension_ = table_schema.dimension_;
X
xj.lin 已提交
1357

S
starlord 已提交
1358
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1359
            if (!status.ok()) {
S
starlord 已提交
1360
                ret = status;
S
starlord 已提交
1361
            }
1362

1363 1364 1365
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1366 1367
            }

1368
            files[table_file.date_].push_back(table_file);
1369
        }
S
starlord 已提交
1370

S
starlord 已提交
1371
        if (res.size() > 0) {
1372 1373
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-search files";
        }
S
starlord 已提交
1374
        return ret;
S
starlord 已提交
1375
    } catch (std::exception& e) {
S
starlord 已提交
1376
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
Z
update  
zhiru 已提交
1377
    }
1378
}
Z
update  
zhiru 已提交
1379

S
starlord 已提交
1380
Status
S
starlord 已提交
1381
MySQLMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) {
1382
    files.clear();
Z
update  
zhiru 已提交
1383

1384
    try {
Y
Yu Kun 已提交
1385
        server::MetricCollector metric;
S
starlord 已提交
1386

S
starlord 已提交
1387
        // check table existence
S
starlord 已提交
1388 1389 1390 1391 1392 1393 1394
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
1395
        mysqlpp::StoreQueryResult res;
1396
        {
S
starlord 已提交
1397
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1398

1399
            if (connectionPtr == nullptr) {
G
groot 已提交
1400
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1401
            }
Z
update  
zhiru 已提交
1402

S
starlord 已提交
1403 1404
            mysqlpp::Query filesToMergeQuery = connectionPtr->query();
            filesToMergeQuery
G
groot 已提交
1405 1406 1407
                << "SELECT id, table_id, file_id, file_type, file_size, row_count, date, engine_type, created_on"
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                << " AND file_type = " << std::to_string(TableFileSchema::RAW) << " ORDER BY row_count DESC;";
Z
update  
zhiru 已提交
1408

1409
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
1410

1411
            res = filesToMergeQuery.store();
S
starlord 已提交
1412
        }  // Scoped Connection
1413

S
starlord 已提交
1414
        Status ret;
1415
        int64_t to_merge_files = 0;
S
starlord 已提交
1416
        for (auto& resRow : res) {
S
starlord 已提交
1417 1418
            TableFileSchema table_file;
            table_file.file_size_ = resRow["file_size"];
S
starlord 已提交
1419
            if (table_file.file_size_ >= table_schema.index_file_size_) {
S
starlord 已提交
1420
                continue;  // skip large file
S
starlord 已提交
1421
            }
Z
update  
zhiru 已提交
1422

S
starlord 已提交
1423
            table_file.id_ = resRow["id"];  // implicit conversion
G
groot 已提交
1424 1425
            resRow["table_id"].to_string(table_file.table_id_);
            resRow["file_id"].to_string(table_file.file_id_);
1426
            table_file.file_type_ = resRow["file_type"];
S
starlord 已提交
1427
            table_file.row_count_ = resRow["row_count"];
1428
            table_file.date_ = resRow["date"];
1429
            table_file.index_file_size_ = table_schema.index_file_size_;
S
starlord 已提交
1430
            table_file.engine_type_ = resRow["engine_type"];
S
starlord 已提交
1431
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
1432
            table_file.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
1433
            table_file.created_on_ = resRow["created_on"];
1434
            table_file.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1435

S
starlord 已提交
1436
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1437
            if (!status.ok()) {
S
starlord 已提交
1438
                ret = status;
S
starlord 已提交
1439
            }
Z
update  
zhiru 已提交
1440

1441 1442 1443
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1444
                to_merge_files++;
1445
            }
1446 1447

            files[table_file.date_].push_back(table_file);
1448
        }
Z
update  
zhiru 已提交
1449

1450 1451
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1452
        }
S
starlord 已提交
1453
        return ret;
S
starlord 已提交
1454
    } catch (std::exception& e) {
S
starlord 已提交
1455
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
1456 1457 1458
    }
}

S
starlord 已提交
1459
Status
G
groot 已提交
1460 1461
MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) {
    files.clear();
Z
zhiru 已提交
1462

1463
    try {
G
groot 已提交
1464
        server::MetricCollector metric;
S
starlord 已提交
1465
        mysqlpp::StoreQueryResult res;
1466
        {
S
starlord 已提交
1467
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1468 1469

            if (connectionPtr == nullptr) {
G
groot 已提交
1470
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1471 1472
            }

G
groot 已提交
1473 1474 1475 1476 1477
            mysqlpp::Query filesToIndexQuery = connectionPtr->query();
            filesToIndexQuery
                << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
                << " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                << ";";
1478

G
groot 已提交
1479
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
1480

G
groot 已提交
1481
            res = filesToIndexQuery.store();
S
starlord 已提交
1482
        }  // Scoped Connection
1483

S
starlord 已提交
1484
        Status ret;
G
groot 已提交
1485 1486
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
S
starlord 已提交
1487
        for (auto& resRow : res) {
G
groot 已提交
1488 1489 1490 1491 1492 1493 1494 1495 1496
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
            table_file.engine_type_ = resRow["engine_type"];
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.file_type_ = resRow["file_type"];
            table_file.file_size_ = resRow["file_size"];
            table_file.row_count_ = resRow["row_count"];
            table_file.date_ = resRow["date"];
            table_file.created_on_ = resRow["created_on"];
Z
update  
zhiru 已提交
1497

G
groot 已提交
1498 1499 1500 1501 1502 1503 1504
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
1505
                }
G
groot 已提交
1506
                groups[table_file.table_id_] = table_schema;
Z
zhiru 已提交
1507
            }
G
groot 已提交
1508 1509 1510 1511
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
1512

G
groot 已提交
1513 1514 1515
            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
1516
            }
Z
zhiru 已提交
1517

G
groot 已提交
1518 1519
            files.push_back(table_file);
        }
Z
update  
zhiru 已提交
1520

G
groot 已提交
1521 1522
        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-index files";
1523
        }
G
groot 已提交
1524
        return ret;
S
starlord 已提交
1525
    } catch (std::exception& e) {
G
groot 已提交
1526
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
1527 1528
    }
}
1529

S
starlord 已提交
1530
Status
G
groot 已提交
1531
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
G
groot 已提交
1532
                           TableFilesSchema& table_files) {
G
groot 已提交
1533 1534
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
Z
update  
zhiru 已提交
1535 1536
    }

1537
    try {
G
groot 已提交
1538
        table_files.clear();
G
groot 已提交
1539 1540

        mysqlpp::StoreQueryResult res;
1541
        {
S
starlord 已提交
1542
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1543

1544
            if (connectionPtr == nullptr) {
G
groot 已提交
1545
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1546
            }
Z
zhiru 已提交
1547

G
groot 已提交
1548 1549 1550 1551
            std::string types;
            for (auto type : file_types) {
                if (!types.empty()) {
                    types += ",";
Z
update  
zhiru 已提交
1552
                }
G
groot 已提交
1553
                types += std::to_string(type);
1554
            }
Z
update  
zhiru 已提交
1555

G
groot 已提交
1556 1557
            mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
            // since table_id is a unique column we just need to check whether it exists or not
G
groot 已提交
1558 1559 1560 1561
            hasNonIndexFilesQuery
                << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
                << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                << " AND file_type in (" << types << ");";
1562

G
groot 已提交
1563
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
Z
update  
zhiru 已提交
1564

G
groot 已提交
1565
            res = hasNonIndexFilesQuery.store();
S
starlord 已提交
1566
        }  // Scoped Connection
1567

G
groot 已提交
1568 1569 1570 1571
        if (res.num_rows() > 0) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
            for (auto& resRow : res) {
G
groot 已提交
1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583
                TableFileSchema file_schema;
                file_schema.id_ = resRow["id"];
                file_schema.table_id_ = table_id;
                file_schema.engine_type_ = resRow["engine_type"];
                resRow["file_id"].to_string(file_schema.file_id_);
                file_schema.file_type_ = resRow["file_type"];
                file_schema.file_size_ = resRow["file_size"];
                file_schema.row_count_ = resRow["row_count"];
                file_schema.date_ = resRow["date"];
                file_schema.created_on_ = resRow["created_on"];

                table_files.emplace_back(file_schema);
1584

G
groot 已提交
1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609
                int32_t file_type = resRow["file_type"];
                switch (file_type) {
                    case (int)TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int)TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int)TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int)TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int)TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    case (int)TableFileSchema::INDEX:
                        index_count++;
                        break;
                    case (int)TableFileSchema::BACKUP:
                        backup_count++;
                        break;
                    default:
                        break;
Z
update  
zhiru 已提交
1610
                }
1611 1612
            }

G
groot 已提交
1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
            std::string msg = "Get table files by type. ";
            for (int file_type : file_types) {
                switch (file_type) {
                    case (int)TableFileSchema::RAW:
                        msg = msg + "raw files:" + std::to_string(raw_count);
                        break;
                    case (int)TableFileSchema::NEW:
                        msg = msg + "new files:" + std::to_string(raw_count);
                        break;
                    case (int)TableFileSchema::NEW_MERGE:
                        msg = msg + "new_merge files:" + std::to_string(raw_count);
                        break;
                    case (int)TableFileSchema::NEW_INDEX:
                        msg = msg + "new_index files:" + std::to_string(raw_count);
                        break;
                    case (int)TableFileSchema::TO_INDEX:
                        msg = msg + "to_index files:" + std::to_string(raw_count);
                        break;
                    case (int)TableFileSchema::INDEX:
                        msg = msg + "index files:" + std::to_string(raw_count);
                        break;
                    case (int)TableFileSchema::BACKUP:
                        msg = msg + "backup files:" + std::to_string(raw_count);
                        break;
                    default:
                        break;
                }
            }
            ENGINE_LOG_DEBUG << msg;
G
groot 已提交
1642
        }
S
starlord 已提交
1643
    } catch (std::exception& e) {
G
groot 已提交
1644
        return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
1645
    }
S
starlord 已提交
1646

1647 1648
    return Status::OK();
}
1649

G
groot 已提交
1650
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1651
Status
G
groot 已提交
1652 1653 1654 1655 1656
MySQLMetaImpl::Archive() {
    auto& criterias = options_.archive_conf_.GetCriterias();
    if (criterias.empty()) {
        return Status::OK();
    }
1657

G
groot 已提交
1658 1659 1660 1661 1662 1663
    for (auto& kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
            size_t usecs = limit * D_SEC * US_PS;
            int64_t now = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
1664

G
groot 已提交
1665 1666
            try {
                mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1667

G
groot 已提交
1668 1669 1670
                if (connectionPtr == nullptr) {
                    return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
                }
1671

G
groot 已提交
1672 1673 1674 1675 1676
                mysqlpp::Query archiveQuery = connectionPtr->query();
                archiveQuery << "UPDATE " << META_TABLEFILES
                             << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                             << " WHERE created_on < " << std::to_string(now - usecs) << " AND file_type <> "
                             << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1677

G
groot 已提交
1678 1679 1680 1681 1682 1683 1684 1685 1686 1687
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();

                if (!archiveQuery.exec()) {
                    return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
                }

                ENGINE_LOG_DEBUG << "Archive old files";
            } catch (std::exception& e) {
                return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
            }
Z
fix  
zhiru 已提交
1688
        }
G
groot 已提交
1689 1690 1691
        if (criteria == engine::ARCHIVE_CONF_DISK) {
            uint64_t sum = 0;
            Size(sum);
Z
fix  
zhiru 已提交
1692

G
groot 已提交
1693 1694 1695 1696 1697
            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);

            ENGINE_LOG_DEBUG << "Archive files to free disk";
        }
1698
    }
Z
update  
zhiru 已提交
1699

1700 1701
    return Status::OK();
}
Z
zhiru 已提交
1702

S
starlord 已提交
1703
Status
G
groot 已提交
1704 1705 1706
MySQLMetaImpl::Size(uint64_t& result) {
    result = 0;

1707
    try {
G
groot 已提交
1708
        mysqlpp::StoreQueryResult res;
1709
        {
S
starlord 已提交
1710
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1711 1712

            if (connectionPtr == nullptr) {
G
groot 已提交
1713
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1714
            }
Z
update  
zhiru 已提交
1715

G
groot 已提交
1716 1717 1718 1719
            mysqlpp::Query getSizeQuery = connectionPtr->query();
            getSizeQuery << "SELECT IFNULL(SUM(file_size),0) AS sum"
                         << " FROM " << META_TABLEFILES << " WHERE file_type <> "
                         << std::to_string(TableFileSchema::TO_DELETE) << ";";
1720

G
groot 已提交
1721
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
1722

G
groot 已提交
1723 1724
            res = getSizeQuery.store();
        }  // Scoped Connection
1725

G
groot 已提交
1726 1727 1728 1729 1730 1731 1732 1733
        if (res.empty()) {
            result = 0;
        } else {
            result = res[0]["sum"];
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
    }
1734

G
groot 已提交
1735 1736
    return Status::OK();
}
1737

G
groot 已提交
1738
Status
1739
MySQLMetaImpl::CleanUpShadowFiles() {
G
groot 已提交
1740 1741
    try {
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1742

G
groot 已提交
1743 1744 1745
        if (connectionPtr == nullptr) {
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
        }
1746

G
groot 已提交
1747 1748 1749 1750 1751
        mysqlpp::Query cleanUpQuery = connectionPtr->query();
        cleanUpQuery << "SELECT table_name"
                     << " FROM information_schema.tables"
                     << " WHERE table_schema = " << mysqlpp::quote << mysql_connection_pool_->getDB()
                     << " AND table_name = " << mysqlpp::quote << META_TABLEFILES << ";";
Z
fix  
zhiru 已提交
1752

G
groot 已提交
1753
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
1754

G
groot 已提交
1755
        mysqlpp::StoreQueryResult res = cleanUpQuery.store();
1756

G
groot 已提交
1757 1758 1759 1760 1761 1762 1763 1764 1765 1766
        if (!res.empty()) {
            ENGINE_LOG_DEBUG << "Remove table file type as NEW";
            cleanUpQuery << "DELETE FROM " << META_TABLEFILES << " WHERE file_type IN ("
                         << std::to_string(TableFileSchema::NEW) << "," << std::to_string(TableFileSchema::NEW_MERGE)
                         << "," << std::to_string(TableFileSchema::NEW_INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();

            if (!cleanUpQuery.exec()) {
                return HandleException("QUERY ERROR WHEN CLEANING UP FILES", cleanUpQuery.error());
1767
            }
G
groot 已提交
1768
        }
1769

G
groot 已提交
1770 1771 1772
        if (res.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << res.size() << " files";
        }
S
starlord 已提交
1773
    } catch (std::exception& e) {
G
groot 已提交
1774
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
1775
    }
S
starlord 已提交
1776

1777 1778
    return Status::OK();
}
Z
fix  
zhiru 已提交
1779

S
starlord 已提交
1780
Status
1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823
MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
    auto now = utils::GetMicroSecTimeStamp();

    // erase deleted/backup files from cache
    try {
        server::MetricCollector metric;

        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

        if (connectionPtr == nullptr) {
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
        }

        mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
        cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
                                 << " FROM " << META_TABLEFILES << " WHERE file_type IN ("
                                 << std::to_string(TableFileSchema::TO_DELETE) << ","
                                 << std::to_string(TableFileSchema::BACKUP) << ")"
                                 << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";

        mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();

        TableFileSchema table_file;
        std::vector<std::string> idsToDelete;

        for (auto& resRow : res) {
            table_file.id_ = resRow["id"];  // implicit conversion
            resRow["table_id"].to_string(table_file.table_id_);
            resRow["file_id"].to_string(table_file.file_id_);
            table_file.date_ = resRow["date"];

            utils::GetTableFilePath(options_, table_file);
            server::CommonUtil::EraseFromCache(table_file.location_);
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
1824
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1825 1826
    std::set<std::string> table_ids;

S
starlord 已提交
1827
    // remove to_delete files
1828
    try {
Y
Yu Kun 已提交
1829
        server::MetricCollector metric;
1830

1831
        {
S
starlord 已提交
1832
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1833

1834
            if (connectionPtr == nullptr) {
G
groot 已提交
1835
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
1836
            }
Z
zhiru 已提交
1837

S
starlord 已提交
1838
            mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
G
groot 已提交
1839 1840 1841 1842
            cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
                                     << " FROM " << META_TABLEFILES
                                     << " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                                     << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
Z
update  
zhiru 已提交
1843

1844
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1845

S
starlord 已提交
1846
            mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1847

1848 1849
            TableFileSchema table_file;
            std::vector<std::string> idsToDelete;
1850

S
starlord 已提交
1851 1852
            for (auto& resRow : res) {
                table_file.id_ = resRow["id"];  // implicit conversion
G
groot 已提交
1853 1854
                resRow["table_id"].to_string(table_file.table_id_);
                resRow["file_id"].to_string(table_file.file_id_);
1855
                table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1856

1857 1858
                utils::DeleteTableFilePath(options_, table_file);

S
starlord 已提交
1859
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
1860 1861

                idsToDelete.emplace_back(std::to_string(table_file.id_));
S
starlord 已提交
1862
                table_ids.insert(table_file.table_id_);
1863 1864 1865 1866
            }

            if (!idsToDelete.empty()) {
                std::stringstream idsToDeleteSS;
S
starlord 已提交
1867
                for (auto& id : idsToDelete) {
1868
                    idsToDeleteSS << "id = " << id << " OR ";
1869
                }
1870

1871
                std::string idsToDeleteStr = idsToDeleteSS.str();
S
starlord 已提交
1872
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4);  // remove the last " OR "
G
groot 已提交
1873
                cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";";
1874

1875 1876 1877
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1878 1879
                    return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL",
                                           cleanUpFilesWithTTLQuery.error());
1880 1881
                }
            }
1882

S
starlord 已提交
1883
            if (res.size() > 0) {
1884 1885
                ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds";
            }
S
starlord 已提交
1886 1887
        }  // Scoped Connection
    } catch (std::exception& e) {
S
starlord 已提交
1888
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
Z
update  
zhiru 已提交
1889
    }
1890

S
starlord 已提交
1891
    // remove to_delete tables
1892
    try {
Y
Yu Kun 已提交
1893
        server::MetricCollector metric;
1894 1895

        {
S
starlord 已提交
1896
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1897

Z
update  
zhiru 已提交
1898
            if (connectionPtr == nullptr) {
G
groot 已提交
1899
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
update  
zhiru 已提交
1900 1901
            }

S
starlord 已提交
1902
            mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
G
groot 已提交
1903 1904 1905
            cleanUpFilesWithTTLQuery << "SELECT id, table_id"
                                     << " FROM " << META_TABLES
                                     << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
1906

1907 1908
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

S
starlord 已提交
1909
            mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1910

1911
            int64_t remove_tables = 0;
Z
update  
zhiru 已提交
1912
            if (!res.empty()) {
1913
                std::stringstream idsToDeleteSS;
S
starlord 已提交
1914
                for (auto& resRow : res) {
1915 1916 1917
                    size_t id = resRow["id"];
                    std::string table_id;
                    resRow["table_id"].to_string(table_id);
Z
update  
zhiru 已提交
1918

S
starlord 已提交
1919
                    utils::DeleteTablePath(options_, table_id, false);  // only delete empty folder
1920
                    remove_tables++;
1921
                    idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
Z
update  
zhiru 已提交
1922
                }
1923
                std::string idsToDeleteStr = idsToDeleteSS.str();
S
starlord 已提交
1924
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4);  // remove the last " OR "
G
groot 已提交
1925
                cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";";
1926

1927
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1928

1929
                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1930 1931
                    return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL",
                                           cleanUpFilesWithTTLQuery.error());
1932 1933
                }
            }
1934

1935 1936
            if (remove_tables > 0) {
                ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables from meta";
1937
            }
S
starlord 已提交
1938 1939
        }  // Scoped Connection
    } catch (std::exception& e) {
S
starlord 已提交
1940
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
Z
update  
zhiru 已提交
1941 1942
    }

S
starlord 已提交
1943 1944
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1945
    try {
Y
Yu Kun 已提交
1946
        server::MetricCollector metric;
S
starlord 已提交
1947 1948

        {
S
starlord 已提交
1949
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
1950 1951

            if (connectionPtr == nullptr) {
G
groot 已提交
1952
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
S
starlord 已提交
1953 1954
            }

S
starlord 已提交
1955
            for (auto& table_id : table_ids) {
S
starlord 已提交
1956
                mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
G
groot 已提交
1957 1958 1959
                cleanUpFilesWithTTLQuery << "SELECT file_id"
                                         << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote
                                         << table_id << ";";
S
starlord 已提交
1960 1961 1962

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

S
starlord 已提交
1963
                mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
S
starlord 已提交
1964 1965 1966 1967 1968

                if (res.empty()) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }
1969

S
starlord 已提交
1970
            if (table_ids.size() > 0) {
1971 1972
                ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
            }
S
starlord 已提交
1973
        }
S
starlord 已提交
1974
    } catch (std::exception& e) {
S
starlord 已提交
1975
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
S
starlord 已提交
1976 1977
    }

1978 1979
    return Status::OK();
}
1980

S
starlord 已提交
1981
Status
S
starlord 已提交
1982
MySQLMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1983
    try {
Y
Yu Kun 已提交
1984
        server::MetricCollector metric;
1985 1986 1987 1988 1989 1990 1991

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);

        if (!status.ok()) {
            return status;
1992
        }
Z
zhiru 已提交
1993

S
starlord 已提交
1994
        mysqlpp::StoreQueryResult res;
1995
        {
S
starlord 已提交
1996
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1997

Z
update  
zhiru 已提交
1998
            if (connectionPtr == nullptr) {
G
groot 已提交
1999
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
update  
zhiru 已提交
2000 2001
            }

S
starlord 已提交
2002
            mysqlpp::Query countQuery = connectionPtr->query();
G
groot 已提交
2003 2004 2005 2006 2007
            countQuery << "SELECT row_count"
                       << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
                       << " AND (file_type = " << std::to_string(TableFileSchema::RAW)
                       << " OR file_type = " << std::to_string(TableFileSchema::TO_INDEX)
                       << " OR file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
Z
update  
zhiru 已提交
2008

2009
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
Z
update  
zhiru 已提交
2010

2011
            res = countQuery.store();
S
starlord 已提交
2012
        }  // Scoped Connection
2013 2014

        result = 0;
S
starlord 已提交
2015
        for (auto& resRow : res) {
S
starlord 已提交
2016
            size_t size = resRow["row_count"];
2017
            result += size;
2018
        }
S
starlord 已提交
2019
    } catch (std::exception& e) {
S
starlord 已提交
2020
        return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
Z
update  
zhiru 已提交
2021
    }
S
starlord 已提交
2022

2023 2024 2025
    return Status::OK();
}

S
starlord 已提交
2026 2027
Status
MySQLMetaImpl::DropAll() {
2028
    try {
S
starlord 已提交
2029
        ENGINE_LOG_DEBUG << "Drop all mysql meta";
S
starlord 已提交
2030
        mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
2031 2032

        if (connectionPtr == nullptr) {
G
groot 已提交
2033
            return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
Z
zhiru 已提交
2034
        }
2035

S
starlord 已提交
2036
        mysqlpp::Query dropTableQuery = connectionPtr->query();
2037
        dropTableQuery << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
2038 2039 2040 2041 2042 2043

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();

        if (dropTableQuery.exec()) {
            return Status::OK();
        }
S
starlord 已提交
2044
        return HandleException("QUERY ERROR WHEN DROPPING ALL", dropTableQuery.error());
S
starlord 已提交
2045
    } catch (std::exception& e) {
S
starlord 已提交
2046
        return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
2047 2048 2049
    }
}

G
groot 已提交
2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115
Status
MySQLMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
        server::MetricCollector metric;
        bool status;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);

            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query discardFilesQuery = connectionPtr->query();
            discardFilesQuery << "SELECT id, file_size"
                              << " FROM " << META_TABLEFILES << " WHERE file_type <> "
                              << std::to_string(TableFileSchema::TO_DELETE) << " ORDER BY id ASC "
                              << " LIMIT 10;";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();

            mysqlpp::StoreQueryResult res = discardFilesQuery.store();
            if (res.num_rows() == 0) {
                return Status::OK();
            }

            TableFileSchema table_file;
            std::stringstream idsToDiscardSS;
            for (auto& resRow : res) {
                if (to_discard_size <= 0) {
                    break;
                }
                table_file.id_ = resRow["id"];
                table_file.file_size_ = resRow["file_size"];
                idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            std::string idsToDiscardStr = idsToDiscardSS.str();
            idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4);  // remove the last " OR "

            discardFilesQuery << "UPDATE " << META_TABLEFILES
                              << " SET file_type = " << std::to_string(TableFileSchema::TO_DELETE)
                              << " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " WHERE "
                              << idsToDiscardStr << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();

            status = discardFilesQuery.exec();
            if (!status) {
                return HandleException("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
            }
        }  // Scoped Connection

        return DiscardFiles(to_discard_size);
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
    }
}

S
starlord 已提交
2116 2117 2118
}  // namespace meta
}  // namespace engine
}  // namespace milvus