SqliteMetaImpl.cpp 51.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
S
starlord 已提交
6 7 8 9
#include "SqliteMetaImpl.h"
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "db/Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
S
starlord 已提交
11
#include "db/Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36 37 38
}

}

39
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
40
    return make_storage(path,
G
groot 已提交
41
                        make_table("Tables",
G
groot 已提交
42 43
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
44
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
45 46 47
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("engine_type", &TableSchema::engine_type_),
48 49 50
                                   make_column("nlist", &TableSchema::nlist_),
                                   make_column("index_file_size", &TableSchema::index_file_size_),
                                   make_column("metric_type", &TableSchema::metric_type_)),
G
groot 已提交
51
                        make_table("TableFiles",
G
groot 已提交
52 53
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
54
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
55 56
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
57 58
                                   make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
                                   make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
G
groot 已提交
59 60 61
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
62
    );
X
Xu Peng 已提交
63 64 65

}

X
Xu Peng 已提交
66
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
67
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
68
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
69

S
starlord 已提交
70
Status SqliteMetaImpl::NextTableId(std::string &table_id) {
71 72
    std::stringstream ss;
    SimpleIDGenerator g;
73
    ss << g.GetNextIDNumber();
74
    table_id = ss.str();
75 76 77
    return Status::OK();
}

S
starlord 已提交
78
Status SqliteMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
79 80
    std::stringstream ss;
    SimpleIDGenerator g;
81
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
82 83 84 85
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
86
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
87 88
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
89 90
}

S
starlord 已提交
91
Status SqliteMetaImpl::Initialize() {
X
Xu Peng 已提交
92 93
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
94
        if (!ret) {
G
groot 已提交
95
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
96
            return Status::InvalidDBPath("Failed to create db directory", options_.path);
97
        }
X
Xu Peng 已提交
98
    }
X
Xu Peng 已提交
99

100
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
101

X
Xu Peng 已提交
102
    ConnectorPtr->sync_schema();
103
    ConnectorPtr->open_forever(); // thread safe option
104
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
105

106
    CleanUp();
X
Xu Peng 已提交
107

X
Xu Peng 已提交
108
    return Status::OK();
X
Xu Peng 已提交
109 110
}

X
Xu Peng 已提交
111
// PXU TODO: Temp solution. Will fix later
S
starlord 已提交
112
Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
113
                                         const DatesT &dates) {
X
Xu Peng 已提交
114 115 116 117
    if (dates.size() == 0) {
        return Status::OK();
    }

118
    TableSchema table_schema;
G
groot 已提交
119
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
120
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
121 122 123 124
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
125 126
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
127

G
groot 已提交
128 129 130 131
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
132 133
        }

134 135 136
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
137
        ConnectorPtr->update_all(
138
            set(
G
groot 已提交
139
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
140 141
            ),
            where(
G
groot 已提交
142 143
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
144 145
            ));
    } catch (std::exception &e) {
G
groot 已提交
146
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
147
    }
G
groot 已提交
148

X
Xu Peng 已提交
149 150 151
    return Status::OK();
}

S
starlord 已提交
152
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Z
zhiru 已提交
153

G
groot 已提交
154
    try {
Y
Yu Kun 已提交
155
        server::MetricCollector metric;
G
groot 已提交
156

157 158 159
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
160 161
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
162 163 164 165
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
G
groot 已提交
166 167 168
                if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status::Error("Table already exists and it is in delete state, please wait a second");
                } else {
169 170
                    // Change from no error to already exist.
                    return Status::AlreadyExist("Table already exists");
G
groot 已提交
171
                }
G
groot 已提交
172
            }
G
groot 已提交
173
        }
G
groot 已提交
174

G
groot 已提交
175 176 177
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
178
        try {
179
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
180
            table_schema.id_ = id;
X
Xu Peng 已提交
181
        } catch (...) {
182
            ENGINE_LOG_ERROR << "sqlite transaction failed";
X
Xu Peng 已提交
183
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
184
        }
185

S
starlord 已提交
186
        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
187 188 189

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
190 191
    }

X
Xu Peng 已提交
192
    return Status::OK();
X
Xu Peng 已提交
193 194
}

S
starlord 已提交
195
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
G
groot 已提交
196
    try {
Y
Yu Kun 已提交
197
        server::MetricCollector metric;
G
groot 已提交
198

199 200 201
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
202
        //soft delete table
S
starlord 已提交
203 204 205 206 207 208 209 210
        ConnectorPtr->update_all(
                set(
                        c(&TableSchema::state_) = (int) TableSchema::TO_DELETE
                ),
                where(
                        c(&TableSchema::table_id_) == table_id and
                        c(&TableSchema::state_) != (int) TableSchema::TO_DELETE
                ));
G
groot 已提交
211

G
groot 已提交
212
    } catch (std::exception &e) {
G
groot 已提交
213
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
214 215 216 217 218
    }

    return Status::OK();
}

S
starlord 已提交
219
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
220
    try {
Y
Yu Kun 已提交
221
        server::MetricCollector metric;
G
groot 已提交
222

223 224 225
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238
        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
G
groot 已提交
239 240 241 242 243
    }

    return Status::OK();
}

S
starlord 已提交
244
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
245
    try {
Y
Yu Kun 已提交
246
        server::MetricCollector metric;
G
groot 已提交
247

G
groot 已提交
248
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
249
                                                   &TableSchema::state_,
G
groot 已提交
250
                                                   &TableSchema::dimension_,
S
starlord 已提交
251 252 253 254 255
                                                   &TableSchema::created_on_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::metric_type_),
G
groot 已提交
256 257 258
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

259
        if (groups.size() == 1) {
G
groot 已提交
260
            table_schema.id_ = std::get<0>(groups[0]);
S
starlord 已提交
261 262 263 264 265 266 267
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
            table_schema.engine_type_ = std::get<4>(groups[0]);
            table_schema.nlist_ = std::get<5>(groups[0]);
            table_schema.index_file_size_ = std::get<6>(groups[0]);
            table_schema.metric_type_ = std::get<7>(groups[0]);
268
        } else {
G
groot 已提交
269
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
270
        }
G
groot 已提交
271

272
    } catch (std::exception &e) {
G
groot 已提交
273
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
274
    }
X
Xu Peng 已提交
275

X
Xu Peng 已提交
276
    return Status::OK();
X
Xu Peng 已提交
277 278
}

S
starlord 已提交
279
Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
P
peng.xu 已提交
280 281
    has = false;
    try {
282 283 284 285 286 287 288
        std::vector<int> file_types = {
                (int) TableFileSchema::RAW,
                (int) TableFileSchema::NEW,
                (int) TableFileSchema::NEW_MERGE,
                (int) TableFileSchema::NEW_INDEX,
                (int) TableFileSchema::TO_INDEX,
        };
289 290
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::file_type_),
291
                                             where(in(&TableFileSchema::file_type_, file_types)
P
peng.xu 已提交
292 293 294 295 296
                                                   and c(&TableFileSchema::table_id_) == table_id
                                             ));

        if (selected.size() >= 1) {
            has = true;
297 298

            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
S
starlord 已提交
299
            std::vector<std::string> file_ids;
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
            for (auto &file : selected) {
                switch (std::get<1>(file)) {
                    case (int) TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int) TableFileSchema::NEW:
                        new_count++;
                        break;
                    case (int) TableFileSchema::NEW_MERGE:
                        new_merge_count++;
                        break;
                    case (int) TableFileSchema::NEW_INDEX:
                        new_index_count++;
                        break;
                    case (int) TableFileSchema::TO_INDEX:
                        to_index_count++;
                        break;
                    default:
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
                << " new files:" << new_count << " new_merge files:" << new_merge_count
                << " new_index files:" << new_index_count << " to_index files:" << to_index_count;
P
peng.xu 已提交
325 326 327 328 329 330 331 332
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when check non index files", e);
    }
    return Status::OK();
}

333 334
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
    try {
Y
Yu Kun 已提交
335
        server::MetricCollector metric;
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::created_on_),
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));

        if(tables.size() > 0) {
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
355
            table_schema.nlist_ = index.nlist_;
S
starlord 已提交
356
            table_schema.index_file_size_ = index.index_file_size_*ONE_MB;
S
starlord 已提交
357
            table_schema.metric_type_ = index.metric_type_;
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383

            ConnectorPtr->update(table_schema);
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
        return HandleException(msg, e);
    }
    return Status::OK();
}

Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
    try {
Y
Yu Kun 已提交
384
        server::MetricCollector metric;
385 386 387 388 389 390 391 392 393 394

        auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::index_file_size_,
                                                   &TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
395
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
396
            index.index_file_size_ = std::get<2>(groups[0])/ONE_MB;
S
starlord 已提交
397
            index.metric_type_ = std::get<3>(groups[0]);
398 399 400 401 402 403 404 405 406 407 408 409 410
        } else {
            return Status::NotFound("Table " + table_id + " not found");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when describe index", e);
    }

    return Status::OK();
}

Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
    try {
Y
Yu Kun 已提交
411
        server::MetricCollector metric;
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete index files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX
                ));

        //set all backup file to raw
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
445
Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
446
    has_or_not = false;
447

G
groot 已提交
448
    try {
Y
Yu Kun 已提交
449
        server::MetricCollector metric;
G
groot 已提交
450
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
451 452
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
453
        if (tables.size() == 1) {
454 455 456 457
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
458

459
    } catch (std::exception &e) {
G
groot 已提交
460
        return HandleException("Encounter exception when lookup table", e);
G
groot 已提交
461
    }
G
groot 已提交
462

G
groot 已提交
463 464 465
    return Status::OK();
}

S
starlord 已提交
466
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
467
    try {
Y
Yu Kun 已提交
468
        server::MetricCollector metric;
G
groot 已提交
469

G
groot 已提交
470
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
471 472 473 474 475 476 477
                                                     &TableSchema::table_id_,
                                                     &TableSchema::dimension_,
                                                     &TableSchema::created_on_,
                                                     &TableSchema::engine_type_,
                                                     &TableSchema::nlist_,
                                                     &TableSchema::index_file_size_,
                                                     &TableSchema::metric_type_),
G
groot 已提交
478
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
479 480 481 482
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
S
starlord 已提交
483 484 485 486 487 488
            schema.created_on_ = std::get<2>(table);
            schema.dimension_ = std::get<3>(table);
            schema.engine_type_ = std::get<4>(table);
            schema.nlist_ = std::get<5>(table);
            schema.index_file_size_ = std::get<6>(table);
            schema.metric_type_ = std::get<7>(table);
G
groot 已提交
489 490 491

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
492

G
groot 已提交
493
    } catch (std::exception &e) {
G
groot 已提交
494
        return HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
495
    }
G
groot 已提交
496

X
Xu Peng 已提交
497
    return Status::OK();
X
Xu Peng 已提交
498 499
}

S
starlord 已提交
500
Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
501 502
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
503
    }
504
    TableSchema table_schema;
G
groot 已提交
505
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
506
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
507 508 509
    if (!status.ok()) {
        return status;
    }
510

G
groot 已提交
511
    try {
Y
Yu Kun 已提交
512
        server::MetricCollector metric;
G
groot 已提交
513 514 515

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
516 517
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
G
groot 已提交
518 519 520
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
521 522
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
523

524 525 526
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
527 528 529
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

S
starlord 已提交
530
        return utils::CreateTableFilePath(options_, file_schema);
531

G
groot 已提交
532 533
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
534 535
    }

X
Xu Peng 已提交
536
    return Status::OK();
X
Xu Peng 已提交
537 538
}

S
starlord 已提交
539
Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
540
    files.clear();
X
Xu Peng 已提交
541

542
    try {
Y
Yu Kun 已提交
543
        server::MetricCollector metric;
G
groot 已提交
544

G
groot 已提交
545 546 547 548
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
S
starlord 已提交
549
                                                     &TableFileSchema::file_size_,
550
                                                     &TableFileSchema::row_count_,
G
groot 已提交
551
                                                     &TableFileSchema::date_,
S
starlord 已提交
552 553
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
554
                                             where(c(&TableFileSchema::file_type_)
555
                                                       == (int) TableFileSchema::TO_INDEX));
556

557
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
558
        TableFileSchema table_file;
559

560
        for (auto &file : selected) {
G
groot 已提交
561 562 563 564
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
565 566 567 568 569
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
570

S
starlord 已提交
571
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
572
            auto groupItr = groups.find(table_file.table_id_);
573
            if (groupItr == groups.end()) {
574
                TableSchema table_schema;
G
groot 已提交
575
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
576
                auto status = DescribeTable(table_schema);
577 578 579
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
580
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
581
            }
S
starlord 已提交
582 583
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
G
groot 已提交
584
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
585
            files.push_back(table_file);
X
Xu Peng 已提交
586
        }
G
groot 已提交
587

588
    } catch (std::exception &e) {
G
groot 已提交
589
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
590
    }
X
Xu Peng 已提交
591

X
Xu Peng 已提交
592 593 594
    return Status::OK();
}

S
starlord 已提交
595
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
596 597
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
598
    files.clear();
X
Xu Peng 已提交
599

600
    try {
Y
Yu Kun 已提交
601
        server::MetricCollector metric;
G
groot 已提交
602

X
Xu Peng 已提交
603
        if (partition.empty()) {
604
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
605 606 607 608
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
609
                                                         &TableFileSchema::file_size_,
610
                                                         &TableFileSchema::row_count_,
G
groot 已提交
611 612 613
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
614
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
615

X
Xu Peng 已提交
616
            TableSchema table_schema;
G
groot 已提交
617
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
618 619 620 621
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
622

X
Xu Peng 已提交
623 624 625
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
626 627 628 629
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
630 631 632 633
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
S
starlord 已提交
634 635
                table_file.metric_type_ = table_schema.metric_type_;
                table_file.nlist_ = table_schema.nlist_;
G
groot 已提交
636
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
637
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
638
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
639
                if (dateItr == files.end()) {
G
groot 已提交
640
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
641
                }
G
groot 已提交
642
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
643 644 645
            }
        }
        else {
646
            std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
G
groot 已提交
647 648 649 650
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
S
starlord 已提交
651
                                                         &TableFileSchema::file_size_,
652
                                                         &TableFileSchema::row_count_,
G
groot 已提交
653 654
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
655
                                                 where(c(&TableFileSchema::table_id_) == table_id and
656 657
                                                       in(&TableFileSchema::date_, partition) and
                                                       in(&TableFileSchema::file_type_, file_type)));
G
groot 已提交
658

X
Xu Peng 已提交
659
            TableSchema table_schema;
G
groot 已提交
660
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
661 662 663 664
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
665

X
Xu Peng 已提交
666 667 668
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
669 670 671 672
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
673 674 675 676
                table_file.file_size_ = std::get<4>(file);
                table_file.row_count_ = std::get<5>(file);
                table_file.date_ = std::get<6>(file);
                table_file.engine_type_ = std::get<7>(file);
S
starlord 已提交
677 678
                table_file.metric_type_ = table_schema.metric_type_;
                table_file.nlist_ = table_schema.nlist_;
G
groot 已提交
679
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
680
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
681
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
682
                if (dateItr == files.end()) {
G
groot 已提交
683
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
684
                }
G
groot 已提交
685
                files[table_file.date_].push_back(table_file);
686
            }
X
Xu Peng 已提交
687

X
xj.lin 已提交
688
        }
689
    } catch (std::exception &e) {
G
groot 已提交
690
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
691 692 693 694 695
    }

    return Status::OK();
}

S
starlord 已提交
696
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
X
xj.lin 已提交
697 698 699 700
                                 const std::vector<size_t> &ids,
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
    files.clear();
Y
Yu Kun 已提交
701
    server::MetricCollector metric;
X
xj.lin 已提交
702 703 704 705 706 707

    try {
        auto select_columns = columns(&TableFileSchema::id_,
                                      &TableFileSchema::table_id_,
                                      &TableFileSchema::file_id_,
                                      &TableFileSchema::file_type_,
S
starlord 已提交
708
                                      &TableFileSchema::file_size_,
709
                                      &TableFileSchema::row_count_,
X
xj.lin 已提交
710 711 712 713
                                      &TableFileSchema::date_,
                                      &TableFileSchema::engine_type_);

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
714 715 716

        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto match_type = in(&TableFileSchema::file_type_, file_type);
X
xj.lin 已提交
717 718 719 720 721 722 723 724

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) { return status; }

        decltype(ConnectorPtr->select(select_columns)) result;
        if (partition.empty() && ids.empty()) {
X
xj.lin 已提交
725
            auto filter = where(match_tableid and match_type);
X
xj.lin 已提交
726 727 728 729
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
730
            auto filter = where(match_tableid and match_fileid and match_type);
X
xj.lin 已提交
731 732 733 734
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && ids.empty()) {
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
735
            auto filter = where(match_tableid and match_date and match_type);
X
xj.lin 已提交
736 737 738 739 740
            result = ConnectorPtr->select(select_columns, filter);
        }
        else if (!partition.empty() && !ids.empty()) {
            auto match_fileid = in(&TableFileSchema::id_, ids);
            auto match_date = in(&TableFileSchema::date_, partition);
X
xj.lin 已提交
741
            auto filter = where(match_tableid and match_fileid and match_date and match_type);
X
xj.lin 已提交
742 743 744 745 746 747 748 749 750
            result = ConnectorPtr->select(select_columns, filter);
        }

        TableFileSchema table_file;
        for (auto &file : result) {
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
751 752 753 754
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
755
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
756 757
            table_file.metric_type_ = table_schema.metric_type_;
            table_file.nlist_ = table_schema.nlist_;
X
xj.lin 已提交
758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
            utils::GetTableFilePath(options_, table_file);
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when iterate index files", e);
    }

    return Status::OK();
}

S
starlord 已提交
773
Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
774
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
775
    files.clear();
X
Xu Peng 已提交
776

777
    try {
Y
Yu Kun 已提交
778
        server::MetricCollector metric;
G
groot 已提交
779

S
starlord 已提交
780 781 782 783 784 785 786 787 788
        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        //get files to merge
G
groot 已提交
789 790 791 792
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
793
                                                     &TableFileSchema::file_size_,
S
starlord 已提交
794 795 796
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
797
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
798
                                                 c(&TableFileSchema::table_id_) == table_id),
799
                                             order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
800

801
        for (auto &file : selected) {
S
starlord 已提交
802 803 804 805 806 807
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
            if(table_file.file_size_ >= table_schema.index_file_size_) {
                continue;//skip large file
            }

G
groot 已提交
808 809 810 811
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
812 813 814
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
815
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
816 817
            table_file.metric_type_ = table_schema.metric_type_;
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
818
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
819
            auto dateItr = files.find(table_file.date_);
820
            if (dateItr == files.end()) {
G
groot 已提交
821
                files[table_file.date_] = TableFilesSchema();
822
            }
G
groot 已提交
823
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
824
        }
825
    } catch (std::exception &e) {
G
groot 已提交
826
        return HandleException("Encounter exception when iterate merge files", e);
X
Xu Peng 已提交
827 828 829
    }

    return Status::OK();
X
Xu Peng 已提交
830 831
}

S
starlord 已提交
832
Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
833 834
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
835
    try {
836
        table_files.clear();
Y
yu yunfeng 已提交
837 838
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
G
groot 已提交
839
                                                  &TableFileSchema::file_type_,
840 841
                                                  &TableFileSchema::file_size_,
                                                  &TableFileSchema::row_count_,
842
                                                  &TableFileSchema::date_,
S
starlord 已提交
843 844
                                                  &TableFileSchema::engine_type_,
                                                  &TableFileSchema::created_on_),
845 846
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
847
                                          ));
848 849 850 851 852 853 854 855 856 857

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
858
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
859 860 861
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
862 863 864 865
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
S
starlord 已提交
866 867
            file_schema.metric_type_ = table_schema.metric_type_;
            file_schema.nlist_ = table_schema.nlist_;
S
starlord 已提交
868
            file_schema.created_on_ = std::get<7>(file);
869
            file_schema.dimension_ = table_schema.dimension_;
S
starlord 已提交
870

S
starlord 已提交
871
            utils::GetTableFilePath(options_, file_schema);
872 873

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
874 875
        }
    } catch (std::exception &e) {
876
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
877 878
    }

X
Xu Peng 已提交
879
    return Status::OK();
X
Xu Peng 已提交
880 881
}

X
Xu Peng 已提交
882
// PXU TODO: Support Swap
S
starlord 已提交
883
Status SqliteMetaImpl::Archive() {
884
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
885 886 887 888 889
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
890 891
        auto &criteria = kv.first;
        auto &limit = kv.second;
G
groot 已提交
892
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
X
Xu Peng 已提交
893
            long usecs = limit * D_SEC * US_PS;
894
            long now = utils::GetMicroSecTimeStamp();
895
            try {
896 897 898
                //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
899
                ConnectorPtr->update_all(
900
                    set(
G
groot 已提交
901
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
902 903
                    ),
                    where(
G
groot 已提交
904 905
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
906 907
                    ));
            } catch (std::exception &e) {
G
groot 已提交
908
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
909 910
            }
        }
G
groot 已提交
911
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
912
            uint64_t sum = 0;
X
Xu Peng 已提交
913
            Size(sum);
X
Xu Peng 已提交
914

G
groot 已提交
915
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
916
            DiscardFiles(to_delete);
X
Xu Peng 已提交
917 918 919 920 921 922
        }
    }

    return Status::OK();
}

S
starlord 已提交
923
Status SqliteMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
924
    result = 0;
X
Xu Peng 已提交
925
    try {
926
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
S
starlord 已提交
927 928 929
                                          where(
                                                  c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                                          ));
930 931 932
        for (auto &total_size : selected) {
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
933
            }
934
            result += (uint64_t) (*std::get<0>(total_size));
X
Xu Peng 已提交
935
        }
936

937
    } catch (std::exception &e) {
G
groot 已提交
938
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
939 940 941 942 943
    }

    return Status::OK();
}

S
starlord 已提交
944
Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
945 946 947
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
948

G
groot 已提交
949
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
950

X
Xu Peng 已提交
951
    try {
Y
Yu Kun 已提交
952
        server::MetricCollector metric;
G
groot 已提交
953

954 955 956
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
957 958
        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
959
                                                         &TableFileSchema::file_size_),
G
groot 已提交
960
                                                 where(c(&TableFileSchema::file_type_)
961
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
962 963
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
964

G
groot 已提交
965 966
            std::vector<int> ids;
            TableFileSchema table_file;
967

G
groot 已提交
968 969 970
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
971
                table_file.file_size_ = std::get<1>(file);
G
groot 已提交
972 973
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
974 975
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
G
groot 已提交
976
            }
977

G
groot 已提交
978 979 980
            if (ids.size() == 0) {
                return true;
            }
981

G
groot 已提交
982 983 984 985 986 987 988 989 990 991 992 993 994
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
995
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
996 997
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
998

999
    } catch (std::exception &e) {
G
groot 已提交
1000
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
1001 1002
    }

X
Xu Peng 已提交
1003
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
1004 1005
}

S
starlord 已提交
1006
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
1007
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1008
    try {
Y
Yu Kun 已提交
1009
        server::MetricCollector metric;
G
groot 已提交
1010

1011 1012 1013
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1014 1015 1016 1017 1018 1019 1020 1021 1022
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
1023
        ConnectorPtr->update(file_schema);
G
groot 已提交
1024

1025
    } catch (std::exception &e) {
G
groot 已提交
1026 1027 1028
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
1029
    }
X
Xu Peng 已提交
1030
    return Status::OK();
X
Xu Peng 已提交
1031 1032
}

S
starlord 已提交
1033
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
P
peng.xu 已提交
1034
    try {
Y
Yu Kun 已提交
1035
        server::MetricCollector metric;
1036 1037 1038 1039

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

P
peng.xu 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
        ConnectorPtr->update_all(
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
            ),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
            ));
    } catch (std::exception &e) {
        return HandleException("Encounter exception when update table files to to_index", e);
    }

    return Status::OK();
}

S
starlord 已提交
1055
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
1056
    try {
Y
Yu Kun 已提交
1057
        server::MetricCollector metric;
G
groot 已提交
1058

1059 1060 1061
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

1077 1078
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
1079 1080 1081 1082
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
1083
                file.updated_time_ = utils::GetMicroSecTimeStamp();
1084 1085 1086 1087
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
1088

1089
        if (!commited) {
1090
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1091
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
1092
        }
G
groot 已提交
1093

1094
    } catch (std::exception &e) {
G
groot 已提交
1095
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
1096
    }
1097 1098 1099
    return Status::OK();
}

S
starlord 已提交
1100
Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
1101
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1102 1103 1104
    std::set<std::string> table_ids;

    //remove to_delete files
1105
    try {
Y
Yu Kun 已提交
1106
        server::MetricCollector metric;
1107

1108 1109 1110
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
1121

G
groot 已提交
1122 1123 1124 1125 1126 1127 1128 1129
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1130
                utils::DeleteTableFilePath(options_, table_file);
1131
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1132 1133
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1134
                table_ids.insert(table_file.table_id_);
1135
            }
G
groot 已提交
1136 1137 1138 1139
            return true;
        });

        if (!commited) {
1140
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1141 1142 1143 1144 1145 1146 1147
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

S
starlord 已提交
1148
    //remove to_delete tables
G
groot 已提交
1149
    try {
Y
Yu Kun 已提交
1150
        server::MetricCollector metric;
G
groot 已提交
1151

1152 1153 1154
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1155 1156 1157 1158 1159 1160
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
S
starlord 已提交
1161
                utils::DeleteTablePath(options_, std::get<1>(table), false);//only delete empty folder
G
groot 已提交
1162
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1163
            }
G
groot 已提交
1164 1165 1166 1167 1168

            return true;
        });

        if (!commited) {
1169
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1170
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1171
        }
G
groot 已提交
1172

1173
    } catch (std::exception &e) {
G
groot 已提交
1174
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
1175 1176
    }

S
starlord 已提交
1177 1178 1179
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1180
        server::MetricCollector metric;
S
starlord 已提交
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193

        for(auto& table_id : table_ids) {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
            if(selected.size() == 0) {
                utils::DeleteTablePath(options_, table_id);
            }
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table folder", e);
    }

X
Xu Peng 已提交
1194 1195 1196
    return Status::OK();
}

S
starlord 已提交
1197
Status SqliteMetaImpl::CleanUp() {
1198
    try {
Y
Yu Kun 已提交
1199
        server::MetricCollector metric;
1200 1201 1202 1203

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

1204 1205
        std::vector<int> file_type = {(int) TableFileSchema::NEW, (int) TableFileSchema::NEW_INDEX, (int) TableFileSchema::NEW_MERGE};
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_type)));
1206

G
groot 已提交
1207 1208 1209 1210
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
1211
            }
G
groot 已提交
1212 1213 1214 1215
            return true;
        });

        if (!commited) {
1216
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
1217
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
1218
        }
G
groot 已提交
1219

1220
    } catch (std::exception &e) {
G
groot 已提交
1221
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
1222 1223 1224 1225 1226
    }

    return Status::OK();
}

S
starlord 已提交
1227
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
1228

1229
    try {
Y
Yu Kun 已提交
1230
        server::MetricCollector metric;
1231

1232 1233 1234
        std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
                                             where(in(&TableFileSchema::file_type_, file_type)
G
groot 已提交
1235
                                                   and c(&TableFileSchema::table_id_) == table_id));
1236

1237
        TableSchema table_schema;
G
groot 已提交
1238
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1239
        auto status = DescribeTable(table_schema);
1240

1241 1242 1243 1244 1245
        if (!status.ok()) {
            return status;
        }

        result = 0;
1246
        for (auto &file : selected) {
1247 1248
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
1249

1250
    } catch (std::exception &e) {
G
groot 已提交
1251
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
1252 1253 1254 1255
    }
    return Status::OK();
}

S
starlord 已提交
1256
Status SqliteMetaImpl::DropAll() {
X
Xu Peng 已提交
1257 1258
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
1259 1260 1261 1262
    }
    return Status::OK();
}

S
starlord 已提交
1263
SqliteMetaImpl::~SqliteMetaImpl() {
1264
    CleanUp();
X
Xu Peng 已提交
1265 1266
}

1267
} // namespace meta
X
Xu Peng 已提交
1268
} // namespace engine
J
jinhai 已提交
1269
} // namespace milvus
X
Xu Peng 已提交
1270
} // namespace zilliz