SqliteMetaImpl.cpp 60.4 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/meta/SqliteMetaImpl.h"
Y
youny626 已提交
19
#include "MetaConsts.h"
S
starlord 已提交
20 21
#include "db/IDGenerator.h"
#include "db/Utils.h"
22
#include "metrics/Metrics.h"
23
#include "utils/CommonUtil.h"
Y
youny626 已提交
24 25
#include "utils/Exception.h"
#include "utils/Log.h"
26
#include "utils/StringHelpFunctions.h"
X
Xu Peng 已提交
27

Y
youny626 已提交
28
#include <sqlite_orm.h>
X
Xu Peng 已提交
29
#include <unistd.h>
X
Xu Peng 已提交
30
#include <boost/filesystem.hpp>
31
#include <chrono>
X
Xu Peng 已提交
32
#include <fstream>
Y
youny626 已提交
33
#include <iostream>
S
starlord 已提交
34
#include <map>
Y
youny626 已提交
35
#include <memory>
S
starlord 已提交
36
#include <set>
Y
youny626 已提交
37
#include <sstream>
S
starlord 已提交
38

J
jinhai 已提交
39
namespace milvus {
X
Xu Peng 已提交
40
namespace engine {
41
namespace meta {
X
Xu Peng 已提交
42

X
Xu Peng 已提交
43 44
using namespace sqlite_orm;

G
groot 已提交
45 46
namespace {

S
starlord 已提交
47
Status
Y
youny626 已提交
48
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
49
    if (what == nullptr) {
S
starlord 已提交
50 51 52 53 54 55 56
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
57 58
}

Y
youny626 已提交
59
}  // namespace
G
groot 已提交
60

S
starlord 已提交
61
inline auto
G
groot 已提交
62
StoragePrototype(const std::string& path) {
G
groot 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    return make_storage(path,
                        make_table(META_TABLES,
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
                                   make_column("state", &TableSchema::state_),
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("flag", &TableSchema::flag_, default_value(0)),
                                   make_column("index_file_size", &TableSchema::index_file_size_),
                                   make_column("engine_type", &TableSchema::engine_type_),
                                   make_column("nlist", &TableSchema::nlist_),
                                   make_column("metric_type", &TableSchema::metric_type_),
                                   make_column("owner_table", &TableSchema::owner_table_, default_value("")),
                                   make_column("partition_tag", &TableSchema::partition_tag_, default_value("")),
                                   make_column("version", &TableSchema::version_, default_value(CURRENT_VERSION))),
                        make_table(META_TABLEFILES,
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
                                   make_column("engine_type", &TableFileSchema::engine_type_),
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
                                   make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
                                   make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_)));
X
Xu Peng 已提交
89 90
}

X
Xu Peng 已提交
91
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
92 93
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
94
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
95 96 97 98 99 100
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
101
Status
Y
youny626 已提交
102
SqliteMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
103
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
104 105
    std::stringstream ss;
    SimpleIDGenerator g;
106
    ss << g.GetNextIDNumber();
107
    table_id = ss.str();
108 109 110
    return Status::OK();
}

S
starlord 已提交
111
Status
Y
youny626 已提交
112
SqliteMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
113
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
X
Xu Peng 已提交
114 115
    std::stringstream ss;
    SimpleIDGenerator g;
116
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
117 118 119 120
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
121 122 123
void
SqliteMetaImpl::ValidateMetaSchema() {
    if (ConnectorPtr == nullptr) {
124 125 126
        return;
    }

Y
youny626 已提交
127
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
128
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
129 130
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
131 132
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
133 134
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
135 136 137 138
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
139 140
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
141 142
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
143
        if (!ret) {
S
starlord 已提交
144
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
145 146
            ENGINE_LOG_ERROR << msg;
            return Status(DB_INVALID_PATH, msg);
147
        }
X
Xu Peng 已提交
148
    }
X
Xu Peng 已提交
149

S
starlord 已提交
150
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
151

152
    ValidateMetaSchema();
153

X
Xu Peng 已提交
154
    ConnectorPtr->sync_schema();
Y
youny626 已提交
155 156
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
157

158
    CleanUpShadowFiles();
X
Xu Peng 已提交
159

X
Xu Peng 已提交
160
    return Status::OK();
X
Xu Peng 已提交
161 162
}

G
groot 已提交
163
Status
G
groot 已提交
164
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
                } else {
                    // Change from no error to already exist.
                    return Status(DB_ALREADY_EXIST, "Table already exists");
                }
            }
        }

        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

        try {
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id_ = id;
G
groot 已提交
192
        } catch (std::exception& e) {
G
groot 已提交
193 194 195 196 197 198
            return HandleException("Encounter exception when create table", e.what());
        }

        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
199
    } catch (std::exception& e) {
G
groot 已提交
200 201 202 203 204
        return HandleException("Encounter exception when create table", e.what());
    }
}

Status
G
groot 已提交
205
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
G
groot 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    try {
        server::MetricCollector metric;

        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::created_on_,
                                                   &TableSchema::flag_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_,
                                                   &TableSchema::owner_table_,
                                                   &TableSchema::partition_tag_,
                                                   &TableSchema::version_),
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
G
groot 已提交
222
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239

        if (groups.size() == 1) {
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.flag_ = std::get<4>(groups[0]);
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
            table_schema.nlist_ = std::get<7>(groups[0]);
            table_schema.metric_type_ = std::get<8>(groups[0]);
            table_schema.owner_table_ = std::get<9>(groups[0]);
            table_schema.partition_tag_ = std::get<10>(groups[0]);
            table_schema.version_ = std::get<11>(groups[0]);
        } else {
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
        }
G
groot 已提交
240
    } catch (std::exception& e) {
G
groot 已提交
241 242 243 244 245 246 247
        return HandleException("Encounter exception when describe table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
248
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
249 250 251 252 253 254
    has_or_not = false;

    try {
        server::MetricCollector metric;
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                           where(c(&TableSchema::table_id_) == table_id
G
groot 已提交
255
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
256 257 258 259 260
        if (tables.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
261
    } catch (std::exception& e) {
G
groot 已提交
262 263 264 265 266 267 268
        return HandleException("Encounter exception when lookup table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
269
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
                                                     &TableSchema::table_id_,
                                                     &TableSchema::dimension_,
                                                     &TableSchema::created_on_,
                                                     &TableSchema::flag_,
                                                     &TableSchema::index_file_size_,
                                                     &TableSchema::engine_type_,
                                                     &TableSchema::nlist_,
                                                     &TableSchema::metric_type_,
                                                     &TableSchema::owner_table_,
                                                     &TableSchema::partition_tag_,
                                                     &TableSchema::version_),
G
groot 已提交
285 286
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
        for (auto& table : selected) {
G
groot 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
            schema.nlist_ = std::get<7>(table);
            schema.metric_type_ = std::get<8>(table);
            schema.owner_table_ = std::get<9>(table);
            schema.partition_tag_ = std::get<10>(table);
            schema.version_ = std::get<11>(table);

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
303
    } catch (std::exception& e) {
G
groot 已提交
304 305 306 307 308 309 310
        return HandleException("Encounter exception when lookup all tables", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
311
SqliteMetaImpl::DropTable(const std::string& table_id) {
G
groot 已提交
312 313 314 315 316 317 318 319 320
    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete table
        ConnectorPtr->update_all(
            set(
G
groot 已提交
321
                c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
G
groot 已提交
322 323
            where(
                c(&TableSchema::table_id_) == table_id and
G
groot 已提交
324
                c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
325 326

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
G
groot 已提交
327
    } catch (std::exception& e) {
G
groot 已提交
328 329 330 331 332 333 334
        return HandleException("Encounter exception when delete table", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
335
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
336 337 338 339 340 341 342 343 344
    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete table files
        ConnectorPtr->update_all(
            set(
G
groot 已提交
345
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
G
groot 已提交
346 347 348
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
349
                c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
350 351

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
G
groot 已提交
352
    } catch (std::exception& e) {
G
groot 已提交
353 354 355 356 357 358 359
        return HandleException("Encounter exception when delete table files", e.what());
    }

    return Status::OK();
}

Status
G
groot 已提交
360
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = utils::GetDate();
    }
    TableSchema table_schema;
    table_schema.table_id_ = file_schema.table_id_;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
    }

    try {
        server::MetricCollector metric;

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.index_file_size_ = table_schema.index_file_size_;
        file_schema.engine_type_ = table_schema.engine_type_;
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);
G
groot 已提交
393
    } catch (std::exception& e) {
G
groot 已提交
394 395 396 397 398 399
        return HandleException("Encounter exception when create table file", e.what());
    }

    return Status::OK();
}

S
starlord 已提交
400
// TODO(myh): Delete single vecotor by id
S
starlord 已提交
401
Status
G
groot 已提交
402 403
SqliteMetaImpl::DropDataByDate(const std::string& table_id,
                               const DatesT& dates) {
404
    if (dates.empty()) {
X
Xu Peng 已提交
405 406 407
        return Status::OK();
    }

408
    TableSchema table_schema;
G
groot 已提交
409
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
410
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
411 412 413 414
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
415
    try {
Y
youny626 已提交
416 417
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
418 419 420
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
421
        for (DateT date : dates) {
422 423
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
424
            if (last_batch.size() > batch_size) {
425 426 427 428
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
429
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
430 431
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
432 433
        for (auto& batch_dates : split_dates) {
            if (batch_dates.empty()) {
434 435 436 437
                continue;
            }

            ConnectorPtr->update_all(
Y
youny626 已提交
438
                set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
439
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
Y
youny626 已提交
440
                where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::date_, batch_dates)));
441
        }
442

G
groot 已提交
443
        ENGINE_LOG_DEBUG << "Successfully drop data by date, table id = " << table_schema.table_id_;
G
groot 已提交
444
    } catch (std::exception& e) {
S
starlord 已提交
445
        return HandleException("Encounter exception when drop partition", e.what());
X
Xu Peng 已提交
446
    }
G
groot 已提交
447

X
Xu Peng 已提交
448 449 450
    return Status::OK();
}

S
starlord 已提交
451
Status
G
groot 已提交
452 453 454
SqliteMetaImpl::GetTableFiles(const std::string& table_id,
                              const std::vector<size_t>& ids,
                              TableFilesSchema& table_files) {
G
groot 已提交
455
    try {
G
groot 已提交
456 457 458 459 460 461 462 463 464 465 466
        table_files.clear();
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::file_size_,
                                                  &TableFileSchema::row_count_,
                                                  &TableFileSchema::date_,
                                                  &TableFileSchema::engine_type_,
                                                  &TableFileSchema::created_on_),
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                in(&TableFileSchema::id_, ids) and
G
groot 已提交
467
                                                c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
468 469 470 471 472
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
G
groot 已提交
473
        }
G
groot 已提交
474

G
groot 已提交
475
        Status result;
G
groot 已提交
476
        for (auto& file : files) {
G
groot 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490
            TableFileSchema file_schema;
            file_schema.table_id_ = table_id;
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
            file_schema.created_on_ = std::get<7>(file);
            file_schema.dimension_ = table_schema.dimension_;
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
491

G
groot 已提交
492
            utils::GetTableFilePath(options_, file_schema);
493

G
groot 已提交
494 495
            table_files.emplace_back(file_schema);
        }
496

G
groot 已提交
497 498
        ENGINE_LOG_DEBUG << "Get table files by id";
        return result;
G
groot 已提交
499
    } catch (std::exception& e) {
G
groot 已提交
500
        return HandleException("Encounter exception when lookup table files", e.what());
501
    }
X
Xu Peng 已提交
502 503
}

S
starlord 已提交
504
Status
G
groot 已提交
505
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
G
groot 已提交
506
    try {
Y
Yu Kun 已提交
507
        server::MetricCollector metric;
G
groot 已提交
508

G
groot 已提交
509
        //set all backup file to raw
S
starlord 已提交
510
        ConnectorPtr->update_all(
G
groot 已提交
511 512 513 514 515
            set(
                c(&TableSchema::flag_) = flag),
            where(
                c(&TableSchema::table_id_) == table_id));
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
G
groot 已提交
516
    } catch (std::exception& e) {
G
groot 已提交
517 518
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e.what());
G
groot 已提交
519 520 521 522 523
    }

    return Status::OK();
}

S
starlord 已提交
524
Status
G
groot 已提交
525
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
526
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
G
groot 已提交
527
    try {
Y
Yu Kun 已提交
528
        server::MetricCollector metric;
G
groot 已提交
529

Y
youny626 已提交
530
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
531 532
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
533 534
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));
G
groot 已提交
535

G
groot 已提交
536 537
        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
G
groot 已提交
538
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
539 540
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }
G
groot 已提交
541

G
groot 已提交
542 543 544
        ConnectorPtr->update(file_schema);

        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
G
groot 已提交
545
    } catch (std::exception& e) {
G
groot 已提交
546 547 548 549
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
                          + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e.what());
    }
G
groot 已提交
550 551 552
    return Status::OK();
}

S
starlord 已提交
553
Status
G
groot 已提交
554
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
555
    try {
Y
Yu Kun 已提交
556
        server::MetricCollector metric;
G
groot 已提交
557

G
groot 已提交
558 559
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);
G
groot 已提交
560

G
groot 已提交
561
        std::map<std::string, bool> has_tables;
G
groot 已提交
562
        for (auto& file : files) {
G
groot 已提交
563 564 565 566 567
            if (has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
G
groot 已提交
568
                                                     and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
569 570 571 572 573
            if (tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
574
        }
P
peng.xu 已提交
575

G
groot 已提交
576
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
577
            for (auto& file : files) {
G
groot 已提交
578 579
                if (!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
580
                }
G
groot 已提交
581 582 583

                file.updated_time_ = utils::GetMicroSecTimeStamp();
                ConnectorPtr->update(file);
584
            }
G
groot 已提交
585 586
            return true;
        });
587

G
groot 已提交
588 589
        if (!commited) {
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
P
peng.xu 已提交
590
        }
G
groot 已提交
591 592

        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
G
groot 已提交
593
    } catch (std::exception& e) {
G
groot 已提交
594
        return HandleException("Encounter exception when update table files", e.what());
P
peng.xu 已提交
595 596 597 598
    }
    return Status::OK();
}

S
starlord 已提交
599
Status
Y
youny626 已提交
600
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
601
    try {
Y
Yu Kun 已提交
602
        server::MetricCollector metric;
603

Y
youny626 已提交
604
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
605 606
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
607 608 609 610 611 612 613 614 615 616
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::created_on_,
                                                   &TableSchema::flag_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::owner_table_,
                                                   &TableSchema::partition_tag_,
                                                   &TableSchema::version_),
                                           where(c(&TableSchema::table_id_) == table_id
G
groot 已提交
617
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
618

S
starlord 已提交
619
        if (tables.size() > 0) {
620 621 622 623 624 625
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
626
            table_schema.flag_ = std::get<4>(tables[0]);
627
            table_schema.index_file_size_ = std::get<5>(tables[0]);
G
groot 已提交
628 629 630
            table_schema.owner_table_ = std::get<6>(tables[0]);
            table_schema.partition_tag_ = std::get<7>(tables[0]);
            table_schema.version_ = std::get<8>(tables[0]);
631
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
632 633
            table_schema.nlist_ = index.nlist_;
            table_schema.metric_type_ = index.metric_type_;
634 635 636

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
637
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
638 639
        }

G
groot 已提交
640 641 642
        //set all backup file to raw
        ConnectorPtr->update_all(
            set(
G
groot 已提交
643
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
G
groot 已提交
644 645 646
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
647
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
648

649
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
650
    } catch (std::exception& e) {
651
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
652
        return HandleException(msg, e.what());
653
    }
S
starlord 已提交
654 655 656 657

    return Status::OK();
}

S
starlord 已提交
658
Status
G
groot 已提交
659
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
S
starlord 已提交
660
    try {
Y
Yu Kun 已提交
661
        server::MetricCollector metric;
S
starlord 已提交
662

G
groot 已提交
663 664 665 666 667
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        ConnectorPtr->update_all(
            set(
G
groot 已提交
668
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
G
groot 已提交
669 670
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
671 672
                c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
G
groot 已提交
673 674

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
G
groot 已提交
675
    } catch (std::exception& e) {
G
groot 已提交
676
        return HandleException("Encounter exception when update table files to to_index", e.what());
S
starlord 已提交
677 678
    }

679 680 681
    return Status::OK();
}

S
starlord 已提交
682
Status
Y
youny626 已提交
683
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
684
    try {
Y
Yu Kun 已提交
685
        server::MetricCollector metric;
686

G
groot 已提交
687 688 689 690
        auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id
G
groot 已提交
691
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
692 693 694

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
695
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
696
            index.metric_type_ = std::get<2>(groups[0]);
697
        } else {
S
starlord 已提交
698
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
699
        }
Y
youny626 已提交
700
    } catch (std::exception& e) {
S
starlord 已提交
701
        return HandleException("Encounter exception when describe index", e.what());
702 703 704 705 706
    }

    return Status::OK();
}

S
starlord 已提交
707
Status
Y
youny626 已提交
708
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
709
    try {
Y
Yu Kun 已提交
710
        server::MetricCollector metric;
711

Y
youny626 已提交
712
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
713 714
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
715 716 717
        //soft delete index files
        ConnectorPtr->update_all(
            set(
G
groot 已提交
718
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
G
groot 已提交
719 720 721
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
722
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));
G
groot 已提交
723 724

        //set all backup file to raw
725
        ConnectorPtr->update_all(
G
groot 已提交
726
            set(
G
groot 已提交
727
                c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
G
groot 已提交
728 729 730
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
G
groot 已提交
731
                c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
G
groot 已提交
732 733 734 735 736 737

        //set table index type to raw
        ConnectorPtr->update_all(
            set(
                c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE,
                c(&TableSchema::nlist_) = DEFAULT_NLIST,
S
starlord 已提交
738
                c(&TableSchema::metric_type_) = DEFAULT_METRIC_TYPE),
G
groot 已提交
739 740
            where(
                c(&TableSchema::table_id_) == table_id));
741

742
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
G
groot 已提交
743
    } catch (std::exception& e) {
S
starlord 已提交
744
        return HandleException("Encounter exception when delete table index files", e.what());
745 746 747 748 749
    }

    return Status::OK();
}

S
starlord 已提交
750
Status
G
groot 已提交
751 752 753
SqliteMetaImpl::CreatePartition(const std::string& table_id,
                                const std::string& partition_name,
                                const std::string& tag) {
G
groot 已提交
754
    server::MetricCollector metric;
755

G
groot 已提交
756 757 758 759 760
    TableSchema table_schema;
    table_schema.table_id_ = table_id;
    auto status = DescribeTable(table_schema);
    if (!status.ok()) {
        return status;
G
groot 已提交
761
    }
G
groot 已提交
762

G
groot 已提交
763
    // not allow create partition under partition
G
groot 已提交
764
    if (!table_schema.owner_table_.empty()) {
G
groot 已提交
765
        return Status(DB_ERROR, "Nested partition is not allowed");
G
groot 已提交
766
    }
G
groot 已提交
767

768 769 770 771 772 773 774 775
    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    // not allow duplicated partition
    std::string exist_partition;
    GetPartitionName(table_id, valid_tag, exist_partition);
G
groot 已提交
776
    if (!exist_partition.empty()) {
G
groot 已提交
777
        return Status(DB_ERROR, "Duplicate partition is not allowed");
778
    }
G
groot 已提交
779

780 781
    if (partition_name == "") {
        // generate unique partition name
G
groot 已提交
782 783 784
        NextTableId(table_schema.table_id_);
    } else {
        table_schema.table_id_ = partition_name;
X
Xu Peng 已提交
785
    }
G
groot 已提交
786

G
groot 已提交
787 788 789 790
    table_schema.id_ = -1;
    table_schema.flag_ = 0;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
    table_schema.owner_table_ = table_id;
791 792 793 794 795 796
    table_schema.partition_tag_ = valid_tag;

    status = CreateTable(table_schema);
    if (status.code() == DB_ALREADY_EXIST) {
        return Status(DB_ALREADY_EXIST, "Partition already exists");
    }
G
groot 已提交
797

798
    return status;
X
Xu Peng 已提交
799 800
}

S
starlord 已提交
801
Status
G
groot 已提交
802 803 804
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
    return DropTable(partition_name);
}
805

G
groot 已提交
806 807
Status
SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) {
G
groot 已提交
808
    try {
Y
Yu Kun 已提交
809
        server::MetricCollector metric;
G
groot 已提交
810

G
groot 已提交
811
        auto partitions = ConnectorPtr->select(columns(&TableSchema::table_id_),
G
groot 已提交
812 813 814
                                               where(c(&TableSchema::owner_table_) == table_id
                                                     and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
        for (size_t i = 0; i < partitions.size(); i++) {
G
groot 已提交
815 816 817 818 819 820
            std::string partition_name = std::get<0>(partitions[i]);
            meta::TableSchema partition_schema;
            partition_schema.table_id_ = partition_name;
            DescribeTable(partition_schema);
            partiton_schema_array.emplace_back(partition_schema);
        }
G
groot 已提交
821
    } catch (std::exception& e) {
G
groot 已提交
822
        return HandleException("Encounter exception when show partitions", e.what());
823 824
    }

X
Xu Peng 已提交
825
    return Status::OK();
X
Xu Peng 已提交
826 827
}

S
starlord 已提交
828
Status
G
groot 已提交
829
SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) {
830
    try {
Y
Yu Kun 已提交
831
        server::MetricCollector metric;
G
groot 已提交
832

833 834 835 836 837
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

G
groot 已提交
838
        auto name = ConnectorPtr->select(columns(&TableSchema::table_id_),
G
groot 已提交
839
                                         where(c(&TableSchema::owner_table_) == table_id
840 841
                                               and c(&TableSchema::partition_tag_) == valid_tag
                                               and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
842 843 844
        if (name.size() > 0) {
            partition_name = std::get<0>(name[0]);
        } else {
845
            return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
846
        }
G
groot 已提交
847
    } catch (std::exception& e) {
G
groot 已提交
848
        return HandleException("Encounter exception when get partition name", e.what());
X
Xu Peng 已提交
849
    }
G
groot 已提交
850 851

    return Status::OK();
X
Xu Peng 已提交
852 853
}

S
starlord 已提交
854
Status
G
groot 已提交
855 856 857
SqliteMetaImpl::FilesToSearch(const std::string& table_id,
                              const std::vector<size_t>& ids,
                              const DatesT& dates,
Y
youny626 已提交
858
                              DatePartionedTableFilesSchema& files) {
X
xj.lin 已提交
859
    files.clear();
Y
Yu Kun 已提交
860
    server::MetricCollector metric;
X
xj.lin 已提交
861 862

    try {
Y
youny626 已提交
863 864 865 866
        auto select_columns =
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
867 868

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
869

Y
youny626 已提交
870 871
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
872
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
873 874 875 876

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
877 878 879
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
880

Y
youny626 已提交
881 882
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
883 884 885
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
886
        for (DateT date : dates) {
887 888
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
889
            if (last_batch.size() > batch_size) {
890 891 892 893
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
894
        // perform query
895
        decltype(ConnectorPtr->select(select_columns)) selected;
896
        if (dates.empty() && ids.empty()) {
X
xj.lin 已提交
897
            auto filter = where(match_tableid and match_type);
898
            selected = ConnectorPtr->select(select_columns, filter);
899
        } else if (dates.empty() && !ids.empty()) {
X
xj.lin 已提交
900
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
901
            auto filter = where(match_tableid and match_fileid and match_type);
902
            selected = ConnectorPtr->select(select_columns, filter);
903
        } else if (!dates.empty() && ids.empty()) {
Y
youny626 已提交
904 905
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
906 907 908 909 910
                    continue;
                }
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
911
                for (auto& file : batch_selected) {
912 913 914 915 916
                    selected.push_back(file);
                }
            }

        } else if (!dates.empty() && !ids.empty()) {
Y
youny626 已提交
917 918
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
919 920 921 922 923 924
                    continue;
                }
                auto match_fileid = in(&TableFileSchema::id_, ids);
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_fileid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
925
                for (auto& file : batch_selected) {
926 927 928
                    selected.push_back(file);
                }
            }
X
xj.lin 已提交
929 930
        }

S
starlord 已提交
931
        Status ret;
X
xj.lin 已提交
932
        TableFileSchema table_file;
Y
youny626 已提交
933
        for (auto& file : selected) {
X
xj.lin 已提交
934 935 936 937
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
938 939 940 941
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
942
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
943
            table_file.index_file_size_ = table_schema.index_file_size_;
944
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
945 946
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
947
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
948
            if (!status.ok()) {
S
starlord 已提交
949
                ret = status;
S
starlord 已提交
950 951
            }

X
xj.lin 已提交
952 953 954 955 956 957
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }
S
starlord 已提交
958
        if (files.empty()) {
S
starlord 已提交
959
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
960
        }
S
starlord 已提交
961

S
starlord 已提交
962
        if (selected.size() > 0) {
963 964
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
965
        return ret;
Y
youny626 已提交
966
    } catch (std::exception& e) {
S
starlord 已提交
967
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
968 969 970
    }
}

S
starlord 已提交
971
Status
Y
youny626 已提交
972
SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) {
X
Xu Peng 已提交
973
    files.clear();
X
Xu Peng 已提交
974

975
    try {
Y
Yu Kun 已提交
976
        server::MetricCollector metric;
G
groot 已提交
977

Y
youny626 已提交
978
        // check table existence
S
starlord 已提交
979 980 981 982 983 984 985
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
986 987 988 989 990 991 992 993
        // get files to merge
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
994

S
starlord 已提交
995
        Status result;
996
        int64_t to_merge_files = 0;
Y
youny626 已提交
997
        for (auto& file : selected) {
S
starlord 已提交
998 999
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
S
starlord 已提交
1000
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
1001
                continue;  // skip large file
S
starlord 已提交
1002 1003
            }

G
groot 已提交
1004 1005 1006 1007
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
1008 1009 1010
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
1011
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
1012
            table_file.index_file_size_ = table_schema.index_file_size_;
1013
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
1014 1015
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
1016
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
1017
            if (!status.ok()) {
S
starlord 已提交
1018
                result = status;
S
starlord 已提交
1019 1020
            }

G
groot 已提交
1021
            auto dateItr = files.find(table_file.date_);
1022
            if (dateItr == files.end()) {
G
groot 已提交
1023
                files[table_file.date_] = TableFilesSchema();
1024
            }
1025

G
groot 已提交
1026
            files[table_file.date_].push_back(table_file);
1027
            to_merge_files++;
X
Xu Peng 已提交
1028
        }
S
starlord 已提交
1029

1030 1031
        if (to_merge_files > 0) {
            ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
1032
        }
G
groot 已提交
1033 1034 1035 1036 1037 1038 1039
        return result;
    } catch (std::exception& e) {
        return HandleException("Encounter exception when iterate merge files", e.what());
    }
}

Status
G
groot 已提交
1040
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
G
groot 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
    files.clear();

    try {
        server::MetricCollector metric;

        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::file_size_,
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
                                             where(c(&TableFileSchema::file_type_)
G
groot 已提交
1056
                                                   == (int)TableFileSchema::TO_INDEX));
G
groot 已提交
1057 1058 1059 1060 1061

        std::map<std::string, TableSchema> groups;
        TableFileSchema table_file;

        Status ret;
G
groot 已提交
1062
        for (auto& file : selected) {
G
groot 已提交
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);

            auto status = utils::GetTableFilePath(options_, table_file);
            if (!status.ok()) {
                ret = status;
            }
            auto groupItr = groups.find(table_file.table_id_);
            if (groupItr == groups.end()) {
                TableSchema table_schema;
                table_schema.table_id_ = table_file.table_id_;
                auto status = DescribeTable(table_schema);
                if (!status.ok()) {
                    return status;
                }
                groups[table_file.table_id_] = table_schema;
            }
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            files.push_back(table_file);
        }

        if (selected.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
        return ret;
G
groot 已提交
1098
    } catch (std::exception& e) {
G
groot 已提交
1099
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
1100
    }
X
Xu Peng 已提交
1101 1102
}

S
starlord 已提交
1103
Status
G
groot 已提交
1104 1105 1106
SqliteMetaImpl::FilesByType(const std::string& table_id,
                            const std::vector<int>& file_types,
                            TableFilesSchema& table_files) {
G
groot 已提交
1107 1108 1109
    if (file_types.empty()) {
        return Status(DB_ERROR, "file types array is empty");
    }
1110

G
groot 已提交
1111
    try {
G
groot 已提交
1112 1113 1114 1115 1116 1117 1118 1119 1120
        table_files.clear();
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::file_size_,
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
1121 1122
                                             where(in(&TableFileSchema::file_type_, file_types)
                                                   and c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1123

G
groot 已提交
1124 1125 1126
        if (selected.size() >= 1) {
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
G
groot 已提交
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
            for (auto& file : selected) {
                TableFileSchema file_schema;
                file_schema.table_id_ = table_id;
                file_schema.id_ = std::get<0>(file);
                file_schema.file_id_ = std::get<1>(file);
                file_schema.file_type_ = std::get<2>(file);
                file_schema.file_size_ = std::get<3>(file);
                file_schema.row_count_ = std::get<4>(file);
                file_schema.date_ = std::get<5>(file);
                file_schema.engine_type_ = std::get<6>(file);
                file_schema.created_on_ = std::get<7>(file);

                switch (file_schema.file_type_) {
                    case (int)TableFileSchema::RAW:raw_count++;
G
groot 已提交
1141
                        break;
G
groot 已提交
1142
                    case (int)TableFileSchema::NEW:new_count++;
G
groot 已提交
1143
                        break;
G
groot 已提交
1144
                    case (int)TableFileSchema::NEW_MERGE:new_merge_count++;
G
groot 已提交
1145
                        break;
G
groot 已提交
1146
                    case (int)TableFileSchema::NEW_INDEX:new_index_count++;
G
groot 已提交
1147
                        break;
G
groot 已提交
1148
                    case (int)TableFileSchema::TO_INDEX:to_index_count++;
G
groot 已提交
1149
                        break;
G
groot 已提交
1150
                    case (int)TableFileSchema::INDEX:index_count++;
G
groot 已提交
1151
                        break;
G
groot 已提交
1152
                    case (int)TableFileSchema::BACKUP:backup_count++;
G
groot 已提交
1153 1154 1155
                        break;
                    default:break;
                }
G
groot 已提交
1156 1157

                table_files.emplace_back(file_schema);
G
groot 已提交
1158
            }
1159

G
groot 已提交
1160 1161 1162 1163
            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                             << " new files:" << new_count << " new_merge files:" << new_merge_count
                             << " new_index files:" << new_index_count << " to_index files:" << to_index_count
                             << " index files:" << index_count << " backup files:" << backup_count;
X
Xu Peng 已提交
1164
        }
G
groot 已提交
1165
    } catch (std::exception& e) {
G
groot 已提交
1166
        return HandleException("Encounter exception when check non index files", e.what());
X
Xu Peng 已提交
1167
    }
G
groot 已提交
1168
    return Status::OK();
X
Xu Peng 已提交
1169 1170
}

S
starlord 已提交
1171
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
1172 1173
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
1174
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
1175 1176 1177 1178 1179
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
1180 1181
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
1182
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
S
starlord 已提交
1183 1184
            int64_t usecs = limit * D_SEC * US_PS;
            int64_t now = utils::GetMicroSecTimeStamp();
1185
            try {
Y
youny626 已提交
1186
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1187 1188
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1189 1190
                ConnectorPtr->update_all(
                    set(
G
groot 已提交
1191
                        c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
G
groot 已提交
1192
                    where(
G
groot 已提交
1193 1194 1195
                        c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                        c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
            } catch (std::exception& e) {
S
starlord 已提交
1196
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1197
            }
1198 1199

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
1200
        }
G
groot 已提交
1201
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
1202
            uint64_t sum = 0;
X
Xu Peng 已提交
1203
            Size(sum);
X
Xu Peng 已提交
1204

Y
youny626 已提交
1205
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
1206
            DiscardFiles(to_delete);
1207 1208

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
1209 1210 1211 1212 1213 1214
        }
    }

    return Status::OK();
}

S
starlord 已提交
1215
Status
Y
youny626 已提交
1216
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
1217
    result = 0;
X
Xu Peng 已提交
1218
    try {
1219
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
1220 1221
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
1222 1223
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
1224
            }
Y
youny626 已提交
1225
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
1226
        }
Y
youny626 已提交
1227
    } catch (std::exception& e) {
S
starlord 已提交
1228
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
1229 1230 1231 1232 1233
    }

    return Status::OK();
}

S
starlord 已提交
1234
Status
1235
SqliteMetaImpl::CleanUpShadowFiles() {
X
Xu Peng 已提交
1236
    try {
Y
Yu Kun 已提交
1237
        server::MetricCollector metric;
G
groot 已提交
1238

Y
youny626 已提交
1239
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1240 1241
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1242
        std::vector<int> file_types = {
G
groot 已提交
1243 1244 1245
            (int)TableFileSchema::NEW,
            (int)TableFileSchema::NEW_INDEX,
            (int)TableFileSchema::NEW_MERGE
G
groot 已提交
1246 1247 1248
        };
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1249

G
groot 已提交
1250
        auto commited = ConnectorPtr->transaction([&]() mutable {
G
groot 已提交
1251
            for (auto& file : files) {
G
groot 已提交
1252 1253
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
G
groot 已提交
1254 1255 1256 1257 1258
            }
            return true;
        });

        if (!commited) {
G
groot 已提交
1259
            return HandleException("CleanUp error: sqlite transaction failed");
G
groot 已提交
1260
        }
X
Xu Peng 已提交
1261

G
groot 已提交
1262 1263
        if (files.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
G
groot 已提交
1264
        }
G
groot 已提交
1265
    } catch (std::exception& e) {
G
groot 已提交
1266
        return HandleException("Encounter exception when clean table file", e.what());
P
peng.xu 已提交
1267 1268 1269 1270 1271
    }

    return Status::OK();
}

S
starlord 已提交
1272
Status
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
    auto now = utils::GetMicroSecTimeStamp();

    // erase deleted/backup files from cache
    try {
        server::MetricCollector metric;

        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        std::vector<int> file_types = {
            (int)TableFileSchema::TO_DELETE,
            (int)TableFileSchema::BACKUP,
        };

        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                              in(&TableFileSchema::file_type_, file_types)
                                              and
                                              c(&TableFileSchema::updated_time_)
                                              < now - seconds * US_PS));

        for (auto& file : files) {
            TableFileSchema table_file;
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.date_ = std::get<3>(file);

            utils::GetTableFilePath(options_, table_file);
            server::CommonUtil::EraseFromCache(table_file.location_);
        }

    } catch (std::exception& e) {
        return HandleException("Encounter exception when clean cache", e.what());
    }

    return Status::OK();
}

Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
X
Xu Peng 已提交
1318
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1319 1320
    std::set<std::string> table_ids;

Y
youny626 已提交
1321
    // remove to_delete files
1322
    try {
Y
Yu Kun 已提交
1323
        server::MetricCollector metric;
1324

Y
youny626 已提交
1325
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1326 1327
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1328 1329 1330 1331 1332 1333
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                              c(&TableFileSchema::file_type_) ==
G
groot 已提交
1334
                                              (int)TableFileSchema::TO_DELETE
G
groot 已提交
1335 1336 1337
                                              and
                                              c(&TableFileSchema::updated_time_)
                                              < now - seconds * US_PS));
1338

G
groot 已提交
1339 1340
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1341
            for (auto& file : files) {
G
groot 已提交
1342 1343 1344 1345 1346
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1347
                utils::DeleteTableFilePath(options_, table_file);
1348
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1349 1350
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1351
                table_ids.insert(table_file.table_id_);
1352
            }
G
groot 已提交
1353 1354 1355 1356
            return true;
        });

        if (!commited) {
S
starlord 已提交
1357
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1358 1359
        }

S
starlord 已提交
1360
        if (files.size() > 0) {
1361 1362
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
        }
Y
youny626 已提交
1363
    } catch (std::exception& e) {
S
starlord 已提交
1364
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1365 1366
    }

Y
youny626 已提交
1367
    // remove to_delete tables
G
groot 已提交
1368
    try {
Y
Yu Kun 已提交
1369
        server::MetricCollector metric;
G
groot 已提交
1370

Y
youny626 已提交
1371
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1372 1373
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1374 1375
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1376 1377

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1378 1379
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1380
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1381
            }
G
groot 已提交
1382 1383 1384 1385 1386

            return true;
        });

        if (!commited) {
S
starlord 已提交
1387
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1388
        }
G
groot 已提交
1389

S
starlord 已提交
1390
        if (tables.size() > 0) {
1391 1392
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1393
    } catch (std::exception& e) {
S
starlord 已提交
1394
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1395 1396
    }

Y
youny626 已提交
1397 1398
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1399
    try {
Y
Yu Kun 已提交
1400
        server::MetricCollector metric;
S
starlord 已提交
1401

1402
        int64_t remove_tables = 0;
Y
youny626 已提交
1403
        for (auto& table_id : table_ids) {
S
starlord 已提交
1404 1405
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1406
            if (selected.size() == 0) {
S
starlord 已提交
1407
                utils::DeleteTablePath(options_, table_id);
1408
                remove_tables++;
S
starlord 已提交
1409 1410 1411
            }
        }

1412 1413
        if (remove_tables) {
            ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables folder";
1414
        }
Y
youny626 已提交
1415
    } catch (std::exception& e) {
S
starlord 已提交
1416
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1417 1418
    }

X
Xu Peng 已提交
1419 1420 1421
    return Status::OK();
}

S
starlord 已提交
1422
Status
G
groot 已提交
1423
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1424
    try {
Y
Yu Kun 已提交
1425
        server::MetricCollector metric;
1426

Y
youny626 已提交
1427 1428 1429 1430 1431
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1432

1433
        TableSchema table_schema;
G
groot 已提交
1434
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1435
        auto status = DescribeTable(table_schema);
1436

1437 1438 1439 1440 1441
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1442
        for (auto& file : selected) {
1443 1444
            result += std::get<0>(file);
        }
Y
youny626 已提交
1445
    } catch (std::exception& e) {
S
starlord 已提交
1446
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1447 1448 1449 1450
    }
    return Status::OK();
}

S
starlord 已提交
1451 1452
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1453 1454 1455
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1456 1457
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1458
    } catch (std::exception& e) {
S
starlord 已提交
1459
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1460
    }
S
starlord 已提交
1461

X
Xu Peng 已提交
1462 1463 1464
    return Status::OK();
}

G
groot 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }

    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;

    try {
        server::MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::file_size_),
                                                 where(c(&TableFileSchema::file_type_)
G
groot 已提交
1483
                                                       != (int)TableFileSchema::TO_DELETE),
G
groot 已提交
1484 1485 1486 1487 1488 1489
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));

            std::vector<int> ids;
            TableFileSchema table_file;

G
groot 已提交
1490
            for (auto& file : selected) {
G
groot 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
                table_file.file_size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
            }

            if (ids.size() == 0) {
                return true;
            }

            ConnectorPtr->update_all(
                set(
G
groot 已提交
1506
                    c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
G
groot 已提交
1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                where(
                    in(&TableFileSchema::id_, ids)));

            return true;
        });

        if (!commited) {
            return HandleException("DiscardFiles error: sqlite transaction failed");
        }
G
groot 已提交
1517
    } catch (std::exception& e) {
G
groot 已提交
1518 1519 1520 1521 1522 1523 1524 1525 1526 1527
        return HandleException("Encounter exception when discard table file", e.what());
    }

    return DiscardFiles(to_discard_size);
}

} // namespace meta
} // namespace engine
} // namespace milvus