SqliteMetaImpl.cpp 52.9 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/meta/SqliteMetaImpl.h"
S
starlord 已提交
19 20
#include "db/IDGenerator.h"
#include "db/Utils.h"
S
starlord 已提交
21
#include "utils/Log.h"
22
#include "utils/Exception.h"
X
Xu Peng 已提交
23
#include "MetaConsts.h"
24
#include "metrics/Metrics.h"
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26
#include <unistd.h>
X
Xu Peng 已提交
27 28
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
29
#include <boost/filesystem.hpp>
30
#include <chrono>
X
Xu Peng 已提交
31
#include <fstream>
S
starlord 已提交
32 33 34
#include <memory>
#include <map>
#include <set>
35
#include <sqlite_orm.h>
X
Xu Peng 已提交
36

S
starlord 已提交
37

J
jinhai 已提交
38
namespace milvus {
X
Xu Peng 已提交
39
namespace engine {
40
namespace meta {
X
Xu Peng 已提交
41

X
Xu Peng 已提交
42 43
using namespace sqlite_orm;

G
groot 已提交
44 45
namespace {

S
starlord 已提交
46 47 48
Status
HandleException(const std::string &desc, const char *what = nullptr) {
    if (what == nullptr) {
S
starlord 已提交
49 50 51 52 53 54 55
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
56 57
}

S
starlord 已提交
58
} // namespace
G
groot 已提交
59

S
starlord 已提交
60 61
inline auto
StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
62
    return make_storage(path,
63
                        make_table(META_TABLES,
G
groot 已提交
64 65
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
66
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
67 68
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
S
starlord 已提交
69
                                   make_column("flag", &TableSchema::flag_, default_value(0)),
70
                                   make_column("index_file_size", &TableSchema::index_file_size_),
G
groot 已提交
71
                                   make_column("engine_type", &TableSchema::engine_type_),
72 73
                                   make_column("nlist", &TableSchema::nlist_),
                                   make_column("metric_type", &TableSchema::metric_type_)),
74
                        make_table(META_TABLEFILES,
G
groot 已提交
75 76
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
77
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
78 79
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
80 81
                                   make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
                                   make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
G
groot 已提交
82 83
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
S
starlord 已提交
84
                                   make_column("date", &TableFileSchema::date_)));
X
Xu Peng 已提交
85 86
}

X
Xu Peng 已提交
87
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
88 89
static std::unique_ptr<ConnectorT> ConnectorPtr;

S
starlord 已提交
90 91
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options)
    : options_(options) {
92 93 94 95 96 97
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
98 99
Status
SqliteMetaImpl::NextTableId(std::string &table_id) {
100 101
    std::stringstream ss;
    SimpleIDGenerator g;
102
    ss << g.GetNextIDNumber();
103
    table_id = ss.str();
104 105 106
    return Status::OK();
}

S
starlord 已提交
107 108
Status
SqliteMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
109 110
    std::stringstream ss;
    SimpleIDGenerator g;
111
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
112 113 114 115
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
116 117 118
void
SqliteMetaImpl::ValidateMetaSchema() {
    if (ConnectorPtr == nullptr) {
119 120 121 122 123
        return;
    }

    //old meta could be recreated since schema changed, throw exception if meta schema is not compatible
    auto ret = ConnectorPtr->sync_schema_simulate();
S
starlord 已提交
124 125
    if (ret.find(META_TABLES) != ret.end()
        && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
126 127
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
S
starlord 已提交
128 129
    if (ret.find(META_TABLEFILES) != ret.end()
        && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
130 131 132 133
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
134 135
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
136 137
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
138
        if (!ret) {
S
starlord 已提交
139
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
140 141
            ENGINE_LOG_ERROR << msg;
            return Status(DB_INVALID_PATH, msg);
142
        }
X
Xu Peng 已提交
143
    }
X
Xu Peng 已提交
144

S
starlord 已提交
145
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
146

147
    ValidateMetaSchema();
148

X
Xu Peng 已提交
149
    ConnectorPtr->sync_schema();
150
    ConnectorPtr->open_forever(); // thread safe option
151
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
152

153
    CleanUp();
X
Xu Peng 已提交
154

X
Xu Peng 已提交
155
    return Status::OK();
X
Xu Peng 已提交
156 157
}

S
starlord 已提交
158
// TODO(myh): Delete single vecotor by id
S
starlord 已提交
159 160 161
Status
SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                      const DatesT &dates) {
162
    if (dates.empty()) {
X
Xu Peng 已提交
163 164 165
        return Status::OK();
    }

166
    TableSchema table_schema;
G
groot 已提交
167
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
168
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
169 170 171 172
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
173
    try {
174 175 176 177 178 179 180 181 182 183 184 185 186
        //sqlite_orm has a bug, 'in' statement cannot handle too many elements
        //so we split one query into multi-queries, this is a work-around!!
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
        for(DateT date : dates) {
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
            if(last_batch.size() > batch_size) {
                split_dates.push_back(DatesT());
            }
        }

187 188 189
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

190 191 192 193 194 195 196 197 198 199 200 201 202
        for(auto& batch_dates : split_dates) {
            if(batch_dates.empty()) {
                continue;
            }

            ConnectorPtr->update_all(
                set(
                    c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                where(
                    c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, batch_dates)));
        }
203 204

        ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;
205
    } catch (std::exception &e) {
S
starlord 已提交
206
        return HandleException("Encounter exception when drop partition", e.what());
X
Xu Peng 已提交
207
    }
G
groot 已提交
208

X
Xu Peng 已提交
209 210 211
    return Status::OK();
}

S
starlord 已提交
212 213
Status
SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
G
groot 已提交
214
    try {
Y
Yu Kun 已提交
215
        server::MetricCollector metric;
G
groot 已提交
216

217 218 219
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
220 221
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
222 223
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
S
starlord 已提交
224
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
G
groot 已提交
225
            if (table.size() == 1) {
S
starlord 已提交
226
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
S
starlord 已提交
227
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
G
groot 已提交
228
                } else {
229
                    // Change from no error to already exist.
S
starlord 已提交
230
                    return Status(DB_ALREADY_EXIST, "Table already exists");
G
groot 已提交
231
                }
G
groot 已提交
232
            }
G
groot 已提交
233
        }
G
groot 已提交
234

G
groot 已提交
235 236 237
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
238
        try {
239
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
240
            table_schema.id_ = id;
S
starlord 已提交
241 242
        } catch (std::exception &e) {
            return HandleException("Encounter exception when create table", e.what());
X
Xu Peng 已提交
243
        }
244

245 246
        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

S
starlord 已提交
247
        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
248
    } catch (std::exception &e) {
S
starlord 已提交
249
        return HandleException("Encounter exception when create table", e.what());
250
    }
X
Xu Peng 已提交
251 252
}

S
starlord 已提交
253 254
Status
SqliteMetaImpl::DeleteTable(const std::string &table_id) {
G
groot 已提交
255
    try {
Y
Yu Kun 已提交
256
        server::MetricCollector metric;
G
groot 已提交
257

258 259 260
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
261
        //soft delete table
S
starlord 已提交
262
        ConnectorPtr->update_all(
S
starlord 已提交
263 264 265 266 267
            set(
                c(&TableSchema::state_) = (int) TableSchema::TO_DELETE),
            where(
                c(&TableSchema::table_id_) == table_id and
                    c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
G
groot 已提交
268

269
        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
G
groot 已提交
270
    } catch (std::exception &e) {
S
starlord 已提交
271
        return HandleException("Encounter exception when delete table", e.what());
G
groot 已提交
272 273 274 275 276
    }

    return Status::OK();
}

S
starlord 已提交
277 278
Status
SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
G
groot 已提交
279
    try {
Y
Yu Kun 已提交
280
        server::MetricCollector metric;
G
groot 已提交
281

282 283 284
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
285 286
        //soft delete table files
        ConnectorPtr->update_all(
S
starlord 已提交
287 288 289 290 291 292
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                    c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
G
groot 已提交
293

294
        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
G
groot 已提交
295
    } catch (std::exception &e) {
S
starlord 已提交
296
        return HandleException("Encounter exception when delete table files", e.what());
G
groot 已提交
297 298 299 300 301
    }

    return Status::OK();
}

S
starlord 已提交
302 303
Status
SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
304
    try {
Y
Yu Kun 已提交
305
        server::MetricCollector metric;
G
groot 已提交
306

G
groot 已提交
307
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
308
                                                   &TableSchema::state_,
G
groot 已提交
309
                                                   &TableSchema::dimension_,
S
starlord 已提交
310
                                                   &TableSchema::created_on_,
S
starlord 已提交
311
                                                   &TableSchema::flag_,
312
                                                   &TableSchema::index_file_size_,
S
starlord 已提交
313 314 315
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_),
G
groot 已提交
316
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
S
starlord 已提交
317
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
G
groot 已提交
318

319
        if (groups.size() == 1) {
G
groot 已提交
320
            table_schema.id_ = std::get<0>(groups[0]);
S
starlord 已提交
321 322 323
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
S
starlord 已提交
324
            table_schema.flag_ = std::get<4>(groups[0]);
325 326 327
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
            table_schema.nlist_ = std::get<7>(groups[0]);
S
starlord 已提交
328
            table_schema.metric_type_ = std::get<8>(groups[0]);
329
        } else {
S
starlord 已提交
330
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
331 332
        }
    } catch (std::exception &e) {
S
starlord 已提交
333
        return HandleException("Encounter exception when describe table", e.what());
X
Xu Peng 已提交
334
    }
X
Xu Peng 已提交
335

X
Xu Peng 已提交
336
    return Status::OK();
X
Xu Peng 已提交
337 338
}

S
starlord 已提交
339 340 341 342 343
Status
SqliteMetaImpl::FilesByType(const std::string &table_id,
                            const std::vector<int> &file_types,
                            std::vector<std::string> &file_ids) {
    if (file_types.empty()) {
S
starlord 已提交
344
        return Status(DB_ERROR, "file types array is empty");
345 346
    }

P
peng.xu 已提交
347
    try {
348 349
        file_ids.clear();
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
350
                                                     &TableFileSchema::file_type_),
351
                                             where(in(&TableFileSchema::file_type_, file_types)
S
starlord 已提交
352
                                                       and c(&TableFileSchema::table_id_) == table_id));
P
peng.xu 已提交
353 354

        if (selected.size() >= 1) {
355 356
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
357
            for (auto &file : selected) {
358
                file_ids.push_back(std::get<0>(file));
359
                switch (std::get<1>(file)) {
S
starlord 已提交
360
                    case (int) TableFileSchema::RAW:raw_count++;
361
                        break;
S
starlord 已提交
362
                    case (int) TableFileSchema::NEW:new_count++;
363
                        break;
S
starlord 已提交
364
                    case (int) TableFileSchema::NEW_MERGE:new_merge_count++;
365
                        break;
S
starlord 已提交
366
                    case (int) TableFileSchema::NEW_INDEX:new_index_count++;
367
                        break;
S
starlord 已提交
368
                    case (int) TableFileSchema::TO_INDEX:to_index_count++;
369
                        break;
S
starlord 已提交
370
                    case (int) TableFileSchema::INDEX:index_count++;
371
                        break;
S
starlord 已提交
372
                    case (int) TableFileSchema::BACKUP:backup_count++;
373
                        break;
S
starlord 已提交
374
                    default:break;
375 376 377 378
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
379 380 381
                             << " new files:" << new_count << " new_merge files:" << new_merge_count
                             << " new_index files:" << new_index_count << " to_index files:" << to_index_count
                             << " index files:" << index_count << " backup files:" << backup_count;
P
peng.xu 已提交
382 383
        }
    } catch (std::exception &e) {
S
starlord 已提交
384
        return HandleException("Encounter exception when check non index files", e.what());
P
peng.xu 已提交
385 386 387 388
    }
    return Status::OK();
}

S
starlord 已提交
389 390
Status
SqliteMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex &index) {
391
    try {
Y
Yu Kun 已提交
392
        server::MetricCollector metric;
393 394 395 396 397 398 399

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::state_,
                                                   &TableSchema::dimension_,
S
starlord 已提交
400
                                                   &TableSchema::created_on_,
401 402
                                                   &TableSchema::flag_,
                                                   &TableSchema::index_file_size_),
403
                                           where(c(&TableSchema::table_id_) == table_id
S
starlord 已提交
404
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
405

S
starlord 已提交
406
        if (tables.size() > 0) {
407 408 409 410 411 412
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
413
            table_schema.flag_ = std::get<4>(tables[0]);
414
            table_schema.index_file_size_ = std::get<5>(tables[0]);
415
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
416 417
            table_schema.nlist_ = index.nlist_;
            table_schema.metric_type_ = index.metric_type_;
418 419 420

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
421
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
422 423 424 425
        }

        //set all backup file to raw
        ConnectorPtr->update_all(
S
starlord 已提交
426 427 428 429 430 431
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
432

433
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
434 435
    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
436
        return HandleException(msg, e.what());
437
    }
S
starlord 已提交
438 439 440 441

    return Status::OK();
}

S
starlord 已提交
442 443
Status
SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
S
starlord 已提交
444
    try {
Y
Yu Kun 已提交
445
        server::MetricCollector metric;
S
starlord 已提交
446 447 448

        //set all backup file to raw
        ConnectorPtr->update_all(
S
starlord 已提交
449 450 451 452
            set(
                c(&TableSchema::flag_) = flag),
            where(
                c(&TableSchema::table_id_) == table_id));
453
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
S
starlord 已提交
454 455
    } catch (std::exception &e) {
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
S
starlord 已提交
456
        return HandleException(msg, e.what());
S
starlord 已提交
457 458
    }

459 460 461
    return Status::OK();
}

S
starlord 已提交
462 463
Status
SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex &index) {
464
    try {
Y
Yu Kun 已提交
465
        server::MetricCollector metric;
466 467 468 469 470

        auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
                                                   &TableSchema::nlist_,
                                                   &TableSchema::metric_type_),
                                           where(c(&TableSchema::table_id_) == table_id
S
starlord 已提交
471
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
472 473 474

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
475
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
476
            index.metric_type_ = std::get<2>(groups[0]);
477
        } else {
S
starlord 已提交
478
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
479 480
        }
    } catch (std::exception &e) {
S
starlord 已提交
481
        return HandleException("Encounter exception when describe index", e.what());
482 483 484 485 486
    }

    return Status::OK();
}

S
starlord 已提交
487 488
Status
SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
489
    try {
Y
Yu Kun 已提交
490
        server::MetricCollector metric;
491 492 493 494 495 496

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

        //soft delete index files
        ConnectorPtr->update_all(
S
starlord 已提交
497 498 499 500 501 502
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX));
503 504 505

        //set all backup file to raw
        ConnectorPtr->update_all(
S
starlord 已提交
506 507 508 509 510 511
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
                c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
512

513 514
        //set table index type to raw
        ConnectorPtr->update_all(
S
starlord 已提交
515 516 517 518 519 520
            set(
                c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE,
                c(&TableSchema::nlist_) = DEFAULT_NLIST,
                c(&TableSchema::metric_type_) = DEFAULT_METRIC_TYPE),
            where(
                c(&TableSchema::table_id_) == table_id));
521

522
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
523
    } catch (std::exception &e) {
S
starlord 已提交
524
        return HandleException("Encounter exception when delete table index files", e.what());
525 526 527 528 529
    }

    return Status::OK();
}

S
starlord 已提交
530 531
Status
SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
532
    has_or_not = false;
533

G
groot 已提交
534
    try {
Y
Yu Kun 已提交
535
        server::MetricCollector metric;
G
groot 已提交
536
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
537
                                           where(c(&TableSchema::table_id_) == table_id
S
starlord 已提交
538
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
539
        if (tables.size() == 1) {
540 541 542 543 544
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
S
starlord 已提交
545
        return HandleException("Encounter exception when lookup table", e.what());
G
groot 已提交
546
    }
G
groot 已提交
547

G
groot 已提交
548 549 550
    return Status::OK();
}

S
starlord 已提交
551 552
Status
SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
G
groot 已提交
553
    try {
Y
Yu Kun 已提交
554
        server::MetricCollector metric;
G
groot 已提交
555

G
groot 已提交
556
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
S
starlord 已提交
557 558 559
                                                     &TableSchema::table_id_,
                                                     &TableSchema::dimension_,
                                                     &TableSchema::created_on_,
S
starlord 已提交
560
                                                     &TableSchema::flag_,
561
                                                     &TableSchema::index_file_size_,
S
starlord 已提交
562 563 564
                                                     &TableSchema::engine_type_,
                                                     &TableSchema::nlist_,
                                                     &TableSchema::metric_type_),
S
starlord 已提交
565
                                             where(c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
G
groot 已提交
566 567 568 569
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
S
starlord 已提交
570 571 572
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
573 574 575
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
            schema.nlist_ = std::get<7>(table);
S
starlord 已提交
576
            schema.metric_type_ = std::get<8>(table);
G
groot 已提交
577 578 579 580

            table_schema_array.emplace_back(schema);
        }
    } catch (std::exception &e) {
S
starlord 已提交
581
        return HandleException("Encounter exception when lookup all tables", e.what());
X
Xu Peng 已提交
582
    }
G
groot 已提交
583

X
Xu Peng 已提交
584
    return Status::OK();
X
Xu Peng 已提交
585 586
}

S
starlord 已提交
587 588
Status
SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
589
    if (file_schema.date_ == EmptyDate) {
590
        file_schema.date_ = utils::GetDate();
X
Xu Peng 已提交
591
    }
592
    TableSchema table_schema;
G
groot 已提交
593
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
594
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
595 596 597
    if (!status.ok()) {
        return status;
    }
598

G
groot 已提交
599
    try {
Y
Yu Kun 已提交
600
        server::MetricCollector metric;
G
groot 已提交
601 602 603

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
604 605
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
G
groot 已提交
606 607
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
608
        file_schema.index_file_size_ = table_schema.index_file_size_;
G
groot 已提交
609
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
610 611
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
612

613 614 615
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
616 617 618
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

619
        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
S
starlord 已提交
620
        return utils::CreateTableFilePath(options_, file_schema);
S
starlord 已提交
621
    } catch (std::exception &e) {
S
starlord 已提交
622
        return HandleException("Encounter exception when create table file", e.what());
623 624
    }

X
Xu Peng 已提交
625
    return Status::OK();
X
Xu Peng 已提交
626 627
}

S
starlord 已提交
628 629
Status
SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
630
    files.clear();
X
Xu Peng 已提交
631

632
    try {
Y
Yu Kun 已提交
633
        server::MetricCollector metric;
G
groot 已提交
634

G
groot 已提交
635 636 637 638
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
S
starlord 已提交
639
                                                     &TableFileSchema::file_size_,
640
                                                     &TableFileSchema::row_count_,
G
groot 已提交
641
                                                     &TableFileSchema::date_,
S
starlord 已提交
642 643
                                                     &TableFileSchema::engine_type_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
644
                                             where(c(&TableFileSchema::file_type_)
645
                                                       == (int) TableFileSchema::TO_INDEX));
646

647
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
648
        TableFileSchema table_file;
649

S
starlord 已提交
650
        Status ret;
651
        for (auto &file : selected) {
G
groot 已提交
652 653 654 655
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
656 657 658 659 660
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
661

S
starlord 已提交
662
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
663
            if (!status.ok()) {
S
starlord 已提交
664
                ret = status;
S
starlord 已提交
665
            }
G
groot 已提交
666
            auto groupItr = groups.find(table_file.table_id_);
667
            if (groupItr == groups.end()) {
668
                TableSchema table_schema;
G
groot 已提交
669
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
670
                auto status = DescribeTable(table_schema);
671 672 673
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
674
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
675
            }
676
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
677
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
678
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
679
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
X
Xu Peng 已提交
680
            files.push_back(table_file);
X
Xu Peng 已提交
681
        }
G
groot 已提交
682

S
starlord 已提交
683
        if (selected.size() > 0) {
684 685
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
S
starlord 已提交
686
        return ret;
687
    } catch (std::exception &e) {
S
starlord 已提交
688
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
689 690 691
    }
}

S
starlord 已提交
692 693 694
Status
SqliteMetaImpl::FilesToSearch(const std::string &table_id,
                              const std::vector<size_t> &ids,
695
                              const DatesT &dates,
S
starlord 已提交
696
                              DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
697
    files.clear();
Y
Yu Kun 已提交
698
    server::MetricCollector metric;
X
xj.lin 已提交
699 700 701 702 703 704

    try {
        auto select_columns = columns(&TableFileSchema::id_,
                                      &TableFileSchema::table_id_,
                                      &TableFileSchema::file_id_,
                                      &TableFileSchema::file_type_,
S
starlord 已提交
705
                                      &TableFileSchema::file_size_,
706
                                      &TableFileSchema::row_count_,
X
xj.lin 已提交
707 708 709 710
                                      &TableFileSchema::date_,
                                      &TableFileSchema::engine_type_);

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
711

S
starlord 已提交
712
        std::vector<int> file_types = {
S
starlord 已提交
713 714 715
            (int) TableFileSchema::RAW,
            (int) TableFileSchema::TO_INDEX,
            (int) TableFileSchema::INDEX
S
starlord 已提交
716 717
        };
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
718 719 720 721 722 723

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) { return status; }

724 725 726 727 728 729 730 731 732 733 734 735 736 737
        //sqlite_orm has a bug, 'in' statement cannot handle too many elements
        //so we split one query into multi-queries, this is a work-around!!
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
        for(DateT date : dates) {
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
            if(last_batch.size() > batch_size) {
                split_dates.push_back(DatesT());
            }
        }

        //perform query
738
        decltype(ConnectorPtr->select(select_columns)) selected;
739
        if (dates.empty() && ids.empty()) {
X
xj.lin 已提交
740
            auto filter = where(match_tableid and match_type);
741
            selected = ConnectorPtr->select(select_columns, filter);
742
        } else if (dates.empty() && !ids.empty()) {
X
xj.lin 已提交
743
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
744
            auto filter = where(match_tableid and match_fileid and match_type);
745
            selected = ConnectorPtr->select(select_columns, filter);
746 747 748 749 750 751 752 753
        } else if (!dates.empty() && ids.empty()) {
            for(auto& batch_dates : split_dates) {
                if(batch_dates.empty()) {
                    continue;
                }
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
754
                for (auto &file : batch_selected) {
755 756 757 758 759 760 761 762 763 764 765 766 767
                    selected.push_back(file);
                }
            }

        } else if (!dates.empty() && !ids.empty()) {
            for(auto& batch_dates : split_dates) {
                if(batch_dates.empty()) {
                    continue;
                }
                auto match_fileid = in(&TableFileSchema::id_, ids);
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_fileid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
768
                for (auto &file : batch_selected) {
769 770 771
                    selected.push_back(file);
                }
            }
X
xj.lin 已提交
772 773
        }

S
starlord 已提交
774
        Status ret;
X
xj.lin 已提交
775
        TableFileSchema table_file;
776
        for (auto &file : selected) {
X
xj.lin 已提交
777 778 779 780
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
781 782 783 784
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
785
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
786
            table_file.index_file_size_ = table_schema.index_file_size_;
787
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
788 789
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
790
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
791
            if (!status.ok()) {
S
starlord 已提交
792
                ret = status;
S
starlord 已提交
793 794
            }

X
xj.lin 已提交
795 796 797 798 799 800
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }
S
starlord 已提交
801
        if (files.empty()) {
S
starlord 已提交
802
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
803
        }
S
starlord 已提交
804

S
starlord 已提交
805
        if (selected.size() > 0) {
806 807
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
808
        return ret;
X
xj.lin 已提交
809
    } catch (std::exception &e) {
S
starlord 已提交
810
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
811 812 813
    }
}

S
starlord 已提交
814 815 816
Status
SqliteMetaImpl::FilesToMerge(const std::string &table_id,
                             DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
817
    files.clear();
X
Xu Peng 已提交
818

819
    try {
Y
Yu Kun 已提交
820
        server::MetricCollector metric;
G
groot 已提交
821

S
starlord 已提交
822 823 824 825 826 827 828 829 830
        //check table existence
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        //get files to merge
G
groot 已提交
831 832 833 834
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
835
                                                     &TableFileSchema::file_size_,
S
starlord 已提交
836 837 838
                                                     &TableFileSchema::row_count_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::created_on_),
G
groot 已提交
839
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
840
                                                 c(&TableFileSchema::table_id_) == table_id),
841
                                             order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
842

S
starlord 已提交
843
        Status result;
844
        for (auto &file : selected) {
S
starlord 已提交
845 846
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
S
starlord 已提交
847
            if (table_file.file_size_ >= table_schema.index_file_size_) {
S
starlord 已提交
848 849 850
                continue;//skip large file
            }

G
groot 已提交
851 852 853 854
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
855 856 857
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
858
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
859
            table_file.index_file_size_ = table_schema.index_file_size_;
860
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
861 862
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
863
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
864
            if (!status.ok()) {
S
starlord 已提交
865
                result = status;
S
starlord 已提交
866 867
            }

G
groot 已提交
868
            auto dateItr = files.find(table_file.date_);
869
            if (dateItr == files.end()) {
G
groot 已提交
870
                files[table_file.date_] = TableFilesSchema();
871
            }
G
groot 已提交
872
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
873
        }
S
starlord 已提交
874

S
starlord 已提交
875
        if (selected.size() > 0) {
876 877
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-merge files";
        }
S
starlord 已提交
878
        return result;
879
    } catch (std::exception &e) {
S
starlord 已提交
880
        return HandleException("Encounter exception when iterate merge files", e.what());
X
Xu Peng 已提交
881
    }
X
Xu Peng 已提交
882 883
}

S
starlord 已提交
884 885 886 887
Status
SqliteMetaImpl::GetTableFiles(const std::string &table_id,
                              const std::vector<size_t> &ids,
                              TableFilesSchema &table_files) {
X
Xu Peng 已提交
888
    try {
889
        table_files.clear();
Y
yu yunfeng 已提交
890 891
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
G
groot 已提交
892
                                                  &TableFileSchema::file_type_,
893 894
                                                  &TableFileSchema::file_size_,
                                                  &TableFileSchema::row_count_,
895
                                                  &TableFileSchema::date_,
S
starlord 已提交
896 897
                                                  &TableFileSchema::engine_type_,
                                                  &TableFileSchema::created_on_),
898
                                          where(c(&TableFileSchema::table_id_) == table_id and
S
starlord 已提交
899 900
                                              in(&TableFileSchema::id_, ids) and
                                              c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
901 902 903 904 905 906 907 908

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
909
        Status result;
910 911
        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
912
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
913 914 915
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
916 917 918 919
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
S
starlord 已提交
920
            file_schema.created_on_ = std::get<7>(file);
921
            file_schema.dimension_ = table_schema.dimension_;
922 923 924
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
925

S
starlord 已提交
926
            utils::GetTableFilePath(options_, file_schema);
927 928

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
929
        }
S
starlord 已提交
930

931
        ENGINE_LOG_DEBUG << "Get table files by id";
S
starlord 已提交
932
        return result;
X
Xu Peng 已提交
933
    } catch (std::exception &e) {
S
starlord 已提交
934
        return HandleException("Encounter exception when lookup table files", e.what());
X
Xu Peng 已提交
935
    }
X
Xu Peng 已提交
936 937
}

S
starlord 已提交
938
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
939 940
Status
SqliteMetaImpl::Archive() {
S
starlord 已提交
941
    auto &criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
942 943 944 945 946
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
947 948
        auto &criteria = kv.first;
        auto &limit = kv.second;
G
groot 已提交
949
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
S
starlord 已提交
950 951
            int64_t usecs = limit * D_SEC * US_PS;
            int64_t now = utils::GetMicroSecTimeStamp();
952
            try {
953 954 955
                //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
956
                ConnectorPtr->update_all(
957
                    set(
S
starlord 已提交
958
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE),
959
                    where(
S
starlord 已提交
960 961
                        c(&TableFileSchema::created_on_) < (int64_t) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
962
            } catch (std::exception &e) {
S
starlord 已提交
963
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
964
            }
965 966

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
967
        }
G
groot 已提交
968
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
969
            uint64_t sum = 0;
X
Xu Peng 已提交
970
            Size(sum);
X
Xu Peng 已提交
971

S
starlord 已提交
972
            int64_t to_delete = (int64_t) sum - limit * G;
X
Xu Peng 已提交
973
            DiscardFiles(to_delete);
974 975

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
976 977 978 979 980 981
        }
    }

    return Status::OK();
}

S
starlord 已提交
982 983
Status
SqliteMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
984
    result = 0;
X
Xu Peng 已提交
985
    try {
986
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
S
starlord 已提交
987 988
                                             where(
                                                 c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
989 990 991
        for (auto &total_size : selected) {
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
992
            }
993
            result += (uint64_t) (*std::get<0>(total_size));
X
Xu Peng 已提交
994
        }
995
    } catch (std::exception &e) {
S
starlord 已提交
996
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
997 998 999 1000 1001
    }

    return Status::OK();
}

S
starlord 已提交
1002 1003
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
X
Xu Peng 已提交
1004 1005 1006
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
1007

G
groot 已提交
1008
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
1009

X
Xu Peng 已提交
1010
    try {
Y
Yu Kun 已提交
1011
        server::MetricCollector metric;
G
groot 已提交
1012

1013 1014 1015
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1016 1017
        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
1018
                                                         &TableFileSchema::file_size_),
G
groot 已提交
1019
                                                 where(c(&TableFileSchema::file_type_)
S
starlord 已提交
1020
                                                           != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
1021 1022
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
1023

G
groot 已提交
1024 1025
            std::vector<int> ids;
            TableFileSchema table_file;
1026

G
groot 已提交
1027 1028 1029
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
1030
                table_file.file_size_ = std::get<1>(file);
G
groot 已提交
1031 1032
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
1033 1034
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
G
groot 已提交
1035
            }
1036

G
groot 已提交
1037 1038 1039
            if (ids.size() == 0) {
                return true;
            }
1040

G
groot 已提交
1041
            ConnectorPtr->update_all(
S
starlord 已提交
1042 1043 1044 1045 1046
                set(
                    c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                where(
                    in(&TableFileSchema::id_, ids)));
G
groot 已提交
1047 1048 1049 1050 1051

            return true;
        });

        if (!commited) {
S
starlord 已提交
1052
            return HandleException("DiscardFiles error: sqlite transaction failed");
G
groot 已提交
1053
        }
1054
    } catch (std::exception &e) {
S
starlord 已提交
1055
        return HandleException("Encounter exception when discard table file", e.what());
X
Xu Peng 已提交
1056 1057
    }

X
Xu Peng 已提交
1058
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
1059 1060
}

S
starlord 已提交
1061 1062
Status
SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
1063
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
1064
    try {
Y
Yu Kun 已提交
1065
        server::MetricCollector metric;
G
groot 已提交
1066

1067 1068 1069
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1070 1071 1072 1073 1074
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
S
starlord 已提交
1075
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int) TableSchema::TO_DELETE) {
G
groot 已提交
1076 1077 1078
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
1079
        ConnectorPtr->update(file_schema);
G
groot 已提交
1080

1081
        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
1082
    } catch (std::exception &e) {
G
groot 已提交
1083 1084
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
S
starlord 已提交
1085
        return HandleException(msg, e.what());
X
Xu Peng 已提交
1086
    }
X
Xu Peng 已提交
1087
    return Status::OK();
X
Xu Peng 已提交
1088 1089
}

S
starlord 已提交
1090 1091
Status
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
P
peng.xu 已提交
1092
    try {
Y
Yu Kun 已提交
1093
        server::MetricCollector metric;
1094 1095 1096 1097

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

P
peng.xu 已提交
1098 1099
        ConnectorPtr->update_all(
            set(
S
starlord 已提交
1100
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX),
P
peng.xu 已提交
1101 1102
            where(
                c(&TableFileSchema::table_id_) == table_id and
S
starlord 已提交
1103
                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW));
1104 1105

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
P
peng.xu 已提交
1106
    } catch (std::exception &e) {
S
starlord 已提交
1107
        return HandleException("Encounter exception when update table files to to_index", e.what());
P
peng.xu 已提交
1108 1109 1110 1111 1112
    }

    return Status::OK();
}

S
starlord 已提交
1113 1114
Status
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
1115
    try {
Y
Yu Kun 已提交
1116
        server::MetricCollector metric;
G
groot 已提交
1117

1118 1119 1120
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1121 1122
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
S
starlord 已提交
1123
            if (has_tables.find(file.table_id_) != has_tables.end()) {
G
groot 已提交
1124 1125 1126 1127
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
S
starlord 已提交
1128 1129
                                                         and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if (tables.size() >= 1) {
G
groot 已提交
1130 1131 1132 1133 1134 1135
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

1136 1137
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
S
starlord 已提交
1138
                if (!has_tables[file.table_id_]) {
G
groot 已提交
1139 1140 1141
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
1142
                file.updated_time_ = utils::GetMicroSecTimeStamp();
1143 1144 1145 1146
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
1147

1148
        if (!commited) {
S
starlord 已提交
1149
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
X
Xu Peng 已提交
1150
        }
G
groot 已提交
1151

1152
        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
1153
    } catch (std::exception &e) {
S
starlord 已提交
1154
        return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1155
    }
1156 1157 1158
    return Status::OK();
}

S
starlord 已提交
1159 1160
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
1161
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1162 1163 1164
    std::set<std::string> table_ids;

    //remove to_delete files
1165
    try {
Y
Yu Kun 已提交
1166
        server::MetricCollector metric;
1167

1168 1169 1170
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1171 1172 1173 1174 1175
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
S
starlord 已提交
1176
                                              c(&TableFileSchema::file_type_) ==
G
groot 已提交
1177 1178
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
S
starlord 已提交
1179 1180
                                                      c(&TableFileSchema::updated_time_)
                                                          < now - seconds * US_PS));
1181

G
groot 已提交
1182 1183 1184 1185 1186 1187 1188 1189
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1190
                utils::DeleteTableFilePath(options_, table_file);
1191
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1192 1193
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1194
                table_ids.insert(table_file.table_id_);
1195
            }
G
groot 已提交
1196 1197 1198 1199
            return true;
        });

        if (!commited) {
S
starlord 已提交
1200
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1201 1202
        }

S
starlord 已提交
1203
        if (files.size() > 0) {
1204 1205
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
        }
G
groot 已提交
1206
    } catch (std::exception &e) {
S
starlord 已提交
1207
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1208 1209
    }

S
starlord 已提交
1210
    //remove to_delete tables
G
groot 已提交
1211
    try {
Y
Yu Kun 已提交
1212
        server::MetricCollector metric;
G
groot 已提交
1213

1214 1215 1216
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1217 1218 1219 1220 1221 1222
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
S
starlord 已提交
1223
                utils::DeleteTablePath(options_, std::get<1>(table), false);//only delete empty folder
G
groot 已提交
1224
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1225
            }
G
groot 已提交
1226 1227 1228 1229 1230

            return true;
        });

        if (!commited) {
S
starlord 已提交
1231
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1232
        }
G
groot 已提交
1233

S
starlord 已提交
1234
        if (tables.size() > 0) {
1235 1236
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
1237
    } catch (std::exception &e) {
S
starlord 已提交
1238
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1239 1240
    }

S
starlord 已提交
1241 1242 1243
    //remove deleted table folder
    //don't remove table folder until all its files has been deleted
    try {
Y
Yu Kun 已提交
1244
        server::MetricCollector metric;
S
starlord 已提交
1245

S
starlord 已提交
1246
        for (auto &table_id : table_ids) {
S
starlord 已提交
1247 1248
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1249
            if (selected.size() == 0) {
S
starlord 已提交
1250 1251 1252 1253
                utils::DeleteTablePath(options_, table_id);
            }
        }

S
starlord 已提交
1254
        if (table_ids.size() > 0) {
1255 1256
            ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
        }
S
starlord 已提交
1257
    } catch (std::exception &e) {
S
starlord 已提交
1258
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1259 1260
    }

X
Xu Peng 已提交
1261 1262 1263
    return Status::OK();
}

S
starlord 已提交
1264 1265
Status
SqliteMetaImpl::CleanUp() {
1266
    try {
Y
Yu Kun 已提交
1267
        server::MetricCollector metric;
1268 1269 1270 1271

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

S
starlord 已提交
1272
        std::vector<int> file_types = {
S
starlord 已提交
1273 1274 1275
            (int) TableFileSchema::NEW,
            (int) TableFileSchema::NEW_INDEX,
            (int) TableFileSchema::NEW_MERGE
S
starlord 已提交
1276
        };
S
starlord 已提交
1277 1278
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1279

G
groot 已提交
1280 1281 1282 1283
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
1284
            }
G
groot 已提交
1285 1286 1287 1288
            return true;
        });

        if (!commited) {
S
starlord 已提交
1289
            return HandleException("CleanUp error: sqlite transaction failed");
X
Xu Peng 已提交
1290
        }
G
groot 已提交
1291

S
starlord 已提交
1292
        if (files.size() > 0) {
1293 1294
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
        }
1295
    } catch (std::exception &e) {
S
starlord 已提交
1296
        return HandleException("Encounter exception when clean table file", e.what());
X
Xu Peng 已提交
1297 1298 1299 1300 1301
    }

    return Status::OK();
}

S
starlord 已提交
1302 1303
Status
SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
1304
    try {
Y
Yu Kun 已提交
1305
        server::MetricCollector metric;
1306

S
starlord 已提交
1307
        std::vector<int> file_types = {
S
starlord 已提交
1308 1309 1310
            (int) TableFileSchema::RAW,
            (int) TableFileSchema::TO_INDEX,
            (int) TableFileSchema::INDEX
S
starlord 已提交
1311
        };
1312
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
S
starlord 已提交
1313
                                             where(in(&TableFileSchema::file_type_, file_types)
S
starlord 已提交
1314
                                                       and c(&TableFileSchema::table_id_) == table_id));
1315

1316
        TableSchema table_schema;
G
groot 已提交
1317
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1318
        auto status = DescribeTable(table_schema);
1319

1320 1321 1322 1323 1324
        if (!status.ok()) {
            return status;
        }

        result = 0;
1325
        for (auto &file : selected) {
1326 1327
            result += std::get<0>(file);
        }
1328
    } catch (std::exception &e) {
S
starlord 已提交
1329
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1330 1331 1332 1333
    }
    return Status::OK();
}

S
starlord 已提交
1334 1335
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1336 1337 1338
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1339 1340
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
S
starlord 已提交
1341
    } catch (std::exception &e) {
S
starlord 已提交
1342
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1343
    }
S
starlord 已提交
1344

X
Xu Peng 已提交
1345 1346 1347
    return Status::OK();
}

1348
} // namespace meta
X
Xu Peng 已提交
1349
} // namespace engine
J
jinhai 已提交
1350
} // namespace milvus
S
starlord 已提交
1351