SqliteMetaImpl.cpp 52.5 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
S
starlord 已提交
6 7 8 9
#include "SqliteMetaImpl.h"
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "db/Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
S
starlord 已提交
11
#include "db/Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36 37 38
}

}

39
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
40
    return make_storage(path,
G
groot 已提交
41
                        make_table("Tables",
G
groot 已提交
42 43
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
44
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
45 46
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
S
starlord 已提交
47
                                   make_column("flag", &TableSchema::flag_, default_value(0)),
G
groot 已提交
48
                                   make_column("engine_type", &TableSchema::engine_type_),
49 50 51
                                   make_column("nlist", &TableSchema::nlist_),
                                   make_column("index_file_size", &TableSchema::index_file_size_),
                                   make_column("metric_type", &TableSchema::metric_type_)),
G
groot 已提交
52
                        make_table("TableFiles",
G
groot 已提交
53 54
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
55
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
56 57
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
58 59
                                   make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
                                   make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
G
groot 已提交
60 61 62
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
63
    );
X
Xu Peng 已提交
64 65 66

}

X
Xu Peng 已提交
67
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
68
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
69
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
70

S
starlord 已提交
71
Status SqliteMetaImpl::NextTableId(std::string &table_id) {
72 73
    std::stringstream ss;
    SimpleIDGenerator g;
74
    ss << g.GetNextIDNumber();
75
    table_id = ss.str();
76 77 78
    return Status::OK();
}

S
starlord 已提交
79
Status SqliteMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
80 81
    std::stringstream ss;
    SimpleIDGenerator g;
82
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
83 84 85 86
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
87
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
88 89
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
90 91
}

S
starlord 已提交
92
Status SqliteMetaImpl::Initialize() {
X
Xu Peng 已提交
93 94
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
95
        if (!ret) {
G
groot 已提交
96
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
97
            return Status::InvalidDBPath("Failed to create db directory", options_.path);
98
        }
X
Xu Peng 已提交
99
    }
X
Xu Peng 已提交
100

101
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
102

X
Xu Peng 已提交
103
    ConnectorPtr->sync_schema();
104
    ConnectorPtr->open_forever(); // thread safe option
105
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
106

107
    CleanUp();
X
Xu Peng 已提交
108

X
Xu Peng 已提交
109
    return Status::OK();
X
Xu Peng 已提交
110 111
}

X
Xu Peng 已提交
112
// PXU TODO: Temp solution. Will fix later
S
starlord 已提交
113
Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
114
                                         const DatesT &dates) {
X
Xu Peng 已提交
115 116 117 118
    if (dates.size() == 0) {
        return Status::OK();
    }

119
    TableSchema table_schema;
G
groot 已提交
120
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
121
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
122 123 124 125
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
126 127
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
128

G
groot 已提交
129 130 131 132
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
133 134
        }

135 136 137
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
138
        ConnectorPtr->update_all(
139
            set(
G
groot 已提交
140
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
141 142
            ),
            where(
G
groot 已提交
143 144
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
145 146
            ));
    } catch (std::exception &e) {
G
groot 已提交
147
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
148
    }
G
groot 已提交
149

X
Xu Peng 已提交
150 151 152
    return Status::OK();
}

S
starlord 已提交
153
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Z
zhiru 已提交
154

G
groot 已提交
155
    try {
Y
Yu Kun 已提交
156
        server::MetricCollector metric;
G
groot 已提交
157

158 159 160
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
161 162
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
163 164 165 166
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
G
groot 已提交
167 168 169
                if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status::Error("Table already exists and it is in delete state, please wait a second");
                } else {
170 171
                    // Change from no error to already exist.
                    return Status::AlreadyExist("Table already exists");
G
groot 已提交
172
                }
G
groot 已提交
173
            }
G
groot 已提交
174
        }
G
groot 已提交
175

G
groot 已提交
176 177 178
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
179
        try {
180
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
181
            table_schema.id_ = id;
X
Xu Peng 已提交
182
        } catch (...) {
183
            ENGINE_LOG_ERROR << "sqlite transaction failed";
X
Xu Peng 已提交
184
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
185
        }
186

S
starlord 已提交
187
        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
188 189 190

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
191 192
    }

X
Xu Peng 已提交
193
    return Status::OK();
X
Xu Peng 已提交
194 195
}

S
starlord 已提交
196
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
G
groot 已提交
197
    try {
Y
Yu Kun 已提交
198
        server::MetricCollector metric;
G
groot 已提交
199

200 201 202
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
203
        //soft delete table
S
starlord 已提交
204 205 206 207 208 209 210 211
        ConnectorPtr->update_all(
                set(
                        c(&TableSchema::state_) = (int) TableSchema::TO_DELETE
                ),
                where(
                        c(&TableSchema::table_id_) == table_id and
                        c(&TableSchema::state_) != (int) TableSchema::TO_DELETE
                ));
G
groot 已提交
212

G
groot 已提交
213
    } catch (std::exception &e) {
G
groot 已提交
214
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
215 216 217 218 219
    }

    return Status::OK();
}

S
starlord 已提交
220
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
221
    try {
Y
Yu Kun 已提交
222
        server::MetricCollector metric;
G
groot 已提交
223

224 225 226
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239
        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
G
groot 已提交
240 241 242 243 244
    }

    return Status::OK();
}

S
starlord 已提交
245
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
246
    try {
Y
Yu Kun 已提交
247
        server::MetricCollector metric;
G
groot 已提交
248

G
groot 已提交
249
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
250
                                                   &TableSchema::state_,
G
groot 已提交
251
                                                   &TableSchema::dimension_,
S
starlord 已提交
252
                                                   &TableSchema::created_on_,
S
starlord 已提交
253
                                                   &TableSchema::flag_,
S
starlord 已提交
254 255 256 257
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::metric_type_),
G
groot 已提交
258 259 260
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

261
        if (groups.size() == 1) {
G
groot 已提交
262
            table_schema.id_ = std::get<0>(groups[0]);
S
starlord 已提交
263 264 265
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
S
starlord 已提交
266 267 268 269 270
            table_schema.flag_ = std::get<4>(groups[0]);
            table_schema.engine_type_ = std::get<5>(groups[0]);
            table_schema.nlist_ = std::get<6>(groups[0]);
            table_schema.index_file_size_ = std::get<7>(groups[0]);
            table_schema.metric_type_ = std::get<8>(groups[0]);
271
        } else {
G
groot 已提交
272
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
273
        }
G
groot 已提交
274

275
    } catch (std::exception &e) {
G
groot 已提交
276
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
277
    }
X
Xu Peng 已提交
278

X
Xu Peng 已提交
279
    return Status::OK();
X
Xu Peng 已提交
280 281
}

S
starlord 已提交
282
Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
P
peng.xu 已提交
283 284
    has = false;
    try {
285 286 287 288 289 290 291
        std::vector<int> file_types = {
                (int) TableFileSchema::RAW,
                (int) TableFileSchema::NEW,
                (int) TableFileSchema::NEW_MERGE,
                (int) TableFileSchema::NEW_INDEX,
                (int) TableFileSchema::TO_INDEX,
        };
292 293
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::file_type_),
294
                                             where(in(&TableFileSchema::file_type_, file_types)
P
peng.xu 已提交
295 296 297 298 299
                                                   and c(&TableFileSchema::table_id_) == table_id
                                             ));

        if (selected.size() >= 1) {
            has = true;
300 301

            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
S
starlord 已提交
302
            std::vector<std::string> file_ids;
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
            for (auto &file : selected) {
                switch (std::get<1>(file)) {
                    case (int) TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int) TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int) TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int) TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int) TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    default:
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                << " new files:" << new_count << " new_merge files:" << new_merge_count
                << " new_index files:" << new_index_count << " to_index files:" << to_index_count;
P
peng.xu 已提交
328 329 330 331 332 333 334 335
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when check non index files", e);
    }
    return Status::OK();
}

336 337
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
    try {
Y
Yu Kun 已提交
338
        server::MetricCollector metric;
339 340 341 342 343 344 345

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
S
starlord 已提交
346 347
                                                   &TableSchema::created_on_,
                                                   &TableSchema::flag_),
348 349 350 351 352 353 354 355 356 357
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));

        if(tables.size() > 0) {
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
358
            table_schema.flag_ = std::get<4>(tables[0]);
359
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
360
            table_schema.nlist_ = index.nlist_;
S
starlord 已提交
361
            table_schema.index_file_size_ = index.index_file_size_*ONE_MB;
S
starlord 已提交
362
            table_schema.metric_type_ = index.metric_type_;
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383

            ConnectorPtr->update(table_schema);
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
        return HandleException(msg, e);
    }
S
starlord 已提交
384 385 386 387 388 389

    return Status::OK();
}

Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    try {
Y
Yu Kun 已提交
390
        server::MetricCollector metric;
S
starlord 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableSchema::flag_) = flag
                ),
                where(
                        c(&TableSchema::table_id_) == table_id
                ));

    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e);
    }

406 407 408 409 410
    return Status::OK();
}

Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
    try {
Y
Yu Kun 已提交
411
        server::MetricCollector metric;
412 413 414 415 416 417 418 419 420 421

        auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
422
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
423
            index.index_file_size_ = std::get<2>(groups[0])/ONE_MB;
S
starlord 已提交
424
            index.metric_type_ = std::get<3>(groups[0]);
425 426 427 428 429 430 431 432 433 434 435 436 437
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when describe index", e);
    }

    return Status::OK();
}

Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
438
        server::MetricCollector metric;
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete index files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX
                ));

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
472
Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
473
    has_or_not = false;
474

G
groot 已提交
475
    try {
Y
Yu Kun 已提交
476
        server::MetricCollector metric;
G
groot 已提交
477
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
478 479
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
480
        if (tables.size() == 1) {
481 482 483 484
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
485

486
    } catch (std::exception &e) {
G
groot 已提交
487
        return HandleException("Encounter exception when lookup table", e);
G
groot 已提交
488
    }
G
groot 已提交
489

G
groot 已提交
490 491 492
    return Status::OK();
}

S
starlord 已提交
493
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
494
    try {
Y
Yu Kun 已提交
495
        server::MetricCollector metric;
G
groot 已提交
496

G
groot 已提交
497
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
498 499 500
                                                     &TableSchema::table_id_,
                                                     &TableSchema::dimension_,
                                                     &TableSchema::created_on_,
S
starlord 已提交
501
                                                     &TableSchema::flag_,
S
starlord 已提交
502 503 504 505
                                                     &TableSchema::engine_type_,
                                                     &TableSchema::nlist_,
                                                     &TableSchema::index_file_size_,
                                                     &TableSchema::metric_type_),
G
groot 已提交
506
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
507 508 509 510
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
S
starlord 已提交
511 512 513 514 515 516 517
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
            schema.engine_type_ = std::get<5>(table);
            schema.nlist_ = std::get<6>(table);
            schema.index_file_size_ = std::get<7>(table);
            schema.metric_type_ = std::get<8>(table);
G
groot 已提交
518 519 520

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
521

G
groot 已提交
522
    } catch (std::exception &e) {
G
groot 已提交
523
        return HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
524
    }
G
groot 已提交
525

X
Xu Peng 已提交
526
    return Status::OK();
X
Xu Peng 已提交
527 528
}

S
starlord 已提交
529
Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
530 531
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
532
    }
533
    TableSchema table_schema;
G
groot 已提交
534
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
535
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
536 537 538
    if (!status.ok()) {
        return status;
    }
539

G
groot 已提交
540
    try {
Y
Yu Kun 已提交
541
        server::MetricCollector metric;
G
groot 已提交
542 543 544

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
545 546
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
G
groot 已提交
547 548 549
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
550
        file_schema.nlist_ = table_schema.nlist_;
S
starlord 已提交
551
        file_schema.index_file_size_ = table_schema.index_file_size_;
S
starlord 已提交
552
        file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
553

554 555 556
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
557 558 559
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

S
starlord 已提交
560
        return utils::CreateTableFilePath(options_, file_schema);
561

G
groot 已提交
562 563
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
564 565
    }

X
Xu Peng 已提交
566
    return Status::OK();
X
Xu Peng 已提交
567 568
}

S
starlord 已提交
569
Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
570
    files.clear();
X
Xu Peng 已提交
571

572
    try {
Y
Yu Kun 已提交
573
        server::MetricCollector metric;
G
groot 已提交
574

G
groot 已提交
575 576 577 578
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
S
starlord 已提交
579
                                                     &TableFileSchema::file_size_,
580
                                                     &TableFileSchema::row_count_,
G
groot 已提交
581
                                                     &TableFileSchema::date_,
S
starlord 已提交
582 583
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
584
                                             where(c(&TableFileSchema::file_type_)
585
                                                       == (int) TableFileSchema::TO_INDEX));
586

587
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
588
        TableFileSchema table_file;
589

590
        for (auto &file : selected) {
G
groot 已提交
591 592 593 594
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
595 596 597 598 599
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
600

S
starlord 已提交
601
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
602
            auto groupItr = groups.find(table_file.table_id_);
603
            if (groupItr == groups.end()) {
604
                TableSchema table_schema;
G
groot 已提交
605
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
606
                auto status = DescribeTable(table_schema);
607 608 609
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
610
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
611
            }
S
starlord 已提交
612
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
613 614
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
G
groot 已提交
615
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
616
            files.push_back(table_file);
X
Xu Peng 已提交
617
        }
G
groot 已提交
618

619
    } catch (std::exception &e) {
G
groot 已提交
620
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
621
    }
X
Xu Peng 已提交
622

X
Xu Peng 已提交
623 624 625
    return Status::OK();
}

S
starlord 已提交
626
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
627 628
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
629
    files.clear();
X
Xu Peng 已提交
630

631
    try {
Y
Yu Kun 已提交
632
        server::MetricCollector metric;
G
groot 已提交
633

X
Xu Peng 已提交
634
        if (partition.empty()) {
635
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
636 637 638 639
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
640
                                                         &TableFileSchema::file_size_,
641
                                                         &TableFileSchema::row_count_,
G
groot 已提交
642 643 644
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
645
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
646

X
Xu Peng 已提交
647
            TableSchema table_schema;
G
groot 已提交
648
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
649 650 651 652
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
653

X
Xu Peng 已提交
654 655 656
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
657 658 659 660
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
661 662 663 664
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
S
starlord 已提交
665 666
                table_file.metric_type_ = table_schema.metric_type_;
                table_file.nlist_ = table_schema.nlist_;
G
groot 已提交
667
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
668
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
669
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
670
                if (dateItr == files.end()) {
G
groot 已提交
671
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
672
                }
G
groot 已提交
673
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
674 675 676
            }
        }
        else {
677
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
678 679 680 681
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
682
                                                         &TableFileSchema::file_size_,
683
                                                         &TableFileSchema::row_count_,
G
groot 已提交
684 685
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
686
                                                 where(c(&TableFileSchema::table_id_) == table_id and
687 688
                                                       in(&TableFileSchema::date_, partition) and
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
689

X
Xu Peng 已提交
690
            TableSchema table_schema;
G
groot 已提交
691
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
692 693 694 695
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
696

X
Xu Peng 已提交
697 698 699
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
700 701 702 703
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
704 705 706 707
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
S
starlord 已提交
708
                table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
709 710
                table_file.index_file_size_ = table_schema.index_file_size_;
                table_file.metric_type_ = table_schema.metric_type_;
G
groot 已提交
711
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
712

S
starlord 已提交
713
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
714
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
715
                if (dateItr == files.end()) {
G
groot 已提交
716
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
717
                }
G
groot 已提交
718
                files[table_file.date_].push_back(table_file);
719
            }
X
Xu Peng 已提交
720

X
xj.lin 已提交
721
        }
722
    } catch (std::exception &e) {
G
groot 已提交
723
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
724 725 726 727 728
    }

    return Status::OK();
}

S
starlord 已提交
729
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
X
xj.lin 已提交
730 731 732 733
                                 const std::vector<size_t> &ids,
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
    files.clear();
Y
Yu Kun 已提交
734
    server::MetricCollector metric;
X
xj.lin 已提交
735 736 737 738 739 740

    try {
        auto select_columns = columns(&TableFileSchema::id_,
                                      &TableFileSchema::table_id_,
                                      &TableFileSchema::file_id_,
                                      &TableFileSchema::file_type_,
S
starlord 已提交
741
                                      &TableFileSchema::file_size_,
742
                                      &TableFileSchema::row_count_,
X
xj.lin 已提交
743 744 745 746
                                      &TableFileSchema::date_,
                                      &TableFileSchema::engine_type_);

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
747 748 749

        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto match_type = in(&TableFileSchema::file_type_, file_type);
X
xj.lin 已提交
750 751 752 753 754 755 756 757

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) { return status; }

        decltype(ConnectorPtr->select(select_columns)) result;
        if (partition.empty() && ids.empty()) {
X
xj.lin 已提交
758
            auto filter = where(match_tableid and match_type);
X
xj.lin 已提交
759 760 761 762
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
763
            auto filter = where(match_tableid and match_fileid and match_type);
X
xj.lin 已提交
764 765 766 767
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && ids.empty()) {
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
768
            auto filter = where(match_tableid and match_date and match_type);
X
xj.lin 已提交
769 770 771 772 773
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
774
            auto filter = where(match_tableid and match_fileid and match_date and match_type);
X
xj.lin 已提交
775 776 777 778 779 780 781 782 783
            result = ConnectorPtr->select(select_columns, filter);
        }

        TableFileSchema table_file;
        for (auto &file : result) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
784 785 786 787
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
788
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
789
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
790 791 792
            table_file.index_file_size_ = table_schema.index_file_size_;
            table_file.metric_type_ = table_schema.metric_type_;

X
xj.lin 已提交
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807
            utils::GetTableFilePath(options_, table_file);
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when iterate index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
808
Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
809
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
810
    files.clear();
X
Xu Peng 已提交
811

812
    try {
Y
Yu Kun 已提交
813
        server::MetricCollector metric;
G
groot 已提交
814

S
starlord 已提交
815 816 817 818 819 820 821 822 823
        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        //get files to merge
G
groot 已提交
824 825 826 827
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
828
                                                     &TableFileSchema::file_size_,
S
starlord 已提交
829 830 831
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
832
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
833
                                                 c(&TableFileSchema::table_id_) == table_id),
834
                                             order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
835

836
        for (auto &file : selected) {
S
starlord 已提交
837 838 839 840 841 842
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }

G
groot 已提交
843 844 845 846
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
847 848 849
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
850
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
851
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
852 853 854
            table_file.index_file_size_ = table_schema.index_file_size_;
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
855
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
856
            auto dateItr = files.find(table_file.date_);
857
            if (dateItr == files.end()) {
G
groot 已提交
858
                files[table_file.date_] = TableFilesSchema();
859
            }
G
groot 已提交
860
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
861
        }
862
    } catch (std::exception &e) {
G
groot 已提交
863
        return HandleException("Encounter exception when iterate merge files", e);
X
Xu Peng 已提交
864 865 866
    }

    return Status::OK();
X
Xu Peng 已提交
867 868
}

S
starlord 已提交
869
Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
870 871
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
872
    try {
873
        table_files.clear();
Y
yu yunfeng 已提交
874 875
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
G
groot 已提交
876
                                                  &TableFileSchema::file_type_,
877 878
                                                  &TableFileSchema::file_size_,
                                                  &TableFileSchema::row_count_,
879
                                                  &TableFileSchema::date_,
S
starlord 已提交
880 881
                                                  &TableFileSchema::engine_type_,
                                                  &TableFileSchema::created_on_),
882 883
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
884
                                          ));
885 886 887 888 889 890 891 892 893 894

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
895
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
896 897 898
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
899 900 901 902
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
S
starlord 已提交
903
            file_schema.nlist_ = table_schema.nlist_;
S
starlord 已提交
904 905
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
906
            file_schema.created_on_ = std::get<7>(file);
907
            file_schema.dimension_ = table_schema.dimension_;
S
starlord 已提交
908

S
starlord 已提交
909
            utils::GetTableFilePath(options_, file_schema);
910 911

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
912 913
        }
    } catch (std::exception &e) {
914
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
915 916
    }

X
Xu Peng 已提交
917
    return Status::OK();
X
Xu Peng 已提交
918 919
}

X
Xu Peng 已提交
920
// PXU TODO: Support Swap
S
starlord 已提交
921
Status SqliteMetaImpl::Archive() {
922
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
923 924 925 926 927
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
928 929
        auto &criteria = kv.first;
        auto &limit = kv.second;
G
groot 已提交
930
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
X
Xu Peng 已提交
931
            long usecs = limit * D_SEC * US_PS;
932
            long now = utils::GetMicroSecTimeStamp();
933
            try {
934 935 936
                //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
937
                ConnectorPtr->update_all(
938
                    set(
G
groot 已提交
939
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
940 941
                    ),
                    where(
G
groot 已提交
942 943
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
944 945
                    ));
            } catch (std::exception &e) {
G
groot 已提交
946
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
947 948
            }
        }
G
groot 已提交
949
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
950
            uint64_t sum = 0;
X
Xu Peng 已提交
951
            Size(sum);
X
Xu Peng 已提交
952

G
groot 已提交
953
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
954
            DiscardFiles(to_delete);
X
Xu Peng 已提交
955 956 957 958 959 960
        }
    }

    return Status::OK();
}

S
starlord 已提交
961
Status SqliteMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
962
    result = 0;
X
Xu Peng 已提交
963
    try {
964
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
S
starlord 已提交
965 966 967
                                          where(
                                                  c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                                          ));
968 969 970
        for (auto &total_size : selected) {
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
971
            }
972
            result += (uint64_t) (*std::get<0>(total_size));
X
Xu Peng 已提交
973
        }
974

975
    } catch (std::exception &e) {
G
groot 已提交
976
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
977 978 979 980 981
    }

    return Status::OK();
}

S
starlord 已提交
982
Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
983 984 985
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
986

G
groot 已提交
987
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
988

X
Xu Peng 已提交
989
    try {
Y
Yu Kun 已提交
990
        server::MetricCollector metric;
G
groot 已提交
991

992 993 994
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
995 996
        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
997
                                                         &TableFileSchema::file_size_),
G
groot 已提交
998
                                                 where(c(&TableFileSchema::file_type_)
999
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
1000 1001
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
1002

G
groot 已提交
1003 1004
            std::vector<int> ids;
            TableFileSchema table_file;
1005

G
groot 已提交
1006 1007 1008
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
1009
                table_file.file_size_ = std::get<1>(file);
G
groot 已提交
1010 1011
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
1012 1013
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
G
groot 已提交
1014
            }
1015

G
groot 已提交
1016 1017 1018
            if (ids.size() == 0) {
                return true;
            }
1019

G
groot 已提交
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
1033
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1034 1035
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
1036

1037
    } catch (std::exception &e) {
G
groot 已提交
1038
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
1039 1040
    }

X
Xu Peng 已提交
1041
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
1042 1043
}

S
starlord 已提交
1044
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
1045
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1046
    try {
Y
Yu Kun 已提交
1047
        server::MetricCollector metric;
G
groot 已提交
1048

1049 1050 1051
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1052 1053 1054 1055 1056 1057 1058 1059 1060
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
1061
        ConnectorPtr->update(file_schema);
G
groot 已提交
1062

1063
    } catch (std::exception &e) {
G
groot 已提交
1064 1065 1066
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
1067
    }
X
Xu Peng 已提交
1068
    return Status::OK();
X
Xu Peng 已提交
1069 1070
}

S
starlord 已提交
1071
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
P
peng.xu 已提交
1072
    try {
Y
Yu Kun 已提交
1073
        server::MetricCollector metric;
1074 1075 1076 1077

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

P
peng.xu 已提交
1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092
        ConnectorPtr->update_all(
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
            ),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
            ));
    } catch (std::exception &e) {
        return HandleException("Encounter exception when update table files to to_index", e);
    }

    return Status::OK();
}

S
starlord 已提交
1093
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
1094
    try {
Y
Yu Kun 已提交
1095
        server::MetricCollector metric;
G
groot 已提交
1096

1097 1098 1099
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

1115 1116
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
1117 1118 1119 1120
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
1121
                file.updated_time_ = utils::GetMicroSecTimeStamp();
1122 1123 1124 1125
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
1126

1127
        if (!commited) {
1128
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1129
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
1130
        }
G
groot 已提交
1131

1132
    } catch (std::exception &e) {
G
groot 已提交
1133
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
1134
    }
1135 1136 1137
    return Status::OK();
}

S
starlord 已提交
1138
Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
1139
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1140 1141 1142
    std::set<std::string> table_ids;

    //remove to_delete files
1143
    try {
Y
Yu Kun 已提交
1144
        server::MetricCollector metric;
1145

1146 1147 1148
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
1159

G
groot 已提交
1160 1161 1162 1163 1164 1165 1166 1167
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1168
                utils::DeleteTableFilePath(options_, table_file);
1169
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1170 1171
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1172
                table_ids.insert(table_file.table_id_);
1173
            }
G
groot 已提交
1174 1175 1176 1177
            return true;
        });

        if (!commited) {
1178
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1179 1180 1181 1182 1183 1184 1185
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

S
starlord 已提交
1186
    //remove to_delete tables
G
groot 已提交
1187
    try {
Y
Yu Kun 已提交
1188
        server::MetricCollector metric;
G
groot 已提交
1189

1190 1191 1192
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1193 1194 1195 1196 1197 1198
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
S
starlord 已提交
1199
                utils::DeleteTablePath(options_, std::get<1>(table), false);//only delete empty folder
G
groot 已提交
1200
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1201
            }
G
groot 已提交
1202 1203 1204 1205 1206

            return true;
        });

        if (!commited) {
1207
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1208
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1209
        }
G
groot 已提交
1210

1211
    } catch (std::exception &e) {
G
groot 已提交
1212
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
1213 1214
    }

S
starlord 已提交
1215 1216 1217
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1218
        server::MetricCollector metric;
S
starlord 已提交
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231

        for(auto& table_id : table_ids) {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
            if(selected.size() == 0) {
                utils::DeleteTablePath(options_, table_id);
            }
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table folder", e);
    }

X
Xu Peng 已提交
1232 1233 1234
    return Status::OK();
}

S
starlord 已提交
1235
Status SqliteMetaImpl::CleanUp() {
1236
    try {
Y
Yu Kun 已提交
1237
        server::MetricCollector metric;
1238 1239 1240 1241

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1242 1243
        std::vector<int> file_type = {(int) TableFileSchema::NEW, (int) TableFileSchema::NEW_INDEX, (int) TableFileSchema::NEW_MERGE};
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_type)));
1244

G
groot 已提交
1245 1246 1247 1248
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
1249
            }
G
groot 已提交
1250 1251 1252 1253
            return true;
        });

        if (!commited) {
1254
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1255
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1256
        }
G
groot 已提交
1257

1258
    } catch (std::exception &e) {
G
groot 已提交
1259
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
1260 1261 1262 1263 1264
    }

    return Status::OK();
}

S
starlord 已提交
1265
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
1266

1267
    try {
Y
Yu Kun 已提交
1268
        server::MetricCollector metric;
1269

1270 1271 1272
        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
                                             where(in(&TableFileSchema::file_type_, file_type)
G
groot 已提交
1273
                                                   and c(&TableFileSchema::table_id_) == table_id));
1274

1275
        TableSchema table_schema;
G
groot 已提交
1276
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1277
        auto status = DescribeTable(table_schema);
1278

1279 1280 1281 1282 1283
        if (!status.ok()) {
            return status;
        }

        result = 0;
1284
        for (auto &file : selected) {
1285 1286
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
1287

1288
    } catch (std::exception &e) {
G
groot 已提交
1289
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
1290 1291 1292 1293
    }
    return Status::OK();
}

S
starlord 已提交
1294
Status SqliteMetaImpl::DropAll() {
X
Xu Peng 已提交
1295 1296
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
1297 1298 1299 1300
    }
    return Status::OK();
}

S
starlord 已提交
1301
SqliteMetaImpl::~SqliteMetaImpl() {
1302
    CleanUp();
X
Xu Peng 已提交
1303 1304
}

1305
} // namespace meta
X
Xu Peng 已提交
1306
} // namespace engine
J
jinhai 已提交
1307
} // namespace milvus
X
Xu Peng 已提交
1308
} // namespace zilliz