MySQLMetaImpl.cpp 69.1 KB
Newer Older
Z
update  
zhiru 已提交
1 2 3 4 5 6
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "MySQLMetaImpl.h"
S
starlord 已提交
7 8 9
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "db/Log.h"
Z
update  
zhiru 已提交
10
#include "MetaConsts.h"
S
starlord 已提交
11
#include "db/Factories.h"
Z
update  
zhiru 已提交
12 13 14 15 16 17 18 19 20 21
#include "metrics/Metrics.h"

#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <regex>
#include <string>
Z
zhiru 已提交
22
#include <mutex>
Z
zhiru 已提交
23
#include <thread>
Z
update  
zhiru 已提交
24 25 26

#include "mysql++/mysql++.h"

27

Z
update  
zhiru 已提交
28 29 30 31 32
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {

33
using namespace mysqlpp;
Z
update  
zhiru 已提交
34

35
namespace {
Z
update  
zhiru 已提交
36

S
starlord 已提交
37 38 39 40 41 42 43 44 45
Status HandleException(const std::string &desc, const char* what = nullptr) {
    if(what == nullptr) {
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
46
}
Z
update  
zhiru 已提交
47

48 49
}

50 51 52 53 54 55 56
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
    : options_(options_),
      mode_(mode) {
    Initialize();
}

MySQLMetaImpl::~MySQLMetaImpl() {
S
starlord 已提交
57

58 59
}

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    table_id = ss.str();
    return Status::OK();
}

Status MySQLMetaImpl::NextFileId(std::string &file_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

Status MySQLMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
        if (!ret) {
S
starlord 已提交
80 81 82
            std::string msg = "Failed to create db directory " + options_.path;
            ENGINE_LOG_ERROR << msg;
            return Status(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
83 84 85
        }
    }

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    std::string uri = options_.backend_uri;

    std::string dialectRegex = "(.*)";
    std::string usernameRegex = "(.*)";
    std::string passwordRegex = "(.*)";
    std::string hostRegex = "(.*)";
    std::string portRegex = "(.*)";
    std::string dbNameRegex = "(.*)";
    std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
        usernameRegex + "\\:" +
        passwordRegex + "\\@" +
        hostRegex + "\\:" +
        portRegex + "\\/" +
        dbNameRegex;
    std::regex uriRegex(uriRegexStr);
    std::smatch pieces_match;

    if (std::regex_match(uri, pieces_match, uriRegex)) {
        std::string dialect = pieces_match[1].str();
        std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
        if (dialect.find("mysql") == std::string::npos) {
S
starlord 已提交
107
            return Status(DB_ERROR, "URI's dialect is not MySQL");
108
        }
109 110 111 112 113 114
        std::string username = pieces_match[2].str();
        std::string password = pieces_match[3].str();
        std::string serverAddress = pieces_match[4].str();
        unsigned int port = 0;
        if (!pieces_match[5].str().empty()) {
            port = std::stoi(pieces_match[5].str());
115
        }
116
        std::string dbName = pieces_match[6].str();
117 118


119 120 121 122
        int threadHint = std::thread::hardware_concurrency();
        int maxPoolSize = threadHint == 0 ? 8 : threadHint;
        mysql_connection_pool_ =
            std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
123

124 125
        ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
        try {
126

127 128
            if (mode_ != Options::MODE::READ_ONLY) {
                CleanUp();
129 130
            }

131
            {
Z
update  
zhiru 已提交
132
                ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
133

Z
update  
zhiru 已提交
134
                if (connectionPtr == nullptr) {
S
starlord 已提交
135
                    return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
136 137
                }

138

139 140
                if (!connectionPtr->thread_aware()) {
                    ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
S
starlord 已提交
141
                    return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
142
                }
143
                Query InitializeQuery = connectionPtr->query();
Z
update  
zhiru 已提交
144

145 146 147 148 149 150
                InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" <<
                                "id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
                                "table_id VARCHAR(255) UNIQUE NOT NULL, " <<
                                "state INT NOT NULL, " <<
                                "dimension SMALLINT NOT NULL, " <<
                                "created_on BIGINT NOT NULL, " <<
S
starlord 已提交
151
                                "flag BIGINT DEFAULT 0 NOT NULL, " <<
S
starlord 已提交
152
                                "index_file_size BIGINT DEFAULT 1024 NOT NULL, " <<
153
                                "engine_type INT DEFAULT 1 NOT NULL, " <<
154 155
                                "nlist INT DEFAULT 16384 NOT NULL, " <<
                                "metric_type INT DEFAULT 1 NOT NULL);";
Z
zhiru 已提交
156

157
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
Z
update  
zhiru 已提交
158

159
                if (!InitializeQuery.exec()) {
S
starlord 已提交
160
                    return HandleException("Initialization Error", InitializeQuery.error());
Z
update  
zhiru 已提交
161 162
                }

163 164 165 166 167 168
                InitializeQuery << "CREATE TABLE IF NOT EXISTS TableFiles (" <<
                                "id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
                                "table_id VARCHAR(255) NOT NULL, " <<
                                "engine_type INT DEFAULT 1 NOT NULL, " <<
                                "file_id VARCHAR(255) NOT NULL, " <<
                                "file_type INT DEFAULT 0 NOT NULL, " <<
169 170
                                "file_size BIGINT DEFAULT 0 NOT NULL, " <<
                                "row_count BIGINT DEFAULT 0 NOT NULL, " <<
171 172 173 174 175 176 177
                                "updated_time BIGINT NOT NULL, " <<
                                "created_on BIGINT NOT NULL, " <<
                                "date INT DEFAULT -1 NOT NULL);";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();

                if (!InitializeQuery.exec()) {
S
starlord 已提交
178
                    return HandleException("Initialization Error", InitializeQuery.error());
179
                }
180
            } //Scoped Connection
Z
update  
zhiru 已提交
181

Z
zhiru 已提交
182
        } catch (std::exception &e) {
S
starlord 已提交
183
            return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
184
        }
185 186
    } else {
        ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
S
starlord 已提交
187
        return Status(DB_ERROR, "Wrong URI format");
Z
update  
zhiru 已提交
188
    }
S
starlord 已提交
189 190

    return Status::OK();
191 192 193 194 195 196
}

// PXU TODO: Temp solution. Will fix later
Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                            const DatesT &dates) {
    if (dates.empty()) {
P
peng.xu 已提交
197 198 199
        return Status::OK();
    }

200 201 202 203 204 205
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
Z
zhiru 已提交
206

207 208 209 210 211 212 213
    try {
        std::stringstream dateListSS;
        for (auto &date : dates) {
            dateListSS << std::to_string(date) << ", ";
        }
        std::string dateListStr = dateListSS.str();
        dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", "
Z
update  
zhiru 已提交
214

215 216
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
217

218
            if (connectionPtr == nullptr) {
S
starlord 已提交
219
                return Status(DB_ERROR, "Failed to connect to database server");
220
            }
Z
update  
zhiru 已提交
221

Z
update  
zhiru 已提交
222

223
            Query dropPartitionsByDatesQuery = connectionPtr->query();
224

225
            dropPartitionsByDatesQuery << "UPDATE TableFiles " <<
226 227
                                       "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
                                       "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
228 229
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "date in (" << dateListStr << ");";
Z
update  
zhiru 已提交
230

231
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str();
Z
update  
zhiru 已提交
232

233
            if (!dropPartitionsByDatesQuery.exec()) {
S
starlord 已提交
234
                return HandleException("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error());
Z
update  
zhiru 已提交
235
            }
236
        } //Scoped Connection
S
starlord 已提交
237 238
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
Z
update  
zhiru 已提交
239
    }
240 241
    return Status::OK();
}
Z
update  
zhiru 已提交
242

243 244
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
245
        server::MetricCollector metric;
246 247
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
248

249
            if (connectionPtr == nullptr) {
S
starlord 已提交
250
                return Status(DB_ERROR, "Failed to connect to database server");
251
            }
Z
update  
zhiru 已提交
252

253
            Query createTableQuery = connectionPtr->query();
Z
update  
zhiru 已提交
254

255 256 257 258 259
            if (table_schema.table_id_.empty()) {
                NextTableId(table_schema.table_id_);
            } else {
                createTableQuery << "SELECT state FROM Tables " <<
                                 "WHERE table_id = " << quote << table_schema.table_id_ << ";";
Z
zhiru 已提交
260

261
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
Z
update  
zhiru 已提交
262

263
                StoreQueryResult res = createTableQuery.store();
264

265 266 267
                if (res.num_rows() == 1) {
                    int state = res[0]["state"];
                    if (TableSchema::TO_DELETE == state) {
S
starlord 已提交
268
                        return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
269
                    } else {
S
starlord 已提交
270
                        return Status(DB_ALREADY_EXIST, "Table already exists");
271 272 273
                    }
                }
            }
274

275 276
            table_schema.id_ = -1;
            table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
277

278 279 280 281 282
            std::string id = "NULL"; //auto-increment
            std::string table_id = table_schema.table_id_;
            std::string state = std::to_string(table_schema.state_);
            std::string dimension = std::to_string(table_schema.dimension_);
            std::string created_on = std::to_string(table_schema.created_on_);
S
starlord 已提交
283 284
            std::string flag = std::to_string(table_schema.flag_);
            std::string index_file_size = std::to_string(table_schema.index_file_size_);
285
            std::string engine_type = std::to_string(table_schema.engine_type_);
S
starlord 已提交
286 287
            std::string nlist = std::to_string(table_schema.nlist_);
            std::string metric_type = std::to_string(table_schema.metric_type_);
Z
update  
zhiru 已提交
288

289 290
            createTableQuery << "INSERT INTO Tables VALUES" <<
                             "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " <<
S
starlord 已提交
291 292
                             created_on << ", " << flag << ", " << index_file_size << ", " << engine_type << ", " <<
                             nlist << ", " << metric_type << ");";
Z
update  
zhiru 已提交
293

294
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
295

296 297
            if (SimpleResult res = createTableQuery.execute()) {
                table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
298

299 300
                //Consume all results to avoid "Commands out of sync" error
            } else {
S
starlord 已提交
301
                return HandleException("Add Table Error", createTableQuery.error());
302
            }
303
        } //Scoped Connection
304

305
        return utils::CreateTablePath(options_, table_schema.table_id_);
Z
update  
zhiru 已提交
306

307
    } catch (std::exception &e) {
S
starlord 已提交
308
        return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
309 310
    }
}
311

312 313 314 315
Status MySQLMetaImpl::FilesByType(const std::string &table_id,
                                  const std::vector<int> &file_types,
                                  std::vector<std::string> &file_ids) {
    if(file_types.empty()) {
S
starlord 已提交
316
        return Status(DB_ERROR, "file types array is empty");
317
    }
Z
zhiru 已提交
318 319

    try {
320 321
        file_ids.clear();

Z
zhiru 已提交
322 323 324 325 326
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
327
                return Status(DB_ERROR, "Failed to connect to database server");
Z
zhiru 已提交
328 329
            }

330 331 332 333 334 335 336 337
            std::string types;
            for(auto type : file_types) {
                if(!types.empty()) {
                    types += ",";
                }
                types += std::to_string(type);
            }

Z
zhiru 已提交
338 339
            Query hasNonIndexFilesQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
340
            hasNonIndexFilesQuery << "SELECT file_id, file_type FROM TableFiles " <<
Z
fix  
zhiru 已提交
341
                                  "WHERE table_id = " << quote << table_id << " AND " <<
342
                                  "file_type in (" << types << ");";
Z
zhiru 已提交
343

344
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
Z
zhiru 已提交
345 346 347 348

            res = hasNonIndexFilesQuery.store();
        } //Scoped Connection

349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
        if (res.num_rows() > 0) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
            for (auto &resRow : res) {
                std::string file_id;
                resRow["file_id"].to_string(file_id);
                file_ids.push_back(file_id);

                int32_t file_type = resRow["file_type"];
                switch (file_type) {
                    case (int) TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int) TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int) TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int) TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int) TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    case (int) TableFileSchema::INDEX:
                        index_count++;
                        break;
                    case (int) TableFileSchema::BACKUP:
                        backup_count++;
                        break;
                    default:
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                             << " new files:" << new_count << " new_merge files:" << new_merge_count
                             << " new_index files:" << new_index_count << " to_index files:" << to_index_count
                             << " index files:" << index_count << " backup files:" << backup_count;
        }
Z
zhiru 已提交
390

S
starlord 已提交
391 392
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
Z
zhiru 已提交
393 394
    }

395 396
    return Status::OK();
}
397

S
starlord 已提交
398
Status MySQLMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex& index) {
S
starlord 已提交
399
    try {
Y
Yu Kun 已提交
400
        server::MetricCollector metric;
S
starlord 已提交
401 402 403 404 405

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
406
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
407 408 409 410 411 412 413 414
            }

            Query updateTableIndexParamQuery = connectionPtr->query();
            updateTableIndexParamQuery << "SELECT id, state, dimension, created_on " <<
                                       "FROM Tables " <<
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

S
starlord 已提交
415
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
S
starlord 已提交
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431

            StoreQueryResult res = updateTableIndexParamQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                size_t id = resRow["id"];
                int32_t state = resRow["state"];
                uint16_t dimension = resRow["dimension"];
                int64_t created_on = resRow["created_on"];

                updateTableIndexParamQuery << "UPDATE Tables " <<
                                           "SET id = " << id << ", " <<
                                           "state = " << state << ", " <<
                                           "dimension = " << dimension << ", " <<
                                           "created_on = " << created_on << ", " <<
S
starlord 已提交
432
                                           "engine_type = " << index.engine_type_ << ", " <<
S
starlord 已提交
433
                                           "nlist = " << index.nlist_ << ", " <<
S
starlord 已提交
434
                                           "metric_type = " << index.metric_type_ << " " <<
S
starlord 已提交
435
                                           "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
436

S
starlord 已提交
437
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
S
starlord 已提交
438 439 440


                if (!updateTableIndexParamQuery.exec()) {
S
starlord 已提交
441
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM", updateTableIndexParamQuery.error());
S
starlord 已提交
442 443
                }
            } else {
S
starlord 已提交
444
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
S
starlord 已提交
445 446 447 448
            }

        } //Scoped Connection

S
starlord 已提交
449 450
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
S
starlord 已提交
451 452
    }

453 454 455
    return Status::OK();
}

S
starlord 已提交
456 457
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    try {
Y
Yu Kun 已提交
458
        server::MetricCollector metric;
S
starlord 已提交
459 460 461 462 463

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
464
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
465 466 467 468 469
            }

            Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE Tables " <<
                                 "SET flag = " << flag << " " <<
S
starlord 已提交
470
                                 "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
471 472 473 474

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();

            if (!updateTableFlagQuery.exec()) {
S
starlord 已提交
475
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLAG", updateTableFlagQuery.error());
S
starlord 已提交
476 477 478 479
            }

        } //Scoped Connection

S
starlord 已提交
480 481
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
S
starlord 已提交
482 483 484 485 486
    }

    return Status::OK();
}

487
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
S
starlord 已提交
488
    try {
Y
Yu Kun 已提交
489
        server::MetricCollector metric;
490

S
starlord 已提交
491 492
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
493

S
starlord 已提交
494
            if (connectionPtr == nullptr) {
S
starlord 已提交
495
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
496 497 498 499 500 501 502
            }

            Query describeTableIndexQuery = connectionPtr->query();
            describeTableIndexQuery << "SELECT engine_type, nlist, index_file_size, metric_type " <<
                                       "FROM Tables " <<
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
503

S
starlord 已提交
504
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
Z
update  
zhiru 已提交
505

S
starlord 已提交
506 507 508 509 510 511 512 513 514
            StoreQueryResult res = describeTableIndexQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                index.engine_type_ = resRow["engine_type"];
                index.nlist_ = resRow["nlist"];
                index.metric_type_ = resRow["metric_type"];
            } else {
S
starlord 已提交
515
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
S
starlord 已提交
516 517 518
            }

        } //Scoped Connection
Z
update  
zhiru 已提交
519

S
starlord 已提交
520 521
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
S
starlord 已提交
522 523 524 525 526 527 528
    }

    return Status::OK();
}

Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
529
        server::MetricCollector metric;
Z
update  
zhiru 已提交
530

531 532
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
533

534
            if (connectionPtr == nullptr) {
S
starlord 已提交
535
                return Status(DB_ERROR, "Failed to connect to database server");
536
            }
537

S
starlord 已提交
538 539
            Query dropTableIndexQuery = connectionPtr->query();

540
            //soft delete index files
S
starlord 已提交
541 542 543 544 545 546 547 548 549
            dropTableIndexQuery << "UPDATE TableFiles " <<
                                "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::INDEX) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
550
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
S
starlord 已提交
551 552
            }

553
            //set all backup file to raw
S
starlord 已提交
554 555 556 557 558 559 560 561 562
            dropTableIndexQuery << "UPDATE TableFiles " <<
                                "SET file_type = " << std::to_string(TableFileSchema::RAW) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
563
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
564 565 566 567 568
            }

            //set table index type to raw
            dropTableIndexQuery << "UPDATE Tables " <<
                                "SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE) << "," <<
S
starlord 已提交
569
                                "nlist = " << std::to_string(DEFAULT_NLIST) << ", " <<
570 571 572 573 574 575
                                "metric_type = " << std::to_string(DEFAULT_METRIC_TYPE) << " " <<
                                "WHERE table_id = " << quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
576
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
S
starlord 已提交
577 578 579 580
            }

        } //Scoped Connection

S
starlord 已提交
581 582
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
S
starlord 已提交
583
    }
584

S
starlord 已提交
585 586
    return Status::OK();
}
Z
update  
zhiru 已提交
587

S
starlord 已提交
588 589
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
590
        server::MetricCollector metric;
S
starlord 已提交
591 592
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
593

S
starlord 已提交
594
            if (connectionPtr == nullptr) {
S
starlord 已提交
595
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
596
            }
Z
zhiru 已提交
597

598 599 600 601 602 603
            //soft delete table
            Query deleteTableQuery = connectionPtr->query();
//
            deleteTableQuery << "UPDATE Tables " <<
                             "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " <<
                             "WHERE table_id = " << quote << table_id << ";";
Z
update  
zhiru 已提交
604

605
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
606

607
            if (!deleteTableQuery.exec()) {
S
starlord 已提交
608
                return HandleException("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
609
            }
610

611
        } //Scoped Connection
Z
zhiru 已提交
612

613 614 615
        if (mode_ == Options::MODE::CLUSTER) {
            DeleteTableFiles(table_id);
        }
Z
update  
zhiru 已提交
616

S
starlord 已提交
617 618
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
619
    }
Z
update  
zhiru 已提交
620

621 622
    return Status::OK();
}
Z
update  
zhiru 已提交
623

624 625
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
626
        server::MetricCollector metric;
627 628 629 630
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
631
                return Status(DB_ERROR, "Failed to connect to database server");
632
            }
633

634 635 636 637 638 639 640 641
            //soft delete table files
            Query deleteTableFilesQuery = connectionPtr->query();
            //
            deleteTableFilesQuery << "UPDATE TableFiles " <<
                                  "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                                  "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                                  "WHERE table_id = " << quote << table_id << " AND " <<
                                  "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
642

643
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
644

645
            if (!deleteTableFilesQuery.exec()) {
S
starlord 已提交
646
                return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
647
            }
648
        } //Scoped Connection
S
starlord 已提交
649 650
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
Z
update  
zhiru 已提交
651 652
    }

653 654
    return Status::OK();
}
Z
zhiru 已提交
655

656 657
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
658
        server::MetricCollector metric;
659 660 661
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
662

663
            if (connectionPtr == nullptr) {
S
starlord 已提交
664
                return Status(DB_ERROR, "Failed to connect to database server");
665
            }
Z
zhiru 已提交
666

667
            Query describeTableQuery = connectionPtr->query();
S
starlord 已提交
668 669
            describeTableQuery << "SELECT id, state, dimension, created_on, " <<
                               "flag, index_file_size, engine_type, nlist, metric_type " <<
670 671 672 673 674
                               "FROM Tables " <<
                               "WHERE table_id = " << quote << table_schema.table_id_ << " " <<
                               "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
Z
update  
zhiru 已提交
675

676 677
            res = describeTableQuery.store();
        } //Scoped Connection
678

679 680
        if (res.num_rows() == 1) {
            const Row &resRow = res[0];
681

682
            table_schema.id_ = resRow["id"]; //implicit conversion
Z
update  
zhiru 已提交
683

S
starlord 已提交
684 685
            table_schema.state_ = resRow["state"];

686
            table_schema.dimension_ = resRow["dimension"];
687

S
starlord 已提交
688 689 690 691
            table_schema.created_on_ = resRow["created_on"];

            table_schema.flag_ = resRow["flag"];

692 693
            table_schema.index_file_size_ = resRow["index_file_size"];

694
            table_schema.engine_type_ = resRow["engine_type"];
S
starlord 已提交
695 696 697 698

            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];
699
        } else {
S
starlord 已提交
700
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
701
        }
Z
update  
zhiru 已提交
702

S
starlord 已提交
703 704
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
Z
update  
zhiru 已提交
705 706
    }

707 708
    return Status::OK();
}
Z
zhiru 已提交
709

710 711
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
    try {
Y
Yu Kun 已提交
712
        server::MetricCollector metric;
713 714 715
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
716

717
            if (connectionPtr == nullptr) {
S
starlord 已提交
718
                return Status(DB_ERROR, "Failed to connect to database server");
719
            }
Z
update  
zhiru 已提交
720

721 722 723 724 725 726 727
            Query hasTableQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
            hasTableQuery << "SELECT EXISTS " <<
                          "(SELECT 1 FROM Tables " <<
                          "WHERE table_id = " << quote << table_id << " " <<
                          "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                          "AS " << quote << "check" << ";";
Z
update  
zhiru 已提交
728

729
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
730

731 732
            res = hasTableQuery.store();
        } //Scoped Connection
733

734 735
        int check = res[0]["check"];
        has_or_not = (check == 1);
736

S
starlord 已提交
737 738
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
739
    }
740

741 742
    return Status::OK();
}
743

744 745
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
    try {
Y
Yu Kun 已提交
746
        server::MetricCollector metric;
747 748 749
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
750

751
            if (connectionPtr == nullptr) {
S
starlord 已提交
752
                return Status(DB_ERROR, "Failed to connect to database server");
753
            }
Z
update  
zhiru 已提交
754

755
            Query allTablesQuery = connectionPtr->query();
S
starlord 已提交
756
            allTablesQuery << "SELECT id, table_id, dimension, engine_type, nlist, index_file_size, metric_type " <<
757 758
                           "FROM Tables " <<
                           "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
zhiru 已提交
759

760
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
Z
zhiru 已提交
761

762 763
            res = allTablesQuery.store();
        } //Scoped Connection
764

765 766
        for (auto &resRow : res) {
            TableSchema table_schema;
Z
update  
zhiru 已提交
767

768
            table_schema.id_ = resRow["id"]; //implicit conversion
769

770 771 772
            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_schema.table_id_ = table_id;
773

774
            table_schema.dimension_ = resRow["dimension"];
775

776 777
            table_schema.index_file_size_ = resRow["index_file_size"];

778
            table_schema.engine_type_ = resRow["engine_type"];
779

S
starlord 已提交
780 781 782 783
            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];

784 785
            table_schema_array.emplace_back(table_schema);
        }
S
starlord 已提交
786 787
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
788
    }
Z
update  
zhiru 已提交
789

790 791
    return Status::OK();
}
792

793 794
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
    if (file_schema.date_ == EmptyDate) {
795
        file_schema.date_ = utils::GetDate();
796 797 798 799 800 801 802
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
803

804
    try {
Y
Yu Kun 已提交
805
        server::MetricCollector metric;
806 807 808

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
809 810
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
811 812
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
813
        file_schema.index_file_size_ = table_schema.index_file_size_;
814
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
815 816
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
817 818 819 820 821 822

        std::string id = "NULL"; //auto-increment
        std::string table_id = file_schema.table_id_;
        std::string engine_type = std::to_string(file_schema.engine_type_);
        std::string file_id = file_schema.file_id_;
        std::string file_type = std::to_string(file_schema.file_type_);
S
starlord 已提交
823
        std::string file_size = std::to_string(file_schema.file_size_);
824
        std::string row_count = std::to_string(file_schema.row_count_);
825 826 827 828 829 830
        std::string updated_time = std::to_string(file_schema.updated_time_);
        std::string created_on = std::to_string(file_schema.created_on_);
        std::string date = std::to_string(file_schema.date_);

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
831

832
            if (connectionPtr == nullptr) {
S
starlord 已提交
833
                return Status(DB_ERROR, "Failed to connect to database server");
834
            }
Z
update  
zhiru 已提交
835

836
            Query createTableFileQuery = connectionPtr->query();
837

838 839
            createTableFileQuery << "INSERT INTO TableFiles VALUES" <<
                                 "(" << id << ", " << quote << table_id << ", " << engine_type << ", " <<
S
starlord 已提交
840 841
                                 quote << file_id << ", " << file_type << ", " << file_size << ", " <<
                                 row_count << ", " << updated_time << ", " << created_on << ", " << date << ");";
842 843 844 845 846 847 848 849

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();

            if (SimpleResult res = createTableFileQuery.execute()) {
                file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?

                //Consume all results to avoid "Commands out of sync" error
            } else {
S
starlord 已提交
850
                return HandleException("QUERY ERROR WHEN CREATING TABLE FILE", createTableFileQuery.error());
851
            }
852 853 854 855
        } // Scoped Connection

        return utils::CreateTableFilePath(options_, file_schema);

S
starlord 已提交
856 857
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
858 859
    }
}
860

861 862
Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
    files.clear();
863

864
    try {
Y
Yu Kun 已提交
865
        server::MetricCollector metric;
866 867 868 869 870
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
871
                return Status(DB_ERROR, "Failed to connect to database server");
872
            }
873

874
            Query filesToIndexQuery = connectionPtr->query();
S
starlord 已提交
875
            filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
876 877
                              "FROM TableFiles " <<
                              "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
878

879
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
880

881 882 883
            res = filesToIndexQuery.store();
        } //Scoped Connection

S
starlord 已提交
884
        Status ret;
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_file.table_id_ = table_id;

            table_file.engine_type_ = resRow["engine_type"];

            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
903 904
            table_file.file_size_ = resRow["file_size"];

905
            table_file.row_count_ = resRow["row_count"];
906 907 908

            table_file.date_ = resRow["date"];

S
starlord 已提交
909 910
            table_file.created_on_ = resRow["created_on"];

911 912 913 914 915 916 917
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
918
                }
919
                groups[table_file.table_id_] = table_schema;
920 921

            }
922
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
923
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
924
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
925
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
Z
update  
zhiru 已提交
926

S
starlord 已提交
927 928
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
929
                ret = status;
S
starlord 已提交
930
            }
931 932 933

            files.push_back(table_file);
        }
S
starlord 已提交
934 935 936

        return ret;

S
starlord 已提交
937 938
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
Z
update  
zhiru 已提交
939
    }
940 941
}

X
xj.lin 已提交
942 943 944 945 946 947 948
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    const DatesT &partition,
                                    DatePartionedTableFilesSchema &files) {
    files.clear();

    try {
Y
Yu Kun 已提交
949
        server::MetricCollector metric;
X
xj.lin 已提交
950 951 952 953 954
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
955
                return Status(DB_ERROR, "Failed to connect to database server");
X
xj.lin 已提交
956 957 958
            }

            Query filesToSearchQuery = connectionPtr->query();
S
starlord 已提交
959
            filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
X
xj.lin 已提交
960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002
                               "FROM TableFiles " <<
                               "WHERE table_id = " << quote << table_id;

            if (!partition.empty()) {
                std::stringstream partitionListSS;
                for (auto &date : partition) {
                    partitionListSS << std::to_string(date) << ", ";
                }
                std::string partitionListStr = partitionListSS.str();

                partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
                filesToSearchQuery << " AND " << "date IN (" << partitionListStr << ")";
            }

            if (!ids.empty()) {
                std::stringstream idSS;
                for (auto &id : ids) {
                    idSS << "id = " << std::to_string(id) << " OR ";
                }
                std::string idStr = idSS.str();
                idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "

                filesToSearchQuery  << " AND " << "(" << idStr << ")";

            }
            // End
            filesToSearchQuery << " AND " <<
                               "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

            res = filesToSearchQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
1003
        Status ret;
X
xj.lin 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012
        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;

1013 1014
            table_file.index_file_size_ = table_schema.index_file_size_;

X
xj.lin 已提交
1015 1016
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1017 1018
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1019 1020
            table_file.metric_type_ = table_schema.metric_type_;

X
xj.lin 已提交
1021 1022 1023 1024 1025 1026
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
1027 1028
            table_file.file_size_ = resRow["file_size"];

1029
            table_file.row_count_ = resRow["row_count"];
X
xj.lin 已提交
1030 1031 1032 1033 1034

            table_file.date_ = resRow["date"];

            table_file.dimension_ = table_schema.dimension_;

S
starlord 已提交
1035 1036
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
1037
                ret = status;
S
starlord 已提交
1038
            }
1039

1040 1041 1042
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1043 1044
            }

1045
            files[table_file.date_].push_back(table_file);
1046
        }
S
starlord 已提交
1047 1048

        return ret;
S
starlord 已提交
1049 1050
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
Z
update  
zhiru 已提交
1051
    }
1052
}
Z
update  
zhiru 已提交
1053

1054 1055 1056
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
                                   DatePartionedTableFilesSchema &files) {
    files.clear();
Z
update  
zhiru 已提交
1057

1058
    try {
Y
Yu Kun 已提交
1059
        server::MetricCollector metric;
S
starlord 已提交
1060 1061 1062 1063 1064 1065 1066 1067 1068

        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

1069 1070 1071
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1072

1073
            if (connectionPtr == nullptr) {
S
starlord 已提交
1074
                return Status(DB_ERROR, "Failed to connect to database server");
1075
            }
Z
update  
zhiru 已提交
1076

1077
            Query filesToMergeQuery = connectionPtr->query();
S
starlord 已提交
1078
            filesToMergeQuery << "SELECT id, table_id, file_id, file_type, file_size, row_count, date, engine_type, created_on " <<
1079 1080 1081
                              "FROM TableFiles " <<
                              "WHERE table_id = " << quote << table_id << " AND " <<
                              "file_type = " << std::to_string(TableFileSchema::RAW) << " " <<
1082
                              "ORDER BY row_count DESC" << ";";
Z
update  
zhiru 已提交
1083

1084
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
1085

1086 1087
            res = filesToMergeQuery.store();
        } //Scoped Connection
1088

S
starlord 已提交
1089
        Status ret;
1090
        for (auto &resRow : res) {
S
starlord 已提交
1091 1092 1093 1094 1095
            TableFileSchema table_file;
            table_file.file_size_ = resRow["file_size"];
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }
Z
update  
zhiru 已提交
1096

1097
            table_file.id_ = resRow["id"]; //implicit conversion
1098

1099 1100 1101
            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;
Z
update  
zhiru 已提交
1102

1103 1104 1105
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;
1106

1107
            table_file.file_type_ = resRow["file_type"];
1108

S
starlord 已提交
1109 1110
            table_file.row_count_ = resRow["row_count"];

1111
            table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1112

1113 1114
            table_file.index_file_size_ = table_schema.index_file_size_;

S
starlord 已提交
1115 1116
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1117 1118
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1119 1120
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1121 1122
            table_file.created_on_ = resRow["created_on"];

1123
            table_file.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1124

S
starlord 已提交
1125 1126
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
1127
                ret = status;
S
starlord 已提交
1128
            }
Z
update  
zhiru 已提交
1129

1130 1131 1132
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1133
            }
1134 1135

            files[table_file.date_].push_back(table_file);
1136
        }
Z
update  
zhiru 已提交
1137

S
starlord 已提交
1138 1139
        return ret;

S
starlord 已提交
1140 1141
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
1142 1143 1144 1145 1146 1147 1148
    }
}

Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    TableFilesSchema &table_files) {
    if (ids.empty()) {
Z
update  
zhiru 已提交
1149 1150 1151
        return Status::OK();
    }

1152 1153 1154 1155 1156 1157
    std::stringstream idSS;
    for (auto &id : ids) {
        idSS << "id = " << std::to_string(id) << " OR ";
    }
    std::string idStr = idSS.str();
    idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "
Z
zhiru 已提交
1158

1159 1160 1161 1162 1163 1164
    try {
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
1165
                return Status(DB_ERROR, "Failed to connect to database server");
1166 1167 1168
            }

            Query getTableFileQuery = connectionPtr->query();
S
starlord 已提交
1169
            getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
1170 1171
                              "FROM TableFiles " <<
                              "WHERE table_id = " << quote << table_id << " AND " <<
S
starlord 已提交
1172 1173
                              "(" << idStr << ") AND " <<
                              "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
1174 1175 1176 1177 1178 1179 1180 1181

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();

            res = getTableFileQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
S
starlord 已提交
1182
        DescribeTable(table_schema);
1183

S
starlord 已提交
1184
        Status ret;
1185
        for (auto &resRow : res) {
Z
zhiru 已提交
1186

1187
            TableFileSchema file_schema;
1188

1189
            file_schema.id_ = resRow["id"];
Z
zhiru 已提交
1190

1191
            file_schema.table_id_ = table_id;
Z
update  
zhiru 已提交
1192

1193 1194
            file_schema.index_file_size_ = table_schema.index_file_size_;

1195
            file_schema.engine_type_ = resRow["engine_type"];
Z
update  
zhiru 已提交
1196

S
starlord 已提交
1197 1198
            file_schema.nlist_ = table_schema.nlist_;

S
starlord 已提交
1199 1200
            file_schema.metric_type_ = table_schema.metric_type_;

1201 1202 1203
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            file_schema.file_id_ = file_id;
Z
update  
zhiru 已提交
1204

1205
            file_schema.file_type_ = resRow["file_type"];
Z
update  
zhiru 已提交
1206

1207 1208 1209
            file_schema.file_size_ = resRow["file_size"];

            file_schema.row_count_ = resRow["row_count"];
1210

1211
            file_schema.date_ = resRow["date"];
1212

S
starlord 已提交
1213 1214
            file_schema.created_on_ = resRow["created_on"];

1215
            file_schema.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1216

S
starlord 已提交
1217
            utils::GetTableFilePath(options_, file_schema);
1218 1219 1220

            table_files.emplace_back(file_schema);
        }
S
starlord 已提交
1221 1222 1223

        return ret;

S
starlord 已提交
1224 1225
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
Z
update  
zhiru 已提交
1226
    }
1227
}
Z
zhiru 已提交
1228

1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243
// PXU TODO: Support Swap
Status MySQLMetaImpl::Archive() {
    auto &criterias = options_.archive_conf.GetCriterias();
    if (criterias.empty()) {
        return Status::OK();
    }

    for (auto &kv : criterias) {
        auto &criteria = kv.first;
        auto &limit = kv.second;
        if (criteria == "days") {
            size_t usecs = limit * D_SEC * US_PS;
            long now = utils::GetMicroSecTimeStamp();

            try {
Z
update  
zhiru 已提交
1244
                ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1245

Z
update  
zhiru 已提交
1246
                if (connectionPtr == nullptr) {
S
starlord 已提交
1247
                    return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1248 1249
                }

1250 1251 1252 1253 1254
                Query archiveQuery = connectionPtr->query();
                archiveQuery << "UPDATE TableFiles " <<
                             "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                             "WHERE created_on < " << std::to_string(now - usecs) << " AND " <<
                             "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1255

1256
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();
Z
update  
zhiru 已提交
1257

1258
                if (!archiveQuery.exec()) {
S
starlord 已提交
1259
                    return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
1260
                }
1261

S
starlord 已提交
1262 1263
            } catch (std::exception &e) {
                return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
Z
zhiru 已提交
1264
            }
1265
        }
1266 1267 1268
        if (criteria == "disk") {
            uint64_t sum = 0;
            Size(sum);
Z
update  
zhiru 已提交
1269

1270 1271 1272
            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);
        }
Z
update  
zhiru 已提交
1273 1274
    }

1275 1276
    return Status::OK();
}
Z
zhiru 已提交
1277

1278 1279
Status MySQLMetaImpl::Size(uint64_t &result) {
    result = 0;
1280

S
starlord 已提交
1281
    try {
1282 1283 1284
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1285

1286
            if (connectionPtr == nullptr) {
S
starlord 已提交
1287
                return Status(DB_ERROR, "Failed to connect to database server");
1288
            }
Z
zhiru 已提交
1289

1290
            Query getSizeQuery = connectionPtr->query();
1291
            getSizeQuery << "SELECT IFNULL(SUM(file_size),0) AS sum " <<
1292 1293
                         "FROM TableFiles " <<
                         "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1294

1295
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
Z
update  
zhiru 已提交
1296

1297 1298
            res = getSizeQuery.store();
        } //Scoped Connection
Z
update  
zhiru 已提交
1299

1300 1301 1302 1303 1304
        if (res.empty()) {
            result = 0;
        } else {
            result = res[0]["sum"];
        }
Z
update  
zhiru 已提交
1305

S
starlord 已提交
1306 1307
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
1308
    }
1309

1310 1311
    return Status::OK();
}
1312

1313 1314 1315 1316
Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
    if (to_discard_size <= 0) {

        return Status::OK();
Z
update  
zhiru 已提交
1317
    }
1318
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
Z
update  
zhiru 已提交
1319

1320
    try {
Y
Yu Kun 已提交
1321
        server::MetricCollector metric;
1322 1323 1324
        bool status;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1325

1326
            if (connectionPtr == nullptr) {
S
starlord 已提交
1327
                return Status(DB_ERROR, "Failed to connect to database server");
1328
            }
Z
zhiru 已提交
1329

1330
            Query discardFilesQuery = connectionPtr->query();
1331
            discardFilesQuery << "SELECT id, file_size " <<
1332 1333 1334 1335
                              "FROM TableFiles " <<
                              "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                              "ORDER BY id ASC " <<
                              "LIMIT 10;";
Z
update  
zhiru 已提交
1336

1337
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
1338

1339 1340 1341 1342
            StoreQueryResult res = discardFilesQuery.store();
            if (res.num_rows() == 0) {
                return Status::OK();
            }
1343

1344 1345 1346 1347 1348
            TableFileSchema table_file;
            std::stringstream idsToDiscardSS;
            for (auto &resRow : res) {
                if (to_discard_size <= 0) {
                    break;
Z
update  
zhiru 已提交
1349
                }
1350
                table_file.id_ = resRow["id"];
1351
                table_file.file_size_ = resRow["file_size"];
1352 1353
                idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
1354 1355
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
1356
            }
Z
update  
zhiru 已提交
1357

1358 1359
            std::string idsToDiscardStr = idsToDiscardSS.str();
            idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR "
1360

1361 1362 1363 1364
            discardFilesQuery << "UPDATE TableFiles " <<
                              "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                              "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                              "WHERE " << idsToDiscardStr << ";";
1365

1366
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
Z
update  
zhiru 已提交
1367

1368 1369
            status = discardFilesQuery.exec();
            if (!status) {
S
starlord 已提交
1370
                return HandleException("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
1371 1372 1373 1374 1375
            }
        } //Scoped Connection

        return DiscardFiles(to_discard_size);

S
starlord 已提交
1376 1377
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
P
peng.xu 已提交
1378
    }
1379
}
P
peng.xu 已提交
1380

1381 1382 1383
//ZR: this function assumes all fields in file_schema have value
Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1384

S
starlord 已提交
1385
    try {
Y
Yu Kun 已提交
1386
        server::MetricCollector metric;
1387 1388
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1389

1390
            if (connectionPtr == nullptr) {
S
starlord 已提交
1391
                return Status(DB_ERROR, "Failed to connect to database server");
1392
            }
Z
update  
zhiru 已提交
1393

1394
            Query updateTableFileQuery = connectionPtr->query();
Z
update  
zhiru 已提交
1395

1396 1397 1398 1399
            //if the table has been deleted, just mark the table file as TO_DELETE
            //clean thread will delete the file later
            updateTableFileQuery << "SELECT state FROM Tables " <<
                                 "WHERE table_id = " << quote << file_schema.table_id_ << ";";
Z
update  
zhiru 已提交
1400

1401
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
Z
update  
zhiru 已提交
1402

1403
            StoreQueryResult res = updateTableFileQuery.store();
1404

1405 1406 1407 1408
            if (res.num_rows() == 1) {
                int state = res[0]["state"];
                if (state == TableSchema::TO_DELETE) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
Z
update  
zhiru 已提交
1409
                }
1410 1411 1412 1413 1414 1415 1416 1417 1418
            } else {
                file_schema.file_type_ = TableFileSchema::TO_DELETE;
            }

            std::string id = std::to_string(file_schema.id_);
            std::string table_id = file_schema.table_id_;
            std::string engine_type = std::to_string(file_schema.engine_type_);
            std::string file_id = file_schema.file_id_;
            std::string file_type = std::to_string(file_schema.file_type_);
1419 1420
            std::string file_size = std::to_string(file_schema.file_size_);
            std::string row_count = std::to_string(file_schema.row_count_);
1421 1422 1423
            std::string updated_time = std::to_string(file_schema.updated_time_);
            std::string created_on = std::to_string(file_schema.created_on_);
            std::string date = std::to_string(file_schema.date_);
Z
update  
zhiru 已提交
1424

1425 1426 1427 1428 1429
            updateTableFileQuery << "UPDATE TableFiles " <<
                                 "SET table_id = " << quote << table_id << ", " <<
                                 "engine_type = " << engine_type << ", " <<
                                 "file_id = " << quote << file_id << ", " <<
                                 "file_type = " << file_type << ", " <<
1430 1431
                                 "file_size = " << file_size << ", " <<
                                 "row_count = " << row_count << ", " <<
1432 1433 1434 1435 1436 1437 1438 1439 1440
                                 "updated_time = " << updated_time << ", " <<
                                 "created_on = " << created_on << ", " <<
                                 "date = " << date << " " <<
                                 "WHERE id = " << id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();

            if (!updateTableFileQuery.exec()) {
                ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
S
starlord 已提交
1441
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error());
1442 1443 1444
            }
        } //Scoped Connection

S
starlord 已提交
1445 1446
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
1447
    }
S
starlord 已提交
1448

1449 1450
    return Status::OK();
}
1451

1452 1453 1454
Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
    try {
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1455

1456
        if (connectionPtr == nullptr) {
S
starlord 已提交
1457
            return Status(DB_ERROR, "Failed to connect to database server");
1458
        }
Z
update  
zhiru 已提交
1459

1460
        Query updateTableFilesToIndexQuery = connectionPtr->query();
Z
zhiru 已提交
1461

1462
        updateTableFilesToIndexQuery << "UPDATE TableFiles " <<
Z
fix  
zhiru 已提交
1463 1464 1465
                                     "SET file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " " <<
                                     "WHERE table_id = " << quote << table_id << " AND " <<
                                     "file_type = " << std::to_string(TableFileSchema::RAW) << ";";
1466

Z
zhiru 已提交
1467
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();
Z
update  
zhiru 已提交
1468

Z
fix  
zhiru 已提交
1469
        if (!updateTableFilesToIndexQuery.exec()) {
S
starlord 已提交
1470
            return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX", updateTableFilesToIndexQuery.error());
Z
fix  
zhiru 已提交
1471 1472
        }

S
starlord 已提交
1473 1474
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
1475
    }
Z
update  
zhiru 已提交
1476

1477 1478
    return Status::OK();
}
Z
zhiru 已提交
1479

1480 1481
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
    try {
Y
Yu Kun 已提交
1482
        server::MetricCollector metric;
1483 1484 1485 1486
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
1487
                return Status(DB_ERROR, "Failed to connect to database server");
1488
            }
Z
update  
zhiru 已提交
1489

1490
            Query updateTableFilesQuery = connectionPtr->query();
1491

1492 1493
            std::map<std::string, bool> has_tables;
            for (auto &file_schema : files) {
1494

1495 1496 1497
                if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
                    continue;
                }
1498

1499 1500 1501 1502 1503
                updateTableFilesQuery << "SELECT EXISTS " <<
                                      "(SELECT 1 FROM Tables " <<
                                      "WHERE table_id = " << quote << file_schema.table_id_ << " " <<
                                      "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                                      "AS " << quote << "check" << ";";
1504

1505
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
1506

1507
                StoreQueryResult res = updateTableFilesQuery.store();
1508

1509 1510 1511
                int check = res[0]["check"];
                has_tables[file_schema.table_id_] = (check == 1);
            }
1512

1513
            for (auto &file_schema : files) {
1514

1515 1516
                if (!has_tables[file_schema.table_id_]) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
1517
                }
1518
                file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1519

1520 1521 1522 1523 1524
                std::string id = std::to_string(file_schema.id_);
                std::string table_id = file_schema.table_id_;
                std::string engine_type = std::to_string(file_schema.engine_type_);
                std::string file_id = file_schema.file_id_;
                std::string file_type = std::to_string(file_schema.file_type_);
1525 1526
                std::string file_size = std::to_string(file_schema.file_size_);
                std::string row_count = std::to_string(file_schema.row_count_);
1527 1528 1529
                std::string updated_time = std::to_string(file_schema.updated_time_);
                std::string created_on = std::to_string(file_schema.created_on_);
                std::string date = std::to_string(file_schema.date_);
Z
fix  
zhiru 已提交
1530

1531 1532 1533 1534 1535
                updateTableFilesQuery << "UPDATE TableFiles " <<
                                      "SET table_id = " << quote << table_id << ", " <<
                                      "engine_type = " << engine_type << ", " <<
                                      "file_id = " << quote << file_id << ", " <<
                                      "file_type = " << file_type << ", " <<
1536 1537
                                      "file_size = " << file_size << ", " <<
                                      "row_count = " << row_count << ", " <<
1538 1539 1540 1541 1542 1543 1544 1545
                                      "updated_time = " << updated_time << ", " <<
                                      "created_on = " << created_on << ", " <<
                                      "date = " << date << " " <<
                                      "WHERE id = " << id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();

                if (!updateTableFilesQuery.exec()) {
S
starlord 已提交
1546
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error());
1547 1548 1549 1550
                }
            }
        } //Scoped Connection

S
starlord 已提交
1551 1552
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
1553
    }
S
starlord 已提交
1554

1555 1556
    return Status::OK();
}
Z
fix  
zhiru 已提交
1557

1558 1559
Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1560 1561 1562
    std::set<std::string> table_ids;

    //remove to_delete files
1563
    try {
Y
Yu Kun 已提交
1564
        server::MetricCollector metric;
1565

1566 1567
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1568

1569
            if (connectionPtr == nullptr) {
S
starlord 已提交
1570
                return Status(DB_ERROR, "Failed to connect to database server");
1571
            }
Z
zhiru 已提交
1572

1573 1574 1575 1576 1577
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
            cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " <<
                                     "FROM TableFiles " <<
                                     "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " <<
                                     "updated_time < " << std::to_string(now - seconds * US_PS) << ";";
Z
update  
zhiru 已提交
1578

1579
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1580

1581
            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1582

1583 1584
            TableFileSchema table_file;
            std::vector<std::string> idsToDelete;
1585

1586
            for (auto &resRow : res) {
1587

1588
                table_file.id_ = resRow["id"]; //implicit conversion
1589

1590 1591 1592
                std::string table_id;
                resRow["table_id"].to_string(table_id);
                table_file.table_id_ = table_id;
Z
fix  
zhiru 已提交
1593

1594 1595 1596
                std::string file_id;
                resRow["file_id"].to_string(file_id);
                table_file.file_id_ = file_id;
Z
update  
zhiru 已提交
1597

1598
                table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1599

1600 1601
                utils::DeleteTableFilePath(options_, table_file);

S
starlord 已提交
1602
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
1603 1604

                idsToDelete.emplace_back(std::to_string(table_file.id_));
S
starlord 已提交
1605 1606

                table_ids.insert(table_file.table_id_);
1607 1608 1609 1610 1611 1612 1613
            }

            if (!idsToDelete.empty()) {

                std::stringstream idsToDeleteSS;
                for (auto &id : idsToDelete) {
                    idsToDeleteSS << "id = " << id << " OR ";
1614
                }
1615

1616 1617 1618 1619
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
                cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " <<
                                         idsToDeleteStr << ";";
1620

1621 1622 1623
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1624
                    return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", cleanUpFilesWithTTLQuery.error());
1625 1626 1627 1628
                }
            }
        } //Scoped Connection

S
starlord 已提交
1629 1630
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
Z
update  
zhiru 已提交
1631
    }
1632

S
starlord 已提交
1633
    //remove to_delete tables
1634
    try {
Y
Yu Kun 已提交
1635
        server::MetricCollector metric;
1636 1637

        {
Z
update  
zhiru 已提交
1638
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1639

Z
update  
zhiru 已提交
1640
            if (connectionPtr == nullptr) {
S
starlord 已提交
1641
                return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1642 1643
            }

1644 1645 1646 1647
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
            cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
                                     "FROM Tables " <<
                                     "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
1648

1649 1650 1651
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1652

Z
update  
zhiru 已提交
1653 1654
            if (!res.empty()) {

1655 1656 1657 1658 1659
                std::stringstream idsToDeleteSS;
                for (auto &resRow : res) {
                    size_t id = resRow["id"];
                    std::string table_id;
                    resRow["table_id"].to_string(table_id);
Z
update  
zhiru 已提交
1660

S
starlord 已提交
1661
                    utils::DeleteTablePath(options_, table_id, false);//only delete empty folder
1662 1663

                    idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
Z
update  
zhiru 已提交
1664
                }
1665 1666 1667 1668
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
                cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " <<
                                         idsToDeleteStr << ";";
1669

1670
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1671

1672
                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1673
                    return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", cleanUpFilesWithTTLQuery.error());
1674 1675 1676 1677
                }
            }
        } //Scoped Connection

S
starlord 已提交
1678 1679
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
Z
update  
zhiru 已提交
1680 1681
    }

S
starlord 已提交
1682 1683 1684
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1685
        server::MetricCollector metric;
S
starlord 已提交
1686 1687 1688 1689 1690

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
1691
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
1692 1693 1694 1695 1696 1697
            }

            for(auto& table_id : table_ids) {
                Query cleanUpFilesWithTTLQuery = connectionPtr->query();
                cleanUpFilesWithTTLQuery << "SELECT file_id " <<
                                         "FROM TableFiles " <<
S
starlord 已提交
1698
                                         "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
1699 1700 1701 1702 1703 1704 1705 1706 1707 1708

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                StoreQueryResult res = cleanUpFilesWithTTLQuery.store();

                if (res.empty()) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }
        }
S
starlord 已提交
1709 1710
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
S
starlord 已提交
1711 1712
    }

1713 1714
    return Status::OK();
}
1715

1716 1717 1718
Status MySQLMetaImpl::CleanUp() {
    try {
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1719

1720
        if (connectionPtr == nullptr) {
S
starlord 已提交
1721
            return Status(DB_ERROR, "Failed to connect to database server");
1722
        }
1723

1724 1725 1726 1727 1728
        Query cleanUpQuery = connectionPtr->query();
        cleanUpQuery << "SELECT table_name " <<
                     "FROM information_schema.tables " <<
                     "WHERE table_schema = " << quote << mysql_connection_pool_->getDB() << " " <<
                     "AND table_name = " << quote << "TableFiles" << ";";
Z
update  
zhiru 已提交
1729

1730
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
Z
update  
zhiru 已提交
1731

1732
        StoreQueryResult res = cleanUpQuery.store();
Z
update  
zhiru 已提交
1733

1734 1735
        if (!res.empty()) {
            ENGINE_LOG_DEBUG << "Remove table file type as NEW";
1736 1737 1738 1739
            cleanUpQuery << "DELETE FROM TableFiles WHERE file_type IN ("
                    << std::to_string(TableFileSchema::NEW) << ","
                    << std::to_string(TableFileSchema::NEW_MERGE) << ","
                    << std::to_string(TableFileSchema::NEW_INDEX) << ");";
1740

1741
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
1742

1743
            if (!cleanUpQuery.exec()) {
S
starlord 已提交
1744
                return HandleException("QUERY ERROR WHEN CLEANING UP FILES", cleanUpQuery.error());
Z
update  
zhiru 已提交
1745
            }
1746
        }
1747

S
starlord 已提交
1748 1749
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
Z
update  
zhiru 已提交
1750 1751
    }

1752 1753 1754 1755 1756
    return Status::OK();
}

Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
    try {
Y
Yu Kun 已提交
1757
        server::MetricCollector metric;
1758 1759 1760 1761 1762 1763 1764

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);

        if (!status.ok()) {
            return status;
1765
        }
Z
zhiru 已提交
1766

1767 1768
        StoreQueryResult res;
        {
Z
update  
zhiru 已提交
1769
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1770

Z
update  
zhiru 已提交
1771
            if (connectionPtr == nullptr) {
S
starlord 已提交
1772
                return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1773 1774
            }

Z
update  
zhiru 已提交
1775

1776
            Query countQuery = connectionPtr->query();
S
starlord 已提交
1777
            countQuery << "SELECT row_count " <<
1778 1779 1780 1781 1782
                       "FROM TableFiles " <<
                       "WHERE table_id = " << quote << table_id << " AND " <<
                       "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
Z
update  
zhiru 已提交
1783

1784
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
Z
update  
zhiru 已提交
1785

1786 1787 1788 1789 1790
            res = countQuery.store();
        } //Scoped Connection

        result = 0;
        for (auto &resRow : res) {
S
starlord 已提交
1791
            size_t size = resRow["row_count"];
1792
            result += size;
1793
        }
1794

S
starlord 已提交
1795 1796
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
Z
update  
zhiru 已提交
1797
    }
S
starlord 已提交
1798

1799 1800 1801 1802 1803
    return Status::OK();
}

Status MySQLMetaImpl::DropAll() {
    try {
S
starlord 已提交
1804
        ENGINE_LOG_DEBUG << "Drop all mysql meta";
1805 1806 1807
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

        if (connectionPtr == nullptr) {
S
starlord 已提交
1808
            return Status(DB_ERROR, "Failed to connect to database server");
Z
zhiru 已提交
1809
        }
1810 1811 1812 1813 1814 1815 1816 1817 1818

        Query dropTableQuery = connectionPtr->query();
        dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();

        if (dropTableQuery.exec()) {
            return Status::OK();
        } else {
S
starlord 已提交
1819
            return HandleException("QUERY ERROR WHEN DROPPING ALL", dropTableQuery.error());
1820
        }
S
starlord 已提交
1821 1822
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
1823 1824 1825
    }
}

Z
update  
zhiru 已提交
1826 1827 1828 1829
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz