SqliteMetaImpl.cpp 52.5 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
S
starlord 已提交
6 7 8 9
#include "SqliteMetaImpl.h"
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "db/Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
S
starlord 已提交
11
#include "db/Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36 37 38
}

}

39
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
40
    return make_storage(path,
G
groot 已提交
41
                        make_table("Tables",
G
groot 已提交
42 43
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
44
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
45 46
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
S
starlord 已提交
47
                                   make_column("flag", &TableSchema::flag_, default_value(0)),
48
                                   make_column("index_file_size", &TableSchema::index_file_size_),
G
groot 已提交
49
                                   make_column("engine_type", &TableSchema::engine_type_),
50 51
                                   make_column("nlist", &TableSchema::nlist_),
                                   make_column("metric_type", &TableSchema::metric_type_)),
G
groot 已提交
52
                        make_table("TableFiles",
G
groot 已提交
53 54
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
55
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
56 57
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
58 59
                                   make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
                                   make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
G
groot 已提交
60 61 62
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
63
    );
X
Xu Peng 已提交
64 65 66

}

X
Xu Peng 已提交
67
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
68
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
69
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
70

S
starlord 已提交
71
Status SqliteMetaImpl::NextTableId(std::string &table_id) {
72 73
    std::stringstream ss;
    SimpleIDGenerator g;
74
    ss << g.GetNextIDNumber();
75
    table_id = ss.str();
76 77 78
    return Status::OK();
}

S
starlord 已提交
79
Status SqliteMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
80 81
    std::stringstream ss;
    SimpleIDGenerator g;
82
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
83 84 85 86
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
87
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
88 89
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
90 91
}

S
starlord 已提交
92
Status SqliteMetaImpl::Initialize() {
X
Xu Peng 已提交
93 94
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
95
        if (!ret) {
G
groot 已提交
96
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
97
            return Status::InvalidDBPath("Failed to create db directory", options_.path);
98
        }
X
Xu Peng 已提交
99
    }
X
Xu Peng 已提交
100

101
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
102

X
Xu Peng 已提交
103
    ConnectorPtr->sync_schema();
104
    ConnectorPtr->open_forever(); // thread safe option
105
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
106

107
    CleanUp();
X
Xu Peng 已提交
108

X
Xu Peng 已提交
109
    return Status::OK();
X
Xu Peng 已提交
110 111
}

X
Xu Peng 已提交
112
// PXU TODO: Temp solution. Will fix later
S
starlord 已提交
113
Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
114
                                         const DatesT &dates) {
X
Xu Peng 已提交
115 116 117 118
    if (dates.size() == 0) {
        return Status::OK();
    }

119
    TableSchema table_schema;
G
groot 已提交
120
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
121
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
122 123 124 125
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
126 127
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
128

G
groot 已提交
129 130 131 132
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
133 134
        }

135 136 137
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
138
        ConnectorPtr->update_all(
139
            set(
G
groot 已提交
140
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
141 142
            ),
            where(
G
groot 已提交
143 144
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
145 146
            ));
    } catch (std::exception &e) {
G
groot 已提交
147
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
148
    }
G
groot 已提交
149

X
Xu Peng 已提交
150 151 152
    return Status::OK();
}

S
starlord 已提交
153
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Z
zhiru 已提交
154

G
groot 已提交
155
    try {
Y
Yu Kun 已提交
156
        server::MetricCollector metric;
G
groot 已提交
157

158 159 160
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
161 162
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
163 164 165 166
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
G
groot 已提交
167 168 169
                if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status::Error("Table already exists and it is in delete state, please wait a second");
                } else {
170 171
                    // Change from no error to already exist.
                    return Status::AlreadyExist("Table already exists");
G
groot 已提交
172
                }
G
groot 已提交
173
            }
G
groot 已提交
174
        }
G
groot 已提交
175

G
groot 已提交
176 177 178
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
179
        try {
180
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
181
            table_schema.id_ = id;
X
Xu Peng 已提交
182
        } catch (...) {
183
            ENGINE_LOG_ERROR << "sqlite transaction failed";
X
Xu Peng 已提交
184
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
185
        }
186

S
starlord 已提交
187
        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
188 189 190

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
191 192
    }

X
Xu Peng 已提交
193
    return Status::OK();
X
Xu Peng 已提交
194 195
}

S
starlord 已提交
196
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
G
groot 已提交
197
    try {
Y
Yu Kun 已提交
198
        server::MetricCollector metric;
G
groot 已提交
199

200 201 202
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
203
        //soft delete table
S
starlord 已提交
204 205 206 207 208 209 210 211
        ConnectorPtr->update_all(
                set(
                        c(&TableSchema::state_) = (int) TableSchema::TO_DELETE
                ),
                where(
                        c(&TableSchema::table_id_) == table_id and
                        c(&TableSchema::state_) != (int) TableSchema::TO_DELETE
                ));
G
groot 已提交
212

G
groot 已提交
213
    } catch (std::exception &e) {
G
groot 已提交
214
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
215 216 217 218 219
    }

    return Status::OK();
}

S
starlord 已提交
220
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
221
    try {
Y
Yu Kun 已提交
222
        server::MetricCollector metric;
G
groot 已提交
223

224 225 226
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239
        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
G
groot 已提交
240 241 242 243 244
    }

    return Status::OK();
}

S
starlord 已提交
245
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
246
    try {
Y
Yu Kun 已提交
247
        server::MetricCollector metric;
G
groot 已提交
248

G
groot 已提交
249
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
250
                                                   &TableSchema::state_,
G
groot 已提交
251
                                                   &TableSchema::dimension_,
S
starlord 已提交
252
                                                   &TableSchema::created_on_,
S
starlord 已提交
253
                                                   &TableSchema::flag_,
254
                                                   &TableSchema::index_file_size_,
S
starlord 已提交
255 256 257
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_),
G
groot 已提交
258 259 260
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

261
        if (groups.size() == 1) {
G
groot 已提交
262
            table_schema.id_ = std::get<0>(groups[0]);
S
starlord 已提交
263 264 265
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
S
starlord 已提交
266
            table_schema.flag_ = std::get<4>(groups[0]);
267 268 269
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
            table_schema.nlist_ = std::get<7>(groups[0]);
S
starlord 已提交
270
            table_schema.metric_type_ = std::get<8>(groups[0]);
271
        } else {
G
groot 已提交
272
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
273
        }
G
groot 已提交
274

275
    } catch (std::exception &e) {
G
groot 已提交
276
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
277
    }
X
Xu Peng 已提交
278

X
Xu Peng 已提交
279
    return Status::OK();
X
Xu Peng 已提交
280 281
}

S
starlord 已提交
282
Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
P
peng.xu 已提交
283 284
    has = false;
    try {
285 286 287 288 289 290 291
        std::vector<int> file_types = {
                (int) TableFileSchema::RAW,
                (int) TableFileSchema::NEW,
                (int) TableFileSchema::NEW_MERGE,
                (int) TableFileSchema::NEW_INDEX,
                (int) TableFileSchema::TO_INDEX,
        };
292 293
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::file_type_),
294
                                             where(in(&TableFileSchema::file_type_, file_types)
P
peng.xu 已提交
295 296 297 298 299
                                                   and c(&TableFileSchema::table_id_) == table_id
                                             ));

        if (selected.size() >= 1) {
            has = true;
300 301

            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
S
starlord 已提交
302
            std::vector<std::string> file_ids;
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
            for (auto &file : selected) {
                switch (std::get<1>(file)) {
                    case (int) TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int) TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int) TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int) TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int) TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    default:
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                << " new files:" << new_count << " new_merge files:" << new_merge_count
                << " new_index files:" << new_index_count << " to_index files:" << to_index_count;
P
peng.xu 已提交
328 329 330 331 332 333 334 335
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when check non index files", e);
    }
    return Status::OK();
}

336 337
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
    try {
Y
Yu Kun 已提交
338
        server::MetricCollector metric;
339 340 341 342 343 344 345

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
S
starlord 已提交
346
                                                   &TableSchema::created_on_,
347 348
                                                   &TableSchema::flag_,
                                                   &TableSchema::index_file_size_),
349 350 351 352 353 354 355 356 357 358
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));

        if(tables.size() > 0) {
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
359
            table_schema.flag_ = std::get<4>(tables[0]);
360
            table_schema.index_file_size_ = std::get<5>(tables[0]);
361
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
362 363
            table_schema.nlist_ = index.nlist_;
            table_schema.metric_type_ = index.metric_type_;
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384

            ConnectorPtr->update(table_schema);
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
        return HandleException(msg, e);
    }
S
starlord 已提交
385 386 387 388 389 390

    return Status::OK();
}

Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    try {
Y
Yu Kun 已提交
391
        server::MetricCollector metric;
S
starlord 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableSchema::flag_) = flag
                ),
                where(
                        c(&TableSchema::table_id_) == table_id
                ));

    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
        return HandleException(msg, e);
    }

407 408 409 410 411
    return Status::OK();
}

Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
    try {
Y
Yu Kun 已提交
412
        server::MetricCollector metric;
413 414 415 416 417 418 419 420 421

        auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
422
            index.nlist_ = std::get<1>(groups[0]);
Y
Yu Kun 已提交
423
            index.metric_type_ = std::get<3>(groups[0]);
424 425 426 427 428 429 430 431 432 433 434 435 436
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when describe index", e);
    }

    return Status::OK();
}

Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
437
        server::MetricCollector metric;
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete index files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX
                ));

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
471
Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
472
    has_or_not = false;
473

G
groot 已提交
474
    try {
Y
Yu Kun 已提交
475
        server::MetricCollector metric;
G
groot 已提交
476
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
477 478
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
479
        if (tables.size() == 1) {
480 481 482 483
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
484

485
    } catch (std::exception &e) {
G
groot 已提交
486
        return HandleException("Encounter exception when lookup table", e);
G
groot 已提交
487
    }
G
groot 已提交
488

G
groot 已提交
489 490 491
    return Status::OK();
}

S
starlord 已提交
492
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
493
    try {
Y
Yu Kun 已提交
494
        server::MetricCollector metric;
G
groot 已提交
495

G
groot 已提交
496
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
497 498 499
                                                     &TableSchema::table_id_,
                                                     &TableSchema::dimension_,
                                                     &TableSchema::created_on_,
S
starlord 已提交
500
                                                     &TableSchema::flag_,
501
                                                     &TableSchema::index_file_size_,
S
starlord 已提交
502 503 504
                                                     &TableSchema::engine_type_,
                                                     &TableSchema::nlist_,
                                                     &TableSchema::metric_type_),
G
groot 已提交
505
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
506 507 508 509
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
S
starlord 已提交
510 511 512
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
513 514 515
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
            schema.nlist_ = std::get<7>(table);
S
starlord 已提交
516
            schema.metric_type_ = std::get<8>(table);
G
groot 已提交
517 518 519

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
520

G
groot 已提交
521
    } catch (std::exception &e) {
G
groot 已提交
522
        return HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
523
    }
G
groot 已提交
524

X
Xu Peng 已提交
525
    return Status::OK();
X
Xu Peng 已提交
526 527
}

S
starlord 已提交
528
Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
529 530
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
531
    }
532
    TableSchema table_schema;
G
groot 已提交
533
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
534
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
535 536 537
    if (!status.ok()) {
        return status;
    }
538

G
groot 已提交
539
    try {
Y
Yu Kun 已提交
540
        server::MetricCollector metric;
G
groot 已提交
541 542 543

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
544 545
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
G
groot 已提交
546 547
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
548
        file_schema.index_file_size_ = table_schema.index_file_size_;
G
groot 已提交
549
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
550 551
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
552

553 554 555
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
556 557 558
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

S
starlord 已提交
559
        return utils::CreateTableFilePath(options_, file_schema);
560

G
groot 已提交
561 562
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
563 564
    }

X
Xu Peng 已提交
565
    return Status::OK();
X
Xu Peng 已提交
566 567
}

S
starlord 已提交
568
Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
569
    files.clear();
X
Xu Peng 已提交
570

571
    try {
Y
Yu Kun 已提交
572
        server::MetricCollector metric;
G
groot 已提交
573

G
groot 已提交
574 575 576 577
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
S
starlord 已提交
578
                                                     &TableFileSchema::file_size_,
579
                                                     &TableFileSchema::row_count_,
G
groot 已提交
580
                                                     &TableFileSchema::date_,
S
starlord 已提交
581 582
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
583
                                             where(c(&TableFileSchema::file_type_)
584
                                                       == (int) TableFileSchema::TO_INDEX));
585

586
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
587
        TableFileSchema table_file;
588

589
        for (auto &file : selected) {
G
groot 已提交
590 591 592 593
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
594 595 596 597 598
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
599

S
starlord 已提交
600
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
601
            auto groupItr = groups.find(table_file.table_id_);
602
            if (groupItr == groups.end()) {
603
                TableSchema table_schema;
G
groot 已提交
604
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
605
                auto status = DescribeTable(table_schema);
606 607 608
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
609
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
610
            }
611
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
612
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
613
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
614
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
X
Xu Peng 已提交
615
            files.push_back(table_file);
X
Xu Peng 已提交
616
        }
G
groot 已提交
617

618
    } catch (std::exception &e) {
G
groot 已提交
619
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
620
    }
X
Xu Peng 已提交
621

X
Xu Peng 已提交
622 623 624
    return Status::OK();
}

S
starlord 已提交
625
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
626 627
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
628
    files.clear();
X
Xu Peng 已提交
629

630
    try {
Y
Yu Kun 已提交
631
        server::MetricCollector metric;
G
groot 已提交
632

X
Xu Peng 已提交
633
        if (partition.empty()) {
634
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
635 636 637 638
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
639
                                                         &TableFileSchema::file_size_,
640
                                                         &TableFileSchema::row_count_,
G
groot 已提交
641 642 643
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
644
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
645

X
Xu Peng 已提交
646
            TableSchema table_schema;
G
groot 已提交
647
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
648 649 650 651
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
652

X
Xu Peng 已提交
653 654 655
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
656 657 658 659
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
660 661 662 663
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
G
groot 已提交
664
                table_file.dimension_ = table_schema.dimension_;
665 666 667 668
                table_file.index_file_size_ = table_schema.index_file_size_;
                table_file.nlist_ = table_schema.nlist_;
                table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
669
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
670
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
671
                if (dateItr == files.end()) {
G
groot 已提交
672
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
673
                }
G
groot 已提交
674
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
675 676 677
            }
        }
        else {
678
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
679 680 681 682
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
683
                                                         &TableFileSchema::file_size_,
684
                                                         &TableFileSchema::row_count_,
G
groot 已提交
685 686
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
687
                                                 where(c(&TableFileSchema::table_id_) == table_id and
688 689
                                                       in(&TableFileSchema::date_, partition) and
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
690

X
Xu Peng 已提交
691
            TableSchema table_schema;
G
groot 已提交
692
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
693 694 695 696
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
697

X
Xu Peng 已提交
698 699 700
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
701 702 703 704
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
705 706 707 708
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
709
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
710
                table_file.index_file_size_ = table_schema.index_file_size_;
711
                table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
712 713
                table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
714
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
715
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
716
                if (dateItr == files.end()) {
G
groot 已提交
717
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
718
                }
G
groot 已提交
719
                files[table_file.date_].push_back(table_file);
720
            }
X
Xu Peng 已提交
721

X
xj.lin 已提交
722
        }
723
    } catch (std::exception &e) {
G
groot 已提交
724
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
725 726 727 728 729
    }

    return Status::OK();
}

S
starlord 已提交
730
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
X
xj.lin 已提交
731 732 733 734
                                 const std::vector<size_t> &ids,
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
    files.clear();
Y
Yu Kun 已提交
735
    server::MetricCollector metric;
X
xj.lin 已提交
736 737 738 739 740 741

    try {
        auto select_columns = columns(&TableFileSchema::id_,
                                      &TableFileSchema::table_id_,
                                      &TableFileSchema::file_id_,
                                      &TableFileSchema::file_type_,
S
starlord 已提交
742
                                      &TableFileSchema::file_size_,
743
                                      &TableFileSchema::row_count_,
X
xj.lin 已提交
744 745 746 747
                                      &TableFileSchema::date_,
                                      &TableFileSchema::engine_type_);

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
748 749 750

        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto match_type = in(&TableFileSchema::file_type_, file_type);
X
xj.lin 已提交
751 752 753 754 755 756 757 758

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) { return status; }

        decltype(ConnectorPtr->select(select_columns)) result;
        if (partition.empty() && ids.empty()) {
X
xj.lin 已提交
759
            auto filter = where(match_tableid and match_type);
X
xj.lin 已提交
760 761 762 763
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
764
            auto filter = where(match_tableid and match_fileid and match_type);
X
xj.lin 已提交
765 766 767 768
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && ids.empty()) {
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
769
            auto filter = where(match_tableid and match_date and match_type);
X
xj.lin 已提交
770 771 772 773 774
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
775
            auto filter = where(match_tableid and match_fileid and match_date and match_type);
X
xj.lin 已提交
776 777 778 779 780 781 782 783 784
            result = ConnectorPtr->select(select_columns, filter);
        }

        TableFileSchema table_file;
        for (auto &file : result) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
785 786 787 788
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
789
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
790
            table_file.index_file_size_ = table_schema.index_file_size_;
791
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
792 793
            table_file.metric_type_ = table_schema.metric_type_;

X
xj.lin 已提交
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
            utils::GetTableFilePath(options_, table_file);
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when iterate index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
809
Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
810
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
811
    files.clear();
X
Xu Peng 已提交
812

813
    try {
Y
Yu Kun 已提交
814
        server::MetricCollector metric;
G
groot 已提交
815

S
starlord 已提交
816 817 818 819 820 821 822 823 824
        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        //get files to merge
G
groot 已提交
825 826 827 828
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
829
                                                     &TableFileSchema::file_size_,
S
starlord 已提交
830 831 832
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
833
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
834
                                                 c(&TableFileSchema::table_id_) == table_id),
835
                                             order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
836

837
        for (auto &file : selected) {
S
starlord 已提交
838 839 840 841 842 843
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }

G
groot 已提交
844 845 846 847
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
848 849 850
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
851
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
852
            table_file.index_file_size_ = table_schema.index_file_size_;
853
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
854 855
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
856
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
857
            auto dateItr = files.find(table_file.date_);
858
            if (dateItr == files.end()) {
G
groot 已提交
859
                files[table_file.date_] = TableFilesSchema();
860
            }
G
groot 已提交
861
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
862
        }
863
    } catch (std::exception &e) {
G
groot 已提交
864
        return HandleException("Encounter exception when iterate merge files", e);
X
Xu Peng 已提交
865 866 867
    }

    return Status::OK();
X
Xu Peng 已提交
868 869
}

S
starlord 已提交
870
Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
871 872
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
873
    try {
874
        table_files.clear();
Y
yu yunfeng 已提交
875 876
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
G
groot 已提交
877
                                                  &TableFileSchema::file_type_,
878 879
                                                  &TableFileSchema::file_size_,
                                                  &TableFileSchema::row_count_,
880
                                                  &TableFileSchema::date_,
S
starlord 已提交
881 882
                                                  &TableFileSchema::engine_type_,
                                                  &TableFileSchema::created_on_),
883 884
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
885
                                          ));
886 887 888 889 890 891 892 893 894 895

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
896
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
897 898 899
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
900 901 902 903
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
S
starlord 已提交
904
            file_schema.created_on_ = std::get<7>(file);
905
            file_schema.dimension_ = table_schema.dimension_;
906 907 908
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
909

S
starlord 已提交
910
            utils::GetTableFilePath(options_, file_schema);
911 912

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
913 914
        }
    } catch (std::exception &e) {
915
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
916 917
    }

X
Xu Peng 已提交
918
    return Status::OK();
X
Xu Peng 已提交
919 920
}

X
Xu Peng 已提交
921
// PXU TODO: Support Swap
S
starlord 已提交
922
Status SqliteMetaImpl::Archive() {
923
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
924 925 926 927 928
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
929 930
        auto &criteria = kv.first;
        auto &limit = kv.second;
G
groot 已提交
931
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
X
Xu Peng 已提交
932
            long usecs = limit * D_SEC * US_PS;
933
            long now = utils::GetMicroSecTimeStamp();
934
            try {
935 936 937
                //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
938
                ConnectorPtr->update_all(
939
                    set(
G
groot 已提交
940
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
941 942
                    ),
                    where(
G
groot 已提交
943 944
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
945 946
                    ));
            } catch (std::exception &e) {
G
groot 已提交
947
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
948 949
            }
        }
G
groot 已提交
950
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
951
            uint64_t sum = 0;
X
Xu Peng 已提交
952
            Size(sum);
X
Xu Peng 已提交
953

G
groot 已提交
954
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
955
            DiscardFiles(to_delete);
X
Xu Peng 已提交
956 957 958 959 960 961
        }
    }

    return Status::OK();
}

S
starlord 已提交
962
Status SqliteMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
963
    result = 0;
X
Xu Peng 已提交
964
    try {
965
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
S
starlord 已提交
966 967 968
                                          where(
                                                  c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                                          ));
969 970 971
        for (auto &total_size : selected) {
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
972
            }
973
            result += (uint64_t) (*std::get<0>(total_size));
X
Xu Peng 已提交
974
        }
975

976
    } catch (std::exception &e) {
G
groot 已提交
977
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
978 979 980 981 982
    }

    return Status::OK();
}

S
starlord 已提交
983
Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
984 985 986
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
987

G
groot 已提交
988
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
989

X
Xu Peng 已提交
990
    try {
Y
Yu Kun 已提交
991
        server::MetricCollector metric;
G
groot 已提交
992

993 994 995
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
996 997
        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
998
                                                         &TableFileSchema::file_size_),
G
groot 已提交
999
                                                 where(c(&TableFileSchema::file_type_)
1000
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
1001 1002
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
1003

G
groot 已提交
1004 1005
            std::vector<int> ids;
            TableFileSchema table_file;
1006

G
groot 已提交
1007 1008 1009
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
1010
                table_file.file_size_ = std::get<1>(file);
G
groot 已提交
1011 1012
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
1013 1014
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
G
groot 已提交
1015
            }
1016

G
groot 已提交
1017 1018 1019
            if (ids.size() == 0) {
                return true;
            }
1020

G
groot 已提交
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
1034
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1035 1036
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
1037

1038
    } catch (std::exception &e) {
G
groot 已提交
1039
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
1040 1041
    }

X
Xu Peng 已提交
1042
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
1043 1044
}

S
starlord 已提交
1045
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
1046
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1047
    try {
Y
Yu Kun 已提交
1048
        server::MetricCollector metric;
G
groot 已提交
1049

1050 1051 1052
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1053 1054 1055 1056 1057 1058 1059 1060 1061
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
1062
        ConnectorPtr->update(file_schema);
G
groot 已提交
1063

1064
    } catch (std::exception &e) {
G
groot 已提交
1065 1066 1067
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
1068
    }
X
Xu Peng 已提交
1069
    return Status::OK();
X
Xu Peng 已提交
1070 1071
}

S
starlord 已提交
1072
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
P
peng.xu 已提交
1073
    try {
Y
Yu Kun 已提交
1074
        server::MetricCollector metric;
1075 1076 1077 1078

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

P
peng.xu 已提交
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
        ConnectorPtr->update_all(
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
            ),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
            ));
    } catch (std::exception &e) {
        return HandleException("Encounter exception when update table files to to_index", e);
    }

    return Status::OK();
}

S
starlord 已提交
1094
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
1095
    try {
Y
Yu Kun 已提交
1096
        server::MetricCollector metric;
G
groot 已提交
1097

1098 1099 1100
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

1116 1117
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
1118 1119 1120 1121
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
1122
                file.updated_time_ = utils::GetMicroSecTimeStamp();
1123 1124 1125 1126
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
1127

1128
        if (!commited) {
1129
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1130
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
1131
        }
G
groot 已提交
1132

1133
    } catch (std::exception &e) {
G
groot 已提交
1134
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
1135
    }
1136 1137 1138
    return Status::OK();
}

S
starlord 已提交
1139
Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
1140
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1141 1142 1143
    std::set<std::string> table_ids;

    //remove to_delete files
1144
    try {
Y
Yu Kun 已提交
1145
        server::MetricCollector metric;
1146

1147 1148 1149
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
1160

G
groot 已提交
1161 1162 1163 1164 1165 1166 1167 1168
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1169
                utils::DeleteTableFilePath(options_, table_file);
1170
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1171 1172
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1173
                table_ids.insert(table_file.table_id_);
1174
            }
G
groot 已提交
1175 1176 1177 1178
            return true;
        });

        if (!commited) {
1179
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1180 1181 1182 1183 1184 1185 1186
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

S
starlord 已提交
1187
    //remove to_delete tables
G
groot 已提交
1188
    try {
Y
Yu Kun 已提交
1189
        server::MetricCollector metric;
G
groot 已提交
1190

1191 1192 1193
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1194 1195 1196 1197 1198 1199
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
S
starlord 已提交
1200
                utils::DeleteTablePath(options_, std::get<1>(table), false);//only delete empty folder
G
groot 已提交
1201
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1202
            }
G
groot 已提交
1203 1204 1205 1206 1207

            return true;
        });

        if (!commited) {
1208
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1209
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1210
        }
G
groot 已提交
1211

1212
    } catch (std::exception &e) {
G
groot 已提交
1213
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
1214 1215
    }

S
starlord 已提交
1216 1217 1218
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1219
        server::MetricCollector metric;
S
starlord 已提交
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232

        for(auto& table_id : table_ids) {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
            if(selected.size() == 0) {
                utils::DeleteTablePath(options_, table_id);
            }
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table folder", e);
    }

X
Xu Peng 已提交
1233 1234 1235
    return Status::OK();
}

S
starlord 已提交
1236
Status SqliteMetaImpl::CleanUp() {
1237
    try {
Y
Yu Kun 已提交
1238
        server::MetricCollector metric;
1239 1240 1241 1242

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1243 1244
        std::vector<int> file_type = {(int) TableFileSchema::NEW, (int) TableFileSchema::NEW_INDEX, (int) TableFileSchema::NEW_MERGE};
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_type)));
1245

G
groot 已提交
1246 1247 1248 1249
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
1250
            }
G
groot 已提交
1251 1252 1253 1254
            return true;
        });

        if (!commited) {
1255
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1256
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1257
        }
G
groot 已提交
1258

1259
    } catch (std::exception &e) {
G
groot 已提交
1260
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
1261 1262 1263 1264 1265
    }

    return Status::OK();
}

S
starlord 已提交
1266
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
1267

1268
    try {
Y
Yu Kun 已提交
1269
        server::MetricCollector metric;
1270

1271 1272 1273
        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
                                             where(in(&TableFileSchema::file_type_, file_type)
G
groot 已提交
1274
                                                   and c(&TableFileSchema::table_id_) == table_id));
1275

1276
        TableSchema table_schema;
G
groot 已提交
1277
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1278
        auto status = DescribeTable(table_schema);
1279

1280 1281 1282 1283 1284
        if (!status.ok()) {
            return status;
        }

        result = 0;
1285
        for (auto &file : selected) {
1286 1287
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
1288

1289
    } catch (std::exception &e) {
G
groot 已提交
1290
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
1291 1292 1293 1294
    }
    return Status::OK();
}

S
starlord 已提交
1295
Status SqliteMetaImpl::DropAll() {
X
Xu Peng 已提交
1296 1297
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
1298 1299 1300 1301
    }
    return Status::OK();
}

S
starlord 已提交
1302
SqliteMetaImpl::~SqliteMetaImpl() {
1303
    CleanUp();
X
Xu Peng 已提交
1304 1305
}

1306
} // namespace meta
X
Xu Peng 已提交
1307
} // namespace engine
J
jinhai 已提交
1308
} // namespace milvus
X
Xu Peng 已提交
1309
} // namespace zilliz