SqliteMetaImpl.cpp 51.3 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
S
starlord 已提交
6 7 8 9
#include "SqliteMetaImpl.h"
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "db/Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
S
starlord 已提交
11
#include "db/Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36
}

G
groot 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
class MetricCollector {
public:
    MetricCollector() {
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
        start_time_ = METRICS_NOW_TIME;
    }

    ~MetricCollector() {
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
    }

private:
    using TIME_POINT = std::chrono::system_clock::time_point;
    TIME_POINT start_time_;
};

G
groot 已提交
55 56
}

57
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
58
    return make_storage(path,
G
groot 已提交
59
                        make_table("Tables",
G
groot 已提交
60 61
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
62
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
63 64 65
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("engine_type", &TableSchema::engine_type_),
66 67 68
                                   make_column("nlist", &TableSchema::nlist_),
                                   make_column("index_file_size", &TableSchema::index_file_size_),
                                   make_column("metric_type", &TableSchema::metric_type_)),
G
groot 已提交
69
                        make_table("TableFiles",
G
groot 已提交
70 71
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
72
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
73 74
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
75 76
                                   make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
                                   make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
G
groot 已提交
77 78 79
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
80
    );
X
Xu Peng 已提交
81 82 83

}

X
Xu Peng 已提交
84
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
85
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
86
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
87

S
starlord 已提交
88
Status SqliteMetaImpl::NextTableId(std::string &table_id) {
89 90
    std::stringstream ss;
    SimpleIDGenerator g;
91
    ss << g.GetNextIDNumber();
92
    table_id = ss.str();
93 94 95
    return Status::OK();
}

S
starlord 已提交
96
Status SqliteMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
97 98
    std::stringstream ss;
    SimpleIDGenerator g;
99
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
100 101 102 103
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
104
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
105 106
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
107 108
}

S
starlord 已提交
109
Status SqliteMetaImpl::Initialize() {
X
Xu Peng 已提交
110 111
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
112
        if (!ret) {
G
groot 已提交
113
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
114
            return Status::InvalidDBPath("Failed to create db directory", options_.path);
115
        }
X
Xu Peng 已提交
116
    }
X
Xu Peng 已提交
117

118
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
119

X
Xu Peng 已提交
120
    ConnectorPtr->sync_schema();
121
    ConnectorPtr->open_forever(); // thread safe option
122
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
123

124
    CleanUp();
X
Xu Peng 已提交
125

X
Xu Peng 已提交
126
    return Status::OK();
X
Xu Peng 已提交
127 128
}

X
Xu Peng 已提交
129
// PXU TODO: Temp solution. Will fix later
S
starlord 已提交
130
Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
131
                                         const DatesT &dates) {
X
Xu Peng 已提交
132 133 134 135
    if (dates.size() == 0) {
        return Status::OK();
    }

136
    TableSchema table_schema;
G
groot 已提交
137
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
138
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
139 140 141 142
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
143 144
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
145

G
groot 已提交
146 147 148 149
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
150 151
        }

152 153 154
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
155
        ConnectorPtr->update_all(
156
            set(
G
groot 已提交
157
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
158 159
            ),
            where(
G
groot 已提交
160 161
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
162 163
            ));
    } catch (std::exception &e) {
G
groot 已提交
164
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
165
    }
G
groot 已提交
166

X
Xu Peng 已提交
167 168 169
    return Status::OK();
}

S
starlord 已提交
170
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Z
zhiru 已提交
171

G
groot 已提交
172 173 174
    try {
        MetricCollector metric;

175 176 177
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
178 179
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
180 181 182 183
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
G
groot 已提交
184 185 186
                if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status::Error("Table already exists and it is in delete state, please wait a second");
                } else {
187 188
                    // Change from no error to already exist.
                    return Status::AlreadyExist("Table already exists");
G
groot 已提交
189
                }
G
groot 已提交
190
            }
G
groot 已提交
191
        }
G
groot 已提交
192

G
groot 已提交
193 194 195
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
196
        try {
197
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
198
            table_schema.id_ = id;
X
Xu Peng 已提交
199
        } catch (...) {
200
            ENGINE_LOG_ERROR << "sqlite transaction failed";
X
Xu Peng 已提交
201
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
202
        }
203

S
starlord 已提交
204
        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
205 206 207

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
208 209
    }

X
Xu Peng 已提交
210
    return Status::OK();
X
Xu Peng 已提交
211 212
}

S
starlord 已提交
213
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
G
groot 已提交
214
    try {
G
groot 已提交
215 216
        MetricCollector metric;

217 218 219
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
220
        //soft delete table
S
starlord 已提交
221 222 223 224 225 226 227 228
        ConnectorPtr->update_all(
                set(
                        c(&TableSchema::state_) = (int) TableSchema::TO_DELETE
                ),
                where(
                        c(&TableSchema::table_id_) == table_id and
                        c(&TableSchema::state_) != (int) TableSchema::TO_DELETE
                ));
G
groot 已提交
229

G
groot 已提交
230
    } catch (std::exception &e) {
G
groot 已提交
231
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
232 233 234 235 236
    }

    return Status::OK();
}

S
starlord 已提交
237
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
238 239 240
    try {
        MetricCollector metric;

241 242 243
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256
        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
G
groot 已提交
257 258 259 260 261
    }

    return Status::OK();
}

S
starlord 已提交
262
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
263
    try {
G
groot 已提交
264 265
        MetricCollector metric;

G
groot 已提交
266
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
267
                                                   &TableSchema::state_,
G
groot 已提交
268
                                                   &TableSchema::dimension_,
S
starlord 已提交
269 270 271 272 273
                                                   &TableSchema::created_on_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::metric_type_),
G
groot 已提交
274 275 276
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

277
        if (groups.size() == 1) {
G
groot 已提交
278
            table_schema.id_ = std::get<0>(groups[0]);
S
starlord 已提交
279 280 281 282 283 284 285
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.engine_type_ = std::get<4>(groups[0]);
            table_schema.nlist_ = std::get<5>(groups[0]);
            table_schema.index_file_size_ = std::get<6>(groups[0]);
            table_schema.metric_type_ = std::get<7>(groups[0]);
286
        } else {
G
groot 已提交
287
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
288
        }
G
groot 已提交
289

290
    } catch (std::exception &e) {
G
groot 已提交
291
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
292
    }
X
Xu Peng 已提交
293

X
Xu Peng 已提交
294
    return Status::OK();
X
Xu Peng 已提交
295 296
}

S
starlord 已提交
297
Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
P
peng.xu 已提交
298 299
    has = false;
    try {
300 301 302 303 304 305 306
        std::vector<int> file_types = {
                (int) TableFileSchema::RAW,
                (int) TableFileSchema::NEW,
                (int) TableFileSchema::NEW_MERGE,
                (int) TableFileSchema::NEW_INDEX,
                (int) TableFileSchema::TO_INDEX,
        };
307 308
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::file_type_),
309
                                             where(in(&TableFileSchema::file_type_, file_types)
P
peng.xu 已提交
310 311 312 313 314
                                                   and c(&TableFileSchema::table_id_) == table_id
                                             ));

        if (selected.size() >= 1) {
            has = true;
315 316

            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
S
starlord 已提交
317
            std::vector<std::string> file_ids;
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
            for (auto &file : selected) {
                switch (std::get<1>(file)) {
                    case (int) TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int) TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int) TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int) TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int) TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    default:
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                << " new files:" << new_count << " new_merge files:" << new_merge_count
                << " new_index files:" << new_index_count << " to_index files:" << to_index_count;
P
peng.xu 已提交
343 344 345 346 347 348 349 350
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when check non index files", e);
    }
    return Status::OK();
}

351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
    try {
        MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::created_on_),
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));

        if(tables.size() > 0) {
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
373
            table_schema.nlist_ = index.nlist_;
S
starlord 已提交
374
            table_schema.index_file_size_ = index.index_file_size_*ONE_MB;
S
starlord 已提交
375
            table_schema.metric_type_ = index.metric_type_;
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412

            ConnectorPtr->update(table_schema);
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
        return HandleException(msg, e);
    }
    return Status::OK();
}

Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
    try {
        MetricCollector metric;

        auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
413
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
414
            index.index_file_size_ = std::get<2>(groups[0])/ONE_MB;
S
starlord 已提交
415
            index.metric_type_ = std::get<3>(groups[0]);
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when describe index", e);
    }

    return Status::OK();
}

Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
        MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete index files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX
                ));

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
463
Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
464
    has_or_not = false;
465

G
groot 已提交
466 467
    try {
        MetricCollector metric;
G
groot 已提交
468
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
469 470
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
471
        if (tables.size() == 1) {
472 473 474 475
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
476

477
    } catch (std::exception &e) {
G
groot 已提交
478
        return HandleException("Encounter exception when lookup table", e);
G
groot 已提交
479
    }
G
groot 已提交
480

G
groot 已提交
481 482 483
    return Status::OK();
}

S
starlord 已提交
484
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
485
    try {
G
groot 已提交
486 487
        MetricCollector metric;

G
groot 已提交
488
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
489 490 491 492 493 494 495
                                                     &TableSchema::table_id_,
                                                     &TableSchema::dimension_,
                                                     &TableSchema::created_on_,
                                                     &TableSchema::engine_type_,
                                                     &TableSchema::nlist_,
                                                     &TableSchema::index_file_size_,
                                                     &TableSchema::metric_type_),
G
groot 已提交
496
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
497 498 499 500
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
S
starlord 已提交
501 502 503 504 505 506
            schema.created_on_ = std::get<2>(table);
            schema.dimension_ = std::get<3>(table);
            schema.engine_type_ = std::get<4>(table);
            schema.nlist_ = std::get<5>(table);
            schema.index_file_size_ = std::get<6>(table);
            schema.metric_type_ = std::get<7>(table);
G
groot 已提交
507 508 509

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
510

G
groot 已提交
511
    } catch (std::exception &e) {
G
groot 已提交
512
        return HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
513
    }
G
groot 已提交
514

X
Xu Peng 已提交
515
    return Status::OK();
X
Xu Peng 已提交
516 517
}

S
starlord 已提交
518
Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
519 520
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
521
    }
522
    TableSchema table_schema;
G
groot 已提交
523
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
524
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
525 526 527
    if (!status.ok()) {
        return status;
    }
528

G
groot 已提交
529 530 531 532 533
    try {
        MetricCollector metric;

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
534 535
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
G
groot 已提交
536 537 538
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
539 540
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
541

542 543 544
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
545 546 547
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

S
starlord 已提交
548
        return utils::CreateTableFilePath(options_, file_schema);
549

G
groot 已提交
550 551
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
552 553
    }

X
Xu Peng 已提交
554
    return Status::OK();
X
Xu Peng 已提交
555 556
}

S
starlord 已提交
557
Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
558
    files.clear();
X
Xu Peng 已提交
559

560
    try {
G
groot 已提交
561 562
        MetricCollector metric;

G
groot 已提交
563 564 565 566
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
S
starlord 已提交
567
                                                     &TableFileSchema::file_size_,
568
                                                     &TableFileSchema::row_count_,
G
groot 已提交
569
                                                     &TableFileSchema::date_,
S
starlord 已提交
570 571
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
572
                                             where(c(&TableFileSchema::file_type_)
573
                                                       == (int) TableFileSchema::TO_INDEX));
574

575
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
576
        TableFileSchema table_file;
577

578
        for (auto &file : selected) {
G
groot 已提交
579 580 581 582
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
583 584 585 586 587
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
588

S
starlord 已提交
589
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
590
            auto groupItr = groups.find(table_file.table_id_);
591
            if (groupItr == groups.end()) {
592
                TableSchema table_schema;
G
groot 已提交
593
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
594
                auto status = DescribeTable(table_schema);
595 596 597
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
598
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
599
            }
S
starlord 已提交
600 601
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
G
groot 已提交
602
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
603
            files.push_back(table_file);
X
Xu Peng 已提交
604
        }
G
groot 已提交
605

606
    } catch (std::exception &e) {
G
groot 已提交
607
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
608
    }
X
Xu Peng 已提交
609

X
Xu Peng 已提交
610 611 612
    return Status::OK();
}

S
starlord 已提交
613
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
614 615
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
616
    files.clear();
X
Xu Peng 已提交
617

618
    try {
G
groot 已提交
619 620
        MetricCollector metric;

X
Xu Peng 已提交
621
        if (partition.empty()) {
622
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
623 624 625 626
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
627
                                                         &TableFileSchema::file_size_,
628
                                                         &TableFileSchema::row_count_,
G
groot 已提交
629 630 631
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
632
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
633

X
Xu Peng 已提交
634
            TableSchema table_schema;
G
groot 已提交
635
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
636 637 638 639
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
640

X
Xu Peng 已提交
641 642 643
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
644 645 646 647
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
648 649 650 651
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
S
starlord 已提交
652 653
                table_file.metric_type_ = table_schema.metric_type_;
                table_file.nlist_ = table_schema.nlist_;
G
groot 已提交
654
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
655
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
656
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
657
                if (dateItr == files.end()) {
G
groot 已提交
658
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
659
                }
G
groot 已提交
660
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
661 662 663
            }
        }
        else {
664
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
665 666 667 668
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
669
                                                         &TableFileSchema::file_size_,
670
                                                         &TableFileSchema::row_count_,
G
groot 已提交
671 672
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
673
                                                 where(c(&TableFileSchema::table_id_) == table_id and
674 675
                                                       in(&TableFileSchema::date_, partition) and
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
676

X
Xu Peng 已提交
677
            TableSchema table_schema;
G
groot 已提交
678
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
679 680 681 682
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
683

X
Xu Peng 已提交
684 685 686
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
687 688 689 690
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
691 692 693 694
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
S
starlord 已提交
695 696
                table_file.metric_type_ = table_schema.metric_type_;
                table_file.nlist_ = table_schema.nlist_;
G
groot 已提交
697
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
698
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
699
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
700
                if (dateItr == files.end()) {
G
groot 已提交
701
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
702
                }
G
groot 已提交
703
                files[table_file.date_].push_back(table_file);
704
            }
X
Xu Peng 已提交
705

X
xj.lin 已提交
706
        }
707
    } catch (std::exception &e) {
G
groot 已提交
708
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
709 710 711 712 713
    }

    return Status::OK();
}

S
starlord 已提交
714
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
X
xj.lin 已提交
715 716 717 718 719 720 721 722 723 724 725
                                 const std::vector<size_t> &ids,
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
    files.clear();
    MetricCollector metric;

    try {
        auto select_columns = columns(&TableFileSchema::id_,
                                      &TableFileSchema::table_id_,
                                      &TableFileSchema::file_id_,
                                      &TableFileSchema::file_type_,
S
starlord 已提交
726
                                      &TableFileSchema::file_size_,
727
                                      &TableFileSchema::row_count_,
X
xj.lin 已提交
728 729 730 731
                                      &TableFileSchema::date_,
                                      &TableFileSchema::engine_type_);

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
732 733 734

        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto match_type = in(&TableFileSchema::file_type_, file_type);
X
xj.lin 已提交
735 736 737 738 739 740 741 742

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) { return status; }

        decltype(ConnectorPtr->select(select_columns)) result;
        if (partition.empty() && ids.empty()) {
X
xj.lin 已提交
743
            auto filter = where(match_tableid and match_type);
X
xj.lin 已提交
744 745 746 747
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
748
            auto filter = where(match_tableid and match_fileid and match_type);
X
xj.lin 已提交
749 750 751 752
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && ids.empty()) {
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
753
            auto filter = where(match_tableid and match_date and match_type);
X
xj.lin 已提交
754 755 756 757 758
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
759
            auto filter = where(match_tableid and match_fileid and match_date and match_type);
X
xj.lin 已提交
760 761 762 763 764 765 766 767 768
            result = ConnectorPtr->select(select_columns, filter);
        }

        TableFileSchema table_file;
        for (auto &file : result) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
769 770 771 772
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
773
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
774 775
            table_file.metric_type_ = table_schema.metric_type_;
            table_file.nlist_ = table_schema.nlist_;
X
xj.lin 已提交
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790
            utils::GetTableFilePath(options_, table_file);
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when iterate index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
791
Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
792
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
793
    files.clear();
X
Xu Peng 已提交
794

795
    try {
G
groot 已提交
796 797
        MetricCollector metric;

S
starlord 已提交
798 799 800 801 802 803 804 805 806
        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        //get files to merge
G
groot 已提交
807 808 809 810
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
811
                                                     &TableFileSchema::file_size_,
S
starlord 已提交
812 813 814
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
815
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
816
                                                 c(&TableFileSchema::table_id_) == table_id),
817
                                             order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
818

819
        for (auto &file : selected) {
S
starlord 已提交
820 821 822 823 824 825
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }

G
groot 已提交
826 827 828 829
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
830 831 832
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
833
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
834 835
            table_file.metric_type_ = table_schema.metric_type_;
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
836
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
837
            auto dateItr = files.find(table_file.date_);
838
            if (dateItr == files.end()) {
G
groot 已提交
839
                files[table_file.date_] = TableFilesSchema();
840
            }
G
groot 已提交
841
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
842
        }
843
    } catch (std::exception &e) {
G
groot 已提交
844
        return HandleException("Encounter exception when iterate merge files", e);
X
Xu Peng 已提交
845 846 847
    }

    return Status::OK();
X
Xu Peng 已提交
848 849
}

S
starlord 已提交
850
Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
851 852
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
853
    try {
854
        table_files.clear();
Y
yu yunfeng 已提交
855 856
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
G
groot 已提交
857
                                                  &TableFileSchema::file_type_,
858 859
                                                  &TableFileSchema::file_size_,
                                                  &TableFileSchema::row_count_,
860
                                                  &TableFileSchema::date_,
S
starlord 已提交
861 862
                                                  &TableFileSchema::engine_type_,
                                                  &TableFileSchema::created_on_),
863 864
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
865
                                          ));
866 867 868 869 870 871 872 873 874 875

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
876
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
877 878 879
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
880 881 882 883
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
S
starlord 已提交
884 885
            file_schema.metric_type_ = table_schema.metric_type_;
            file_schema.nlist_ = table_schema.nlist_;
S
starlord 已提交
886
            file_schema.created_on_ = std::get<7>(file);
887
            file_schema.dimension_ = table_schema.dimension_;
S
starlord 已提交
888

S
starlord 已提交
889
            utils::GetTableFilePath(options_, file_schema);
890 891

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
892 893
        }
    } catch (std::exception &e) {
894
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
895 896
    }

X
Xu Peng 已提交
897
    return Status::OK();
X
Xu Peng 已提交
898 899
}

X
Xu Peng 已提交
900
// PXU TODO: Support Swap
S
starlord 已提交
901
Status SqliteMetaImpl::Archive() {
902
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
903 904 905 906 907
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
908 909
        auto &criteria = kv.first;
        auto &limit = kv.second;
G
groot 已提交
910
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
X
Xu Peng 已提交
911
            long usecs = limit * D_SEC * US_PS;
912
            long now = utils::GetMicroSecTimeStamp();
913
            try {
914 915 916
                //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
917
                ConnectorPtr->update_all(
918
                    set(
G
groot 已提交
919
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
920 921
                    ),
                    where(
G
groot 已提交
922 923
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
924 925
                    ));
            } catch (std::exception &e) {
G
groot 已提交
926
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
927 928
            }
        }
G
groot 已提交
929
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
930
            uint64_t sum = 0;
X
Xu Peng 已提交
931
            Size(sum);
X
Xu Peng 已提交
932

G
groot 已提交
933
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
934
            DiscardFiles(to_delete);
X
Xu Peng 已提交
935 936 937 938 939 940
        }
    }

    return Status::OK();
}

S
starlord 已提交
941
Status SqliteMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
942
    result = 0;
X
Xu Peng 已提交
943
    try {
944
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
S
starlord 已提交
945 946 947
                                          where(
                                                  c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                                          ));
948 949 950
        for (auto &total_size : selected) {
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
951
            }
952
            result += (uint64_t) (*std::get<0>(total_size));
X
Xu Peng 已提交
953
        }
954

955
    } catch (std::exception &e) {
G
groot 已提交
956
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
957 958 959 960 961
    }

    return Status::OK();
}

S
starlord 已提交
962
Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
963 964 965
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
966

G
groot 已提交
967
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
968

X
Xu Peng 已提交
969
    try {
G
groot 已提交
970 971
        MetricCollector metric;

972 973 974
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
975 976
        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
977
                                                         &TableFileSchema::file_size_),
G
groot 已提交
978
                                                 where(c(&TableFileSchema::file_type_)
979
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
980 981
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
982

G
groot 已提交
983 984
            std::vector<int> ids;
            TableFileSchema table_file;
985

G
groot 已提交
986 987 988
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
989
                table_file.file_size_ = std::get<1>(file);
G
groot 已提交
990 991
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
992 993
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
G
groot 已提交
994
            }
995

G
groot 已提交
996 997 998
            if (ids.size() == 0) {
                return true;
            }
999

G
groot 已提交
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
1013
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1014 1015
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
1016

1017
    } catch (std::exception &e) {
G
groot 已提交
1018
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
1019 1020
    }

X
Xu Peng 已提交
1021
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
1022 1023
}

S
starlord 已提交
1024
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
1025
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1026
    try {
G
groot 已提交
1027 1028
        MetricCollector metric;

1029 1030 1031
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1032 1033 1034 1035 1036 1037 1038 1039 1040
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
1041
        ConnectorPtr->update(file_schema);
G
groot 已提交
1042

1043
    } catch (std::exception &e) {
G
groot 已提交
1044 1045 1046
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
1047
    }
X
Xu Peng 已提交
1048
    return Status::OK();
X
Xu Peng 已提交
1049 1050
}

S
starlord 已提交
1051
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
P
peng.xu 已提交
1052
    try {
1053 1054 1055 1056 1057
        MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

P
peng.xu 已提交
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
        ConnectorPtr->update_all(
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
            ),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
            ));
    } catch (std::exception &e) {
        return HandleException("Encounter exception when update table files to to_index", e);
    }

    return Status::OK();
}

S
starlord 已提交
1073
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
1074
    try {
G
groot 已提交
1075 1076
        MetricCollector metric;

1077 1078 1079
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

1095 1096
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
1097 1098 1099 1100
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
1101
                file.updated_time_ = utils::GetMicroSecTimeStamp();
1102 1103 1104 1105
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
1106

1107
        if (!commited) {
1108
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1109
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
1110
        }
G
groot 已提交
1111

1112
    } catch (std::exception &e) {
G
groot 已提交
1113
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
1114
    }
1115 1116 1117
    return Status::OK();
}

S
starlord 已提交
1118
Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
1119
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1120 1121 1122
    std::set<std::string> table_ids;

    //remove to_delete files
1123
    try {
G
groot 已提交
1124
        MetricCollector metric;
1125

1126 1127 1128
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
1139

G
groot 已提交
1140 1141 1142 1143 1144 1145 1146 1147
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1148
                utils::DeleteTableFilePath(options_, table_file);
1149
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1150 1151
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1152
                table_ids.insert(table_file.table_id_);
1153
            }
G
groot 已提交
1154 1155 1156 1157
            return true;
        });

        if (!commited) {
1158
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1159 1160 1161 1162 1163 1164 1165
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

S
starlord 已提交
1166
    //remove to_delete tables
G
groot 已提交
1167 1168 1169
    try {
        MetricCollector metric;

1170 1171 1172
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1173 1174 1175 1176 1177 1178
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
S
starlord 已提交
1179
                utils::DeleteTablePath(options_, std::get<1>(table), false);//only delete empty folder
G
groot 已提交
1180
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1181
            }
G
groot 已提交
1182 1183 1184 1185 1186

            return true;
        });

        if (!commited) {
1187
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1188
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1189
        }
G
groot 已提交
1190

1191
    } catch (std::exception &e) {
G
groot 已提交
1192
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
1193 1194
    }

S
starlord 已提交
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
        MetricCollector metric;

        for(auto& table_id : table_ids) {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
            if(selected.size() == 0) {
                utils::DeleteTablePath(options_, table_id);
            }
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table folder", e);
    }

X
Xu Peng 已提交
1212 1213 1214
    return Status::OK();
}

S
starlord 已提交
1215
Status SqliteMetaImpl::CleanUp() {
1216
    try {
1217 1218 1219 1220 1221
        MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1222 1223
        std::vector<int> file_type = {(int) TableFileSchema::NEW, (int) TableFileSchema::NEW_INDEX, (int) TableFileSchema::NEW_MERGE};
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_type)));
1224

G
groot 已提交
1225 1226 1227 1228
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
1229
            }
G
groot 已提交
1230 1231 1232 1233
            return true;
        });

        if (!commited) {
1234
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1235
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1236
        }
G
groot 已提交
1237

1238
    } catch (std::exception &e) {
G
groot 已提交
1239
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
1240 1241 1242 1243 1244
    }

    return Status::OK();
}

S
starlord 已提交
1245
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
1246

1247
    try {
G
groot 已提交
1248
        MetricCollector metric;
1249

1250 1251 1252
        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
                                             where(in(&TableFileSchema::file_type_, file_type)
G
groot 已提交
1253
                                                   and c(&TableFileSchema::table_id_) == table_id));
1254

1255
        TableSchema table_schema;
G
groot 已提交
1256
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1257
        auto status = DescribeTable(table_schema);
1258

1259 1260 1261 1262 1263
        if (!status.ok()) {
            return status;
        }

        result = 0;
1264
        for (auto &file : selected) {
1265 1266
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
1267

1268
    } catch (std::exception &e) {
G
groot 已提交
1269
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
1270 1271 1272 1273
    }
    return Status::OK();
}

S
starlord 已提交
1274
Status SqliteMetaImpl::DropAll() {
X
Xu Peng 已提交
1275 1276
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
1277 1278 1279 1280
    }
    return Status::OK();
}

S
starlord 已提交
1281
SqliteMetaImpl::~SqliteMetaImpl() {
1282
    CleanUp();
X
Xu Peng 已提交
1283 1284
}

1285
} // namespace meta
X
Xu Peng 已提交
1286
} // namespace engine
J
jinhai 已提交
1287
} // namespace milvus
X
Xu Peng 已提交
1288
} // namespace zilliz