MySQLMetaImpl.cpp 86.4 KB
Newer Older
Z
update  
zhiru 已提交
1 2 3 4 5 6
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "MySQLMetaImpl.h"
S
starlord 已提交
7 8 9
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "db/Log.h"
Z
update  
zhiru 已提交
10
#include "MetaConsts.h"
S
starlord 已提交
11
#include "db/Factories.h"
Z
update  
zhiru 已提交
12 13 14 15 16 17 18 19 20 21
#include "metrics/Metrics.h"

#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <regex>
#include <string>
Z
zhiru 已提交
22
#include <mutex>
Z
zhiru 已提交
23
#include <thread>
Z
update  
zhiru 已提交
24 25 26

#include "mysql++/mysql++.h"

27

Z
update  
zhiru 已提交
28 29 30 31 32
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {

33
using namespace mysqlpp;
Z
update  
zhiru 已提交
34

35
namespace {
Z
update  
zhiru 已提交
36

37 38 39 40
Status HandleException(const std::string &desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
}
Z
update  
zhiru 已提交
41

42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
}

Status MySQLMetaImpl::NextTableId(std::string &table_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    table_id = ss.str();
    return Status::OK();
}

Status MySQLMetaImpl::NextFileId(std::string &file_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
    : options_(options_),
      mode_(mode) {
    Initialize();
}

Status MySQLMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
        if (!ret) {
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
            return Status::DBTransactionError("Failed to create db directory", options_.path);
Z
update  
zhiru 已提交
72 73 74
        }
    }

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
    std::string uri = options_.backend_uri;

    std::string dialectRegex = "(.*)";
    std::string usernameRegex = "(.*)";
    std::string passwordRegex = "(.*)";
    std::string hostRegex = "(.*)";
    std::string portRegex = "(.*)";
    std::string dbNameRegex = "(.*)";
    std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
        usernameRegex + "\\:" +
        passwordRegex + "\\@" +
        hostRegex + "\\:" +
        portRegex + "\\/" +
        dbNameRegex;
    std::regex uriRegex(uriRegexStr);
    std::smatch pieces_match;

    if (std::regex_match(uri, pieces_match, uriRegex)) {
        std::string dialect = pieces_match[1].str();
        std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
        if (dialect.find("mysql") == std::string::npos) {
            return Status::Error("URI's dialect is not MySQL");
97
        }
98 99 100 101 102 103
        std::string username = pieces_match[2].str();
        std::string password = pieces_match[3].str();
        std::string serverAddress = pieces_match[4].str();
        unsigned int port = 0;
        if (!pieces_match[5].str().empty()) {
            port = std::stoi(pieces_match[5].str());
104
        }
105
        std::string dbName = pieces_match[6].str();
106 107


108 109 110 111
        int threadHint = std::thread::hardware_concurrency();
        int maxPoolSize = threadHint == 0 ? 8 : threadHint;
        mysql_connection_pool_ =
            std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
112

113 114
        ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
        try {
115

116 117
            if (mode_ != Options::MODE::READ_ONLY) {
                CleanUp();
118 119
            }

120
            {
Z
update  
zhiru 已提交
121
                ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
122

Z
update  
zhiru 已提交
123 124 125 126
                if (connectionPtr == nullptr) {
                    return Status::Error("Failed to connect to database server");
                }

127

128 129 130
                if (!connectionPtr->thread_aware()) {
                    ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
                    return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it.");
131
                }
132
                Query InitializeQuery = connectionPtr->query();
Z
update  
zhiru 已提交
133

134 135 136 137 138 139
                InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" <<
                                "id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
                                "table_id VARCHAR(255) UNIQUE NOT NULL, " <<
                                "state INT NOT NULL, " <<
                                "dimension SMALLINT NOT NULL, " <<
                                "created_on BIGINT NOT NULL, " <<
S
starlord 已提交
140
                                "flag BIGINT DEFAULT 0 NOT NULL, " <<
141
                                "index_file_size INT DEFAULT 1024 NOT NULL, " <<
142
                                "engine_type INT DEFAULT 1 NOT NULL, " <<
143 144
                                "nlist INT DEFAULT 16384 NOT NULL, " <<
                                "metric_type INT DEFAULT 1 NOT NULL);";
Z
zhiru 已提交
145

146
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
Z
update  
zhiru 已提交
147

148 149
                if (!InitializeQuery.exec()) {
                    return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
Z
update  
zhiru 已提交
150 151
                }

152 153 154 155 156 157
                InitializeQuery << "CREATE TABLE IF NOT EXISTS TableFiles (" <<
                                "id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
                                "table_id VARCHAR(255) NOT NULL, " <<
                                "engine_type INT DEFAULT 1 NOT NULL, " <<
                                "file_id VARCHAR(255) NOT NULL, " <<
                                "file_type INT DEFAULT 0 NOT NULL, " <<
158 159
                                "file_size BIGINT DEFAULT 0 NOT NULL, " <<
                                "row_count BIGINT DEFAULT 0 NOT NULL, " <<
160 161 162 163 164 165 166 167
                                "updated_time BIGINT NOT NULL, " <<
                                "created_on BIGINT NOT NULL, " <<
                                "date INT DEFAULT -1 NOT NULL);";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();

                if (!InitializeQuery.exec()) {
                    return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
168
                }
169
            } //Scoped Connection
Z
update  
zhiru 已提交
170

171
        } catch (const BadQuery &er) {
Z
update  
zhiru 已提交
172
            // Handle any query errors
173 174 175
            ENGINE_LOG_ERROR << "QUERY ERROR DURING INITIALIZATION" << ": " << er.what();
            return Status::DBTransactionError("QUERY ERROR DURING INITIALIZATION", er.what());
        } catch (const Exception &er) {
Z
update  
zhiru 已提交
176
            // Catch-all for any other MySQL++ exceptions
177 178
            ENGINE_LOG_ERROR << "GENERAL ERROR DURING INITIALIZATION" << ": " << er.what();
            return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION", er.what());
Z
zhiru 已提交
179
        } catch (std::exception &e) {
180
            return HandleException("Encounter exception during initialization", e);
181
        }
182 183 184
    } else {
        ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
        return Status::Error("Wrong URI format");
Z
update  
zhiru 已提交
185
    }
S
starlord 已提交
186 187

    return Status::OK();
188 189 190 191 192 193
}

// PXU TODO: Temp solution. Will fix later
Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                            const DatesT &dates) {
    if (dates.empty()) {
P
peng.xu 已提交
194 195 196
        return Status::OK();
    }

197 198 199 200 201 202
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
Z
zhiru 已提交
203

204
    try {
Z
update  
zhiru 已提交
205

206
        auto yesterday = GetDateWithDelta(-1);
Z
update  
zhiru 已提交
207

208 209 210 211 212
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions within 2 days");
            }
        }
Z
zhiru 已提交
213

214 215 216 217 218 219
        std::stringstream dateListSS;
        for (auto &date : dates) {
            dateListSS << std::to_string(date) << ", ";
        }
        std::string dateListStr = dateListSS.str();
        dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", "
Z
update  
zhiru 已提交
220

221 222
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
223

224 225 226
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
227

Z
update  
zhiru 已提交
228

229
            Query dropPartitionsByDatesQuery = connectionPtr->query();
230

231 232 233 234
            dropPartitionsByDatesQuery << "UPDATE TableFiles " <<
                                       "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "date in (" << dateListStr << ");";
Z
update  
zhiru 已提交
235

236
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str();
Z
update  
zhiru 已提交
237

238 239 240 241
            if (!dropPartitionsByDatesQuery.exec()) {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES";
                return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES",
                                                  dropPartitionsByDatesQuery.error());
Z
update  
zhiru 已提交
242
            }
243 244 245 246 247 248 249 250 251
        } //Scoped Connection
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", er.what());
Z
update  
zhiru 已提交
252
    }
253 254
    return Status::OK();
}
Z
update  
zhiru 已提交
255

256 257
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
258
        server::MetricCollector metric;
259 260
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
261

262 263 264
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
265

266
            Query createTableQuery = connectionPtr->query();
Z
update  
zhiru 已提交
267

268 269 270 271 272
            if (table_schema.table_id_.empty()) {
                NextTableId(table_schema.table_id_);
            } else {
                createTableQuery << "SELECT state FROM Tables " <<
                                 "WHERE table_id = " << quote << table_schema.table_id_ << ";";
Z
zhiru 已提交
273

274
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
Z
update  
zhiru 已提交
275

276
                StoreQueryResult res = createTableQuery.store();
277

278 279 280 281 282 283 284 285 286
                if (res.num_rows() == 1) {
                    int state = res[0]["state"];
                    if (TableSchema::TO_DELETE == state) {
                        return Status::Error("Table already exists and it is in delete state, please wait a second");
                    } else {
                        return Status::AlreadyExist("Table already exists");
                    }
                }
            }
287

288 289
            table_schema.id_ = -1;
            table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
290

291 292 293 294 295 296
            std::string id = "NULL"; //auto-increment
            std::string table_id = table_schema.table_id_;
            std::string state = std::to_string(table_schema.state_);
            std::string dimension = std::to_string(table_schema.dimension_);
            std::string created_on = std::to_string(table_schema.created_on_);
            std::string engine_type = std::to_string(table_schema.engine_type_);
Z
update  
zhiru 已提交
297

298 299
            createTableQuery << "INSERT INTO Tables VALUES" <<
                             "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " <<
300
                             created_on << ", " << engine_type << ");";
Z
update  
zhiru 已提交
301

302
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
303

304 305
            if (SimpleResult res = createTableQuery.execute()) {
                table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
306

307 308 309 310
                //Consume all results to avoid "Commands out of sync" error
            } else {
                ENGINE_LOG_ERROR << "Add Table Error";
                return Status::DBTransactionError("Add Table Error", createTableQuery.error());
311
            }
312
        } //Scoped Connection
313

314
        return utils::CreateTablePath(options_, table_schema.table_id_);
Z
update  
zhiru 已提交
315

316 317 318 319 320 321 322 323 324 325 326 327
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what());
    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
    }
}
328

329
Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
Z
zhiru 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343
    has = false;

    try {
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

            Query hasNonIndexFilesQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
            hasNonIndexFilesQuery << "SELECT EXISTS " <<
Z
fix  
zhiru 已提交
344 345 346 347
                                  "(SELECT 1 FROM TableFiles " <<
                                  "WHERE table_id = " << quote << table_id << " AND " <<
                                  "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                                  "file_type = " << std::to_string(TableFileSchema::NEW) << " OR " <<
348 349
                                  "file_type = " << std::to_string(TableFileSchema::NEW_MERGE) << " OR " <<
                                  "file_type = " << std::to_string(TableFileSchema::NEW_INDEX) << " OR " <<
Z
fix  
zhiru 已提交
350 351
                                  "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ")) " <<
                                  "AS " << quote << "check" << ";";
Z
zhiru 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasNonIndexFiles: " << hasNonIndexFilesQuery.str();

            res = hasNonIndexFilesQuery.store();
        } //Scoped Connection

        int check = res[0]["check"];
        has = (check == 1);

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN CHECKING IF NON INDEX FILES EXISTS" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF NON INDEX FILES EXISTS", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CHECKING IF NON INDEX FILES EXISTS" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF NON INDEX FILES EXISTS", er.what());
    }

371 372
    return Status::OK();
}
373

374
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
S
starlord 已提交
375
    try {
Y
Yu Kun 已提交
376
        server::MetricCollector metric;
S
starlord 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

            Query updateTableIndexParamQuery = connectionPtr->query();
            updateTableIndexParamQuery << "SELECT id, state, dimension, created_on " <<
                                       "FROM Tables " <<
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();

            StoreQueryResult res = updateTableIndexParamQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                size_t id = resRow["id"];
                int32_t state = resRow["state"];
                uint16_t dimension = resRow["dimension"];
                int64_t created_on = resRow["created_on"];

                updateTableIndexParamQuery << "UPDATE Tables " <<
                                           "SET id = " << id << ", " <<
                                           "state = " << state << ", " <<
                                           "dimension = " << dimension << ", " <<
                                           "created_on = " << created_on << ", " <<
                                           "engine_type_ = " << index.engine_type_ << ", " <<
                                           "nlist = " << index.nlist_ << ", " <<
S
starlord 已提交
410
                                           "metric_type = " << index.metric_type_ << " " <<
S
starlord 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
                                           "WHERE id = " << quote << table_id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();


                if (!updateTableIndexParamQuery.exec()) {
                    ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE INDEX PARAM";
                    return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM",
                                                      updateTableIndexParamQuery.error());
                }
            } else {
                return Status::NotFound("Table " + table_id + " not found");
            }

        } //Scoped Connection

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE INDEX PARAM" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", er.what());
    }

437 438 439
    return Status::OK();
}

S
starlord 已提交
440 441
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    try {
Y
Yu Kun 已提交
442
        server::MetricCollector metric;
S
starlord 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

            Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE Tables " <<
                                 "SET flag = " << flag << " " <<
                                 "WHERE id = " << quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();


            if (!updateTableFlagQuery.exec()) {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FLAG";
                return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FLAG",
                                                  updateTableFlagQuery.error());
            }

        } //Scoped Connection

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FLAG" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FLAG", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FLAG" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FLAG", er.what());
    }

    return Status::OK();
}

480
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
S
starlord 已提交
481
    try {
Y
Yu Kun 已提交
482
        server::MetricCollector metric;
483

S
starlord 已提交
484 485
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
486

S
starlord 已提交
487 488 489 490 491 492 493 494 495
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

            Query describeTableIndexQuery = connectionPtr->query();
            describeTableIndexQuery << "SELECT engine_type, nlist, index_file_size, metric_type " <<
                                       "FROM Tables " <<
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
496

S
starlord 已提交
497
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
Z
update  
zhiru 已提交
498

S
starlord 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511
            StoreQueryResult res = describeTableIndexQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                index.engine_type_ = resRow["engine_type"];
                index.nlist_ = resRow["nlist"];
                index.metric_type_ = resRow["metric_type"];
            } else {
                return Status::NotFound("Table " + table_id + " not found");
            }

        } //Scoped Connection
Z
update  
zhiru 已提交
512

S
starlord 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBE TABLE INDEX" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBE TABLE INDEX", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBE TABLE INDEX" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBE TABLE INDEX", er.what());
    }

    return Status::OK();
}

Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
528
        server::MetricCollector metric;
Z
update  
zhiru 已提交
529

530 531
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
532

533 534 535
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
536

S
starlord 已提交
537 538
            Query dropTableIndexQuery = connectionPtr->query();

539
            //soft delete index files
S
starlord 已提交
540 541 542 543 544 545 546 547 548 549 550 551 552 553
            dropTableIndexQuery << "UPDATE TableFiles " <<
                                "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::INDEX) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROP TABLE INDEX";
                return Status::DBTransactionError("QUERY ERROR WHEN DROP TABLE INDEX",
                                                  dropTableIndexQuery.error());
            }

554
            //set all backup file to raw
S
starlord 已提交
555 556 557 558 559 560 561 562 563
            dropTableIndexQuery << "UPDATE TableFiles " <<
                                "SET file_type = " << std::to_string(TableFileSchema::RAW) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROP TABLE INDEX";
                return Status::DBTransactionError("QUERY ERROR WHEN DROP TABLE INDEX",
                                                  dropTableIndexQuery.error());
            }

            //set table index type to raw
            dropTableIndexQuery << "UPDATE Tables " <<
                                "SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE) << "," <<
                                "nlist = " << std::to_string(DEFAULT_NLIST) << " " <<
                                "metric_type = " << std::to_string(DEFAULT_METRIC_TYPE) << " " <<
                                "WHERE table_id = " << quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROP TABLE INDEX";
                return Status::DBTransactionError("QUERY ERROR WHEN DROP TABLE INDEX",
                                                  dropTableIndexQuery.error());
            }

        } //Scoped Connection

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROP TABLE INDEX" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DROP TABLE INDEX", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROP TABLE INDEX" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DROP TABLE INDEX", er.what());
    }
595

S
starlord 已提交
596 597
    return Status::OK();
}
Z
update  
zhiru 已提交
598

S
starlord 已提交
599 600
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
601
        server::MetricCollector metric;
S
starlord 已提交
602 603
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
604

S
starlord 已提交
605 606 607
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
zhiru 已提交
608

609 610 611 612 613 614
            //soft delete table
            Query deleteTableQuery = connectionPtr->query();
//
            deleteTableQuery << "UPDATE Tables " <<
                             "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " <<
                             "WHERE table_id = " << quote << table_id << ";";
Z
update  
zhiru 已提交
615

616
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
617

618 619 620 621
            if (!deleteTableQuery.exec()) {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE";
                return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
            }
622

623
        } //Scoped Connection
Z
zhiru 已提交
624

625 626 627
        if (mode_ == Options::MODE::CLUSTER) {
            DeleteTableFiles(table_id);
        }
Z
update  
zhiru 已提交
628

629 630 631 632 633 634 635 636 637
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE", er.what());
    }
Z
update  
zhiru 已提交
638

639 640
    return Status::OK();
}
Z
update  
zhiru 已提交
641

642 643
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
644
        server::MetricCollector metric;
645 646 647 648 649 650
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
651

652 653 654 655 656 657 658 659
            //soft delete table files
            Query deleteTableFilesQuery = connectionPtr->query();
            //
            deleteTableFilesQuery << "UPDATE TableFiles " <<
                                  "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                                  "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                                  "WHERE table_id = " << quote << table_id << " AND " <<
                                  "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
660

661
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
662

663 664 665
            if (!deleteTableFilesQuery.exec()) {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES";
                return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error());
666
            }
667 668 669 670 671 672 673 674 675
        } //Scoped Connection
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE FILES", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE FILES" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE FILES", er.what());
Z
update  
zhiru 已提交
676 677
    }

678 679
    return Status::OK();
}
Z
zhiru 已提交
680

681 682
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
683
        server::MetricCollector metric;
684 685 686
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
687

688 689 690
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
zhiru 已提交
691

692
            Query describeTableQuery = connectionPtr->query();
S
starlord 已提交
693
            describeTableQuery << "SELECT id, state, dimension, engine_type, nlist, index_file_size, metric_type " <<
694 695 696 697 698
                               "FROM Tables " <<
                               "WHERE table_id = " << quote << table_schema.table_id_ << " " <<
                               "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
Z
update  
zhiru 已提交
699

700 701
            res = describeTableQuery.store();
        } //Scoped Connection
702

703 704
        if (res.num_rows() == 1) {
            const Row &resRow = res[0];
705

706
            table_schema.id_ = resRow["id"]; //implicit conversion
Z
update  
zhiru 已提交
707

S
starlord 已提交
708 709
            table_schema.state_ = resRow["state"];

710
            table_schema.dimension_ = resRow["dimension"];
711

712 713
            table_schema.index_file_size_ = resRow["index_file_size"];

714
            table_schema.engine_type_ = resRow["engine_type"];
S
starlord 已提交
715 716 717 718

            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];
719 720
        } else {
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
721
        }
Z
update  
zhiru 已提交
722

723 724 725 726 727 728 729 730
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING TABLE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING TABLE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING TABLE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING TABLE", er.what());
Z
update  
zhiru 已提交
731 732
    }

733 734
    return Status::OK();
}
Z
zhiru 已提交
735

736 737
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
    try {
Y
Yu Kun 已提交
738
        server::MetricCollector metric;
739 740 741
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
742

743 744 745
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
746

747 748 749 750 751 752 753
            Query hasTableQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
            hasTableQuery << "SELECT EXISTS " <<
                          "(SELECT 1 FROM Tables " <<
                          "WHERE table_id = " << quote << table_id << " " <<
                          "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                          "AS " << quote << "check" << ";";
Z
update  
zhiru 已提交
754

755
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
756

757 758
            res = hasTableQuery.store();
        } //Scoped Connection
759

760 761
        int check = res[0]["check"];
        has_or_not = (check == 1);
762

763 764 765 766 767 768 769 770 771
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF TABLE EXISTS", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", er.what());
    }
772

773 774
    return Status::OK();
}
775

776 777
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
    try {
Y
Yu Kun 已提交
778
        server::MetricCollector metric;
779 780 781
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
782

783 784
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
785
            }
Z
update  
zhiru 已提交
786

787
            Query allTablesQuery = connectionPtr->query();
S
starlord 已提交
788
            allTablesQuery << "SELECT id, table_id, dimension, engine_type, nlist, index_file_size, metric_type " <<
789 790
                           "FROM Tables " <<
                           "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
zhiru 已提交
791

792
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
Z
zhiru 已提交
793

794 795
            res = allTablesQuery.store();
        } //Scoped Connection
796

797 798
        for (auto &resRow : res) {
            TableSchema table_schema;
Z
update  
zhiru 已提交
799

800
            table_schema.id_ = resRow["id"]; //implicit conversion
801

802 803 804
            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_schema.table_id_ = table_id;
805

806
            table_schema.dimension_ = resRow["dimension"];
807

808 809
            table_schema.index_file_size_ = resRow["index_file_size"];

810
            table_schema.engine_type_ = resRow["engine_type"];
811

S
starlord 已提交
812 813 814 815
            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];

816 817 818 819 820 821 822 823 824 825 826
            table_schema_array.emplace_back(table_schema);
        }
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING ALL TABLES", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING ALL TABLES", er.what());
    }
Z
update  
zhiru 已提交
827

828 829
    return Status::OK();
}
830

831 832 833 834 835 836 837 838 839 840
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
841

842
    try {
Y
Yu Kun 已提交
843
        server::MetricCollector metric;
844 845 846

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
847 848
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
849 850
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
851
        file_schema.index_file_size_ = table_schema.index_file_size_;
852
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
853 854
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
855 856 857 858 859 860 861
        utils::GetTableFilePath(options_, file_schema);

        std::string id = "NULL"; //auto-increment
        std::string table_id = file_schema.table_id_;
        std::string engine_type = std::to_string(file_schema.engine_type_);
        std::string file_id = file_schema.file_id_;
        std::string file_type = std::to_string(file_schema.file_type_);
862
        std::string row_count = std::to_string(file_schema.row_count_);
863 864 865 866 867 868
        std::string updated_time = std::to_string(file_schema.updated_time_);
        std::string created_on = std::to_string(file_schema.created_on_);
        std::string date = std::to_string(file_schema.date_);

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
869

870 871 872
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
873

874
            Query createTableFileQuery = connectionPtr->query();
875

876 877
            createTableFileQuery << "INSERT INTO TableFiles VALUES" <<
                                 "(" << id << ", " << quote << table_id << ", " << engine_type << ", " <<
878
                                 quote << file_id << ", " << file_type << ", " << row_count << ", " <<
879 880 881 882 883 884 885 886 887 888 889
                                 updated_time << ", " << created_on << ", " << date << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();

            if (SimpleResult res = createTableFileQuery.execute()) {
                file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?

                //Consume all results to avoid "Commands out of sync" error
            } else {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE";
                return Status::DBTransactionError("Add file Error", createTableFileQuery.error());
890
            }
891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906
        } // Scoped Connection

        return utils::CreateTableFilePath(options_, file_schema);

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE FILE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE FILE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what());
    } catch (std::exception &ex) {
        return HandleException("Encounter exception when create table file", ex);
    }
}
907

908 909
Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
    files.clear();
910

911
    try {
Y
Yu Kun 已提交
912
        server::MetricCollector metric;
913 914 915 916 917 918 919
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
920

921
            Query filesToIndexQuery = connectionPtr->query();
S
starlord 已提交
922
            filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
923 924
                              "FROM TableFiles " <<
                              "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
925

926
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
927

928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948
            res = filesToIndexQuery.store();
        } //Scoped Connection

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_file.table_id_ = table_id;

            table_file.engine_type_ = resRow["engine_type"];

            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
949 950
            table_file.file_size_ = resRow["file_size"];

951
            table_file.row_count_ = resRow["row_count"];
952 953 954

            table_file.date_ = resRow["date"];

S
starlord 已提交
955 956
            table_file.created_on_ = resRow["created_on"];

957 958 959 960 961 962 963
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
964
                }
965
                groups[table_file.table_id_] = table_schema;
966 967

            }
968
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
969
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
970
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
971
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
Z
update  
zhiru 已提交
972

973 974 975 976 977 978 979 980 981 982 983 984
            utils::GetTableFilePath(options_, table_file);

            files.push_back(table_file);
        }
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO INDEX", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", er.what());
Z
update  
zhiru 已提交
985 986
    }

987 988 989 990 991
    return Status::OK();
}

Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
                                    const DatesT &partition,
Z
update  
zhiru 已提交
992
                                    DatePartionedTableFilesSchema &files) {
993
    files.clear();
994

995
    try {
Y
Yu Kun 已提交
996
        server::MetricCollector metric;
997 998 999
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1000

1001 1002 1003
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
1004

1005
            if (partition.empty()) {
Z
update  
zhiru 已提交
1006

1007
                Query filesToSearchQuery = connectionPtr->query();
S
starlord 已提交
1008
                filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
1009 1010 1011 1012 1013
                                   "FROM TableFiles " <<
                                   "WHERE table_id = " << quote << table_id << " AND " <<
                                   "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                                   "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                                   "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
1014

1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

                res = filesToSearchQuery.store();

            } else {

                Query filesToSearchQuery = connectionPtr->query();

                std::stringstream partitionListSS;
                for (auto &date : partition) {
                    partitionListSS << std::to_string(date) << ", ";
                }
                std::string partitionListStr = partitionListSS.str();
                partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "

S
starlord 已提交
1030
                filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
                                   "FROM TableFiles " <<
                                   "WHERE table_id = " << quote << table_id << " AND " <<
                                   "date IN (" << partitionListStr << ") AND " <<
                                   "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                                   "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                                   "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

                res = filesToSearchQuery.store();
1041 1042

            }
1043
        } //Scoped Connection
1044

1045 1046 1047 1048 1049 1050
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }
1051

1052 1053
        TableFileSchema table_file;
        for (auto &resRow : res) {
1054

1055
            table_file.id_ = resRow["id"]; //implicit conversion
1056

1057 1058 1059
            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;
1060

1061 1062
            table_file.index_file_size_ = table_schema.index_file_size_;

1063
            table_file.engine_type_ = resRow["engine_type"];
1064

S
starlord 已提交
1065 1066
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1067 1068
            table_file.metric_type_ = table_schema.metric_type_;

1069 1070 1071
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;
1072

1073
            table_file.file_type_ = resRow["file_type"];
1074

S
starlord 已提交
1075 1076
            table_file.file_size_ = resRow["file_size"];

1077
            table_file.row_count_ = resRow["row_count"];
1078

1079
            table_file.date_ = resRow["date"];
1080

1081 1082 1083
            table_file.dimension_ = table_schema.dimension_;

            utils::GetTableFilePath(options_, table_file);
X
xj.lin 已提交
1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111

            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }

            files[table_file.date_].push_back(table_file);
        }
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
    }

    return Status::OK();
}

Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    const DatesT &partition,
                                    DatePartionedTableFilesSchema &files) {
    files.clear();

    try {
Y
Yu Kun 已提交
1112
        server::MetricCollector metric;
X
xj.lin 已提交
1113 1114 1115 1116 1117 1118 1119 1120 1121
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

            Query filesToSearchQuery = connectionPtr->query();
S
starlord 已提交
1122
            filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
X
xj.lin 已提交
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
                               "FROM TableFiles " <<
                               "WHERE table_id = " << quote << table_id;

            if (!partition.empty()) {
                std::stringstream partitionListSS;
                for (auto &date : partition) {
                    partitionListSS << std::to_string(date) << ", ";
                }
                std::string partitionListStr = partitionListSS.str();

                partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
                filesToSearchQuery << " AND " << "date IN (" << partitionListStr << ")";
            }

            if (!ids.empty()) {
                std::stringstream idSS;
                for (auto &id : ids) {
                    idSS << "id = " << std::to_string(id) << " OR ";
                }
                std::string idStr = idSS.str();
                idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "

                filesToSearchQuery  << " AND " << "(" << idStr << ")";

            }
            // End
            filesToSearchQuery << " AND " <<
                               "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

            res = filesToSearchQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;

1175 1176
            table_file.index_file_size_ = table_schema.index_file_size_;

X
xj.lin 已提交
1177 1178
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1179 1180
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1181 1182
            table_file.metric_type_ = table_schema.metric_type_;

X
xj.lin 已提交
1183 1184 1185 1186 1187 1188
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
1189 1190
            table_file.file_size_ = resRow["file_size"];

1191
            table_file.row_count_ = resRow["row_count"];
X
xj.lin 已提交
1192 1193 1194 1195 1196 1197

            table_file.date_ = resRow["date"];

            table_file.dimension_ = table_schema.dimension_;

            utils::GetTableFilePath(options_, table_file);
1198

1199 1200 1201
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1202 1203
            }

1204
            files[table_file.date_].push_back(table_file);
1205
        }
1206 1207 1208 1209 1210 1211 1212 1213
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
Z
update  
zhiru 已提交
1214 1215
    }

1216 1217
    return Status::OK();
}
Z
update  
zhiru 已提交
1218

1219 1220 1221
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
                                   DatePartionedTableFilesSchema &files) {
    files.clear();
Z
update  
zhiru 已提交
1222

1223
    try {
Y
Yu Kun 已提交
1224
        server::MetricCollector metric;
S
starlord 已提交
1225 1226 1227 1228 1229 1230 1231 1232 1233

        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

1234 1235 1236
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1237

1238 1239 1240
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
1241

1242
            Query filesToMergeQuery = connectionPtr->query();
S
starlord 已提交
1243
            filesToMergeQuery << "SELECT id, table_id, file_id, file_type, file_size, row_count, date, engine_type, create_on " <<
1244 1245 1246
                              "FROM TableFiles " <<
                              "WHERE table_id = " << quote << table_id << " AND " <<
                              "file_type = " << std::to_string(TableFileSchema::RAW) << " " <<
1247
                              "ORDER BY row_count DESC" << ";";
Z
update  
zhiru 已提交
1248

1249
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
1250

1251 1252
            res = filesToMergeQuery.store();
        } //Scoped Connection
1253

1254
        for (auto &resRow : res) {
S
starlord 已提交
1255 1256 1257 1258 1259
            TableFileSchema table_file;
            table_file.file_size_ = resRow["file_size"];
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }
Z
update  
zhiru 已提交
1260

1261
            table_file.id_ = resRow["id"]; //implicit conversion
1262

1263 1264 1265
            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;
Z
update  
zhiru 已提交
1266

1267 1268 1269
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;
1270

1271
            table_file.file_type_ = resRow["file_type"];
1272

S
starlord 已提交
1273 1274
            table_file.row_count_ = resRow["row_count"];

1275
            table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1276

1277 1278
            table_file.index_file_size_ = table_schema.index_file_size_;

S
starlord 已提交
1279 1280
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1281 1282
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1283 1284
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1285 1286
            table_file.created_on_ = resRow["created_on"];

1287
            table_file.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1288

1289
            utils::GetTableFilePath(options_, table_file);
Z
update  
zhiru 已提交
1290

1291 1292 1293
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1294
            }
1295 1296

            files[table_file.date_].push_back(table_file);
1297
        }
Z
update  
zhiru 已提交
1298

1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO MERGE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", er.what());
    }

    return Status::OK();
}

Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    TableFilesSchema &table_files) {
    if (ids.empty()) {
Z
update  
zhiru 已提交
1316 1317 1318
        return Status::OK();
    }

1319 1320 1321 1322 1323 1324
    std::stringstream idSS;
    for (auto &id : ids) {
        idSS << "id = " << std::to_string(id) << " OR ";
    }
    std::string idStr = idSS.str();
    idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "
Z
zhiru 已提交
1325

1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
    try {
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }


            Query getTableFileQuery = connectionPtr->query();
S
starlord 已提交
1337
            getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
                              "FROM TableFiles " <<
                              "WHERE table_id = " << quote << table_id << " AND " <<
                              "(" << idStr << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();

            res = getTableFileQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
1352 1353
        }

1354
        for (auto &resRow : res) {
Z
zhiru 已提交
1355

1356
            TableFileSchema file_schema;
1357

1358
            file_schema.id_ = resRow["id"];
Z
zhiru 已提交
1359

1360
            file_schema.table_id_ = table_id;
Z
update  
zhiru 已提交
1361

1362 1363
            file_schema.index_file_size_ = table_schema.index_file_size_;

1364
            file_schema.engine_type_ = resRow["engine_type"];
Z
update  
zhiru 已提交
1365

S
starlord 已提交
1366 1367
            file_schema.nlist_ = table_schema.nlist_;

S
starlord 已提交
1368 1369
            file_schema.metric_type_ = table_schema.metric_type_;

1370 1371 1372
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            file_schema.file_id_ = file_id;
Z
update  
zhiru 已提交
1373

1374
            file_schema.file_type_ = resRow["file_type"];
Z
update  
zhiru 已提交
1375

1376 1377 1378
            file_schema.file_size_ = resRow["file_size"];

            file_schema.row_count_ = resRow["row_count"];
1379

1380
            file_schema.date_ = resRow["date"];
1381

S
starlord 已提交
1382 1383
            file_schema.created_on_ = resRow["created_on"];

1384
            file_schema.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1385

1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
            utils::GetTableFilePath(options_, file_schema);

            table_files.emplace_back(file_schema);
        }
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING TABLE FILES", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING TABLE FILES", er.what());
Z
update  
zhiru 已提交
1398 1399
    }

1400 1401
    return Status::OK();
}
Z
zhiru 已提交
1402

1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
// PXU TODO: Support Swap
Status MySQLMetaImpl::Archive() {
    auto &criterias = options_.archive_conf.GetCriterias();
    if (criterias.empty()) {
        return Status::OK();
    }

    for (auto &kv : criterias) {
        auto &criteria = kv.first;
        auto &limit = kv.second;
        if (criteria == "days") {
            size_t usecs = limit * D_SEC * US_PS;
            long now = utils::GetMicroSecTimeStamp();

            try {
Z
update  
zhiru 已提交
1418
                ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1419

Z
update  
zhiru 已提交
1420 1421 1422 1423
                if (connectionPtr == nullptr) {
                    return Status::Error("Failed to connect to database server");
                }

1424 1425 1426 1427 1428
                Query archiveQuery = connectionPtr->query();
                archiveQuery << "UPDATE TableFiles " <<
                             "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                             "WHERE created_on < " << std::to_string(now - usecs) << " AND " <<
                             "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1429

1430
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();
Z
update  
zhiru 已提交
1431

1432 1433 1434
                if (!archiveQuery.exec()) {
                    return Status::DBTransactionError("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
                }
1435

1436 1437 1438 1439 1440 1441 1442 1443
            } catch (const BadQuery &er) {
                // Handle any query errors
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DURING ARCHIVE" << ": " << er.what();
                return Status::DBTransactionError("QUERY ERROR WHEN DURING ARCHIVE", er.what());
            } catch (const Exception &er) {
                // Catch-all for any other MySQL++ exceptions
                ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DURING ARCHIVE" << ": " << er.what();
                return Status::DBTransactionError("GENERAL ERROR WHEN DURING ARCHIVE", er.what());
Z
zhiru 已提交
1444
            }
1445
        }
1446 1447 1448
        if (criteria == "disk") {
            uint64_t sum = 0;
            Size(sum);
Z
update  
zhiru 已提交
1449

1450 1451 1452
            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);
        }
Z
update  
zhiru 已提交
1453 1454
    }

1455 1456
    return Status::OK();
}
Z
zhiru 已提交
1457

1458 1459
Status MySQLMetaImpl::Size(uint64_t &result) {
    result = 0;
1460

S
starlord 已提交
1461
    try {
1462 1463 1464
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1465

1466 1467 1468
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
zhiru 已提交
1469

Z
update  
zhiru 已提交
1470

1471
            Query getSizeQuery = connectionPtr->query();
1472
            getSizeQuery << "SELECT IFNULL(SUM(file_size),0) AS sum " <<
1473 1474
                         "FROM TableFiles " <<
                         "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1475

1476
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
Z
update  
zhiru 已提交
1477

1478 1479
            res = getSizeQuery.store();
        } //Scoped Connection
Z
update  
zhiru 已提交
1480

1481 1482 1483 1484 1485
        if (res.empty()) {
            result = 0;
        } else {
            result = res[0]["sum"];
        }
Z
update  
zhiru 已提交
1486

1487 1488 1489 1490 1491 1492 1493 1494 1495
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING SIZE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING SIZE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING SIZE", er.what());
    }
1496

1497 1498
    return Status::OK();
}
1499

1500 1501 1502 1503
Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
    if (to_discard_size <= 0) {

        return Status::OK();
Z
update  
zhiru 已提交
1504
    }
1505
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
Z
update  
zhiru 已提交
1506

1507
    try {
Y
Yu Kun 已提交
1508
        server::MetricCollector metric;
1509 1510 1511
        bool status;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1512

1513 1514 1515
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
zhiru 已提交
1516

1517
            Query discardFilesQuery = connectionPtr->query();
1518
            discardFilesQuery << "SELECT id, file_size " <<
1519 1520 1521 1522
                              "FROM TableFiles " <<
                              "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                              "ORDER BY id ASC " <<
                              "LIMIT 10;";
Z
update  
zhiru 已提交
1523

1524
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
1525

1526 1527 1528 1529
            StoreQueryResult res = discardFilesQuery.store();
            if (res.num_rows() == 0) {
                return Status::OK();
            }
1530

1531 1532 1533 1534 1535
            TableFileSchema table_file;
            std::stringstream idsToDiscardSS;
            for (auto &resRow : res) {
                if (to_discard_size <= 0) {
                    break;
Z
update  
zhiru 已提交
1536
                }
1537
                table_file.id_ = resRow["id"];
1538
                table_file.file_size_ = resRow["file_size"];
1539 1540
                idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
1541 1542
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
1543
            }
Z
update  
zhiru 已提交
1544

1545 1546
            std::string idsToDiscardStr = idsToDiscardSS.str();
            idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR "
1547

1548 1549 1550 1551
            discardFilesQuery << "UPDATE TableFiles " <<
                              "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                              "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                              "WHERE " << idsToDiscardStr << ";";
1552

1553
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
Z
update  
zhiru 已提交
1554

1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
            status = discardFilesQuery.exec();
            if (!status) {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES";
                return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
            }
        } //Scoped Connection

        return DiscardFiles(to_discard_size);

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DISCARDING FILES" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DISCARDING FILES", er.what());
P
peng.xu 已提交
1572
    }
1573
}
P
peng.xu 已提交
1574

1575 1576 1577
//ZR: this function assumes all fields in file_schema have value
Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1578

S
starlord 已提交
1579
    try {
Y
Yu Kun 已提交
1580
        server::MetricCollector metric;
1581 1582
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1583

1584 1585 1586
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
1587

1588
            Query updateTableFileQuery = connectionPtr->query();
Z
update  
zhiru 已提交
1589

1590 1591 1592 1593
            //if the table has been deleted, just mark the table file as TO_DELETE
            //clean thread will delete the file later
            updateTableFileQuery << "SELECT state FROM Tables " <<
                                 "WHERE table_id = " << quote << file_schema.table_id_ << ";";
Z
update  
zhiru 已提交
1594

1595
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
Z
update  
zhiru 已提交
1596

1597
            StoreQueryResult res = updateTableFileQuery.store();
1598

1599 1600 1601 1602
            if (res.num_rows() == 1) {
                int state = res[0]["state"];
                if (state == TableSchema::TO_DELETE) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
Z
update  
zhiru 已提交
1603
                }
1604 1605 1606 1607 1608 1609 1610 1611 1612
            } else {
                file_schema.file_type_ = TableFileSchema::TO_DELETE;
            }

            std::string id = std::to_string(file_schema.id_);
            std::string table_id = file_schema.table_id_;
            std::string engine_type = std::to_string(file_schema.engine_type_);
            std::string file_id = file_schema.file_id_;
            std::string file_type = std::to_string(file_schema.file_type_);
1613 1614
            std::string file_size = std::to_string(file_schema.file_size_);
            std::string row_count = std::to_string(file_schema.row_count_);
1615 1616 1617
            std::string updated_time = std::to_string(file_schema.updated_time_);
            std::string created_on = std::to_string(file_schema.created_on_);
            std::string date = std::to_string(file_schema.date_);
Z
update  
zhiru 已提交
1618

1619 1620 1621 1622 1623
            updateTableFileQuery << "UPDATE TableFiles " <<
                                 "SET table_id = " << quote << table_id << ", " <<
                                 "engine_type = " << engine_type << ", " <<
                                 "file_id = " << quote << file_id << ", " <<
                                 "file_type = " << file_type << ", " <<
1624 1625
                                 "file_size = " << file_size << ", " <<
                                 "row_count = " << row_count << ", " <<
1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654
                                 "updated_time = " << updated_time << ", " <<
                                 "created_on = " << created_on << ", " <<
                                 "date = " << date << " " <<
                                 "WHERE id = " << id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();


            if (!updateTableFileQuery.exec()) {
                ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE";
                return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE",
                                                  updateTableFileQuery.error());
            }
        } //Scoped Connection

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILE", er.what());
    }
    return Status::OK();
}
1655

1656 1657 1658
Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
    try {
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1659

1660 1661
        if (connectionPtr == nullptr) {
            return Status::Error("Failed to connect to database server");
1662
        }
Z
update  
zhiru 已提交
1663

1664
        Query updateTableFilesToIndexQuery = connectionPtr->query();
Z
zhiru 已提交
1665

1666
        updateTableFilesToIndexQuery << "UPDATE TableFiles " <<
Z
fix  
zhiru 已提交
1667 1668 1669
                                     "SET file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " " <<
                                     "WHERE table_id = " << quote << table_id << " AND " <<
                                     "file_type = " << std::to_string(TableFileSchema::RAW) << ";";
1670

Z
zhiru 已提交
1671
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();
Z
update  
zhiru 已提交
1672

Z
fix  
zhiru 已提交
1673 1674 1675 1676 1677 1678
        if (!updateTableFilesToIndexQuery.exec()) {
            ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE";
            return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE",
                                              updateTableFilesToIndexQuery.error());
        }

1679 1680 1681 1682 1683 1684 1685 1686 1687
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES TO INDEX" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES TO INDEX", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", er.what());
    }
Z
update  
zhiru 已提交
1688

1689 1690
    return Status::OK();
}
Z
zhiru 已提交
1691

1692 1693
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
    try {
Y
Yu Kun 已提交
1694
        server::MetricCollector metric;
1695 1696 1697 1698 1699 1700
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
update  
zhiru 已提交
1701

1702
            Query updateTableFilesQuery = connectionPtr->query();
1703

1704 1705
            std::map<std::string, bool> has_tables;
            for (auto &file_schema : files) {
1706

1707 1708 1709
                if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
                    continue;
                }
1710

1711 1712 1713 1714 1715
                updateTableFilesQuery << "SELECT EXISTS " <<
                                      "(SELECT 1 FROM Tables " <<
                                      "WHERE table_id = " << quote << file_schema.table_id_ << " " <<
                                      "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                                      "AS " << quote << "check" << ";";
1716

1717
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
1718

1719
                StoreQueryResult res = updateTableFilesQuery.store();
1720

1721 1722 1723
                int check = res[0]["check"];
                has_tables[file_schema.table_id_] = (check == 1);
            }
1724

1725
            for (auto &file_schema : files) {
1726

1727 1728
                if (!has_tables[file_schema.table_id_]) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
1729
                }
1730
                file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1731

1732 1733 1734 1735 1736
                std::string id = std::to_string(file_schema.id_);
                std::string table_id = file_schema.table_id_;
                std::string engine_type = std::to_string(file_schema.engine_type_);
                std::string file_id = file_schema.file_id_;
                std::string file_type = std::to_string(file_schema.file_type_);
1737 1738
                std::string file_size = std::to_string(file_schema.file_size_);
                std::string row_count = std::to_string(file_schema.row_count_);
1739 1740 1741
                std::string updated_time = std::to_string(file_schema.updated_time_);
                std::string created_on = std::to_string(file_schema.created_on_);
                std::string date = std::to_string(file_schema.date_);
Z
fix  
zhiru 已提交
1742

1743 1744 1745 1746 1747
                updateTableFilesQuery << "UPDATE TableFiles " <<
                                      "SET table_id = " << quote << table_id << ", " <<
                                      "engine_type = " << engine_type << ", " <<
                                      "file_id = " << quote << file_id << ", " <<
                                      "file_type = " << file_type << ", " <<
1748 1749
                                      "file_size = " << file_size << ", " <<
                                      "row_count = " << row_count << ", " <<
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773
                                      "updated_time = " << updated_time << ", " <<
                                      "created_on = " << created_on << ", " <<
                                      "date = " << date << " " <<
                                      "WHERE id = " << id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();

                if (!updateTableFilesQuery.exec()) {
                    ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES";
                    return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES",
                                                      updateTableFilesQuery.error());
                }
            }
        } //Scoped Connection

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILES" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILES", er.what());
    }
S
starlord 已提交
1774

1775 1776
    return Status::OK();
}
Z
fix  
zhiru 已提交
1777

1778 1779
Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1780 1781 1782
    std::set<std::string> table_ids;

    //remove to_delete files
1783
    try {
Y
Yu Kun 已提交
1784
        server::MetricCollector metric;
1785

1786 1787
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1788

1789 1790 1791
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }
Z
zhiru 已提交
1792

1793 1794 1795 1796 1797
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
            cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " <<
                                     "FROM TableFiles " <<
                                     "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " <<
                                     "updated_time < " << std::to_string(now - seconds * US_PS) << ";";
Z
update  
zhiru 已提交
1798

1799
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1800

1801
            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1802

1803 1804
            TableFileSchema table_file;
            std::vector<std::string> idsToDelete;
1805

1806
            for (auto &resRow : res) {
1807

1808
                table_file.id_ = resRow["id"]; //implicit conversion
1809

1810 1811 1812
                std::string table_id;
                resRow["table_id"].to_string(table_id);
                table_file.table_id_ = table_id;
Z
fix  
zhiru 已提交
1813

1814 1815 1816
                std::string file_id;
                resRow["file_id"].to_string(file_id);
                table_file.file_id_ = file_id;
Z
update  
zhiru 已提交
1817

1818
                table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1819

1820 1821
                utils::DeleteTableFilePath(options_, table_file);

S
starlord 已提交
1822
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
1823 1824

                idsToDelete.emplace_back(std::to_string(table_file.id_));
S
starlord 已提交
1825 1826

                table_ids.insert(table_file.table_id_);
1827 1828 1829 1830 1831 1832 1833
            }

            if (!idsToDelete.empty()) {

                std::stringstream idsToDeleteSS;
                for (auto &id : idsToDelete) {
                    idsToDeleteSS << "id = " << id << " OR ";
1834
                }
1835

1836 1837 1838 1839
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
                cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " <<
                                         idsToDeleteStr << ";";
1840

1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                if (!cleanUpFilesWithTTLQuery.exec()) {
                    ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL";
                    return Status::DBTransactionError("CleanUpFilesWithTTL Error",
                                                      cleanUpFilesWithTTLQuery.error());
                }
            }
        } //Scoped Connection

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
Z
update  
zhiru 已提交
1859
    }
1860

S
starlord 已提交
1861
    //remove to_delete tables
1862
    try {
Y
Yu Kun 已提交
1863
        server::MetricCollector metric;
1864 1865

        {
Z
update  
zhiru 已提交
1866
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1867

Z
update  
zhiru 已提交
1868 1869 1870 1871
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

1872 1873 1874 1875
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
            cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
                                     "FROM Tables " <<
                                     "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
1876

1877 1878 1879
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1880

Z
update  
zhiru 已提交
1881 1882
            if (!res.empty()) {

1883 1884 1885 1886 1887
                std::stringstream idsToDeleteSS;
                for (auto &resRow : res) {
                    size_t id = resRow["id"];
                    std::string table_id;
                    resRow["table_id"].to_string(table_id);
Z
update  
zhiru 已提交
1888

S
starlord 已提交
1889
                    utils::DeleteTablePath(options_, table_id, false);//only delete empty folder
1890 1891

                    idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
Z
update  
zhiru 已提交
1892
                }
1893 1894 1895 1896
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
                cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " <<
                                         idsToDeleteStr << ";";
1897

1898
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1899

1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915
                if (!cleanUpFilesWithTTLQuery.exec()) {
                    ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL";
                    return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL",
                                                      cleanUpFilesWithTTLQuery.error());
                }
            }
        } //Scoped Connection

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
Z
update  
zhiru 已提交
1916 1917
    }

S
starlord 已提交
1918 1919 1920
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1921
        server::MetricCollector metric;
S
starlord 已提交
1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

            for(auto& table_id : table_ids) {
                Query cleanUpFilesWithTTLQuery = connectionPtr->query();
                cleanUpFilesWithTTLQuery << "SELECT file_id " <<
                                         "FROM TableFiles " <<
S
starlord 已提交
1934
                                         "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
1935 1936 1937 1938 1939 1940 1941 1942 1943 1944

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                StoreQueryResult res = cleanUpFilesWithTTLQuery.store();

                if (res.empty()) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }
        }
S
starlord 已提交
1945 1946 1947 1948
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what());
S
starlord 已提交
1949 1950 1951 1952 1953 1954
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", er.what());
    }

1955 1956
    return Status::OK();
}
1957

1958 1959 1960
Status MySQLMetaImpl::CleanUp() {
    try {
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1961

1962 1963 1964
        if (connectionPtr == nullptr) {
            return Status::Error("Failed to connect to database server");
        }
1965

1966 1967 1968 1969 1970
        Query cleanUpQuery = connectionPtr->query();
        cleanUpQuery << "SELECT table_name " <<
                     "FROM information_schema.tables " <<
                     "WHERE table_schema = " << quote << mysql_connection_pool_->getDB() << " " <<
                     "AND table_name = " << quote << "TableFiles" << ";";
Z
update  
zhiru 已提交
1971

1972
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
Z
update  
zhiru 已提交
1973

1974
        StoreQueryResult res = cleanUpQuery.store();
Z
update  
zhiru 已提交
1975

1976 1977
        if (!res.empty()) {
            ENGINE_LOG_DEBUG << "Remove table file type as NEW";
1978 1979 1980 1981
            cleanUpQuery << "DELETE FROM TableFiles WHERE file_type IN ("
                    << std::to_string(TableFileSchema::NEW) << ","
                    << std::to_string(TableFileSchema::NEW_MERGE) << ","
                    << std::to_string(TableFileSchema::NEW_INDEX) << ");";
1982

1983
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
1984

1985 1986 1987
            if (!cleanUpQuery.exec()) {
                ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES";
                return Status::DBTransactionError("Clean up Error", cleanUpQuery.error());
Z
update  
zhiru 已提交
1988
            }
1989
        }
1990 1991 1992 1993 1994 1995 1996 1997 1998

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES", er.what());
Z
update  
zhiru 已提交
1999 2000
    }

2001 2002 2003 2004 2005
    return Status::OK();
}

Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
    try {
Y
Yu Kun 已提交
2006
        server::MetricCollector metric;
2007 2008 2009 2010 2011 2012 2013

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);

        if (!status.ok()) {
            return status;
2014
        }
Z
zhiru 已提交
2015

2016 2017
        StoreQueryResult res;
        {
Z
update  
zhiru 已提交
2018
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
2019

Z
update  
zhiru 已提交
2020 2021 2022 2023
            if (connectionPtr == nullptr) {
                return Status::Error("Failed to connect to database server");
            }

Z
update  
zhiru 已提交
2024

2025 2026 2027 2028 2029 2030 2031
            Query countQuery = connectionPtr->query();
            countQuery << "SELECT size " <<
                       "FROM TableFiles " <<
                       "WHERE table_id = " << quote << table_id << " AND " <<
                       "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
Z
update  
zhiru 已提交
2032

2033
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
Z
update  
zhiru 已提交
2034

2035 2036 2037 2038 2039 2040 2041
            res = countQuery.store();
        } //Scoped Connection

        result = 0;
        for (auto &resRow : res) {
            size_t size = resRow["size"];
            result += size;
2042
        }
2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061

        if (table_schema.dimension_ <= 0) {
            std::stringstream errorMsg;
            errorMsg << "MySQLMetaImpl::Count: " << "table dimension = " << std::to_string(table_schema.dimension_)
                     << ", table_id = " << table_id;
            ENGINE_LOG_ERROR << errorMsg.str();
            return Status::Error(errorMsg.str());
        }
        result /= table_schema.dimension_;
        result /= sizeof(float);

    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING COUNT" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING COUNT", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING COUNT" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING COUNT", er.what());
Z
update  
zhiru 已提交
2062
    }
S
starlord 已提交
2063

2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075
    return Status::OK();
}

Status MySQLMetaImpl::DropAll() {
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
    }
    try {
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

        if (connectionPtr == nullptr) {
            return Status::Error("Failed to connect to database server");
Z
zhiru 已提交
2076
        }
2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097

        Query dropTableQuery = connectionPtr->query();
        dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();

        if (dropTableQuery.exec()) {
            return Status::OK();
        } else {
            ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE";
            return Status::DBTransactionError("DROP TABLE ERROR", dropTableQuery.error());
        }
    } catch (const BadQuery &er) {
        // Handle any query errors
        ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE" << ": " << er.what();
        return Status::DBTransactionError("QUERY ERROR WHEN DROPPING TABLE", er.what());
    } catch (const Exception &er) {
        // Catch-all for any other MySQL++ exceptions
        ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING TABLE" << ": " << er.what();
        return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what());
    }
S
starlord 已提交
2098

2099 2100 2101 2102 2103 2104
    return Status::OK();
}

MySQLMetaImpl::~MySQLMetaImpl() {
    if (mode_ != Options::MODE::READ_ONLY) {
        CleanUp();
Z
update  
zhiru 已提交
2105
    }
2106
}
Z
update  
zhiru 已提交
2107 2108 2109 2110 2111

} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz