SqliteMetaImpl.cpp 61.4 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/meta/SqliteMetaImpl.h"
Y
youny626 已提交
19
#include "MetaConsts.h"
S
starlord 已提交
20 21
#include "db/IDGenerator.h"
#include "db/Utils.h"
22
#include "metrics/Metrics.h"
23
#include "utils/CommonUtil.h"
Y
youny626 已提交
24 25
#include "utils/Exception.h"
#include "utils/Log.h"
26
#include "utils/StringHelpFunctions.h"
X
Xu Peng 已提交
27

Y
youny626 已提交
28
#include <sqlite_orm.h>
X
Xu Peng 已提交
29
#include <unistd.h>
X
Xu Peng 已提交
30
#include <boost/filesystem.hpp>
31
#include <chrono>
X
Xu Peng 已提交
32
#include <fstream>
Y
youny626 已提交
33
#include <iostream>
S
starlord 已提交
34
#include <map>
Y
youny626 已提交
35
#include <memory>
S
starlord 已提交
36
#include <set>
Y
youny626 已提交
37
#include <sstream>
S
starlord 已提交
38

J
jinhai 已提交
39
namespace milvus {
X
Xu Peng 已提交
40
namespace engine {
41
namespace meta {
X
Xu Peng 已提交
42

X
Xu Peng 已提交
43 44
using namespace sqlite_orm;

G
groot 已提交
45 46
namespace {

S
starlord 已提交
47
Status
Y
youny626 已提交
48
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
49
    if (what == nullptr) {
S
starlord 已提交
50 51 52 53 54 55 56
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
57 58
}

Y
youny626 已提交
59
}  // namespace
G
groot 已提交
60

S
starlord 已提交
61
inline auto
G
groot 已提交
62
StoragePrototype(const std::string& path) {
G
groot 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    return make_storage(path,
                        make_table(META_TABLES,
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
                                   make_column("state", &TableSchema::state_),
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("flag", &TableSchema::flag_, default_value(0)),
                                   make_column("index_file_size", &TableSchema::index_file_size_),
                                   make_column("engine_type", &TableSchema::engine_type_),
                                   make_column("nlist", &TableSchema::nlist_),
                                   make_column("metric_type", &TableSchema::metric_type_),
                                   make_column("owner_table", &TableSchema::owner_table_, default_value("")),
                                   make_column("partition_tag", &TableSchema::partition_tag_, default_value("")),
                                   make_column("version", &TableSchema::version_, default_value(CURRENT_VERSION))),
                        make_table(META_TABLEFILES,
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
                                   make_column("engine_type", &TableFileSchema::engine_type_),
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
                                   make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
                                   make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_)));
X
Xu Peng 已提交
89 90
}

X
Xu Peng 已提交
91
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
92 93
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
94
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
95 96 97 98 99 100
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
101
Status
Y
youny626 已提交
102
SqliteMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
103
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
104 105
    std::stringstream ss;
    SimpleIDGenerator g;
106
    ss << g.GetNextIDNumber();
107
    table_id = ss.str();
108 109 110
    return Status::OK();
}

S
starlord 已提交
111
Status
Y
youny626 已提交
112
SqliteMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
113
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
X
Xu Peng 已提交
114 115
    std::stringstream ss;
    SimpleIDGenerator g;
116
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
117 118 119 120
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
121 122 123
void
SqliteMetaImpl::ValidateMetaSchema() {
    if (ConnectorPtr == nullptr) {
124 125 126
        return;
    }

Y
youny626 已提交
127
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
128
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
129 130
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
131 132
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
133 134
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
135 136 137 138
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
139 140
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
141 142
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
143
        if (!ret) {
S
starlord 已提交
144
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
145 146
            ENGINE_LOG_ERROR << msg;
            return Status(DB_INVALID_PATH, msg);
147
        }
X
Xu Peng 已提交
148
    }
X
Xu Peng 已提交
149

S
starlord 已提交
150
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
151

152
    ValidateMetaSchema();
153

X
Xu Peng 已提交
154
    ConnectorPtr->sync_schema();
Y
youny626 已提交
155 156
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
157

158
    CleanUpShadowFiles();
X
Xu Peng 已提交
159

X
Xu Peng 已提交
160
    return Status::OK();
X
Xu Peng 已提交
161 162
}

G
groot 已提交
163
Status
G
groot 已提交
164
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
                } else {
                    // Change from no error to already exist.
                    return Status(DB_ALREADY_EXIST, "Table already exists");
                }
            }
        }

        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

        try {
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id_ = id;
G
groot 已提交
192
        } catch (std::exception& e) {
G
groot 已提交
193 194 195 196 197 198
            return HandleException("Encounter exception when create table", e.what());
        }

        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
199
    } catch (std::exception& e) {
G
groot 已提交
200 201 202 203 204
        return HandleException("Encounter exception when create table", e.what());
    }
}

Status
G
groot 已提交
205
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
G
groot 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    try {
        server::MetricCollector metric;

        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::created_on_,
                                                   &TableSchema::flag_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_,
                                                   &TableSchema::owner_table_,
                                                   &TableSchema::partition_tag_,
                                                   &TableSchema::version_),
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
G
groot 已提交
222
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239

        if (groups.size() == 1) {
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.flag_ = std::get<4>(groups[0]);
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
            table_schema.nlist_ = std::get<7>(groups[0]);
            table_schema.metric_type_ = std::get<8>(groups[0]);
            table_schema.owner_table_ = std::get<9>(groups[0]);
            table_schema.partition_tag_ = std::get<10>(groups[0]);
            table_schema.version_ = std::get<11>(groups[0]);
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
        }
G
groot 已提交
240
    } catch (std::exception& e) {
G
groot 已提交
241 242 243 244 245 246 247
        return HandleException("Encounter exception when describe table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
248
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
249 250 251 252 253 254
    has_or_not = false;

    try {
        server::MetricCollector metric;
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                           where(c(&TableSchema::table_id_) == table_id
G
groot 已提交
255
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
256 257 258 259 260
        if (tables.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
261
    } catch (std::exception& e) {
G
groot 已提交
262 263 264 265 266 267 268
        return HandleException("Encounter exception when lookup table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
269
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
                                                     &TableSchema::table_id_,
                                                     &TableSchema::dimension_,
                                                     &TableSchema::created_on_,
                                                     &TableSchema::flag_,
                                                     &TableSchema::index_file_size_,
                                                     &TableSchema::engine_type_,
                                                     &TableSchema::nlist_,
                                                     &TableSchema::metric_type_,
                                                     &TableSchema::owner_table_,
                                                     &TableSchema::partition_tag_,
                                                     &TableSchema::version_),
G
groot 已提交
285 286
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
        for (auto& table : selected) {
G
groot 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
            schema.nlist_ = std::get<7>(table);
            schema.metric_type_ = std::get<8>(table);
            schema.owner_table_ = std::get<9>(table);
            schema.partition_tag_ = std::get<10>(table);
            schema.version_ = std::get<11>(table);

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
303
    } catch (std::exception& e) {
G
groot 已提交
304 305 306 307 308 309 310
        return HandleException("Encounter exception when lookup all tables", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
311
SqliteMetaImpl::DropTable(const std::string& table_id) {
G
groot 已提交
312 313 314 315 316 317 318 319 320
    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete table
        ConnectorPtr->update_all(
            set(
G
groot 已提交
321
                c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
G
groot 已提交
322 323
            where(
                c(&TableSchema::table_id_) == table_id and
G
groot 已提交
324
                c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
325 326

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
G
groot 已提交
327
    } catch (std::exception& e) {
G
groot 已提交
328 329 330 331 332 333 334
        return HandleException("Encounter exception when delete table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
335
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
336 337 338 339 340 341 342 343 344
    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete table files
        ConnectorPtr->update_all(
            set(
G
groot 已提交
345
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
G
groot 已提交
346 347 348
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
349
                c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
350 351

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
G
groot 已提交
352
    } catch (std::exception& e) {
G
groot 已提交
353 354 355 356 357 358 359
        return HandleException("Encounter exception when delete table files", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
360
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    try {
        server::MetricCollector metric;

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
        file_schema.engine_type_ = table_schema.engine_type_;
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
G
groot 已提交
393
    } catch (std::exception& e) {
G
groot 已提交
394 395 396 397 398 399
        return HandleException("Encounter exception when create table file", e.what());
    }

    return Status::OK();
}

S
starlord 已提交
400
// TODO(myh): Delete single vecotor by id
S
starlord 已提交
401
Status
G
groot 已提交
402 403
SqliteMetaImpl::DropDataByDate(const std::string& table_id,
                               const DatesT& dates) {
404
    if (dates.empty()) {
X
Xu Peng 已提交
405 406 407
        return Status::OK();
    }

408
    TableSchema table_schema;
G
groot 已提交
409
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
410
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
411 412 413 414
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
415
    try {
Y
youny626 已提交
416 417
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
418 419 420
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
421
        for (DateT date : dates) {
422 423
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
424
            if (last_batch.size() > batch_size) {
425 426 427 428
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
429
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
430 431
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
432 433
        for (auto& batch_dates : split_dates) {
            if (batch_dates.empty()) {
434 435 436 437
                continue;
            }

            ConnectorPtr->update_all(
Y
youny626 已提交
438
                set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
439
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
Y
youny626 已提交
440
                where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::date_, batch_dates)));
441
        }
442

G
groot 已提交
443
        ENGINE_LOG_DEBUG << "Successfully drop data by date, table id = " << table_schema.table_id_;
G
groot 已提交
444
    } catch (std::exception& e) {
S
starlord 已提交
445
        return HandleException("Encounter exception when drop partition", e.what());
X
Xu Peng 已提交
446
    }
G
groot 已提交
447

X
Xu Peng 已提交
448 449 450
    return Status::OK();
}

S
starlord 已提交
451
Status
G
groot 已提交
452 453 454
SqliteMetaImpl::GetTableFiles(const std::string& table_id,
                              const std::vector<size_t>& ids,
                              TableFilesSchema& table_files) {
G
groot 已提交
455
    try {
G
groot 已提交
456 457 458 459 460 461 462 463 464 465 466
        table_files.clear();
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::file_size_,
                                                  &TableFileSchema::row_count_,
                                                  &TableFileSchema::date_,
                                                  &TableFileSchema::engine_type_,
                                                  &TableFileSchema::created_on_),
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                in(&TableFileSchema::id_, ids) and
G
groot 已提交
467
                                                c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
468 469 470 471 472
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
G
groot 已提交
473
        }
G
groot 已提交
474

G
groot 已提交
475
        Status result;
G
groot 已提交
476
        for (auto& file : files) {
G
groot 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490
            TableFileSchema file_schema;
            file_schema.table_id_ = table_id;
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
            file_schema.created_on_ = std::get<7>(file);
            file_schema.dimension_ = table_schema.dimension_;
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
491

G
groot 已提交
492
            utils::GetTableFilePath(options_, file_schema);
493

G
groot 已提交
494 495
            table_files.emplace_back(file_schema);
        }
496

G
groot 已提交
497 498
        ENGINE_LOG_DEBUG << "Get table files by id";
        return result;
G
groot 已提交
499
    } catch (std::exception& e) {
G
groot 已提交
500
        return HandleException("Encounter exception when lookup table files", e.what());
501
    }
X
Xu Peng 已提交
502 503
}

S
starlord 已提交
504
Status
G
groot 已提交
505
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
G
groot 已提交
506
    try {
Y
Yu Kun 已提交
507
        server::MetricCollector metric;
G
groot 已提交
508

G
groot 已提交
509
        //set all backup file to raw
S
starlord 已提交
510
        ConnectorPtr->update_all(
G
groot 已提交
511 512 513 514 515
            set(
                c(&TableSchema::flag_) = flag),
            where(
                c(&TableSchema::table_id_) == table_id));
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
G
groot 已提交
516
    } catch (std::exception& e) {
G
groot 已提交
517 518
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e.what());
G
groot 已提交
519 520 521 522 523
    }

    return Status::OK();
}

S
starlord 已提交
524
Status
G
groot 已提交
525
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
526
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
G
groot 已提交
527
    try {
Y
Yu Kun 已提交
528
        server::MetricCollector metric;
G
groot 已提交
529

Y
youny626 已提交
530
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
531 532
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
533 534
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));
G
groot 已提交
535

G
groot 已提交
536 537
        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
G
groot 已提交
538
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
539 540
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }
G
groot 已提交
541

G
groot 已提交
542 543 544
        ConnectorPtr->update(file_schema);

        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
G
groot 已提交
545
    } catch (std::exception& e) {
G
groot 已提交
546 547 548 549
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
                          + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e.what());
    }
G
groot 已提交
550 551 552
    return Status::OK();
}

S
starlord 已提交
553
Status
G
groot 已提交
554
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
555
    try {
Y
Yu Kun 已提交
556
        server::MetricCollector metric;
G
groot 已提交
557

G
groot 已提交
558 559
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);
G
groot 已提交
560

G
groot 已提交
561
        std::map<std::string, bool> has_tables;
G
groot 已提交
562
        for (auto& file : files) {
G
groot 已提交
563 564 565 566 567
            if (has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
G
groot 已提交
568
                                                     and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
569 570 571 572 573
            if (tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
574
        }
P
peng.xu 已提交
575

G
groot 已提交
576
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
577
            for (auto& file : files) {
G
groot 已提交
578 579
                if (!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
580
                }
G
groot 已提交
581 582 583

                file.updated_time_ = utils::GetMicroSecTimeStamp();
                ConnectorPtr->update(file);
584
            }
G
groot 已提交
585 586
            return true;
        });
587

G
groot 已提交
588 589
        if (!commited) {
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
P
peng.xu 已提交
590
        }
G
groot 已提交
591 592

        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
G
groot 已提交
593
    } catch (std::exception& e) {
G
groot 已提交
594
        return HandleException("Encounter exception when update table files", e.what());
P
peng.xu 已提交
595 596 597 598
    }
    return Status::OK();
}

S
starlord 已提交
599
Status
Y
youny626 已提交
600
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
601
    try {
Y
Yu Kun 已提交
602
        server::MetricCollector metric;
603

Y
youny626 已提交
604
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
605 606
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
607 608 609 610 611 612 613 614 615 616
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::created_on_,
                                                   &TableSchema::flag_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::owner_table_,
                                                   &TableSchema::partition_tag_,
                                                   &TableSchema::version_),
                                           where(c(&TableSchema::table_id_) == table_id
G
groot 已提交
617
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
618

S
starlord 已提交
619
        if (tables.size() > 0) {
620 621 622 623 624 625
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
626
            table_schema.flag_ = std::get<4>(tables[0]);
627
            table_schema.index_file_size_ = std::get<5>(tables[0]);
G
groot 已提交
628 629 630
            table_schema.owner_table_ = std::get<6>(tables[0]);
            table_schema.partition_tag_ = std::get<7>(tables[0]);
            table_schema.version_ = std::get<8>(tables[0]);
631
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
632 633
            table_schema.nlist_ = index.nlist_;
            table_schema.metric_type_ = index.metric_type_;
634 635 636

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
637
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
638 639
        }

G
groot 已提交
640 641 642
        //set all backup file to raw
        ConnectorPtr->update_all(
            set(
G
groot 已提交
643
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
G
groot 已提交
644 645 646
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
647
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
648

649
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
650
    } catch (std::exception& e) {
651
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
652
        return HandleException(msg, e.what());
653
    }
S
starlord 已提交
654 655 656 657

    return Status::OK();
}

S
starlord 已提交
658
Status
G
groot 已提交
659
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
S
starlord 已提交
660
    try {
Y
Yu Kun 已提交
661
        server::MetricCollector metric;
S
starlord 已提交
662

G
groot 已提交
663 664 665 666 667
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        ConnectorPtr->update_all(
            set(
G
groot 已提交
668
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
G
groot 已提交
669 670
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
671 672
                c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
G
groot 已提交
673 674

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
G
groot 已提交
675
    } catch (std::exception& e) {
G
groot 已提交
676
        return HandleException("Encounter exception when update table files to to_index", e.what());
S
starlord 已提交
677 678
    }

679 680 681
    return Status::OK();
}

S
starlord 已提交
682
Status
Y
youny626 已提交
683
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
684
    try {
Y
Yu Kun 已提交
685
        server::MetricCollector metric;
686

G
groot 已提交
687 688 689 690
        auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id
G
groot 已提交
691
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
692 693 694

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
695
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
696
            index.metric_type_ = std::get<2>(groups[0]);
697
        } else {
S
starlord 已提交
698
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
699
        }
Y
youny626 已提交
700
    } catch (std::exception& e) {
S
starlord 已提交
701
        return HandleException("Encounter exception when describe index", e.what());
702 703 704 705 706
    }

    return Status::OK();
}

S
starlord 已提交
707
Status
Y
youny626 已提交
708
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
709
    try {
Y
Yu Kun 已提交
710
        server::MetricCollector metric;
711

Y
youny626 已提交
712
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
713 714
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
715 716 717
        //soft delete index files
        ConnectorPtr->update_all(
            set(
G
groot 已提交
718
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
G
groot 已提交
719 720 721
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
722
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));
G
groot 已提交
723 724

        //set all backup file to raw
725
        ConnectorPtr->update_all(
G
groot 已提交
726
            set(
G
groot 已提交
727
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
G
groot 已提交
728 729 730
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
731
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
G
groot 已提交
732 733 734 735 736 737

        //set table index type to raw
        ConnectorPtr->update_all(
            set(
                c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE,
                c(&TableSchema::nlist_) = DEFAULT_NLIST,
S
starlord 已提交
738
                c(&TableSchema::metric_type_) = DEFAULT_METRIC_TYPE),
G
groot 已提交
739 740
            where(
                c(&TableSchema::table_id_) == table_id));
741

742
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
G
groot 已提交
743
    } catch (std::exception& e) {
S
starlord 已提交
744
        return HandleException("Encounter exception when delete table index files", e.what());
745 746 747 748 749
    }

    return Status::OK();
}

S
starlord 已提交
750
Status
G
groot 已提交
751 752 753
SqliteMetaImpl::CreatePartition(const std::string& table_id,
                                const std::string& partition_name,
                                const std::string& tag) {
G
groot 已提交
754
    server::MetricCollector metric;
755

G
groot 已提交
756 757 758 759 760
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
G
groot 已提交
761
    }
G
groot 已提交
762

G
groot 已提交
763
    // not allow create partition under partition
G
groot 已提交
764
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
765
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
766
    }
G
groot 已提交
767

768 769 770 771 772 773 774 775
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
G
groot 已提交
776
    if (!exist_partition.empty()) {
G
groot 已提交
777
        return Status(DB_ERROR, "Duplicate partition is not allowed");
778
    }
G
groot 已提交
779

780 781
    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
782 783 784
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
X
Xu Peng 已提交
785
    }
G
groot 已提交
786

G
groot 已提交
787 788 789 790
    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
791 792 793 794 795 796
    table_schema.partition_tag_ = valid_tag;

    status = CreateTable(table_schema);
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }
G
groot 已提交
797

798
    return status;
X
Xu Peng 已提交
799 800
}

S
starlord 已提交
801
Status
G
groot 已提交
802 803 804
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}
805

G
groot 已提交
806 807
Status
SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) {
G
groot 已提交
808
    try {
Y
Yu Kun 已提交
809
        server::MetricCollector metric;
G
groot 已提交
810

G
groot 已提交
811
        auto partitions = ConnectorPtr->select(columns(&TableSchema::table_id_),
G
groot 已提交
812 813 814
                                               where(c(&TableSchema::owner_table_) == table_id
                                                     and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
        for (size_t i = 0; i < partitions.size(); i++) {
G
groot 已提交
815 816 817 818 819 820
            std::string partition_name = std::get<0>(partitions[i]);
            meta::TableSchema partition_schema;
            partition_schema.table_id_ = partition_name;
            DescribeTable(partition_schema);
            partiton_schema_array.emplace_back(partition_schema);
        }
G
groot 已提交
821
    } catch (std::exception& e) {
G
groot 已提交
822
        return HandleException("Encounter exception when show partitions", e.what());
823 824
    }

X
Xu Peng 已提交
825
    return Status::OK();
X
Xu Peng 已提交
826 827
}

S
starlord 已提交
828
Status
G
groot 已提交
829
SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
830
    try {
Y
Yu Kun 已提交
831
        server::MetricCollector metric;
G
groot 已提交
832

833 834 835 836 837
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

G
groot 已提交
838
        auto name = ConnectorPtr->select(columns(&TableSchema::table_id_),
G
groot 已提交
839
                                         where(c(&TableSchema::owner_table_) == table_id
840 841
                                               and c(&TableSchema::partition_tag_) == valid_tag
                                               and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
842 843 844
        if (name.size() > 0) {
            partition_name = std::get<0>(name[0]);
        } else {
845
            return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
846
        }
G
groot 已提交
847
    } catch (std::exception& e) {
G
groot 已提交
848
        return HandleException("Encounter exception when get partition name", e.what());
X
Xu Peng 已提交
849
    }
G
groot 已提交
850 851

    return Status::OK();
X
Xu Peng 已提交
852 853
}

S
starlord 已提交
854
Status
G
groot 已提交
855 856 857
SqliteMetaImpl::FilesToSearch(const std::string& table_id,
                              const std::vector<size_t>& ids,
                              const DatesT& dates,
Y
youny626 已提交
858
                              DatePartionedTableFilesSchema& files) {
X
xj.lin 已提交
859
    files.clear();
Y
Yu Kun 已提交
860
    server::MetricCollector metric;
X
xj.lin 已提交
861 862

    try {
Y
youny626 已提交
863 864 865 866
        auto select_columns =
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
867 868

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
869

Y
youny626 已提交
870 871
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
872
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
873 874 875 876

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
877 878 879
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
880

Y
youny626 已提交
881 882
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
883 884 885
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
886
        for (DateT date : dates) {
887 888
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
889
            if (last_batch.size() > batch_size) {
890 891 892 893
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
894
        // perform query
895
        decltype(ConnectorPtr->select(select_columns)) selected;
896
        if (dates.empty() && ids.empty()) {
X
xj.lin 已提交
897
            auto filter = where(match_tableid and match_type);
898
            selected = ConnectorPtr->select(select_columns, filter);
899
        } else if (dates.empty() && !ids.empty()) {
X
xj.lin 已提交
900
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
901
            auto filter = where(match_tableid and match_fileid and match_type);
902
            selected = ConnectorPtr->select(select_columns, filter);
903
        } else if (!dates.empty() && ids.empty()) {
Y
youny626 已提交
904 905
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
906 907 908 909 910
                    continue;
                }
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
911
                for (auto& file : batch_selected) {
912 913 914 915 916
                    selected.push_back(file);
                }
            }

        } else if (!dates.empty() && !ids.empty()) {
Y
youny626 已提交
917 918
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
919 920 921 922 923 924
                    continue;
                }
                auto match_fileid = in(&TableFileSchema::id_, ids);
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_fileid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
925
                for (auto& file : batch_selected) {
926 927 928
                    selected.push_back(file);
                }
            }
X
xj.lin 已提交
929 930
        }

S
starlord 已提交
931
        Status ret;
X
xj.lin 已提交
932
        TableFileSchema table_file;
Y
youny626 已提交
933
        for (auto& file : selected) {
X
xj.lin 已提交
934 935 936 937
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
938 939 940 941
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
942
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
943
            table_file.index_file_size_ = table_schema.index_file_size_;
944
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
945 946
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
947
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
948
            if (!status.ok()) {
S
starlord 已提交
949
                ret = status;
S
starlord 已提交
950 951
            }

X
xj.lin 已提交
952 953 954 955 956 957
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }
S
starlord 已提交
958
        if (files.empty()) {
S
starlord 已提交
959
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
960
        }
S
starlord 已提交
961

S
starlord 已提交
962
        if (selected.size() > 0) {
963 964
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
965
        return ret;
Y
youny626 已提交
966
    } catch (std::exception& e) {
S
starlord 已提交
967
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
968 969 970
    }
}

S
starlord 已提交
971
Status
Y
youny626 已提交
972
SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) {
X
Xu Peng 已提交
973
    files.clear();
X
Xu Peng 已提交
974

975
    try {
Y
Yu Kun 已提交
976
        server::MetricCollector metric;
G
groot 已提交
977

Y
youny626 已提交
978
        // check table existence
S
starlord 已提交
979 980 981 982 983 984 985
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
986 987 988 989 990 991 992 993
        // get files to merge
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
994

S
starlord 已提交
995
        Status result;
996
        int64_t to_merge_files = 0;
Y
youny626 已提交
997
        for (auto& file : selected) {
S
starlord 已提交
998 999
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
S
starlord 已提交
1000
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
1001
                continue;  // skip large file
S
starlord 已提交
1002 1003
            }

G
groot 已提交
1004 1005 1006 1007
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
1008 1009 1010
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
1011
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1012
            table_file.index_file_size_ = table_schema.index_file_size_;
1013
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
1014 1015
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1016
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1017
            if (!status.ok()) {
S
starlord 已提交
1018
                result = status;
S
starlord 已提交
1019 1020
            }

G
groot 已提交
1021
            auto dateItr = files.find(table_file.date_);
1022
            if (dateItr == files.end()) {
G
groot 已提交
1023
                files[table_file.date_] = TableFilesSchema();
1024
            }
1025

G
groot 已提交
1026
            files[table_file.date_].push_back(table_file);
1027
            to_merge_files++;
X
Xu Peng 已提交
1028
        }
S
starlord 已提交
1029

1030 1031
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1032
        }
G
groot 已提交
1033 1034 1035 1036 1037 1038 1039
        return result;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when iterate merge files", e.what());
    }
}

Status
G
groot 已提交
1040
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
G
groot 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
    files.clear();

    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::file_size_,
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
                                             where(c(&TableFileSchema::file_type_)
G
groot 已提交
1056
                                                   == (int)TableFileSchema::TO_INDEX));
G
groot 已提交
1057 1058 1059 1060 1061

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
G
groot 已提交
1062
        for (auto& file : selected) {
G
groot 已提交
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
        return ret;
G
groot 已提交
1098
    } catch (std::exception& e) {
G
groot 已提交
1099
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
1100
    }
X
Xu Peng 已提交
1101 1102
}

S
starlord 已提交
1103
Status
G
groot 已提交
1104 1105 1106
SqliteMetaImpl::FilesByType(const std::string& table_id,
                            const std::vector<int>& file_types,
                            TableFilesSchema& table_files) {
G
groot 已提交
1107 1108 1109
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
    }
1110

G
groot 已提交
1111
    try {
G
groot 已提交
1112 1113 1114 1115 1116 1117 1118 1119 1120
        table_files.clear();
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::file_size_,
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
1121 1122
                                             where(in(&TableFileSchema::file_type_, file_types)
                                                   and c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1123

G
groot 已提交
1124 1125 1126
        if (selected.size() >= 1) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
G
groot 已提交
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
            for (auto& file : selected) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_id;
                file_schema.id_ = std::get<0>(file);
                file_schema.file_id_ = std::get<1>(file);
                file_schema.file_type_ = std::get<2>(file);
                file_schema.file_size_ = std::get<3>(file);
                file_schema.row_count_ = std::get<4>(file);
                file_schema.date_ = std::get<5>(file);
                file_schema.engine_type_ = std::get<6>(file);
                file_schema.created_on_ = std::get<7>(file);

                switch (file_schema.file_type_) {
                    case (int)TableFileSchema::RAW:raw_count++;
G
groot 已提交
1141
                        break;
G
groot 已提交
1142
                    case (int)TableFileSchema::NEW:new_count++;
G
groot 已提交
1143
                        break;
G
groot 已提交
1144
                    case (int)TableFileSchema::NEW_MERGE:new_merge_count++;
G
groot 已提交
1145
                        break;
G
groot 已提交
1146
                    case (int)TableFileSchema::NEW_INDEX:new_index_count++;
G
groot 已提交
1147
                        break;
G
groot 已提交
1148
                    case (int)TableFileSchema::TO_INDEX:to_index_count++;
G
groot 已提交
1149
                        break;
G
groot 已提交
1150
                    case (int)TableFileSchema::INDEX:index_count++;
G
groot 已提交
1151
                        break;
G
groot 已提交
1152
                    case (int)TableFileSchema::BACKUP:backup_count++;
G
groot 已提交
1153 1154 1155
                        break;
                    default:break;
                }
G
groot 已提交
1156 1157

                table_files.emplace_back(file_schema);
G
groot 已提交
1158
            }
1159

G
groot 已提交
1160
            std::string msg = "Get table files by type.";
G
groot 已提交
1161 1162 1163
            for (int file_type : file_types) {
                switch (file_type) {
                    case (int)TableFileSchema::RAW:
G
groot 已提交
1164
                        msg = msg + " raw files:" + std::to_string(raw_count);
G
groot 已提交
1165 1166
                        break;
                    case (int)TableFileSchema::NEW:
G
groot 已提交
1167
                        msg = msg + " new files:" + std::to_string(new_count);
G
groot 已提交
1168 1169
                        break;
                    case (int)TableFileSchema::NEW_MERGE:
G
groot 已提交
1170
                        msg = msg + " new_merge files:" + std::to_string(new_merge_count);
G
groot 已提交
1171 1172
                        break;
                    case (int)TableFileSchema::NEW_INDEX:
G
groot 已提交
1173
                        msg = msg + " new_index files:" + std::to_string(new_index_count);
G
groot 已提交
1174 1175
                        break;
                    case (int)TableFileSchema::TO_INDEX:
G
groot 已提交
1176
                        msg = msg + " to_index files:" + std::to_string(to_index_count);
G
groot 已提交
1177 1178
                        break;
                    case (int)TableFileSchema::INDEX:
G
groot 已提交
1179
                        msg = msg + " index files:" + std::to_string(index_count);
G
groot 已提交
1180 1181
                        break;
                    case (int)TableFileSchema::BACKUP:
G
groot 已提交
1182
                        msg = msg + " backup files:" + std::to_string(backup_count);
G
groot 已提交
1183 1184 1185 1186 1187
                        break;
                    default:break;
                }
            }
            ENGINE_LOG_DEBUG << msg;
X
Xu Peng 已提交
1188
        }
G
groot 已提交
1189
    } catch (std::exception& e) {
G
groot 已提交
1190
        return HandleException("Encounter exception when check non index files", e.what());
X
Xu Peng 已提交
1191
    }
G
groot 已提交
1192
    return Status::OK();
X
Xu Peng 已提交
1193 1194
}

S
starlord 已提交
1195
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1196 1197
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
1198
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
1199 1200 1201 1202 1203
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
1204 1205
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
1206
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
S
starlord 已提交
1207 1208
            int64_t usecs = limit * D_SEC * US_PS;
            int64_t now = utils::GetMicroSecTimeStamp();
1209
            try {
Y
youny626 已提交
1210
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1211 1212
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1213 1214
                ConnectorPtr->update_all(
                    set(
G
groot 已提交
1215
                        c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
G
groot 已提交
1216
                    where(
G
groot 已提交
1217 1218 1219
                        c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                        c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
            } catch (std::exception& e) {
S
starlord 已提交
1220
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1221
            }
1222 1223

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
1224
        }
G
groot 已提交
1225
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
1226
            uint64_t sum = 0;
X
Xu Peng 已提交
1227
            Size(sum);
X
Xu Peng 已提交
1228

Y
youny626 已提交
1229
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
1230
            DiscardFiles(to_delete);
1231 1232

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
1233 1234 1235 1236 1237 1238
        }
    }

    return Status::OK();
}

S
starlord 已提交
1239
Status
Y
youny626 已提交
1240
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
1241
    result = 0;
X
Xu Peng 已提交
1242
    try {
1243
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
1244 1245
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
1246 1247
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
1248
            }
Y
youny626 已提交
1249
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
1250
        }
Y
youny626 已提交
1251
    } catch (std::exception& e) {
S
starlord 已提交
1252
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
1253 1254 1255 1256 1257
    }

    return Status::OK();
}

S
starlord 已提交
1258
Status
1259
SqliteMetaImpl::CleanUpShadowFiles() {
X
Xu Peng 已提交
1260
    try {
Y
Yu Kun 已提交
1261
        server::MetricCollector metric;
G
groot 已提交
1262

Y
youny626 已提交
1263
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1264 1265
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1266
        std::vector<int> file_types = {
G
groot 已提交
1267 1268 1269
            (int)TableFileSchema::NEW,
            (int)TableFileSchema::NEW_INDEX,
            (int)TableFileSchema::NEW_MERGE
G
groot 已提交
1270 1271 1272
        };
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1273

G
groot 已提交
1274
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
1275
            for (auto& file : files) {
G
groot 已提交
1276 1277
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
G
groot 已提交
1278 1279 1280 1281 1282
            }
            return true;
        });

        if (!commited) {
G
groot 已提交
1283
            return HandleException("CleanUp error: sqlite transaction failed");
G
groot 已提交
1284
        }
X
Xu Peng 已提交
1285

G
groot 已提交
1286 1287
        if (files.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
G
groot 已提交
1288
        }
G
groot 已提交
1289
    } catch (std::exception& e) {
G
groot 已提交
1290
        return HandleException("Encounter exception when clean table file", e.what());
P
peng.xu 已提交
1291 1292 1293 1294 1295
    }

    return Status::OK();
}

S
starlord 已提交
1296
Status
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
    auto now = utils::GetMicroSecTimeStamp();

    // erase deleted/backup files from cache
    try {
        server::MetricCollector metric;

        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        std::vector<int> file_types = {
            (int)TableFileSchema::TO_DELETE,
            (int)TableFileSchema::BACKUP,
        };

        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                              in(&TableFileSchema::file_type_, file_types)
                                              and
                                              c(&TableFileSchema::updated_time_)
                                              < now - seconds * US_PS));

        for (auto& file : files) {
            TableFileSchema table_file;
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.date_ = std::get<3>(file);

            utils::GetTableFilePath(options_, table_file);
            server::CommonUtil::EraseFromCache(table_file.location_);
        }

    } catch (std::exception& e) {
        return HandleException("Encounter exception when clean cache", e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
X
Xu Peng 已提交
1342
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1343 1344
    std::set<std::string> table_ids;

Y
youny626 已提交
1345
    // remove to_delete files
1346
    try {
Y
Yu Kun 已提交
1347
        server::MetricCollector metric;
1348

Y
youny626 已提交
1349
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1350 1351
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1352 1353 1354 1355 1356 1357
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                              c(&TableFileSchema::file_type_) ==
G
groot 已提交
1358
                                              (int)TableFileSchema::TO_DELETE
G
groot 已提交
1359 1360 1361
                                              and
                                              c(&TableFileSchema::updated_time_)
                                              < now - seconds * US_PS));
1362

G
groot 已提交
1363 1364
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1365
            for (auto& file : files) {
G
groot 已提交
1366 1367 1368 1369 1370
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1371
                utils::DeleteTableFilePath(options_, table_file);
1372
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1373 1374
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1375
                table_ids.insert(table_file.table_id_);
1376
            }
G
groot 已提交
1377 1378 1379 1380
            return true;
        });

        if (!commited) {
S
starlord 已提交
1381
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1382 1383
        }

S
starlord 已提交
1384
        if (files.size() > 0) {
1385 1386
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
        }
Y
youny626 已提交
1387
    } catch (std::exception& e) {
S
starlord 已提交
1388
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1389 1390
    }

Y
youny626 已提交
1391
    // remove to_delete tables
G
groot 已提交
1392
    try {
Y
Yu Kun 已提交
1393
        server::MetricCollector metric;
G
groot 已提交
1394

Y
youny626 已提交
1395
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1396 1397
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1398 1399
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1400 1401

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1402 1403
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1404
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1405
            }
G
groot 已提交
1406 1407 1408 1409 1410

            return true;
        });

        if (!commited) {
S
starlord 已提交
1411
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1412
        }
G
groot 已提交
1413

S
starlord 已提交
1414
        if (tables.size() > 0) {
1415 1416
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1417
    } catch (std::exception& e) {
S
starlord 已提交
1418
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1419 1420
    }

Y
youny626 已提交
1421 1422
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1423
    try {
Y
Yu Kun 已提交
1424
        server::MetricCollector metric;
S
starlord 已提交
1425

1426
        int64_t remove_tables = 0;
Y
youny626 已提交
1427
        for (auto& table_id : table_ids) {
S
starlord 已提交
1428 1429
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1430
            if (selected.size() == 0) {
S
starlord 已提交
1431
                utils::DeleteTablePath(options_, table_id);
1432
                remove_tables++;
S
starlord 已提交
1433 1434 1435
            }
        }

1436 1437
        if (remove_tables) {
            ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables folder";
1438
        }
Y
youny626 已提交
1439
    } catch (std::exception& e) {
S
starlord 已提交
1440
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1441 1442
    }

X
Xu Peng 已提交
1443 1444 1445
    return Status::OK();
}

S
starlord 已提交
1446
Status
G
groot 已提交
1447
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1448
    try {
Y
Yu Kun 已提交
1449
        server::MetricCollector metric;
1450

Y
youny626 已提交
1451 1452 1453 1454 1455
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1456

1457
        TableSchema table_schema;
G
groot 已提交
1458
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1459
        auto status = DescribeTable(table_schema);
1460

1461 1462 1463 1464 1465
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1466
        for (auto& file : selected) {
1467 1468
            result += std::get<0>(file);
        }
Y
youny626 已提交
1469
    } catch (std::exception& e) {
S
starlord 已提交
1470
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1471 1472 1473 1474
    }
    return Status::OK();
}

S
starlord 已提交
1475 1476
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1477 1478 1479
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1480 1481
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1482
    } catch (std::exception& e) {
S
starlord 已提交
1483
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1484
    }
S
starlord 已提交
1485

X
Xu Peng 已提交
1486 1487 1488
    return Status::OK();
}

G
groot 已提交
1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }

    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::file_size_),
                                                 where(c(&TableFileSchema::file_type_)
G
groot 已提交
1507
                                                       != (int)TableFileSchema::TO_DELETE),
G
groot 已提交
1508 1509 1510 1511 1512 1513
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));

            std::vector<int> ids;
            TableFileSchema table_file;

G
groot 已提交
1514
            for (auto& file : selected) {
G
groot 已提交
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
                table_file.file_size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            if (ids.size() == 0) {
                return true;
            }

            ConnectorPtr->update_all(
                set(
G
groot 已提交
1530
                    c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
G
groot 已提交
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                where(
                    in(&TableFileSchema::id_, ids)));

            return true;
        });

        if (!commited) {
            return HandleException("DiscardFiles error: sqlite transaction failed");
        }
G
groot 已提交
1541
    } catch (std::exception& e) {
G
groot 已提交
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
        return HandleException("Encounter exception when discard table file", e.what());
    }

    return DiscardFiles(to_discard_size);
}

} // namespace meta
} // namespace engine
} // namespace milvus