MySQLMetaImpl.cpp 69.5 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

Z
update  
zhiru 已提交
18
#include "MySQLMetaImpl.h"
S
starlord 已提交
19 20
#include "db/IDGenerator.h"
#include "db/Utils.h"
S
starlord 已提交
21
#include "utils/Log.h"
Z
update  
zhiru 已提交
22 23 24 25 26 27 28 29 30 31 32
#include "MetaConsts.h"
#include "metrics/Metrics.h"

#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <regex>
#include <string>
Z
zhiru 已提交
33
#include <mutex>
Z
zhiru 已提交
34
#include <thread>
Z
update  
zhiru 已提交
35 36 37

#include "mysql++/mysql++.h"

38

Z
update  
zhiru 已提交
39 40 41 42 43
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {

44
using namespace mysqlpp;
Z
update  
zhiru 已提交
45

46
namespace {
Z
update  
zhiru 已提交
47

S
starlord 已提交
48 49 50 51 52 53 54 55 56
Status HandleException(const std::string &desc, const char* what = nullptr) {
    if(what == nullptr) {
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
57
}
Z
update  
zhiru 已提交
58

59 60
}

61 62 63 64 65 66 67
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
    : options_(options_),
      mode_(mode) {
    Initialize();
}

MySQLMetaImpl::~MySQLMetaImpl() {
S
starlord 已提交
68

69 70
}

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    table_id = ss.str();
    return Status::OK();
}

Status MySQLMetaImpl::NextFileId(std::string &file_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.GetNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

Status MySQLMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
        if (!ret) {
S
starlord 已提交
91 92 93
            std::string msg = "Failed to create db directory " + options_.path;
            ENGINE_LOG_ERROR << msg;
            return Status(DB_META_TRANSACTION_FAILED, msg);
Z
update  
zhiru 已提交
94 95 96
        }
    }

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    std::string uri = options_.backend_uri;

    std::string dialectRegex = "(.*)";
    std::string usernameRegex = "(.*)";
    std::string passwordRegex = "(.*)";
    std::string hostRegex = "(.*)";
    std::string portRegex = "(.*)";
    std::string dbNameRegex = "(.*)";
    std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
        usernameRegex + "\\:" +
        passwordRegex + "\\@" +
        hostRegex + "\\:" +
        portRegex + "\\/" +
        dbNameRegex;
    std::regex uriRegex(uriRegexStr);
    std::smatch pieces_match;

    if (std::regex_match(uri, pieces_match, uriRegex)) {
        std::string dialect = pieces_match[1].str();
        std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
        if (dialect.find("mysql") == std::string::npos) {
S
starlord 已提交
118
            return Status(DB_ERROR, "URI's dialect is not MySQL");
119
        }
120 121 122 123 124 125
        std::string username = pieces_match[2].str();
        std::string password = pieces_match[3].str();
        std::string serverAddress = pieces_match[4].str();
        unsigned int port = 0;
        if (!pieces_match[5].str().empty()) {
            port = std::stoi(pieces_match[5].str());
126
        }
127
        std::string dbName = pieces_match[6].str();
128 129


130 131 132 133
        int threadHint = std::thread::hardware_concurrency();
        int maxPoolSize = threadHint == 0 ? 8 : threadHint;
        mysql_connection_pool_ =
            std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
134

135 136
        ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
        try {
137

138 139
            if (mode_ != Options::MODE::READ_ONLY) {
                CleanUp();
140 141
            }

142
            {
Z
update  
zhiru 已提交
143
                ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
144

Z
update  
zhiru 已提交
145
                if (connectionPtr == nullptr) {
S
starlord 已提交
146
                    return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
147 148
                }

149

150 151
                if (!connectionPtr->thread_aware()) {
                    ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
S
starlord 已提交
152
                    return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
153
                }
154
                Query InitializeQuery = connectionPtr->query();
Z
update  
zhiru 已提交
155

156 157 158 159 160 161
                InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" <<
                                "id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
                                "table_id VARCHAR(255) UNIQUE NOT NULL, " <<
                                "state INT NOT NULL, " <<
                                "dimension SMALLINT NOT NULL, " <<
                                "created_on BIGINT NOT NULL, " <<
S
starlord 已提交
162
                                "flag BIGINT DEFAULT 0 NOT NULL, " <<
S
starlord 已提交
163
                                "index_file_size BIGINT DEFAULT 1024 NOT NULL, " <<
164
                                "engine_type INT DEFAULT 1 NOT NULL, " <<
165 166
                                "nlist INT DEFAULT 16384 NOT NULL, " <<
                                "metric_type INT DEFAULT 1 NOT NULL);";
Z
zhiru 已提交
167

168
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
Z
update  
zhiru 已提交
169

170
                if (!InitializeQuery.exec()) {
S
starlord 已提交
171
                    return HandleException("Initialization Error", InitializeQuery.error());
Z
update  
zhiru 已提交
172 173
                }

174 175 176 177 178 179
                InitializeQuery << "CREATE TABLE IF NOT EXISTS TableFiles (" <<
                                "id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
                                "table_id VARCHAR(255) NOT NULL, " <<
                                "engine_type INT DEFAULT 1 NOT NULL, " <<
                                "file_id VARCHAR(255) NOT NULL, " <<
                                "file_type INT DEFAULT 0 NOT NULL, " <<
180 181
                                "file_size BIGINT DEFAULT 0 NOT NULL, " <<
                                "row_count BIGINT DEFAULT 0 NOT NULL, " <<
182 183 184 185 186 187 188
                                "updated_time BIGINT NOT NULL, " <<
                                "created_on BIGINT NOT NULL, " <<
                                "date INT DEFAULT -1 NOT NULL);";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();

                if (!InitializeQuery.exec()) {
S
starlord 已提交
189
                    return HandleException("Initialization Error", InitializeQuery.error());
190
                }
191
            } //Scoped Connection
Z
update  
zhiru 已提交
192

Z
zhiru 已提交
193
        } catch (std::exception &e) {
S
starlord 已提交
194
            return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
195
        }
196 197
    } else {
        ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
S
starlord 已提交
198
        return Status(DB_ERROR, "Wrong URI format");
Z
update  
zhiru 已提交
199
    }
S
starlord 已提交
200 201

    return Status::OK();
202 203 204 205 206 207
}

// PXU TODO: Temp solution. Will fix later
Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                            const DatesT &dates) {
    if (dates.empty()) {
P
peng.xu 已提交
208 209 210
        return Status::OK();
    }

211 212 213 214 215 216
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
Z
zhiru 已提交
217

218 219 220 221 222 223 224
    try {
        std::stringstream dateListSS;
        for (auto &date : dates) {
            dateListSS << std::to_string(date) << ", ";
        }
        std::string dateListStr = dateListSS.str();
        dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", "
Z
update  
zhiru 已提交
225

226 227
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
228

229
            if (connectionPtr == nullptr) {
S
starlord 已提交
230
                return Status(DB_ERROR, "Failed to connect to database server");
231
            }
Z
update  
zhiru 已提交
232

Z
update  
zhiru 已提交
233

234
            Query dropPartitionsByDatesQuery = connectionPtr->query();
235

236
            dropPartitionsByDatesQuery << "UPDATE TableFiles " <<
237 238
                                       "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
                                       "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
239 240
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "date in (" << dateListStr << ");";
Z
update  
zhiru 已提交
241

242
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str();
Z
update  
zhiru 已提交
243

244
            if (!dropPartitionsByDatesQuery.exec()) {
S
starlord 已提交
245
                return HandleException("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error());
Z
update  
zhiru 已提交
246
            }
247
        } //Scoped Connection
S
starlord 已提交
248 249
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
Z
update  
zhiru 已提交
250
    }
251 252
    return Status::OK();
}
Z
update  
zhiru 已提交
253

254 255
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
256
        server::MetricCollector metric;
257 258
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
259

260
            if (connectionPtr == nullptr) {
S
starlord 已提交
261
                return Status(DB_ERROR, "Failed to connect to database server");
262
            }
Z
update  
zhiru 已提交
263

264
            Query createTableQuery = connectionPtr->query();
Z
update  
zhiru 已提交
265

266 267 268 269 270
            if (table_schema.table_id_.empty()) {
                NextTableId(table_schema.table_id_);
            } else {
                createTableQuery << "SELECT state FROM Tables " <<
                                 "WHERE table_id = " << quote << table_schema.table_id_ << ";";
Z
zhiru 已提交
271

272
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
Z
update  
zhiru 已提交
273

274
                StoreQueryResult res = createTableQuery.store();
275

276 277 278
                if (res.num_rows() == 1) {
                    int state = res[0]["state"];
                    if (TableSchema::TO_DELETE == state) {
S
starlord 已提交
279
                        return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
280
                    } else {
S
starlord 已提交
281
                        return Status(DB_ALREADY_EXIST, "Table already exists");
282 283 284
                    }
                }
            }
285

286 287
            table_schema.id_ = -1;
            table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Z
update  
zhiru 已提交
288

289 290 291 292 293
            std::string id = "NULL"; //auto-increment
            std::string table_id = table_schema.table_id_;
            std::string state = std::to_string(table_schema.state_);
            std::string dimension = std::to_string(table_schema.dimension_);
            std::string created_on = std::to_string(table_schema.created_on_);
S
starlord 已提交
294 295
            std::string flag = std::to_string(table_schema.flag_);
            std::string index_file_size = std::to_string(table_schema.index_file_size_);
296
            std::string engine_type = std::to_string(table_schema.engine_type_);
S
starlord 已提交
297 298
            std::string nlist = std::to_string(table_schema.nlist_);
            std::string metric_type = std::to_string(table_schema.metric_type_);
Z
update  
zhiru 已提交
299

300 301
            createTableQuery << "INSERT INTO Tables VALUES" <<
                             "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " <<
S
starlord 已提交
302 303
                             created_on << ", " << flag << ", " << index_file_size << ", " << engine_type << ", " <<
                             nlist << ", " << metric_type << ");";
Z
update  
zhiru 已提交
304

305
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
306

307 308
            if (SimpleResult res = createTableQuery.execute()) {
                table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
309

310 311
                //Consume all results to avoid "Commands out of sync" error
            } else {
S
starlord 已提交
312
                return HandleException("Add Table Error", createTableQuery.error());
313
            }
314
        } //Scoped Connection
315

316
        return utils::CreateTablePath(options_, table_schema.table_id_);
Z
update  
zhiru 已提交
317

318
    } catch (std::exception &e) {
S
starlord 已提交
319
        return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
320 321
    }
}
322

323 324 325 326
Status MySQLMetaImpl::FilesByType(const std::string &table_id,
                                  const std::vector<int> &file_types,
                                  std::vector<std::string> &file_ids) {
    if(file_types.empty()) {
S
starlord 已提交
327
        return Status(DB_ERROR, "file types array is empty");
328
    }
Z
zhiru 已提交
329 330

    try {
331 332
        file_ids.clear();

Z
zhiru 已提交
333 334 335 336 337
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
338
                return Status(DB_ERROR, "Failed to connect to database server");
Z
zhiru 已提交
339 340
            }

341 342 343 344 345 346 347 348
            std::string types;
            for(auto type : file_types) {
                if(!types.empty()) {
                    types += ",";
                }
                types += std::to_string(type);
            }

Z
zhiru 已提交
349 350
            Query hasNonIndexFilesQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
351
            hasNonIndexFilesQuery << "SELECT file_id, file_type FROM TableFiles " <<
Z
fix  
zhiru 已提交
352
                                  "WHERE table_id = " << quote << table_id << " AND " <<
353
                                  "file_type in (" << types << ");";
Z
zhiru 已提交
354

355
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
Z
zhiru 已提交
356 357 358 359

            res = hasNonIndexFilesQuery.store();
        } //Scoped Connection

360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
        if (res.num_rows() > 0) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
            for (auto &resRow : res) {
                std::string file_id;
                resRow["file_id"].to_string(file_id);
                file_ids.push_back(file_id);

                int32_t file_type = resRow["file_type"];
                switch (file_type) {
                    case (int) TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int) TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int) TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int) TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int) TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    case (int) TableFileSchema::INDEX:
                        index_count++;
                        break;
                    case (int) TableFileSchema::BACKUP:
                        backup_count++;
                        break;
                    default:
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                             << " new files:" << new_count << " new_merge files:" << new_merge_count
                             << " new_index files:" << new_index_count << " to_index files:" << to_index_count
                             << " index files:" << index_count << " backup files:" << backup_count;
        }
Z
zhiru 已提交
401

S
starlord 已提交
402 403
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
Z
zhiru 已提交
404 405
    }

406 407
    return Status::OK();
}
408

S
starlord 已提交
409
Status MySQLMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex& index) {
S
starlord 已提交
410
    try {
Y
Yu Kun 已提交
411
        server::MetricCollector metric;
S
starlord 已提交
412 413 414 415 416

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
417
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
418 419 420 421 422 423 424 425
            }

            Query updateTableIndexParamQuery = connectionPtr->query();
            updateTableIndexParamQuery << "SELECT id, state, dimension, created_on " <<
                                       "FROM Tables " <<
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

S
starlord 已提交
426
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
S
starlord 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442

            StoreQueryResult res = updateTableIndexParamQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                size_t id = resRow["id"];
                int32_t state = resRow["state"];
                uint16_t dimension = resRow["dimension"];
                int64_t created_on = resRow["created_on"];

                updateTableIndexParamQuery << "UPDATE Tables " <<
                                           "SET id = " << id << ", " <<
                                           "state = " << state << ", " <<
                                           "dimension = " << dimension << ", " <<
                                           "created_on = " << created_on << ", " <<
S
starlord 已提交
443
                                           "engine_type = " << index.engine_type_ << ", " <<
S
starlord 已提交
444
                                           "nlist = " << index.nlist_ << ", " <<
S
starlord 已提交
445
                                           "metric_type = " << index.metric_type_ << " " <<
S
starlord 已提交
446
                                           "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
447

S
starlord 已提交
448
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
S
starlord 已提交
449 450 451


                if (!updateTableIndexParamQuery.exec()) {
S
starlord 已提交
452
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM", updateTableIndexParamQuery.error());
S
starlord 已提交
453 454
                }
            } else {
S
starlord 已提交
455
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
S
starlord 已提交
456 457 458 459
            }

        } //Scoped Connection

S
starlord 已提交
460 461
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
S
starlord 已提交
462 463
    }

464 465 466
    return Status::OK();
}

S
starlord 已提交
467 468
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    try {
Y
Yu Kun 已提交
469
        server::MetricCollector metric;
S
starlord 已提交
470 471 472 473 474

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
475
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
476 477 478 479 480
            }

            Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE Tables " <<
                                 "SET flag = " << flag << " " <<
S
starlord 已提交
481
                                 "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
482 483 484 485

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();

            if (!updateTableFlagQuery.exec()) {
S
starlord 已提交
486
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLAG", updateTableFlagQuery.error());
S
starlord 已提交
487 488 489 490
            }

        } //Scoped Connection

S
starlord 已提交
491 492
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
S
starlord 已提交
493 494 495 496 497
    }

    return Status::OK();
}

498
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
S
starlord 已提交
499
    try {
Y
Yu Kun 已提交
500
        server::MetricCollector metric;
501

S
starlord 已提交
502 503
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
504

S
starlord 已提交
505
            if (connectionPtr == nullptr) {
S
starlord 已提交
506
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
507 508 509 510 511 512 513
            }

            Query describeTableIndexQuery = connectionPtr->query();
            describeTableIndexQuery << "SELECT engine_type, nlist, index_file_size, metric_type " <<
                                       "FROM Tables " <<
                                       "WHERE table_id = " << quote << table_id << " AND " <<
                                       "state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
514

S
starlord 已提交
515
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
Z
update  
zhiru 已提交
516

S
starlord 已提交
517 518 519 520 521 522 523 524 525
            StoreQueryResult res = describeTableIndexQuery.store();

            if (res.num_rows() == 1) {
                const Row &resRow = res[0];

                index.engine_type_ = resRow["engine_type"];
                index.nlist_ = resRow["nlist"];
                index.metric_type_ = resRow["metric_type"];
            } else {
S
starlord 已提交
526
                return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
S
starlord 已提交
527 528 529
            }

        } //Scoped Connection
Z
update  
zhiru 已提交
530

S
starlord 已提交
531 532
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
S
starlord 已提交
533 534 535 536 537 538 539
    }

    return Status::OK();
}

Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
540
        server::MetricCollector metric;
Z
update  
zhiru 已提交
541

542 543
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
544

545
            if (connectionPtr == nullptr) {
S
starlord 已提交
546
                return Status(DB_ERROR, "Failed to connect to database server");
547
            }
548

S
starlord 已提交
549 550
            Query dropTableIndexQuery = connectionPtr->query();

551
            //soft delete index files
S
starlord 已提交
552 553 554 555 556 557 558 559 560
            dropTableIndexQuery << "UPDATE TableFiles " <<
                                "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::INDEX) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
561
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
S
starlord 已提交
562 563
            }

564
            //set all backup file to raw
S
starlord 已提交
565 566 567 568 569 570 571 572 573
            dropTableIndexQuery << "UPDATE TableFiles " <<
                                "SET file_type = " << std::to_string(TableFileSchema::RAW) << "," <<
                                "updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
                                "WHERE table_id = " << quote << table_id << " AND " <<
                                "file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
574
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
575 576 577 578 579
            }

            //set table index type to raw
            dropTableIndexQuery << "UPDATE Tables " <<
                                "SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE) << "," <<
S
starlord 已提交
580
                                "nlist = " << std::to_string(DEFAULT_NLIST) << ", " <<
581 582 583 584 585 586
                                "metric_type = " << std::to_string(DEFAULT_METRIC_TYPE) << " " <<
                                "WHERE table_id = " << quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();

            if (!dropTableIndexQuery.exec()) {
S
starlord 已提交
587
                return HandleException("QUERY ERROR WHEN DROPPING TABLE INDEX", dropTableIndexQuery.error());
S
starlord 已提交
588 589 590 591
            }

        } //Scoped Connection

S
starlord 已提交
592 593
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
S
starlord 已提交
594
    }
595

S
starlord 已提交
596 597
    return Status::OK();
}
Z
update  
zhiru 已提交
598

S
starlord 已提交
599 600
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
601
        server::MetricCollector metric;
S
starlord 已提交
602 603
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
604

S
starlord 已提交
605
            if (connectionPtr == nullptr) {
S
starlord 已提交
606
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
607
            }
Z
zhiru 已提交
608

609 610 611 612 613 614
            //soft delete table
            Query deleteTableQuery = connectionPtr->query();
//
            deleteTableQuery << "UPDATE Tables " <<
                             "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " <<
                             "WHERE table_id = " << quote << table_id << ";";
Z
update  
zhiru 已提交
615

616
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
617

618
            if (!deleteTableQuery.exec()) {
S
starlord 已提交
619
                return HandleException("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
620
            }
621

622
        } //Scoped Connection
Z
zhiru 已提交
623

624 625 626
        if (mode_ == Options::MODE::CLUSTER) {
            DeleteTableFiles(table_id);
        }
Z
update  
zhiru 已提交
627

S
starlord 已提交
628 629
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
630
    }
Z
update  
zhiru 已提交
631

632 633
    return Status::OK();
}
Z
update  
zhiru 已提交
634

635 636
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
637
        server::MetricCollector metric;
638 639 640 641
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
642
                return Status(DB_ERROR, "Failed to connect to database server");
643
            }
644

645 646 647 648 649 650 651 652
            //soft delete table files
            Query deleteTableFilesQuery = connectionPtr->query();
            //
            deleteTableFilesQuery << "UPDATE TableFiles " <<
                                  "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                                  "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                                  "WHERE table_id = " << quote << table_id << " AND " <<
                                  "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
653

654
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
655

656
            if (!deleteTableFilesQuery.exec()) {
S
starlord 已提交
657
                return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
658
            }
659
        } //Scoped Connection
S
starlord 已提交
660 661
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
Z
update  
zhiru 已提交
662 663
    }

664 665
    return Status::OK();
}
Z
zhiru 已提交
666

667 668
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
    try {
Y
Yu Kun 已提交
669
        server::MetricCollector metric;
670 671 672
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
673

674
            if (connectionPtr == nullptr) {
S
starlord 已提交
675
                return Status(DB_ERROR, "Failed to connect to database server");
676
            }
Z
zhiru 已提交
677

678
            Query describeTableQuery = connectionPtr->query();
S
starlord 已提交
679 680
            describeTableQuery << "SELECT id, state, dimension, created_on, " <<
                               "flag, index_file_size, engine_type, nlist, metric_type " <<
681 682 683 684 685
                               "FROM Tables " <<
                               "WHERE table_id = " << quote << table_schema.table_id_ << " " <<
                               "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
Z
update  
zhiru 已提交
686

687 688
            res = describeTableQuery.store();
        } //Scoped Connection
689

690 691
        if (res.num_rows() == 1) {
            const Row &resRow = res[0];
692

693
            table_schema.id_ = resRow["id"]; //implicit conversion
Z
update  
zhiru 已提交
694

S
starlord 已提交
695 696
            table_schema.state_ = resRow["state"];

697
            table_schema.dimension_ = resRow["dimension"];
698

S
starlord 已提交
699 700 701 702
            table_schema.created_on_ = resRow["created_on"];

            table_schema.flag_ = resRow["flag"];

703 704
            table_schema.index_file_size_ = resRow["index_file_size"];

705
            table_schema.engine_type_ = resRow["engine_type"];
S
starlord 已提交
706 707 708 709

            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];
710
        } else {
S
starlord 已提交
711
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
712
        }
Z
update  
zhiru 已提交
713

S
starlord 已提交
714 715
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
Z
update  
zhiru 已提交
716 717
    }

718 719
    return Status::OK();
}
Z
zhiru 已提交
720

721 722
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
    try {
Y
Yu Kun 已提交
723
        server::MetricCollector metric;
724 725 726
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
727

728
            if (connectionPtr == nullptr) {
S
starlord 已提交
729
                return Status(DB_ERROR, "Failed to connect to database server");
730
            }
Z
update  
zhiru 已提交
731

732 733 734 735 736 737 738
            Query hasTableQuery = connectionPtr->query();
            //since table_id is a unique column we just need to check whether it exists or not
            hasTableQuery << "SELECT EXISTS " <<
                          "(SELECT 1 FROM Tables " <<
                          "WHERE table_id = " << quote << table_id << " " <<
                          "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                          "AS " << quote << "check" << ";";
Z
update  
zhiru 已提交
739

740
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
741

742 743
            res = hasTableQuery.store();
        } //Scoped Connection
744

745 746
        int check = res[0]["check"];
        has_or_not = (check == 1);
747

S
starlord 已提交
748 749
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
750
    }
751

752 753
    return Status::OK();
}
754

755 756
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
    try {
Y
Yu Kun 已提交
757
        server::MetricCollector metric;
758 759 760
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
761

762
            if (connectionPtr == nullptr) {
S
starlord 已提交
763
                return Status(DB_ERROR, "Failed to connect to database server");
764
            }
Z
update  
zhiru 已提交
765

766
            Query allTablesQuery = connectionPtr->query();
S
starlord 已提交
767
            allTablesQuery << "SELECT id, table_id, dimension, engine_type, nlist, index_file_size, metric_type " <<
768 769
                           "FROM Tables " <<
                           "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
Z
zhiru 已提交
770

771
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
Z
zhiru 已提交
772

773 774
            res = allTablesQuery.store();
        } //Scoped Connection
775

776 777
        for (auto &resRow : res) {
            TableSchema table_schema;
Z
update  
zhiru 已提交
778

779
            table_schema.id_ = resRow["id"]; //implicit conversion
780

781 782 783
            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_schema.table_id_ = table_id;
784

785
            table_schema.dimension_ = resRow["dimension"];
786

787 788
            table_schema.index_file_size_ = resRow["index_file_size"];

789
            table_schema.engine_type_ = resRow["engine_type"];
790

S
starlord 已提交
791 792 793 794
            table_schema.nlist_ = resRow["nlist"];

            table_schema.metric_type_ = resRow["metric_type"];

795 796
            table_schema_array.emplace_back(table_schema);
        }
S
starlord 已提交
797 798
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
799
    }
Z
update  
zhiru 已提交
800

801 802
    return Status::OK();
}
803

804 805
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
    if (file_schema.date_ == EmptyDate) {
806
        file_schema.date_ = utils::GetDate();
807 808 809 810 811 812 813
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }
814

815
    try {
Y
Yu Kun 已提交
816
        server::MetricCollector metric;
817 818 819

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
820 821
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
822 823
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
824
        file_schema.index_file_size_ = table_schema.index_file_size_;
825
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
826 827
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
828 829 830 831 832 833

        std::string id = "NULL"; //auto-increment
        std::string table_id = file_schema.table_id_;
        std::string engine_type = std::to_string(file_schema.engine_type_);
        std::string file_id = file_schema.file_id_;
        std::string file_type = std::to_string(file_schema.file_type_);
S
starlord 已提交
834
        std::string file_size = std::to_string(file_schema.file_size_);
835
        std::string row_count = std::to_string(file_schema.row_count_);
836 837 838 839 840 841
        std::string updated_time = std::to_string(file_schema.updated_time_);
        std::string created_on = std::to_string(file_schema.created_on_);
        std::string date = std::to_string(file_schema.date_);

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
842

843
            if (connectionPtr == nullptr) {
S
starlord 已提交
844
                return Status(DB_ERROR, "Failed to connect to database server");
845
            }
Z
update  
zhiru 已提交
846

847
            Query createTableFileQuery = connectionPtr->query();
848

849 850
            createTableFileQuery << "INSERT INTO TableFiles VALUES" <<
                                 "(" << id << ", " << quote << table_id << ", " << engine_type << ", " <<
S
starlord 已提交
851 852
                                 quote << file_id << ", " << file_type << ", " << file_size << ", " <<
                                 row_count << ", " << updated_time << ", " << created_on << ", " << date << ");";
853 854 855 856 857 858 859 860

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();

            if (SimpleResult res = createTableFileQuery.execute()) {
                file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?

                //Consume all results to avoid "Commands out of sync" error
            } else {
S
starlord 已提交
861
                return HandleException("QUERY ERROR WHEN CREATING TABLE FILE", createTableFileQuery.error());
862
            }
863 864 865 866
        } // Scoped Connection

        return utils::CreateTableFilePath(options_, file_schema);

S
starlord 已提交
867 868
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
869 870
    }
}
871

872 873
Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
    files.clear();
874

875
    try {
Y
Yu Kun 已提交
876
        server::MetricCollector metric;
877 878 879 880 881
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
882
                return Status(DB_ERROR, "Failed to connect to database server");
883
            }
884

885
            Query filesToIndexQuery = connectionPtr->query();
S
starlord 已提交
886
            filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
887 888
                              "FROM TableFiles " <<
                              "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
889

890
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
891

892 893 894
            res = filesToIndexQuery.store();
        } //Scoped Connection

S
starlord 已提交
895
        Status ret;
896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id;
            resRow["table_id"].to_string(table_id);
            table_file.table_id_ = table_id;

            table_file.engine_type_ = resRow["engine_type"];

            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
914 915
            table_file.file_size_ = resRow["file_size"];

916
            table_file.row_count_ = resRow["row_count"];
917 918 919

            table_file.date_ = resRow["date"];

S
starlord 已提交
920 921
            table_file.created_on_ = resRow["created_on"];

922 923 924 925 926 927 928
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
929
                }
930
                groups[table_file.table_id_] = table_schema;
931 932

            }
933
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
934
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
935
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
936
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
Z
update  
zhiru 已提交
937

S
starlord 已提交
938 939
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
940
                ret = status;
S
starlord 已提交
941
            }
942 943 944

            files.push_back(table_file);
        }
S
starlord 已提交
945 946 947

        return ret;

S
starlord 已提交
948 949
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
Z
update  
zhiru 已提交
950
    }
951 952
}

X
xj.lin 已提交
953 954 955 956 957 958 959
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    const DatesT &partition,
                                    DatePartionedTableFilesSchema &files) {
    files.clear();

    try {
Y
Yu Kun 已提交
960
        server::MetricCollector metric;
X
xj.lin 已提交
961 962 963 964 965
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
966
                return Status(DB_ERROR, "Failed to connect to database server");
X
xj.lin 已提交
967 968 969
            }

            Query filesToSearchQuery = connectionPtr->query();
S
starlord 已提交
970
            filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
X
xj.lin 已提交
971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
                               "FROM TableFiles " <<
                               "WHERE table_id = " << quote << table_id;

            if (!partition.empty()) {
                std::stringstream partitionListSS;
                for (auto &date : partition) {
                    partitionListSS << std::to_string(date) << ", ";
                }
                std::string partitionListStr = partitionListSS.str();

                partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
                filesToSearchQuery << " AND " << "date IN (" << partitionListStr << ")";
            }

            if (!ids.empty()) {
                std::stringstream idSS;
                for (auto &id : ids) {
                    idSS << "id = " << std::to_string(id) << " OR ";
                }
                std::string idStr = idSS.str();
                idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "

                filesToSearchQuery  << " AND " << "(" << idStr << ")";

            }
            // End
            filesToSearchQuery << " AND " <<
                               "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                               "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();

            res = filesToSearchQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
1014
        Status ret;
X
xj.lin 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023
        TableFileSchema table_file;
        for (auto &resRow : res) {

            table_file.id_ = resRow["id"]; //implicit conversion

            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;

1024 1025
            table_file.index_file_size_ = table_schema.index_file_size_;

X
xj.lin 已提交
1026 1027
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1028 1029
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1030 1031
            table_file.metric_type_ = table_schema.metric_type_;

X
xj.lin 已提交
1032 1033 1034 1035 1036 1037
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;

            table_file.file_type_ = resRow["file_type"];

S
starlord 已提交
1038 1039
            table_file.file_size_ = resRow["file_size"];

1040
            table_file.row_count_ = resRow["row_count"];
X
xj.lin 已提交
1041 1042 1043 1044 1045

            table_file.date_ = resRow["date"];

            table_file.dimension_ = table_schema.dimension_;

S
starlord 已提交
1046 1047
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
1048
                ret = status;
S
starlord 已提交
1049
            }
1050

1051 1052 1053
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1054 1055
            }

1056
            files[table_file.date_].push_back(table_file);
1057
        }
S
starlord 已提交
1058 1059

        return ret;
S
starlord 已提交
1060 1061
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
Z
update  
zhiru 已提交
1062
    }
1063
}
Z
update  
zhiru 已提交
1064

1065 1066 1067
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
                                   DatePartionedTableFilesSchema &files) {
    files.clear();
Z
update  
zhiru 已提交
1068

1069
    try {
Y
Yu Kun 已提交
1070
        server::MetricCollector metric;
S
starlord 已提交
1071 1072 1073 1074 1075 1076 1077 1078 1079

        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

1080 1081 1082
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1083

1084
            if (connectionPtr == nullptr) {
S
starlord 已提交
1085
                return Status(DB_ERROR, "Failed to connect to database server");
1086
            }
Z
update  
zhiru 已提交
1087

1088
            Query filesToMergeQuery = connectionPtr->query();
S
starlord 已提交
1089
            filesToMergeQuery << "SELECT id, table_id, file_id, file_type, file_size, row_count, date, engine_type, created_on " <<
1090 1091 1092
                              "FROM TableFiles " <<
                              "WHERE table_id = " << quote << table_id << " AND " <<
                              "file_type = " << std::to_string(TableFileSchema::RAW) << " " <<
1093
                              "ORDER BY row_count DESC" << ";";
Z
update  
zhiru 已提交
1094

1095
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
1096

1097 1098
            res = filesToMergeQuery.store();
        } //Scoped Connection
1099

S
starlord 已提交
1100
        Status ret;
1101
        for (auto &resRow : res) {
S
starlord 已提交
1102 1103 1104 1105 1106
            TableFileSchema table_file;
            table_file.file_size_ = resRow["file_size"];
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }
Z
update  
zhiru 已提交
1107

1108
            table_file.id_ = resRow["id"]; //implicit conversion
1109

1110 1111 1112
            std::string table_id_str;
            resRow["table_id"].to_string(table_id_str);
            table_file.table_id_ = table_id_str;
Z
update  
zhiru 已提交
1113

1114 1115 1116
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            table_file.file_id_ = file_id;
1117

1118
            table_file.file_type_ = resRow["file_type"];
1119

S
starlord 已提交
1120 1121
            table_file.row_count_ = resRow["row_count"];

1122
            table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1123

1124 1125
            table_file.index_file_size_ = table_schema.index_file_size_;

S
starlord 已提交
1126 1127
            table_file.engine_type_ = resRow["engine_type"];

S
starlord 已提交
1128 1129
            table_file.nlist_ = table_schema.nlist_;

S
starlord 已提交
1130 1131
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1132 1133
            table_file.created_on_ = resRow["created_on"];

1134
            table_file.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1135

S
starlord 已提交
1136 1137
            auto status = utils::GetTableFilePath(options_, table_file);
            if(!status.ok()) {
S
starlord 已提交
1138
                ret = status;
S
starlord 已提交
1139
            }
Z
update  
zhiru 已提交
1140

1141 1142 1143
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
1144
            }
1145 1146

            files[table_file.date_].push_back(table_file);
1147
        }
Z
update  
zhiru 已提交
1148

S
starlord 已提交
1149 1150
        return ret;

S
starlord 已提交
1151 1152
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
1153 1154 1155 1156 1157 1158 1159
    }
}

Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
                                    const std::vector<size_t> &ids,
                                    TableFilesSchema &table_files) {
    if (ids.empty()) {
Z
update  
zhiru 已提交
1160 1161 1162
        return Status::OK();
    }

1163 1164 1165 1166 1167 1168
    std::stringstream idSS;
    for (auto &id : ids) {
        idSS << "id = " << std::to_string(id) << " OR ";
    }
    std::string idStr = idSS.str();
    idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "
Z
zhiru 已提交
1169

1170 1171 1172 1173 1174 1175
    try {
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
1176
                return Status(DB_ERROR, "Failed to connect to database server");
1177 1178 1179
            }

            Query getTableFileQuery = connectionPtr->query();
S
starlord 已提交
1180
            getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
1181 1182
                              "FROM TableFiles " <<
                              "WHERE table_id = " << quote << table_id << " AND " <<
S
starlord 已提交
1183 1184
                              "(" << idStr << ") AND " <<
                              "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
1185 1186 1187 1188 1189 1190 1191 1192

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();

            res = getTableFileQuery.store();
        } //Scoped Connection

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
S
starlord 已提交
1193
        DescribeTable(table_schema);
1194

S
starlord 已提交
1195
        Status ret;
1196
        for (auto &resRow : res) {
Z
zhiru 已提交
1197

1198
            TableFileSchema file_schema;
1199

1200
            file_schema.id_ = resRow["id"];
Z
zhiru 已提交
1201

1202
            file_schema.table_id_ = table_id;
Z
update  
zhiru 已提交
1203

1204 1205
            file_schema.index_file_size_ = table_schema.index_file_size_;

1206
            file_schema.engine_type_ = resRow["engine_type"];
Z
update  
zhiru 已提交
1207

S
starlord 已提交
1208 1209
            file_schema.nlist_ = table_schema.nlist_;

S
starlord 已提交
1210 1211
            file_schema.metric_type_ = table_schema.metric_type_;

1212 1213 1214
            std::string file_id;
            resRow["file_id"].to_string(file_id);
            file_schema.file_id_ = file_id;
Z
update  
zhiru 已提交
1215

1216
            file_schema.file_type_ = resRow["file_type"];
Z
update  
zhiru 已提交
1217

1218 1219 1220
            file_schema.file_size_ = resRow["file_size"];

            file_schema.row_count_ = resRow["row_count"];
1221

1222
            file_schema.date_ = resRow["date"];
1223

S
starlord 已提交
1224 1225
            file_schema.created_on_ = resRow["created_on"];

1226
            file_schema.dimension_ = table_schema.dimension_;
Z
update  
zhiru 已提交
1227

S
starlord 已提交
1228
            utils::GetTableFilePath(options_, file_schema);
1229 1230 1231

            table_files.emplace_back(file_schema);
        }
S
starlord 已提交
1232 1233 1234

        return ret;

S
starlord 已提交
1235 1236
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
Z
update  
zhiru 已提交
1237
    }
1238
}
Z
zhiru 已提交
1239

1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254
// PXU TODO: Support Swap
Status MySQLMetaImpl::Archive() {
    auto &criterias = options_.archive_conf.GetCriterias();
    if (criterias.empty()) {
        return Status::OK();
    }

    for (auto &kv : criterias) {
        auto &criteria = kv.first;
        auto &limit = kv.second;
        if (criteria == "days") {
            size_t usecs = limit * D_SEC * US_PS;
            long now = utils::GetMicroSecTimeStamp();

            try {
Z
update  
zhiru 已提交
1255
                ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1256

Z
update  
zhiru 已提交
1257
                if (connectionPtr == nullptr) {
S
starlord 已提交
1258
                    return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1259 1260
                }

1261 1262 1263 1264 1265
                Query archiveQuery = connectionPtr->query();
                archiveQuery << "UPDATE TableFiles " <<
                             "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                             "WHERE created_on < " << std::to_string(now - usecs) << " AND " <<
                             "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1266

1267
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();
Z
update  
zhiru 已提交
1268

1269
                if (!archiveQuery.exec()) {
S
starlord 已提交
1270
                    return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
1271
                }
1272

S
starlord 已提交
1273 1274
            } catch (std::exception &e) {
                return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
Z
zhiru 已提交
1275
            }
1276
        }
1277 1278 1279
        if (criteria == "disk") {
            uint64_t sum = 0;
            Size(sum);
Z
update  
zhiru 已提交
1280

1281 1282 1283
            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);
        }
Z
update  
zhiru 已提交
1284 1285
    }

1286 1287
    return Status::OK();
}
Z
zhiru 已提交
1288

1289 1290
Status MySQLMetaImpl::Size(uint64_t &result) {
    result = 0;
1291

S
starlord 已提交
1292
    try {
1293 1294 1295
        StoreQueryResult res;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1296

1297
            if (connectionPtr == nullptr) {
S
starlord 已提交
1298
                return Status(DB_ERROR, "Failed to connect to database server");
1299
            }
Z
zhiru 已提交
1300

1301
            Query getSizeQuery = connectionPtr->query();
1302
            getSizeQuery << "SELECT IFNULL(SUM(file_size),0) AS sum " <<
1303 1304
                         "FROM TableFiles " <<
                         "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
Z
update  
zhiru 已提交
1305

1306
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
Z
update  
zhiru 已提交
1307

1308 1309
            res = getSizeQuery.store();
        } //Scoped Connection
Z
update  
zhiru 已提交
1310

1311 1312 1313 1314 1315
        if (res.empty()) {
            result = 0;
        } else {
            result = res[0]["sum"];
        }
Z
update  
zhiru 已提交
1316

S
starlord 已提交
1317 1318
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
1319
    }
1320

1321 1322
    return Status::OK();
}
1323

1324 1325 1326 1327
Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
    if (to_discard_size <= 0) {

        return Status::OK();
Z
update  
zhiru 已提交
1328
    }
1329
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
Z
update  
zhiru 已提交
1330

1331
    try {
Y
Yu Kun 已提交
1332
        server::MetricCollector metric;
1333 1334 1335
        bool status;
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1336

1337
            if (connectionPtr == nullptr) {
S
starlord 已提交
1338
                return Status(DB_ERROR, "Failed to connect to database server");
1339
            }
Z
zhiru 已提交
1340

1341
            Query discardFilesQuery = connectionPtr->query();
1342
            discardFilesQuery << "SELECT id, file_size " <<
1343 1344 1345 1346
                              "FROM TableFiles " <<
                              "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
                              "ORDER BY id ASC " <<
                              "LIMIT 10;";
Z
update  
zhiru 已提交
1347

1348
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
1349

1350 1351 1352 1353
            StoreQueryResult res = discardFilesQuery.store();
            if (res.num_rows() == 0) {
                return Status::OK();
            }
1354

1355 1356 1357 1358 1359
            TableFileSchema table_file;
            std::stringstream idsToDiscardSS;
            for (auto &resRow : res) {
                if (to_discard_size <= 0) {
                    break;
Z
update  
zhiru 已提交
1360
                }
1361
                table_file.id_ = resRow["id"];
1362
                table_file.file_size_ = resRow["file_size"];
1363 1364
                idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR ";
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
1365 1366
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
1367
            }
Z
update  
zhiru 已提交
1368

1369 1370
            std::string idsToDiscardStr = idsToDiscardSS.str();
            idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR "
1371

1372 1373 1374 1375
            discardFilesQuery << "UPDATE TableFiles " <<
                              "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " <<
                              "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
                              "WHERE " << idsToDiscardStr << ";";
1376

1377
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
Z
update  
zhiru 已提交
1378

1379 1380
            status = discardFilesQuery.exec();
            if (!status) {
S
starlord 已提交
1381
                return HandleException("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error());
1382 1383 1384 1385 1386
            }
        } //Scoped Connection

        return DiscardFiles(to_discard_size);

S
starlord 已提交
1387 1388
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
P
peng.xu 已提交
1389
    }
1390
}
P
peng.xu 已提交
1391

1392 1393 1394
//ZR: this function assumes all fields in file_schema have value
Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1395

S
starlord 已提交
1396
    try {
Y
Yu Kun 已提交
1397
        server::MetricCollector metric;
1398 1399
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1400

1401
            if (connectionPtr == nullptr) {
S
starlord 已提交
1402
                return Status(DB_ERROR, "Failed to connect to database server");
1403
            }
Z
update  
zhiru 已提交
1404

1405
            Query updateTableFileQuery = connectionPtr->query();
Z
update  
zhiru 已提交
1406

1407 1408 1409 1410
            //if the table has been deleted, just mark the table file as TO_DELETE
            //clean thread will delete the file later
            updateTableFileQuery << "SELECT state FROM Tables " <<
                                 "WHERE table_id = " << quote << file_schema.table_id_ << ";";
Z
update  
zhiru 已提交
1411

1412
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
Z
update  
zhiru 已提交
1413

1414
            StoreQueryResult res = updateTableFileQuery.store();
1415

1416 1417 1418 1419
            if (res.num_rows() == 1) {
                int state = res[0]["state"];
                if (state == TableSchema::TO_DELETE) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
Z
update  
zhiru 已提交
1420
                }
1421 1422 1423 1424 1425 1426 1427 1428 1429
            } else {
                file_schema.file_type_ = TableFileSchema::TO_DELETE;
            }

            std::string id = std::to_string(file_schema.id_);
            std::string table_id = file_schema.table_id_;
            std::string engine_type = std::to_string(file_schema.engine_type_);
            std::string file_id = file_schema.file_id_;
            std::string file_type = std::to_string(file_schema.file_type_);
1430 1431
            std::string file_size = std::to_string(file_schema.file_size_);
            std::string row_count = std::to_string(file_schema.row_count_);
1432 1433 1434
            std::string updated_time = std::to_string(file_schema.updated_time_);
            std::string created_on = std::to_string(file_schema.created_on_);
            std::string date = std::to_string(file_schema.date_);
Z
update  
zhiru 已提交
1435

1436 1437 1438 1439 1440
            updateTableFileQuery << "UPDATE TableFiles " <<
                                 "SET table_id = " << quote << table_id << ", " <<
                                 "engine_type = " << engine_type << ", " <<
                                 "file_id = " << quote << file_id << ", " <<
                                 "file_type = " << file_type << ", " <<
1441 1442
                                 "file_size = " << file_size << ", " <<
                                 "row_count = " << row_count << ", " <<
1443 1444 1445 1446 1447 1448 1449 1450 1451
                                 "updated_time = " << updated_time << ", " <<
                                 "created_on = " << created_on << ", " <<
                                 "date = " << date << " " <<
                                 "WHERE id = " << id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();

            if (!updateTableFileQuery.exec()) {
                ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
S
starlord 已提交
1452
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error());
1453 1454 1455
            }
        } //Scoped Connection

S
starlord 已提交
1456 1457
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
1458
    }
S
starlord 已提交
1459

1460 1461
    return Status::OK();
}
1462

1463 1464 1465
Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
    try {
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1466

1467
        if (connectionPtr == nullptr) {
S
starlord 已提交
1468
            return Status(DB_ERROR, "Failed to connect to database server");
1469
        }
Z
update  
zhiru 已提交
1470

1471
        Query updateTableFilesToIndexQuery = connectionPtr->query();
Z
zhiru 已提交
1472

1473
        updateTableFilesToIndexQuery << "UPDATE TableFiles " <<
Z
fix  
zhiru 已提交
1474 1475 1476
                                     "SET file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " " <<
                                     "WHERE table_id = " << quote << table_id << " AND " <<
                                     "file_type = " << std::to_string(TableFileSchema::RAW) << ";";
1477

Z
zhiru 已提交
1478
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();
Z
update  
zhiru 已提交
1479

Z
fix  
zhiru 已提交
1480
        if (!updateTableFilesToIndexQuery.exec()) {
S
starlord 已提交
1481
            return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX", updateTableFilesToIndexQuery.error());
Z
fix  
zhiru 已提交
1482 1483
        }

S
starlord 已提交
1484 1485
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
1486
    }
Z
update  
zhiru 已提交
1487

1488 1489
    return Status::OK();
}
Z
zhiru 已提交
1490

1491 1492
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
    try {
Y
Yu Kun 已提交
1493
        server::MetricCollector metric;
1494 1495 1496 1497
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
1498
                return Status(DB_ERROR, "Failed to connect to database server");
1499
            }
Z
update  
zhiru 已提交
1500

1501
            Query updateTableFilesQuery = connectionPtr->query();
1502

1503 1504
            std::map<std::string, bool> has_tables;
            for (auto &file_schema : files) {
1505

1506 1507 1508
                if (has_tables.find(file_schema.table_id_) != has_tables.end()) {
                    continue;
                }
1509

1510 1511 1512 1513 1514
                updateTableFilesQuery << "SELECT EXISTS " <<
                                      "(SELECT 1 FROM Tables " <<
                                      "WHERE table_id = " << quote << file_schema.table_id_ << " " <<
                                      "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
                                      "AS " << quote << "check" << ";";
1515

1516
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
1517

1518
                StoreQueryResult res = updateTableFilesQuery.store();
1519

1520 1521 1522
                int check = res[0]["check"];
                has_tables[file_schema.table_id_] = (check == 1);
            }
1523

1524
            for (auto &file_schema : files) {
1525

1526 1527
                if (!has_tables[file_schema.table_id_]) {
                    file_schema.file_type_ = TableFileSchema::TO_DELETE;
1528
                }
1529
                file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1530

1531 1532 1533 1534 1535
                std::string id = std::to_string(file_schema.id_);
                std::string table_id = file_schema.table_id_;
                std::string engine_type = std::to_string(file_schema.engine_type_);
                std::string file_id = file_schema.file_id_;
                std::string file_type = std::to_string(file_schema.file_type_);
1536 1537
                std::string file_size = std::to_string(file_schema.file_size_);
                std::string row_count = std::to_string(file_schema.row_count_);
1538 1539 1540
                std::string updated_time = std::to_string(file_schema.updated_time_);
                std::string created_on = std::to_string(file_schema.created_on_);
                std::string date = std::to_string(file_schema.date_);
Z
fix  
zhiru 已提交
1541

1542 1543 1544 1545 1546
                updateTableFilesQuery << "UPDATE TableFiles " <<
                                      "SET table_id = " << quote << table_id << ", " <<
                                      "engine_type = " << engine_type << ", " <<
                                      "file_id = " << quote << file_id << ", " <<
                                      "file_type = " << file_type << ", " <<
1547 1548
                                      "file_size = " << file_size << ", " <<
                                      "row_count = " << row_count << ", " <<
1549 1550 1551 1552 1553 1554 1555 1556
                                      "updated_time = " << updated_time << ", " <<
                                      "created_on = " << created_on << ", " <<
                                      "date = " << date << " " <<
                                      "WHERE id = " << id << ";";

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();

                if (!updateTableFilesQuery.exec()) {
S
starlord 已提交
1557
                    return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error());
1558 1559 1560 1561
                }
            }
        } //Scoped Connection

S
starlord 已提交
1562 1563
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
1564
    }
S
starlord 已提交
1565

1566 1567
    return Status::OK();
}
Z
fix  
zhiru 已提交
1568

1569 1570
Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1571 1572 1573
    std::set<std::string> table_ids;

    //remove to_delete files
1574
    try {
Y
Yu Kun 已提交
1575
        server::MetricCollector metric;
1576

1577 1578
        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
update  
zhiru 已提交
1579

1580
            if (connectionPtr == nullptr) {
S
starlord 已提交
1581
                return Status(DB_ERROR, "Failed to connect to database server");
1582
            }
Z
zhiru 已提交
1583

1584 1585 1586 1587 1588
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
            cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " <<
                                     "FROM TableFiles " <<
                                     "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " <<
                                     "updated_time < " << std::to_string(now - seconds * US_PS) << ";";
Z
update  
zhiru 已提交
1589

1590
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1591

1592
            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1593

1594 1595
            TableFileSchema table_file;
            std::vector<std::string> idsToDelete;
1596

1597
            for (auto &resRow : res) {
1598

1599
                table_file.id_ = resRow["id"]; //implicit conversion
1600

1601 1602 1603
                std::string table_id;
                resRow["table_id"].to_string(table_id);
                table_file.table_id_ = table_id;
Z
fix  
zhiru 已提交
1604

1605 1606 1607
                std::string file_id;
                resRow["file_id"].to_string(file_id);
                table_file.file_id_ = file_id;
Z
update  
zhiru 已提交
1608

1609
                table_file.date_ = resRow["date"];
Z
update  
zhiru 已提交
1610

1611 1612
                utils::DeleteTableFilePath(options_, table_file);

S
starlord 已提交
1613
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
1614 1615

                idsToDelete.emplace_back(std::to_string(table_file.id_));
S
starlord 已提交
1616 1617

                table_ids.insert(table_file.table_id_);
1618 1619 1620 1621 1622 1623 1624
            }

            if (!idsToDelete.empty()) {

                std::stringstream idsToDeleteSS;
                for (auto &id : idsToDelete) {
                    idsToDeleteSS << "id = " << id << " OR ";
1625
                }
1626

1627 1628 1629 1630
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
                cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " <<
                                         idsToDeleteStr << ";";
1631

1632 1633 1634
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1635
                    return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", cleanUpFilesWithTTLQuery.error());
1636 1637 1638 1639
                }
            }
        } //Scoped Connection

S
starlord 已提交
1640 1641
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
Z
update  
zhiru 已提交
1642
    }
1643

S
starlord 已提交
1644
    //remove to_delete tables
1645
    try {
Y
Yu Kun 已提交
1646
        server::MetricCollector metric;
1647 1648

        {
Z
update  
zhiru 已提交
1649
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1650

Z
update  
zhiru 已提交
1651
            if (connectionPtr == nullptr) {
S
starlord 已提交
1652
                return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1653 1654
            }

1655 1656 1657 1658
            Query cleanUpFilesWithTTLQuery = connectionPtr->query();
            cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
                                     "FROM Tables " <<
                                     "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
1659

1660 1661 1662
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

            StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
Z
update  
zhiru 已提交
1663

Z
update  
zhiru 已提交
1664 1665
            if (!res.empty()) {

1666 1667 1668 1669 1670
                std::stringstream idsToDeleteSS;
                for (auto &resRow : res) {
                    size_t id = resRow["id"];
                    std::string table_id;
                    resRow["table_id"].to_string(table_id);
Z
update  
zhiru 已提交
1671

S
starlord 已提交
1672
                    utils::DeleteTablePath(options_, table_id, false);//only delete empty folder
1673 1674

                    idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
Z
update  
zhiru 已提交
1675
                }
1676 1677 1678 1679
                std::string idsToDeleteStr = idsToDeleteSS.str();
                idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
                cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " <<
                                         idsToDeleteStr << ";";
1680

1681
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
Z
update  
zhiru 已提交
1682

1683
                if (!cleanUpFilesWithTTLQuery.exec()) {
S
starlord 已提交
1684
                    return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", cleanUpFilesWithTTLQuery.error());
1685 1686 1687 1688
                }
            }
        } //Scoped Connection

S
starlord 已提交
1689 1690
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
Z
update  
zhiru 已提交
1691 1692
    }

S
starlord 已提交
1693 1694 1695
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1696
        server::MetricCollector metric;
S
starlord 已提交
1697 1698 1699 1700 1701

        {
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

            if (connectionPtr == nullptr) {
S
starlord 已提交
1702
                return Status(DB_ERROR, "Failed to connect to database server");
S
starlord 已提交
1703 1704 1705 1706 1707 1708
            }

            for(auto& table_id : table_ids) {
                Query cleanUpFilesWithTTLQuery = connectionPtr->query();
                cleanUpFilesWithTTLQuery << "SELECT file_id " <<
                                         "FROM TableFiles " <<
S
starlord 已提交
1709
                                         "WHERE table_id = " << quote << table_id << ";";
S
starlord 已提交
1710 1711 1712 1713 1714 1715 1716 1717 1718 1719

                ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();

                StoreQueryResult res = cleanUpFilesWithTTLQuery.store();

                if (res.empty()) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }
        }
S
starlord 已提交
1720 1721
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
S
starlord 已提交
1722 1723
    }

1724 1725
    return Status::OK();
}
1726

1727 1728 1729
Status MySQLMetaImpl::CleanUp() {
    try {
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
1730

1731
        if (connectionPtr == nullptr) {
S
starlord 已提交
1732
            return Status(DB_ERROR, "Failed to connect to database server");
1733
        }
1734

1735 1736 1737 1738 1739
        Query cleanUpQuery = connectionPtr->query();
        cleanUpQuery << "SELECT table_name " <<
                     "FROM information_schema.tables " <<
                     "WHERE table_schema = " << quote << mysql_connection_pool_->getDB() << " " <<
                     "AND table_name = " << quote << "TableFiles" << ";";
Z
update  
zhiru 已提交
1740

1741
        ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
Z
update  
zhiru 已提交
1742

1743
        StoreQueryResult res = cleanUpQuery.store();
Z
update  
zhiru 已提交
1744

1745 1746
        if (!res.empty()) {
            ENGINE_LOG_DEBUG << "Remove table file type as NEW";
1747 1748 1749 1750
            cleanUpQuery << "DELETE FROM TableFiles WHERE file_type IN ("
                    << std::to_string(TableFileSchema::NEW) << ","
                    << std::to_string(TableFileSchema::NEW_MERGE) << ","
                    << std::to_string(TableFileSchema::NEW_INDEX) << ");";
1751

1752
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
1753

1754
            if (!cleanUpQuery.exec()) {
S
starlord 已提交
1755
                return HandleException("QUERY ERROR WHEN CLEANING UP FILES", cleanUpQuery.error());
Z
update  
zhiru 已提交
1756
            }
1757
        }
1758

S
starlord 已提交
1759 1760
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
Z
update  
zhiru 已提交
1761 1762
    }

1763 1764 1765 1766 1767
    return Status::OK();
}

Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
    try {
Y
Yu Kun 已提交
1768
        server::MetricCollector metric;
1769 1770 1771 1772 1773 1774 1775

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);

        if (!status.ok()) {
            return status;
1776
        }
Z
zhiru 已提交
1777

1778 1779
        StoreQueryResult res;
        {
Z
update  
zhiru 已提交
1780
            ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
Z
zhiru 已提交
1781

Z
update  
zhiru 已提交
1782
            if (connectionPtr == nullptr) {
S
starlord 已提交
1783
                return Status(DB_ERROR, "Failed to connect to database server");
Z
update  
zhiru 已提交
1784 1785
            }

Z
update  
zhiru 已提交
1786

1787
            Query countQuery = connectionPtr->query();
S
starlord 已提交
1788
            countQuery << "SELECT row_count " <<
1789 1790 1791 1792 1793
                       "FROM TableFiles " <<
                       "WHERE table_id = " << quote << table_id << " AND " <<
                       "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
                       "file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
Z
update  
zhiru 已提交
1794

1795
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
Z
update  
zhiru 已提交
1796

1797 1798 1799 1800 1801
            res = countQuery.store();
        } //Scoped Connection

        result = 0;
        for (auto &resRow : res) {
S
starlord 已提交
1802
            size_t size = resRow["row_count"];
1803
            result += size;
1804
        }
1805

S
starlord 已提交
1806 1807
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
Z
update  
zhiru 已提交
1808
    }
S
starlord 已提交
1809

1810 1811 1812 1813 1814
    return Status::OK();
}

Status MySQLMetaImpl::DropAll() {
    try {
S
starlord 已提交
1815
        ENGINE_LOG_DEBUG << "Drop all mysql meta";
1816 1817 1818
        ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);

        if (connectionPtr == nullptr) {
S
starlord 已提交
1819
            return Status(DB_ERROR, "Failed to connect to database server");
Z
zhiru 已提交
1820
        }
1821 1822 1823 1824 1825 1826 1827 1828 1829

        Query dropTableQuery = connectionPtr->query();
        dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";

        ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();

        if (dropTableQuery.exec()) {
            return Status::OK();
        } else {
S
starlord 已提交
1830
            return HandleException("QUERY ERROR WHEN DROPPING ALL", dropTableQuery.error());
1831
        }
S
starlord 已提交
1832 1833
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
1834 1835 1836
    }
}

Z
update  
zhiru 已提交
1837 1838 1839 1840
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz