MySQLMetaImpl.cpp 75.9 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

Z
update  
zhiru 已提交
18
#include "MySQLMetaImpl.h"
S
starlord 已提交
19 20
#include "db/IDGenerator.h"
#include "db/Utils.h"
S
starlord 已提交
21
#include "utils/Log.h"
22
#include "utils/Exception.h"
Z
update  
zhiru 已提交
23 24 25 26 27 28 29 30 31 32
#include "MetaConsts.h"
#include "metrics/Metrics.h"

#include <unistd.h>
#include <sstream>
#include <iostream>
#include <chrono>
#include <fstream>
#include <regex>
#include <string>
Z
zhiru 已提交
33
#include <mutex>
Z
zhiru 已提交
34
#include <thread>
35 36 37
#include <string.h>
#include <boost/filesystem.hpp>
#include <mysql++/mysql++.h>
Z
update  
zhiru 已提交
38

39

Z
update  
zhiru 已提交
40 41 42 43 44
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {

45
using namespace mysqlpp;
Z
update  
zhiru 已提交
46

47
namespace {
Z
update  
zhiru 已提交
48

S
starlord 已提交
49 50 51 52 53 54 55 56 57
Status HandleException(const std::string &desc, const char* what = nullptr) {
    if(what == nullptr) {
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
58
}
Z
update  
zhiru 已提交
59

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
class MetaField {
public:
    MetaField(const std::string& name, const std::string& type, const std::string& setting)
        : name_(name),
          type_(type),
          setting_(setting) {
    }

    std::string name() const {
        return name_;
    }

    std::string ToString() const {
        return name_ + " " + type_ + " " + setting_;
    }

    // mysql field type has additional information. for instance, a filed type is defined as 'BIGINT'
    // we get the type from sql is 'bigint(20)', so we need to ignore the '(20)'
    bool IsEqual(const MetaField& field) const {
        size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length();
        size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length();
        return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 &&
                strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0;
    }

private:
    std::string name_;
    std::string type_;
    std::string setting_;
};

using MetaFields = std::vector<MetaField>;
class MetaSchema {
public:
    MetaSchema(const std::string& name, const MetaFields& fields)
        : name_(name),
          fields_(fields) {
    }

    std::string name() const {
        return name_;
    }

    std::string ToString() const {
        std::string result;
        for(auto& field : fields_) {
            if(!result.empty()) {
                result += ",";
            }
            result += field.ToString();
        }
        return result;
    }

    //if the outer fields contains all this MetaSchema fields, return true
    //otherwise return false
    bool IsEqual(const MetaFields& fields) const {
        std::vector<std::string> found_field;
        for(const auto& this_field : fields_) {
            for(const auto& outer_field : fields) {
                if(this_field.IsEqual(outer_field)) {
                    found_field.push_back(this_field.name());
                    break;
                }
            }
        }

        return found_field.size() == fields_.size();
    }

private:
    std::string name_;
    MetaFields fields_;
};

//Tables schema
static const MetaSchema TABLES_SCHEMA(META_TABLES, {
    MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
    MetaField("table_id", "VARCHAR(255)", "UNIQUE NOT NULL"),
    MetaField("state", "INT", "NOT NULL"),
    MetaField("dimension", "SMALLINT", "NOT NULL"),
    MetaField("created_on", "BIGINT", "NOT NULL"),
    MetaField("flag", "BIGINT", "DEFAULT 0 NOT NULL"),
    MetaField("index_file_size", "BIGINT", "DEFAULT 1024 NOT NULL"),
    MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
    MetaField("nlist", "INT", "DEFAULT 16384 NOT NULL"),
    MetaField("metric_type", "INT", "DEFAULT 1 NOT NULL"),
});

//TableFiles schema
static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
    MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
    MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
    MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
    MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
    MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"),
    MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
    MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"),
    MetaField("updated_time", "BIGINT", "NOT NULL"),
    MetaField("created_on", "BIGINT", "NOT NULL"),
    MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
});

163 164
}

165
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
166 167 168 169 170 171 172
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
    : options_(options_),
      mode_(mode) {
    Initialize();
}

MySQLMetaImpl::~MySQLMetaImpl() {
S
starlord 已提交
173

174 175
}

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    table_id = ss.str();
    return Status::OK();
}

Status MySQLMetaImpl::NextFileId(std::string &file_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

192 193 194 195 196
void MySQLMetaImpl::ValidateMetaSchema() {
    if(nullptr == mysql_connection_pool_) {
        return;
    }

S
starlord 已提交
197
    ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
    if (connectionPtr == nullptr) {
        return;
    }

    auto validate_func = [&](const MetaSchema& schema) {
        Query query_statement = connectionPtr->query();
        query_statement << "DESC " << schema.name() << ";";

        MetaFields exist_fields;

        try {
            StoreQueryResult res = query_statement.store();
            for (size_t i = 0; i < res.num_rows(); i++) {
                const Row &row = res[i];
                std::string name, type;
                row["Field"].to_string(name);
                row["Type"].to_string(type);

                exist_fields.push_back(MetaField(name, type, ""));
            }
        } catch (std::exception &e) {
            ENGINE_LOG_DEBUG << "Meta table '" << schema.name() << "' not exist and will be created";
        }

        if(exist_fields.empty()) {
            return true;
        }

        return schema.IsEqual(exist_fields);
    };

    //verify Tables
    if (!validate_func(TABLES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }

    //verufy TableFiles
    if (!validate_func(TABLEFILES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

240
Status MySQLMetaImpl::Initialize() {
241
    //step 1: create db root path
S
starlord 已提交
242 243
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
244
        if (!ret) {
S
starlord 已提交
245
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
246 247
            ENGINE_LOG_ERROR << msg;
            return Status(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
248 249 250
        }
    }

S
starlord 已提交
251
    std::string uri = options_.backend_uri_;
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
    //step 2: parse and check meta uri
    utils::MetaUriInfo uri_info;
    auto status = utils::ParseMetaUri(uri, uri_info);
    if(!status.ok()) {
        std::string msg = "Wrong URI format: " + uri;
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

    if (strcasecmp(uri_info.dialect_.c_str(), "mysql") != 0) {
        std::string msg = "URI's dialect is not MySQL";
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_INVALID_META_URI, msg);
    }

    //step 3: connect mysql
    int thread_hint = std::thread::hardware_concurrency();
    int max_pool_size = (thread_hint == 0) ? 8 : thread_hint;
    unsigned int port = 0;
    if (!uri_info.port_.empty()) {
        port = std::stoi(uri_info.port_);
    }

    mysql_connection_pool_ =
            std::make_shared<MySQLConnectionPool>(uri_info.db_name_, uri_info.username_,
                    uri_info.password_, uri_info.host_, port, max_pool_size);
    ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(max_pool_size);

    //step 4: validate to avoid open old version schema
    ValidateMetaSchema();

    //step 5: create meta tables
    try {

        if (mode_ != DBOptions::MODE::READ_ONLY) {
            CleanUp();
289 290
        }

291
        {
S
starlord 已提交
292
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
293

294 295 296
            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to database server");
            }
297 298


299 300 301
            if (!connectionPtr->thread_aware()) {
                ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
                return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
302
            }
303
            Query InitializeQuery = connectionPtr->query();
304

305 306
            InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
                            TABLES_SCHEMA.name() << " (" << TABLES_SCHEMA.ToString() + ");";
307

308
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
Z
update  
zhiru 已提交
309

310 311 312
            if (!InitializeQuery.exec()) {
                return HandleException("Initialization Error", InitializeQuery.error());
            }
313

314 315
            InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
                            TABLEFILES_SCHEMA.name() << " (" << TABLEFILES_SCHEMA.ToString() + ");";
Z
update  
zhiru 已提交
316

317
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
Z
update  
zhiru 已提交
318

319 320 321 322
            if (!InitializeQuery.exec()) {
                return HandleException("Initialization Error", InitializeQuery.error());
            }
        } //Scoped Connection
Z
update  
zhiru 已提交
323

324 325
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
Z
update  
zhiru 已提交
326
    }
S
starlord 已提交
327 328

    return Status::OK();
329 330 331 332 333 334
}

// PXU TODO: Temp solution. Will fix later
Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                            const DatesT &dates) {
    if (dates.empty()) {
P
peng.xu 已提交
335 336 337
        return Status::OK();
    }

338 339 340 341 342 343
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
Z
zhiru 已提交
344

345 346 347 348 349 350 351
    try {
        std::stringstream dateListSS;
        for (auto &date : dates) {
            dateListSS << std::to_string(date) << ", ";
        }
        std::string dateListStr = dateListSS.str();
        dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", "
Z
update  
zhiru 已提交
352

353
        {
S
starlord 已提交
354
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
355

356
            if (connectionPtr == nullptr) {
S
starlord 已提交
357
                return Status(DB_ERROR, "Failed to connect to database server");
358
            }
Z
update  
zhiru 已提交
359

Z
update  
zhiru 已提交
360

361
            Query dropPartitionsByDatesQuery = connectionPtr->query();
362

363 364
            dropPartitionsByDatesQuery << "UPDATE " <<
                                       META_TABLEFILES << " " <<
365 366
                                       "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
                                       "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
367 368
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "date in (" << dateListStr << ");";
Z
update  
zhiru 已提交
369

370
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str();
Z
update  
zhiru 已提交
371

372
            if (!dropPartitionsByDatesQuery.exec()) {
S
starlord 已提交
373
                return HandleException("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error());
Z
update  
zhiru 已提交
374
            }
375
        } //Scoped Connection
376 377 378

        ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;

S
starlord 已提交
379 380
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
Z
update  
zhiru 已提交
381
    }
382 383
    return Status::OK();
}
Z
update  
zhiru 已提交
384

385 386
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
387
        server::MetricCollector metric;
388
        {
S
starlord 已提交
389
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
390

391
            if (connectionPtr == nullptr) {
S
starlord 已提交
392
                return Status(DB_ERROR, "Failed to connect to database server");
393
            }
Z
update  
zhiru 已提交
394

395
            Query createTableQuery = connectionPtr->query();
Z
update  
zhiru 已提交
396

397 398 399
            if (table_schema.table_id_.empty()) {
                NextTableId(table_schema.table_id_);
            } else {
400 401
                createTableQuery << "SELECT state FROM " <<
                                 META_TABLES << " " <<
402
                                 "WHERE table_id = " << quote << table_schema.table_id_ << ";";
Z
zhiru 已提交
403

404
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
Z
update  
zhiru 已提交
405

406
                StoreQueryResult res = createTableQuery.store();
407

408 409 410
                if (res.num_rows() == 1) {
                    int state = res[0]["state"];
                    if (TableSchema::TO_DELETE == state) {
S
starlord 已提交
411
                        return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
412
                    } else {
S
starlord 已提交
413
                        return Status(DB_ALREADY_EXIST, "Table already exists");
414 415 416
                    }
                }
            }
417

418 419
            table_schema.id_ = -1;
            table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
420

421 422 423 424 425
            std::string id = "NULL"; //auto-increment
            std::string table_id = table_schema.table_id_;
            std::string state = std::to_string(table_schema.state_);
            std::string dimension = std::to_string(table_schema.dimension_);
            std::string created_on = std::to_string(table_schema.created_on_);
S
starlord 已提交
426 427
            std::string flag = std::to_string(table_schema.flag_);
            std::string index_file_size = std::to_string(table_schema.index_file_size_);
428
            std::string engine_type = std::to_string(table_schema.engine_type_);
S
starlord 已提交
429 430
            std::string nlist = std::to_string(table_schema.nlist_);
            std::string metric_type = std::to_string(table_schema.metric_type_);
Z
update  
zhiru 已提交
431

432 433 434
            createTableQuery << "INSERT INTO " <<
                             META_TABLES << " " <<
                             "VALUES(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " <<
S
starlord 已提交
435 436
                             created_on << ", " << flag << ", " << index_file_size << ", " << engine_type << ", " <<
                             nlist << ", " << metric_type << ");";
Z
update  
zhiru 已提交
437

438
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
439

440 441
            if (SimpleResult res = createTableQuery.execute()) {
                table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
442

443 444
                //Consume all results to avoid "Commands out of sync" error
            } else {
S
starlord 已提交
445
                return HandleException("Add Table Error", createTableQuery.error());
446
            }
447
        } //Scoped Connection
448

449
        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
450
        return utils::CreateTablePath(options_, table_schema.table_id_);
Z
update  
zhiru 已提交
451

452
    } catch (std::exception &e) {
S
starlord 已提交
453
        return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
454 455
    }
}
456

457 458 459 460
Status MySQLMetaImpl::FilesByType(const std::string &table_id,
                                  const std::vector<int> &file_types,
                                  std::vector<std::string> &file_ids) {
    if(file_types.empty()) {
S
starlord 已提交
461
        return Status(DB_ERROR, "file types array is empty");
462
    }
Z
zhiru 已提交
463 464

    try {
465 466
        file_ids.clear();

Z
zhiru 已提交
467 468
        StoreQueryResult res;
        {
S
starlord 已提交
469
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
470 471

            if (connectionPtr == nullptr) {
S
starlord 已提交
472
                return Status(DB_ERROR, "Failed to connect to database server");
Z
zhiru 已提交
473 474
            }

475 476 477 478 479 480 481 482
            std::string types;
            for(auto type : file_types) {
                if(!types.empty()) {
                    types += ",";
                }
                types += std::to_string(type);
            }

Z
zhiru 已提交
483 484
            Query hasNonIndexFilesQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
485 486
            hasNonIndexFilesQuery << "SELECT file_id, file_type FROM " <<
                                  META_TABLEFILES << " " <<
Z
fix  
zhiru 已提交
487
                                  "WHERE table_id = " << quote << table_id << " AND " <<
488
                                  "file_type in (" << types << ");";
Z
zhiru 已提交
489

490
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
Z
zhiru 已提交
491 492 493 494

            res = hasNonIndexFilesQuery.store();
        } //Scoped Connection

495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
        if (res.num_rows() > 0) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
            for (auto &resRow : res) {
                std::string file_id;
                resRow["file_id"].to_string(file_id);
                file_ids.push_back(file_id);

                int32_t file_type = resRow["file_type"];
                switch (file_type) {
                    case (int) TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int) TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int) TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int) TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int) TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    case (int) TableFileSchema::INDEX:
                        index_count++;
                        break;
                    case (int) TableFileSchema::BACKUP:
                        backup_count++;
                        break;
                    default:
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                             << " new files:" << new_count << " new_merge files:" << new_merge_count
                             << " new_index files:" << new_index_count << " to_index files:" << to_index_count
                             << " index files:" << index_count << " backup files:" << backup_count;
        }
Z
zhiru 已提交
536

S
starlord 已提交
537 538
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
Z
zhiru 已提交
539 540
    }

541 542
    return Status::OK();
}
543

S
starlord 已提交
544
Status MySQLMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex& index) {
S
starlord 已提交
545
    try {
Y
Yu Kun 已提交
546
        server::MetricCollector metric;
S
starlord 已提交
547 548

        {
S
starlord 已提交
549
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
550 551

            if (connectionPtr == nullptr) {
S
starlord 已提交
552
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
553 554 555
            }

            Query updateTableIndexParamQuery = connectionPtr->query();
556 557
            updateTableIndexParamQuery << "SELECT id, state, dimension, created_on FROM " <<
                                       META_TABLES << " " <<
S
starlord 已提交
558 559 560
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

S
starlord 已提交
561
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
S
starlord 已提交
562 563 564 565 566 567 568 569 570 571 572

            StoreQueryResult res = updateTableIndexParamQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                size_t id = resRow["id"];
                int32_t state = resRow["state"];
                uint16_t dimension = resRow["dimension"];
                int64_t created_on = resRow["created_on"];

573 574
                updateTableIndexParamQuery << "UPDATE " <<
                                           META_TABLES << " " <<
S
starlord 已提交
575 576 577 578
                                           "SET id = " << id << ", " <<
                                           "state = " << state << ", " <<
                                           "dimension = " << dimension << ", " <<
                                           "created_on = " << created_on << ", " <<
S
starlord 已提交
579
                                           "engine_type = " << index.engine_type_ << ", " <<
S
starlord 已提交
580
                                           "nlist = " << index.nlist_ << ", " <<
S
starlord 已提交
581
                                           "metric_type = " << index.metric_type_ << " " <<
S
starlord 已提交
582
                                           "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
583

S
starlord 已提交
584
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
S
starlord 已提交
585 586 587


                if (!updateTableIndexParamQuery.exec()) {
S
starlord 已提交
588
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM", updateTableIndexParamQuery.error());
S
starlord 已提交
589 590
                }
            } else {
S
starlord 已提交
591
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
S
starlord 已提交
592 593 594 595
            }

        } //Scoped Connection

596 597
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;

S
starlord 已提交
598 599
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
S
starlord 已提交
600 601
    }

602 603 604
    return Status::OK();
}

S
starlord 已提交
605 606
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    try {
Y
Yu Kun 已提交
607
        server::MetricCollector metric;
S
starlord 已提交
608 609

        {
S
starlord 已提交
610
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
611 612

            if (connectionPtr == nullptr) {
S
starlord 已提交
613
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
614 615 616
            }

            Query updateTableFlagQuery = connectionPtr->query();
617 618
            updateTableFlagQuery << "UPDATE " <<
                                 META_TABLES << " " <<
S
starlord 已提交
619
                                 "SET flag = " << flag << " " <<
S
starlord 已提交
620
                                 "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
621 622 623 624

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();

            if (!updateTableFlagQuery.exec()) {
S
starlord 已提交
625
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLAG", updateTableFlagQuery.error());
S
starlord 已提交
626 627 628 629
            }

        } //Scoped Connection

630 631
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;

S
starlord 已提交
632 633
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
S
starlord 已提交
634 635 636 637 638
    }

    return Status::OK();
}

639
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
S
starlord 已提交
640
    try {
Y
Yu Kun 已提交
641
        server::MetricCollector metric;
642

S
starlord 已提交
643
        {
S
starlord 已提交
644
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
645

S
starlord 已提交
646
            if (connectionPtr == nullptr) {
S
starlord 已提交
647
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
648 649 650
            }

            Query describeTableIndexQuery = connectionPtr->query();
651 652
            describeTableIndexQuery << "SELECT engine_type, nlist, index_file_size, metric_type FROM " <<
                                       META_TABLES << " " <<
S
starlord 已提交
653 654
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
655

S
starlord 已提交
656
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
Z
update  
zhiru 已提交
657

S
starlord 已提交
658 659 660 661 662 663 664 665 666
            StoreQueryResult res = describeTableIndexQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                index.engine_type_ = resRow["engine_type"];
                index.nlist_ = resRow["nlist"];
                index.metric_type_ = resRow["metric_type"];
            } else {
S
starlord 已提交
667
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
S
starlord 已提交
668 669 670
            }

        } //Scoped Connection
Z
update  
zhiru 已提交
671

S
starlord 已提交
672 673
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
S
starlord 已提交
674 675 676 677 678 679 680
    }

    return Status::OK();
}

Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
681
        server::MetricCollector metric;
Z
update  
zhiru 已提交
682

683
        {
S
starlord 已提交
684
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
685

686
            if (connectionPtr == nullptr) {
S
starlord 已提交
687
                return Status(DB_ERROR, "Failed to connect to database server");
688
            }
689

S
starlord 已提交
690 691
            Query dropTableIndexQuery = connectionPtr->query();

692
            //soft delete index files
693 694
            dropTableIndexQuery << "UPDATE " <<
                                META_TABLEFILES << " " <<
S
starlord 已提交
695 696 697 698 699 700 701 702
                                "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::INDEX) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
703
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
S
starlord 已提交
704 705
            }

706
            //set all backup file to raw
707 708
            dropTableIndexQuery << "UPDATE " <<
                                META_TABLEFILES << " " <<
S
starlord 已提交
709 710 711 712 713 714 715 716
                                "SET file_type = " << std::to_string(TableFileSchema::RAW) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
717
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
718 719 720
            }

            //set table index type to raw
721 722
            dropTableIndexQuery << "UPDATE " <<
                                META_TABLES << " " <<
723
                                "SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE) << "," <<
S
starlord 已提交
724
                                "nlist = " << std::to_string(DEFAULT_NLIST) << ", " <<
725 726 727 728 729 730
                                "metric_type = " << std::to_string(DEFAULT_METRIC_TYPE) << " " <<
                                "WHERE table_id = " << quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
731
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
S
starlord 已提交
732 733 734 735
            }

        } //Scoped Connection

736 737
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;

S
starlord 已提交
738 739
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
S
starlord 已提交
740
    }
741

S
starlord 已提交
742 743
    return Status::OK();
}
Z
update  
zhiru 已提交
744

S
starlord 已提交
745 746
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
747
        server::MetricCollector metric;
S
starlord 已提交
748
        {
S
starlord 已提交
749
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
750

S
starlord 已提交
751
            if (connectionPtr == nullptr) {
S
starlord 已提交
752
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
753
            }
Z
zhiru 已提交
754

755 756 757
            //soft delete table
            Query deleteTableQuery = connectionPtr->query();
//
758 759
            deleteTableQuery << "UPDATE " <<
                             META_TABLES << " " <<
760 761
                             "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " <<
                             "WHERE table_id = " << quote << table_id << ";";
Z
update  
zhiru 已提交
762

763
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
764

765
            if (!deleteTableQuery.exec()) {
S
starlord 已提交
766
                return HandleException("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
767
            }
768

769
        } //Scoped Connection
Z
zhiru 已提交
770

S
starlord 已提交
771
        if (mode_ == DBOptions::MODE::CLUSTER) {
772 773
            DeleteTableFiles(table_id);
        }
Z
update  
zhiru 已提交
774

775 776
        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;

S
starlord 已提交
777 778
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
779
    }
Z
update  
zhiru 已提交
780

781 782
    return Status::OK();
}
Z
update  
zhiru 已提交
783

784 785
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
786
        server::MetricCollector metric;
787
        {
S
starlord 已提交
788
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
789 790

            if (connectionPtr == nullptr) {
S
starlord 已提交
791
                return Status(DB_ERROR, "Failed to connect to database server");
792
            }
793

794 795 796
            //soft delete table files
            Query deleteTableFilesQuery = connectionPtr->query();
            //
797 798
            deleteTableFilesQuery << "UPDATE " <<
                                  META_TABLEFILES << " " <<
799 800 801 802
                                  "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                                  "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                                  "WHERE table_id = " << quote << table_id << " AND " <<
                                  "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
803

804
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
805

806
            if (!deleteTableFilesQuery.exec()) {
S
starlord 已提交
807
                return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
808
            }
809
        } //Scoped Connection
810 811 812

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;

S
starlord 已提交
813 814
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
Z
update  
zhiru 已提交
815 816
    }

817 818
    return Status::OK();
}
Z
zhiru 已提交
819

820 821
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
822
        server::MetricCollector metric;
823 824
        StoreQueryResult res;
        {
S
starlord 已提交
825
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
826

827
            if (connectionPtr == nullptr) {
S
starlord 已提交
828
                return Status(DB_ERROR, "Failed to connect to database server");
829
            }
Z
zhiru 已提交
830

831
            Query describeTableQuery = connectionPtr->query();
832 833
            describeTableQuery << "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type, nlist, metric_type FROM " <<
                               META_TABLES << " " <<
834 835 836 837
                               "WHERE table_id = " << quote << table_schema.table_id_ << " " <<
                               "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
Z
update  
zhiru 已提交
838

839 840
            res = describeTableQuery.store();
        } //Scoped Connection
841

842 843
        if (res.num_rows() == 1) {
            const Row &resRow = res[0];
844

845
            table_schema.id_ = resRow["id"]; //implicit conversion
Z
update  
zhiru 已提交
846

S
starlord 已提交
847 848
            table_schema.state_ = resRow["state"];

849
            table_schema.dimension_ = resRow["dimension"];
850

S
starlord 已提交
851 852 853 854
            table_schema.created_on_ = resRow["created_on"];

            table_schema.flag_ = resRow["flag"];

855 856
            table_schema.index_file_size_ = resRow["index_file_size"];

857
            table_schema.engine_type_ = resRow["engine_type"];
S
starlord 已提交
858 859 860 861

            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];
862
        } else {
S
starlord 已提交
863
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
864
        }
Z
update  
zhiru 已提交
865

S
starlord 已提交
866 867
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
Z
update  
zhiru 已提交
868 869
    }

870 871
    return Status::OK();
}
Z
zhiru 已提交
872

873 874
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
    try {
Y
Yu Kun 已提交
875
        server::MetricCollector metric;
876 877
        StoreQueryResult res;
        {
S
starlord 已提交
878
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
879

880
            if (connectionPtr == nullptr) {
S
starlord 已提交
881
                return Status(DB_ERROR, "Failed to connect to database server");
882
            }
Z
update  
zhiru 已提交
883

884 885 886
            Query hasTableQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
            hasTableQuery << "SELECT EXISTS " <<
887 888
                          "(SELECT 1 FROM " <<
                          META_TABLES << " " <<
889 890 891
                          "WHERE table_id = " << quote << table_id << " " <<
                          "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                          "AS " << quote << "check" << ";";
Z
update  
zhiru 已提交
892

893
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
894

895 896
            res = hasTableQuery.store();
        } //Scoped Connection
897

898 899
        int check = res[0]["check"];
        has_or_not = (check == 1);
900

S
starlord 已提交
901 902
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
903
    }
904

905 906
    return Status::OK();
}
907

908 909
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
    try {
Y
Yu Kun 已提交
910
        server::MetricCollector metric;
911 912
        StoreQueryResult res;
        {
S
starlord 已提交
913
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
914

915
            if (connectionPtr == nullptr) {
S
starlord 已提交
916
                return Status(DB_ERROR, "Failed to connect to database server");
917
            }
Z
update  
zhiru 已提交
918

919
            Query allTablesQuery = connectionPtr->query();
920 921
            allTablesQuery << "SELECT id, table_id, dimension, engine_type, nlist, index_file_size, metric_type FROM " <<
                           META_TABLES << " " <<
922
                           "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
zhiru 已提交
923

924
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
Z
zhiru 已提交
925

926 927
            res = allTablesQuery.store();
        } //Scoped Connection
928

929 930
        for (auto &resRow : res) {
            TableSchema table_schema;
Z
update  
zhiru 已提交
931

932
            table_schema.id_ = resRow["id"]; //implicit conversion
933

934 935 936
            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_schema.table_id_ = table_id;
937

938
            table_schema.dimension_ = resRow["dimension"];
939

940 941
            table_schema.index_file_size_ = resRow["index_file_size"];

942
            table_schema.engine_type_ = resRow["engine_type"];
943

S
starlord 已提交
944 945 946 947
            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];

948 949
            table_schema_array.emplace_back(table_schema);
        }
S
starlord 已提交
950 951
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
952
    }
Z
update  
zhiru 已提交
953

954 955
    return Status::OK();
}
956

957 958
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
    if (file_schema.date_ == EmptyDate) {
959
        file_schema.date_ = utils::GetDate();
960 961 962 963 964 965 966
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
967

968
    try {
Y
Yu Kun 已提交
969
        server::MetricCollector metric;
970 971 972

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
973 974
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
975 976
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
977
        file_schema.index_file_size_ = table_schema.index_file_size_;
978
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
979 980
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
981 982 983 984 985 986

        std::string id = "NULL"; //auto-increment
        std::string table_id = file_schema.table_id_;
        std::string engine_type = std::to_string(file_schema.engine_type_);
        std::string file_id = file_schema.file_id_;
        std::string file_type = std::to_string(file_schema.file_type_);
S
starlord 已提交
987
        std::string file_size = std::to_string(file_schema.file_size_);
988
        std::string row_count = std::to_string(file_schema.row_count_);
989 990 991 992 993
        std::string updated_time = std::to_string(file_schema.updated_time_);
        std::string created_on = std::to_string(file_schema.created_on_);
        std::string date = std::to_string(file_schema.date_);

        {
S
starlord 已提交
994
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
995

996
            if (connectionPtr == nullptr) {
S
starlord 已提交
997
                return Status(DB_ERROR, "Failed to connect to database server");
998
            }
Z
update  
zhiru 已提交
999

1000
            Query createTableFileQuery = connectionPtr->query();
1001

1002 1003 1004
            createTableFileQuery << "INSERT INTO " <<
                                 META_TABLEFILES << " " <<
                                 "VALUES(" << id << ", " << quote << table_id << ", " << engine_type << ", " <<
S
starlord 已提交
1005 1006
                                 quote << file_id << ", " << file_type << ", " << file_size << ", " <<
                                 row_count << ", " << updated_time << ", " << created_on << ", " << date << ");";
1007 1008 1009 1010 1011 1012 1013 1014

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();

            if (SimpleResult res = createTableFileQuery.execute()) {
                file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?

                //Consume all results to avoid "Commands out of sync" error
            } else {
S
starlord 已提交
1015
                return HandleException("QUERY ERROR WHEN CREATING TABLE FILE", createTableFileQuery.error());
1016
            }
1017 1018
        } // Scoped Connection

1019
        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
1020 1021
        return utils::CreateTableFilePath(options_, file_schema);

S
starlord 已提交
1022 1023
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
1024 1025
    }
}
1026

1027 1028
Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
    files.clear();
1029

1030
    try {
Y
Yu Kun 已提交
1031
        server::MetricCollector metric;
1032 1033
        StoreQueryResult res;
        {
S
starlord 已提交
1034
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1035 1036

            if (connectionPtr == nullptr) {
S
starlord 已提交
1037
                return Status(DB_ERROR, "Failed to connect to database server");
1038
            }
1039

1040
            Query filesToIndexQuery = connectionPtr->query();
1041 1042
            filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date, created_on FROM " <<
                              META_TABLEFILES << " " <<
1043
                              "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
1044

1045
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
1046

1047 1048 1049
            res = filesToIndexQuery.store();
        } //Scoped Connection

S
starlord 已提交
1050
        Status ret;
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_file.table_id_ = table_id;

            table_file.engine_type_ = resRow["engine_type"];

            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
1069 1070
            table_file.file_size_ = resRow["file_size"];

1071
            table_file.row_count_ = resRow["row_count"];
1072 1073 1074

            table_file.date_ = resRow["date"];

S
starlord 已提交
1075 1076
            table_file.created_on_ = resRow["created_on"];

1077 1078 1079 1080 1081 1082 1083
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
1084
                }
1085
                groups[table_file.table_id_] = table_schema;
1086 1087

            }
1088
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
1089
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
1090
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
1091
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
Z
update  
zhiru 已提交
1092

S
starlord 已提交
1093 1094
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
1095
                ret = status;
S
starlord 已提交
1096
            }
1097 1098 1099

            files.push_back(table_file);
        }
S
starlord 已提交
1100

1101 1102 1103
        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-index files";
        }
S
starlord 已提交
1104 1105
        return ret;

S
starlord 已提交
1106 1107
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
Z
update  
zhiru 已提交
1108
    }
1109 1110
}

X
xj.lin 已提交
1111 1112 1113 1114 1115 1116 1117
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    const DatesT &partition,
                                    DatePartionedTableFilesSchema &files) {
    files.clear();

    try {
Y
Yu Kun 已提交
1118
        server::MetricCollector metric;
X
xj.lin 已提交
1119 1120
        StoreQueryResult res;
        {
S
starlord 已提交
1121
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
X
xj.lin 已提交
1122 1123

            if (connectionPtr == nullptr) {
S
starlord 已提交
1124
                return Status(DB_ERROR, "Failed to connect to database server");
X
xj.lin 已提交
1125 1126 1127
            }

            Query filesToSearchQuery = connectionPtr->query();
1128 1129
            filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date FROM " <<
                               META_TABLEFILES << " " <<
X
xj.lin 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
                               "WHERE table_id = " << quote << table_id;

            if (!partition.empty()) {
                std::stringstream partitionListSS;
                for (auto &date : partition) {
                    partitionListSS << std::to_string(date) << ", ";
                }
                std::string partitionListStr = partitionListSS.str();

                partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
                filesToSearchQuery << " AND " << "date IN (" << partitionListStr << ")";
            }

            if (!ids.empty()) {
                std::stringstream idSS;
                for (auto &id : ids) {
                    idSS << "id = " << std::to_string(id) << " OR ";
                }
                std::string idStr = idSS.str();
                idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "

                filesToSearchQuery  << " AND " << "(" << idStr << ")";

            }
            // End
            filesToSearchQuery << " AND " <<
                               "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

            res = filesToSearchQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
1172
        Status ret;
X
xj.lin 已提交
1173 1174 1175 1176 1177 1178 1179 1180 1181
        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;

1182 1183
            table_file.index_file_size_ = table_schema.index_file_size_;

X
xj.lin 已提交
1184 1185
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1186 1187
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1188 1189
            table_file.metric_type_ = table_schema.metric_type_;

X
xj.lin 已提交
1190 1191 1192 1193 1194 1195
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
1196 1197
            table_file.file_size_ = resRow["file_size"];

1198
            table_file.row_count_ = resRow["row_count"];
X
xj.lin 已提交
1199 1200 1201 1202 1203

            table_file.date_ = resRow["date"];

            table_file.dimension_ = table_schema.dimension_;

S
starlord 已提交
1204 1205
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
1206
                ret = status;
S
starlord 已提交
1207
            }
1208

1209 1210 1211
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1212 1213
            }

1214
            files[table_file.date_].push_back(table_file);
1215
        }
S
starlord 已提交
1216

1217 1218 1219
        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-search files";
        }
S
starlord 已提交
1220
        return ret;
S
starlord 已提交
1221 1222
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
Z
update  
zhiru 已提交
1223
    }
1224
}
Z
update  
zhiru 已提交
1225

1226 1227 1228
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
                                   DatePartionedTableFilesSchema &files) {
    files.clear();
Z
update  
zhiru 已提交
1229

1230
    try {
Y
Yu Kun 已提交
1231
        server::MetricCollector metric;
S
starlord 已提交
1232 1233 1234 1235 1236 1237 1238 1239 1240

        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

1241 1242
        StoreQueryResult res;
        {
S
starlord 已提交
1243
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1244

1245
            if (connectionPtr == nullptr) {
S
starlord 已提交
1246
                return Status(DB_ERROR, "Failed to connect to database server");
1247
            }
Z
update  
zhiru 已提交
1248

1249
            Query filesToMergeQuery = connectionPtr->query();
1250 1251
            filesToMergeQuery << "SELECT id, table_id, file_id, file_type, file_size, row_count, date, engine_type, created_on FROM " <<
                              META_TABLEFILES << " " <<
1252 1253
                              "WHERE table_id = " << quote << table_id << " AND " <<
                              "file_type = " << std::to_string(TableFileSchema::RAW) << " " <<
1254
                              "ORDER BY row_count DESC" << ";";
Z
update  
zhiru 已提交
1255

1256
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
1257

1258 1259
            res = filesToMergeQuery.store();
        } //Scoped Connection
1260

S
starlord 已提交
1261
        Status ret;
1262
        for (auto &resRow : res) {
S
starlord 已提交
1263 1264 1265 1266 1267
            TableFileSchema table_file;
            table_file.file_size_ = resRow["file_size"];
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }
Z
update  
zhiru 已提交
1268

1269
            table_file.id_ = resRow["id"]; //implicit conversion
1270

1271 1272 1273
            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;
Z
update  
zhiru 已提交
1274

1275 1276 1277
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;
1278

1279
            table_file.file_type_ = resRow["file_type"];
1280

S
starlord 已提交
1281 1282
            table_file.row_count_ = resRow["row_count"];

1283
            table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1284

1285 1286
            table_file.index_file_size_ = table_schema.index_file_size_;

S
starlord 已提交
1287 1288
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1289 1290
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1291 1292
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1293 1294
            table_file.created_on_ = resRow["created_on"];

1295
            table_file.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1296

S
starlord 已提交
1297 1298
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
1299
                ret = status;
S
starlord 已提交
1300
            }
Z
update  
zhiru 已提交
1301

1302 1303 1304
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1305
            }
1306 1307

            files[table_file.date_].push_back(table_file);
1308
        }
Z
update  
zhiru 已提交
1309

1310 1311 1312
        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-merge files";
        }
S
starlord 已提交
1313 1314
        return ret;

S
starlord 已提交
1315 1316
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
1317 1318 1319 1320 1321 1322 1323
    }
}

Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    TableFilesSchema &table_files) {
    if (ids.empty()) {
Z
update  
zhiru 已提交
1324 1325 1326
        return Status::OK();
    }

1327 1328 1329 1330 1331 1332
    std::stringstream idSS;
    for (auto &id : ids) {
        idSS << "id = " << std::to_string(id) << " OR ";
    }
    std::string idStr = idSS.str();
    idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "
Z
zhiru 已提交
1333

1334 1335 1336
    try {
        StoreQueryResult res;
        {
S
starlord 已提交
1337
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1338 1339

            if (connectionPtr == nullptr) {
S
starlord 已提交
1340
                return Status(DB_ERROR, "Failed to connect to database server");
1341 1342 1343
            }

            Query getTableFileQuery = connectionPtr->query();
1344 1345
            getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on FROM " <<
                              META_TABLEFILES << " " <<
1346
                              "WHERE table_id = " << quote << table_id << " AND " <<
S
starlord 已提交
1347 1348
                              "(" << idStr << ") AND " <<
                              "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
1349 1350 1351 1352 1353 1354 1355 1356

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();

            res = getTableFileQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
S
starlord 已提交
1357
        DescribeTable(table_schema);
1358

S
starlord 已提交
1359
        Status ret;
1360
        for (auto &resRow : res) {
Z
zhiru 已提交
1361

1362
            TableFileSchema file_schema;
1363

1364
            file_schema.id_ = resRow["id"];
Z
zhiru 已提交
1365

1366
            file_schema.table_id_ = table_id;
Z
update  
zhiru 已提交
1367

1368 1369
            file_schema.index_file_size_ = table_schema.index_file_size_;

1370
            file_schema.engine_type_ = resRow["engine_type"];
Z
update  
zhiru 已提交
1371

S
starlord 已提交
1372 1373
            file_schema.nlist_ = table_schema.nlist_;

S
starlord 已提交
1374 1375
            file_schema.metric_type_ = table_schema.metric_type_;

1376 1377 1378
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            file_schema.file_id_ = file_id;
Z
update  
zhiru 已提交
1379

1380
            file_schema.file_type_ = resRow["file_type"];
Z
update  
zhiru 已提交
1381

1382 1383 1384
            file_schema.file_size_ = resRow["file_size"];

            file_schema.row_count_ = resRow["row_count"];
1385

1386
            file_schema.date_ = resRow["date"];
1387

S
starlord 已提交
1388 1389
            file_schema.created_on_ = resRow["created_on"];

1390
            file_schema.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1391

S
starlord 已提交
1392
            utils::GetTableFilePath(options_, file_schema);
1393 1394 1395

            table_files.emplace_back(file_schema);
        }
S
starlord 已提交
1396

1397
        ENGINE_LOG_DEBUG << "Get table files by id";
S
starlord 已提交
1398 1399
        return ret;

S
starlord 已提交
1400 1401
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
Z
update  
zhiru 已提交
1402
    }
1403
}
Z
zhiru 已提交
1404

1405 1406
// PXU TODO: Support Swap
Status MySQLMetaImpl::Archive() {
S
starlord 已提交
1407
    auto &criterias = options_.archive_conf_.GetCriterias();
1408 1409 1410 1411 1412 1413 1414
    if (criterias.empty()) {
        return Status::OK();
    }

    for (auto &kv : criterias) {
        auto &criteria = kv.first;
        auto &limit = kv.second;
1415
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
1416 1417 1418 1419
            size_t usecs = limit * D_SEC * US_PS;
            long now = utils::GetMicroSecTimeStamp();

            try {
S
starlord 已提交
1420
                ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1421

Z
update  
zhiru 已提交
1422
                if (connectionPtr == nullptr) {
S
starlord 已提交
1423
                    return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1424 1425
                }

1426
                Query archiveQuery = connectionPtr->query();
1427 1428
                archiveQuery << "UPDATE " <<
                             META_TABLEFILES << " " <<
1429 1430 1431
                             "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                             "WHERE created_on < " << std::to_string(now - usecs) << " AND " <<
                             "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1432

1433
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();
Z
update  
zhiru 已提交
1434

1435
                if (!archiveQuery.exec()) {
S
starlord 已提交
1436
                    return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
1437
                }
1438

1439 1440
                ENGINE_LOG_DEBUG << "Archive old files";

S
starlord 已提交
1441 1442
            } catch (std::exception &e) {
                return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
Z
zhiru 已提交
1443
            }
1444
        }
1445
        if (criteria == engine::ARCHIVE_CONF_DISK) {
1446 1447
            uint64_t sum = 0;
            Size(sum);
Z
update  
zhiru 已提交
1448

1449 1450
            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);
1451 1452

            ENGINE_LOG_DEBUG << "Archive files to free disk";
1453
        }
Z
update  
zhiru 已提交
1454 1455
    }

1456 1457
    return Status::OK();
}
Z
zhiru 已提交
1458

1459 1460
Status MySQLMetaImpl::Size(uint64_t &result) {
    result = 0;
1461

S
starlord 已提交
1462
    try {
1463 1464
        StoreQueryResult res;
        {
S
starlord 已提交
1465
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1466

1467
            if (connectionPtr == nullptr) {
S
starlord 已提交
1468
                return Status(DB_ERROR, "Failed to connect to database server");
1469
            }
Z
zhiru 已提交
1470

1471
            Query getSizeQuery = connectionPtr->query();
1472 1473
            getSizeQuery << "SELECT IFNULL(SUM(file_size),0) AS sum FROM " <<
                         META_TABLEFILES << " " <<
1474
                         "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1475

1476
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
Z
update  
zhiru 已提交
1477

1478 1479
            res = getSizeQuery.store();
        } //Scoped Connection
Z
update  
zhiru 已提交
1480

1481 1482 1483 1484 1485
        if (res.empty()) {
            result = 0;
        } else {
            result = res[0]["sum"];
        }
Z
update  
zhiru 已提交
1486

S
starlord 已提交
1487 1488
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
1489
    }
1490

1491 1492
    return Status::OK();
}
1493

1494 1495 1496 1497
Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
    if (to_discard_size <= 0) {

        return Status::OK();
Z
update  
zhiru 已提交
1498
    }
1499
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
Z
update  
zhiru 已提交
1500

1501
    try {
Y
Yu Kun 已提交
1502
        server::MetricCollector metric;
1503 1504
        bool status;
        {
S
starlord 已提交
1505
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1506

1507
            if (connectionPtr == nullptr) {
S
starlord 已提交
1508
                return Status(DB_ERROR, "Failed to connect to database server");
1509
            }
Z
zhiru 已提交
1510

1511
            Query discardFilesQuery = connectionPtr->query();
1512 1513
            discardFilesQuery << "SELECT id, file_size FROM " <<
                              META_TABLEFILES << " " <<
1514 1515 1516
                              "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                              "ORDER BY id ASC " <<
                              "LIMIT 10;";
Z
update  
zhiru 已提交
1517

1518
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
1519

1520 1521 1522 1523
            StoreQueryResult res = discardFilesQuery.store();
            if (res.num_rows() == 0) {
                return Status::OK();
            }
1524

1525 1526 1527 1528 1529
            TableFileSchema table_file;
            std::stringstream idsToDiscardSS;
            for (auto &resRow : res) {
                if (to_discard_size <= 0) {
                    break;
Z
update  
zhiru 已提交
1530
                }
1531
                table_file.id_ = resRow["id"];
1532
                table_file.file_size_ = resRow["file_size"];
1533 1534
                idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
1535 1536
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
1537
            }
Z
update  
zhiru 已提交
1538

1539 1540
            std::string idsToDiscardStr = idsToDiscardSS.str();
            idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR "
1541

1542 1543
            discardFilesQuery << "UPDATE " <<
                              META_TABLEFILES << " " <<
1544 1545 1546
                              "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                              "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                              "WHERE " << idsToDiscardStr << ";";
1547

1548
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
Z
update  
zhiru 已提交
1549

1550 1551
            status = discardFilesQuery.exec();
            if (!status) {
S
starlord 已提交
1552
                return HandleException("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
1553 1554 1555 1556 1557
            }
        } //Scoped Connection

        return DiscardFiles(to_discard_size);

S
starlord 已提交
1558 1559
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
P
peng.xu 已提交
1560
    }
1561
}
P
peng.xu 已提交
1562

1563 1564 1565
//ZR: this function assumes all fields in file_schema have value
Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1566

S
starlord 已提交
1567
    try {
Y
Yu Kun 已提交
1568
        server::MetricCollector metric;
1569
        {
S
starlord 已提交
1570
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1571

1572
            if (connectionPtr == nullptr) {
S
starlord 已提交
1573
                return Status(DB_ERROR, "Failed to connect to database server");
1574
            }
Z
update  
zhiru 已提交
1575

1576
            Query updateTableFileQuery = connectionPtr->query();
Z
update  
zhiru 已提交
1577

1578 1579
            //if the table has been deleted, just mark the table file as TO_DELETE
            //clean thread will delete the file later
1580 1581
            updateTableFileQuery << "SELECT state FROM " <<
                                 META_TABLES << " " <<
1582
                                 "WHERE table_id = " << quote << file_schema.table_id_ << ";";
Z
update  
zhiru 已提交
1583

1584
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
Z
update  
zhiru 已提交
1585

1586
            StoreQueryResult res = updateTableFileQuery.store();
1587

1588 1589 1590 1591
            if (res.num_rows() == 1) {
                int state = res[0]["state"];
                if (state == TableSchema::TO_DELETE) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
Z
update  
zhiru 已提交
1592
                }
1593 1594 1595 1596 1597 1598 1599 1600 1601
            } else {
                file_schema.file_type_ = TableFileSchema::TO_DELETE;
            }

            std::string id = std::to_string(file_schema.id_);
            std::string table_id = file_schema.table_id_;
            std::string engine_type = std::to_string(file_schema.engine_type_);
            std::string file_id = file_schema.file_id_;
            std::string file_type = std::to_string(file_schema.file_type_);
1602 1603
            std::string file_size = std::to_string(file_schema.file_size_);
            std::string row_count = std::to_string(file_schema.row_count_);
1604 1605 1606
            std::string updated_time = std::to_string(file_schema.updated_time_);
            std::string created_on = std::to_string(file_schema.created_on_);
            std::string date = std::to_string(file_schema.date_);
Z
update  
zhiru 已提交
1607

1608 1609
            updateTableFileQuery << "UPDATE " <<
                                 META_TABLEFILES << " " <<
1610 1611 1612 1613
                                 "SET table_id = " << quote << table_id << ", " <<
                                 "engine_type = " << engine_type << ", " <<
                                 "file_id = " << quote << file_id << ", " <<
                                 "file_type = " << file_type << ", " <<
1614 1615
                                 "file_size = " << file_size << ", " <<
                                 "row_count = " << row_count << ", " <<
1616 1617 1618 1619 1620 1621 1622 1623 1624
                                 "updated_time = " << updated_time << ", " <<
                                 "created_on = " << created_on << ", " <<
                                 "date = " << date << " " <<
                                 "WHERE id = " << id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();

            if (!updateTableFileQuery.exec()) {
                ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
S
starlord 已提交
1625
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error());
1626 1627 1628
            }
        } //Scoped Connection

1629 1630
        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;

S
starlord 已提交
1631 1632
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
1633
    }
S
starlord 已提交
1634

1635 1636
    return Status::OK();
}
1637

1638 1639
Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
    try {
S
starlord 已提交
1640
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1641

1642
        if (connectionPtr == nullptr) {
S
starlord 已提交
1643
            return Status(DB_ERROR, "Failed to connect to database server");
1644
        }
Z
update  
zhiru 已提交
1645

1646
        Query updateTableFilesToIndexQuery = connectionPtr->query();
Z
zhiru 已提交
1647

1648 1649
        updateTableFilesToIndexQuery << "UPDATE " <<
                                     META_TABLEFILES << " " <<
Z
fix  
zhiru 已提交
1650 1651 1652
                                     "SET file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " " <<
                                     "WHERE table_id = " << quote << table_id << " AND " <<
                                     "file_type = " << std::to_string(TableFileSchema::RAW) << ";";
1653

Z
zhiru 已提交
1654
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();
Z
update  
zhiru 已提交
1655

Z
fix  
zhiru 已提交
1656
        if (!updateTableFilesToIndexQuery.exec()) {
S
starlord 已提交
1657
            return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX", updateTableFilesToIndexQuery.error());
Z
fix  
zhiru 已提交
1658 1659
        }

1660 1661
        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;

S
starlord 已提交
1662 1663
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
1664
    }
Z
update  
zhiru 已提交
1665

1666 1667
    return Status::OK();
}
Z
zhiru 已提交
1668

1669 1670
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
    try {
Y
Yu Kun 已提交
1671
        server::MetricCollector metric;
1672
        {
S
starlord 已提交
1673
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1674 1675

            if (connectionPtr == nullptr) {
S
starlord 已提交
1676
                return Status(DB_ERROR, "Failed to connect to database server");
1677
            }
Z
update  
zhiru 已提交
1678

1679
            Query updateTableFilesQuery = connectionPtr->query();
1680

1681 1682
            std::map<std::string, bool> has_tables;
            for (auto &file_schema : files) {
1683

1684 1685 1686
                if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
                    continue;
                }
1687

1688
                updateTableFilesQuery << "SELECT EXISTS " <<
1689 1690
                                      "(SELECT 1 FROM " <<
                                      META_TABLES << " " <<
1691 1692 1693
                                      "WHERE table_id = " << quote << file_schema.table_id_ << " " <<
                                      "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                                      "AS " << quote << "check" << ";";
1694

1695
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
1696

1697
                StoreQueryResult res = updateTableFilesQuery.store();
1698

1699 1700 1701
                int check = res[0]["check"];
                has_tables[file_schema.table_id_] = (check == 1);
            }
1702

1703
            for (auto &file_schema : files) {
1704

1705 1706
                if (!has_tables[file_schema.table_id_]) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
1707
                }
1708
                file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1709

1710 1711 1712 1713 1714
                std::string id = std::to_string(file_schema.id_);
                std::string table_id = file_schema.table_id_;
                std::string engine_type = std::to_string(file_schema.engine_type_);
                std::string file_id = file_schema.file_id_;
                std::string file_type = std::to_string(file_schema.file_type_);
1715 1716
                std::string file_size = std::to_string(file_schema.file_size_);
                std::string row_count = std::to_string(file_schema.row_count_);
1717 1718 1719
                std::string updated_time = std::to_string(file_schema.updated_time_);
                std::string created_on = std::to_string(file_schema.created_on_);
                std::string date = std::to_string(file_schema.date_);
Z
fix  
zhiru 已提交
1720

1721 1722
                updateTableFilesQuery << "UPDATE " <<
                                      META_TABLEFILES << " " <<
1723 1724 1725 1726
                                      "SET table_id = " << quote << table_id << ", " <<
                                      "engine_type = " << engine_type << ", " <<
                                      "file_id = " << quote << file_id << ", " <<
                                      "file_type = " << file_type << ", " <<
1727 1728
                                      "file_size = " << file_size << ", " <<
                                      "row_count = " << row_count << ", " <<
1729 1730 1731 1732 1733 1734 1735 1736
                                      "updated_time = " << updated_time << ", " <<
                                      "created_on = " << created_on << ", " <<
                                      "date = " << date << " " <<
                                      "WHERE id = " << id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();

                if (!updateTableFilesQuery.exec()) {
S
starlord 已提交
1737
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error());
1738 1739 1740 1741
                }
            }
        } //Scoped Connection

1742 1743
        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";

S
starlord 已提交
1744 1745
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
1746
    }
S
starlord 已提交
1747

1748 1749
    return Status::OK();
}
Z
fix  
zhiru 已提交
1750

1751 1752
Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1753 1754 1755
    std::set<std::string> table_ids;

    //remove to_delete files
1756
    try {
Y
Yu Kun 已提交
1757
        server::MetricCollector metric;
1758

1759
        {
S
starlord 已提交
1760
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
update  
zhiru 已提交
1761

1762
            if (connectionPtr == nullptr) {
S
starlord 已提交
1763
                return Status(DB_ERROR, "Failed to connect to database server");
1764
            }
Z
zhiru 已提交
1765

1766
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
1767 1768
            cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date FROM " <<
                                     META_TABLEFILES << " " <<
1769 1770
                                     "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " <<
                                     "updated_time < " << std::to_string(now - seconds * US_PS) << ";";
Z
update  
zhiru 已提交
1771

1772
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1773

1774
            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1775

1776 1777
            TableFileSchema table_file;
            std::vector<std::string> idsToDelete;
1778

1779
            for (auto &resRow : res) {
1780

1781
                table_file.id_ = resRow["id"]; //implicit conversion
1782

1783 1784 1785
                std::string table_id;
                resRow["table_id"].to_string(table_id);
                table_file.table_id_ = table_id;
Z
fix  
zhiru 已提交
1786

1787 1788 1789
                std::string file_id;
                resRow["file_id"].to_string(file_id);
                table_file.file_id_ = file_id;
Z
update  
zhiru 已提交
1790

1791
                table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1792

1793 1794
                utils::DeleteTableFilePath(options_, table_file);

S
starlord 已提交
1795
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
1796 1797

                idsToDelete.emplace_back(std::to_string(table_file.id_));
S
starlord 已提交
1798 1799

                table_ids.insert(table_file.table_id_);
1800 1801 1802 1803 1804 1805 1806
            }

            if (!idsToDelete.empty()) {

                std::stringstream idsToDeleteSS;
                for (auto &id : idsToDelete) {
                    idsToDeleteSS << "id = " << id << " OR ";
1807
                }
1808

1809 1810
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
1811 1812 1813
                cleanUpFilesWithTTLQuery << "DELETE FROM " <<
                                         META_TABLEFILES << " " <<
                                         "WHERE " << idsToDeleteStr << ";";
1814

1815 1816 1817
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1818
                    return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", cleanUpFilesWithTTLQuery.error());
1819 1820
                }
            }
1821 1822 1823 1824 1825

            if(res.size() > 0) {
                ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds";
            }

1826 1827
        } //Scoped Connection

S
starlord 已提交
1828 1829
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
Z
update  
zhiru 已提交
1830
    }
1831

S
starlord 已提交
1832
    //remove to_delete tables
1833
    try {
Y
Yu Kun 已提交
1834
        server::MetricCollector metric;
1835 1836

        {
S
starlord 已提交
1837
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1838

Z
update  
zhiru 已提交
1839
            if (connectionPtr == nullptr) {
S
starlord 已提交
1840
                return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1841 1842
            }

1843
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
1844 1845
            cleanUpFilesWithTTLQuery << "SELECT id, table_id FROM " <<
                                     META_TABLES << " " <<
1846
                                     "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
1847

1848 1849 1850
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1851

Z
update  
zhiru 已提交
1852 1853
            if (!res.empty()) {

1854 1855 1856 1857 1858
                std::stringstream idsToDeleteSS;
                for (auto &resRow : res) {
                    size_t id = resRow["id"];
                    std::string table_id;
                    resRow["table_id"].to_string(table_id);
Z
update  
zhiru 已提交
1859

S
starlord 已提交
1860
                    utils::DeleteTablePath(options_, table_id, false);//only delete empty folder
1861 1862

                    idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
Z
update  
zhiru 已提交
1863
                }
1864 1865
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
1866 1867 1868
                cleanUpFilesWithTTLQuery << "DELETE FROM " <<
                                         META_TABLES << " " <<
                                         "WHERE " << idsToDeleteStr << ";";
1869

1870
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1871

1872
                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1873
                    return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", cleanUpFilesWithTTLQuery.error());
1874 1875
                }
            }
1876 1877 1878 1879

            if(res.size() > 0) {
                ENGINE_LOG_DEBUG << "Remove " << res.size() << " tables from meta";
            }
1880 1881
        } //Scoped Connection

S
starlord 已提交
1882 1883
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
Z
update  
zhiru 已提交
1884 1885
    }

S
starlord 已提交
1886 1887 1888
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1889
        server::MetricCollector metric;
S
starlord 已提交
1890 1891

        {
S
starlord 已提交
1892
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
S
starlord 已提交
1893 1894

            if (connectionPtr == nullptr) {
S
starlord 已提交
1895
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
1896 1897 1898 1899
            }

            for(auto& table_id : table_ids) {
                Query cleanUpFilesWithTTLQuery = connectionPtr->query();
1900 1901
                cleanUpFilesWithTTLQuery << "SELECT file_id FROM " <<
                                         META_TABLEFILES << " " <<
S
starlord 已提交
1902
                                         "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
1903 1904 1905 1906 1907 1908 1909 1910 1911

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                StoreQueryResult res = cleanUpFilesWithTTLQuery.store();

                if (res.empty()) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }
1912 1913 1914 1915

            if(table_ids.size() > 0) {
                ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
            }
S
starlord 已提交
1916
        }
S
starlord 已提交
1917 1918
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
S
starlord 已提交
1919 1920
    }

1921 1922
    return Status::OK();
}
1923

1924 1925
Status MySQLMetaImpl::CleanUp() {
    try {
S
starlord 已提交
1926
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
1927

1928
        if (connectionPtr == nullptr) {
S
starlord 已提交
1929
            return Status(DB_ERROR, "Failed to connect to database server");
1930
        }
1931

1932 1933 1934 1935
        Query cleanUpQuery = connectionPtr->query();
        cleanUpQuery << "SELECT table_name " <<
                     "FROM information_schema.tables " <<
                     "WHERE table_schema = " << quote << mysql_connection_pool_->getDB() << " " <<
1936
                     "AND table_name = " << quote << META_TABLEFILES << ";";
Z
update  
zhiru 已提交
1937

1938
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
Z
update  
zhiru 已提交
1939

1940
        StoreQueryResult res = cleanUpQuery.store();
Z
update  
zhiru 已提交
1941

1942 1943
        if (!res.empty()) {
            ENGINE_LOG_DEBUG << "Remove table file type as NEW";
1944
            cleanUpQuery << "DELETE FROM " << META_TABLEFILES << " WHERE file_type IN ("
1945 1946 1947
                    << std::to_string(TableFileSchema::NEW) << ","
                    << std::to_string(TableFileSchema::NEW_MERGE) << ","
                    << std::to_string(TableFileSchema::NEW_INDEX) << ");";
1948

1949
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
1950

1951
            if (!cleanUpQuery.exec()) {
S
starlord 已提交
1952
                return HandleException("QUERY ERROR WHEN CLEANING UP FILES", cleanUpQuery.error());
Z
update  
zhiru 已提交
1953
            }
1954
        }
1955

1956 1957 1958
        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << res.size() << " files";
        }
S
starlord 已提交
1959 1960
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
Z
update  
zhiru 已提交
1961 1962
    }

1963 1964 1965 1966 1967
    return Status::OK();
}

Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
    try {
Y
Yu Kun 已提交
1968
        server::MetricCollector metric;
1969 1970 1971 1972 1973 1974 1975

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);

        if (!status.ok()) {
            return status;
1976
        }
Z
zhiru 已提交
1977

1978 1979
        StoreQueryResult res;
        {
S
starlord 已提交
1980
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
Z
zhiru 已提交
1981

Z
update  
zhiru 已提交
1982
            if (connectionPtr == nullptr) {
S
starlord 已提交
1983
                return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1984 1985
            }

Z
update  
zhiru 已提交
1986

1987
            Query countQuery = connectionPtr->query();
1988 1989
            countQuery << "SELECT row_count FROM " <<
                       META_TABLEFILES << " " <<
1990 1991 1992 1993
                       "WHERE table_id = " << quote << table_id << " AND " <<
                       "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
Z
update  
zhiru 已提交
1994

1995
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
Z
update  
zhiru 已提交
1996

1997 1998 1999 2000 2001
            res = countQuery.store();
        } //Scoped Connection

        result = 0;
        for (auto &resRow : res) {
S
starlord 已提交
2002
            size_t size = resRow["row_count"];
2003
            result += size;
2004
        }
2005

S
starlord 已提交
2006 2007
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
Z
update  
zhiru 已提交
2008
    }
S
starlord 已提交
2009

2010 2011 2012 2013 2014
    return Status::OK();
}

Status MySQLMetaImpl::DropAll() {
    try {
S
starlord 已提交
2015
        ENGINE_LOG_DEBUG << "Drop all mysql meta";
S
starlord 已提交
2016
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
2017 2018

        if (connectionPtr == nullptr) {
S
starlord 已提交
2019
            return Status(DB_ERROR, "Failed to connect to database server");
Z
zhiru 已提交
2020
        }
2021 2022

        Query dropTableQuery = connectionPtr->query();
2023
        dropTableQuery << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
2024 2025 2026 2027 2028 2029

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();

        if (dropTableQuery.exec()) {
            return Status::OK();
        } else {
S
starlord 已提交
2030
            return HandleException("QUERY ERROR WHEN DROPPING ALL", dropTableQuery.error());
2031
        }
S
starlord 已提交
2032 2033
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
2034 2035 2036
    }
}

Z
update  
zhiru 已提交
2037 2038 2039 2040
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz