SqliteMetaImpl.cpp 49.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/meta/SqliteMetaImpl.h"
Y
youny626 已提交
19
#include "MetaConsts.h"
S
starlord 已提交
20 21
#include "db/IDGenerator.h"
#include "db/Utils.h"
22
#include "metrics/Metrics.h"
Y
youny626 已提交
23 24
#include "utils/Exception.h"
#include "utils/Log.h"
X
Xu Peng 已提交
25

Y
youny626 已提交
26
#include <sqlite_orm.h>
X
Xu Peng 已提交
27
#include <unistd.h>
X
Xu Peng 已提交
28
#include <boost/filesystem.hpp>
29
#include <chrono>
X
Xu Peng 已提交
30
#include <fstream>
Y
youny626 已提交
31
#include <iostream>
S
starlord 已提交
32
#include <map>
Y
youny626 已提交
33
#include <memory>
S
starlord 已提交
34
#include <set>
Y
youny626 已提交
35
#include <sstream>
S
starlord 已提交
36

J
jinhai 已提交
37
namespace milvus {
X
Xu Peng 已提交
38
namespace engine {
39
namespace meta {
X
Xu Peng 已提交
40

X
Xu Peng 已提交
41 42
using namespace sqlite_orm;

G
groot 已提交
43 44
namespace {

S
starlord 已提交
45
Status
Y
youny626 已提交
46
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
47
    if (what == nullptr) {
S
starlord 已提交
48 49 50 51 52 53 54
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
55 56
}

Y
youny626 已提交
57
}  // namespace
G
groot 已提交
58

S
starlord 已提交
59
inline auto
Y
youny626 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
StoragePrototype(const std::string& path) {
    return make_storage(
        path,
        make_table(META_TABLES, make_column("id", &TableSchema::id_, primary_key()),
                   make_column("table_id", &TableSchema::table_id_, unique()),
                   make_column("state", &TableSchema::state_), make_column("dimension", &TableSchema::dimension_),
                   make_column("created_on", &TableSchema::created_on_),
                   make_column("flag", &TableSchema::flag_, default_value(0)),
                   make_column("index_file_size", &TableSchema::index_file_size_),
                   make_column("engine_type", &TableSchema::engine_type_), make_column("nlist", &TableSchema::nlist_),
                   make_column("metric_type", &TableSchema::metric_type_)),
        make_table(
            META_TABLEFILES, make_column("id", &TableFileSchema::id_, primary_key()),
            make_column("table_id", &TableFileSchema::table_id_),
            make_column("engine_type", &TableFileSchema::engine_type_),
            make_column("file_id", &TableFileSchema::file_id_), make_column("file_type", &TableFileSchema::file_type_),
            make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
            make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
            make_column("updated_time", &TableFileSchema::updated_time_),
            make_column("created_on", &TableFileSchema::created_on_), make_column("date", &TableFileSchema::date_)));
X
Xu Peng 已提交
80 81
}

X
Xu Peng 已提交
82
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
83 84
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
85
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
86 87 88 89 90 91
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
92
Status
Y
youny626 已提交
93
SqliteMetaImpl::NextTableId(std::string& table_id) {
G
groot 已提交
94
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
95 96
    std::stringstream ss;
    SimpleIDGenerator g;
97
    ss << g.GetNextIDNumber();
98
    table_id = ss.str();
99 100 101
    return Status::OK();
}

S
starlord 已提交
102
Status
Y
youny626 已提交
103
SqliteMetaImpl::NextFileId(std::string& file_id) {
G
groot 已提交
104
    std::lock_guard<std::mutex> lock(genid_mutex_);  // avoid duplicated id
X
Xu Peng 已提交
105 106
    std::stringstream ss;
    SimpleIDGenerator g;
107
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
108 109 110 111
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
112 113 114
void
SqliteMetaImpl::ValidateMetaSchema() {
    if (ConnectorPtr == nullptr) {
115 116 117
        return;
    }

Y
youny626 已提交
118
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
119
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
120 121
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
122 123
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
124 125
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
126 127 128 129
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
130 131
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
132 133
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
134
        if (!ret) {
S
starlord 已提交
135
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
136 137
            ENGINE_LOG_ERROR << msg;
            return Status(DB_INVALID_PATH, msg);
138
        }
X
Xu Peng 已提交
139
    }
X
Xu Peng 已提交
140

S
starlord 已提交
141
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
142

143
    ValidateMetaSchema();
144

X
Xu Peng 已提交
145
    ConnectorPtr->sync_schema();
Y
youny626 已提交
146 147
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
148

149
    CleanUp();
X
Xu Peng 已提交
150

X
Xu Peng 已提交
151
    return Status::OK();
X
Xu Peng 已提交
152 153
}

S
starlord 已提交
154
// TODO(myh): Delete single vecotor by id
S
starlord 已提交
155
Status
Y
youny626 已提交
156
SqliteMetaImpl::DropPartitionsByDates(const std::string& table_id, const DatesT& dates) {
157
    if (dates.empty()) {
X
Xu Peng 已提交
158 159 160
        return Status::OK();
    }

161
    TableSchema table_schema;
G
groot 已提交
162
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
163
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
164 165 166 167
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
168
    try {
Y
youny626 已提交
169 170
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
171 172 173
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
174
        for (DateT date : dates) {
175 176
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
177
            if (last_batch.size() > batch_size) {
178 179 180 181
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
182
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
183 184
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
185 186
        for (auto& batch_dates : split_dates) {
            if (batch_dates.empty()) {
187 188 189 190
                continue;
            }

            ConnectorPtr->update_all(
Y
youny626 已提交
191
                set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
192
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
Y
youny626 已提交
193
                where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::date_, batch_dates)));
194
        }
195 196

        ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;
Y
youny626 已提交
197
    } catch (std::exception& e) {
S
starlord 已提交
198
        return HandleException("Encounter exception when drop partition", e.what());
X
Xu Peng 已提交
199
    }
G
groot 已提交
200

X
Xu Peng 已提交
201 202 203
    return Status::OK();
}

S
starlord 已提交
204
Status
Y
youny626 已提交
205
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
206
    try {
Y
Yu Kun 已提交
207
        server::MetricCollector metric;
G
groot 已提交
208

Y
youny626 已提交
209
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
210 211
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
212 213
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
214 215
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
S
starlord 已提交
216
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
G
groot 已提交
217
            if (table.size() == 1) {
S
starlord 已提交
218
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
S
starlord 已提交
219
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
G
groot 已提交
220
                } else {
221
                    // Change from no error to already exist.
S
starlord 已提交
222
                    return Status(DB_ALREADY_EXIST, "Table already exists");
G
groot 已提交
223
                }
G
groot 已提交
224
            }
G
groot 已提交
225
        }
G
groot 已提交
226

G
groot 已提交
227 228 229
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
230
        try {
231
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
232
            table_schema.id_ = id;
Y
youny626 已提交
233
        } catch (std::exception& e) {
S
starlord 已提交
234
            return HandleException("Encounter exception when create table", e.what());
X
Xu Peng 已提交
235
        }
236

237 238
        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

S
starlord 已提交
239
        return utils::CreateTablePath(options_, table_schema.table_id_);
Y
youny626 已提交
240
    } catch (std::exception& e) {
S
starlord 已提交
241
        return HandleException("Encounter exception when create table", e.what());
242
    }
X
Xu Peng 已提交
243 244
}

S
starlord 已提交
245
Status
Y
youny626 已提交
246
SqliteMetaImpl::DeleteTable(const std::string& table_id) {
G
groot 已提交
247
    try {
Y
Yu Kun 已提交
248
        server::MetricCollector metric;
G
groot 已提交
249

Y
youny626 已提交
250
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
251 252
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
253
        // soft delete table
S
starlord 已提交
254
        ConnectorPtr->update_all(
Y
youny626 已提交
255 256
            set(c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
257

258
        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
Y
youny626 已提交
259
    } catch (std::exception& e) {
S
starlord 已提交
260
        return HandleException("Encounter exception when delete table", e.what());
G
groot 已提交
261 262 263 264 265
    }

    return Status::OK();
}

S
starlord 已提交
266
Status
Y
youny626 已提交
267
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
268
    try {
Y
Yu Kun 已提交
269
        server::MetricCollector metric;
G
groot 已提交
270

Y
youny626 已提交
271
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
272 273
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
274 275 276 277 278
        // soft delete table files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
279

280
        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
Y
youny626 已提交
281
    } catch (std::exception& e) {
S
starlord 已提交
282
        return HandleException("Encounter exception when delete table files", e.what());
G
groot 已提交
283 284 285 286 287
    }

    return Status::OK();
}

S
starlord 已提交
288
Status
Y
youny626 已提交
289
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
290
    try {
Y
Yu Kun 已提交
291
        server::MetricCollector metric;
G
groot 已提交
292

Y
youny626 已提交
293 294 295 296 297 298
        auto groups =
            ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_,
                                         &TableSchema::created_on_, &TableSchema::flag_, &TableSchema::index_file_size_,
                                         &TableSchema::engine_type_, &TableSchema::nlist_, &TableSchema::metric_type_),
                                 where(c(&TableSchema::table_id_) == table_schema.table_id_ and
                                       c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
299

300
        if (groups.size() == 1) {
G
groot 已提交
301
            table_schema.id_ = std::get<0>(groups[0]);
S
starlord 已提交
302 303 304
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
S
starlord 已提交
305
            table_schema.flag_ = std::get<4>(groups[0]);
306 307 308
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
            table_schema.nlist_ = std::get<7>(groups[0]);
S
starlord 已提交
309
            table_schema.metric_type_ = std::get<8>(groups[0]);
310
        } else {
S
starlord 已提交
311
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
312
        }
Y
youny626 已提交
313
    } catch (std::exception& e) {
S
starlord 已提交
314
        return HandleException("Encounter exception when describe table", e.what());
X
Xu Peng 已提交
315
    }
X
Xu Peng 已提交
316

X
Xu Peng 已提交
317
    return Status::OK();
X
Xu Peng 已提交
318 319
}

S
starlord 已提交
320
Status
Y
youny626 已提交
321 322
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
                            std::vector<std::string>& file_ids) {
S
starlord 已提交
323
    if (file_types.empty()) {
S
starlord 已提交
324
        return Status(DB_ERROR, "file types array is empty");
325 326
    }

P
peng.xu 已提交
327
    try {
328
        file_ids.clear();
Y
youny626 已提交
329 330 331
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::file_id_, &TableFileSchema::file_type_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
P
peng.xu 已提交
332 333

        if (selected.size() >= 1) {
334 335
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
Y
youny626 已提交
336
            for (auto& file : selected) {
337
                file_ids.push_back(std::get<0>(file));
338
                switch (std::get<1>(file)) {
Y
youny626 已提交
339 340 341 342 343
                    case (int)TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int)TableFileSchema::NEW:
                        new_count++;
344
                        break;
Y
youny626 已提交
345 346
                    case (int)TableFileSchema::NEW_MERGE:
                        new_merge_count++;
347
                        break;
Y
youny626 已提交
348 349
                    case (int)TableFileSchema::NEW_INDEX:
                        new_index_count++;
350
                        break;
Y
youny626 已提交
351 352
                    case (int)TableFileSchema::TO_INDEX:
                        to_index_count++;
353
                        break;
Y
youny626 已提交
354 355
                    case (int)TableFileSchema::INDEX:
                        index_count++;
356
                        break;
Y
youny626 已提交
357 358
                    case (int)TableFileSchema::BACKUP:
                        backup_count++;
359
                        break;
Y
youny626 已提交
360
                    default:
361 362 363 364 365
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
366 367 368
                             << " new files:" << new_count << " new_merge files:" << new_merge_count
                             << " new_index files:" << new_index_count << " to_index files:" << to_index_count
                             << " index files:" << index_count << " backup files:" << backup_count;
P
peng.xu 已提交
369
        }
Y
youny626 已提交
370
    } catch (std::exception& e) {
S
starlord 已提交
371
        return HandleException("Encounter exception when check non index files", e.what());
P
peng.xu 已提交
372 373 374 375
    }
    return Status::OK();
}

S
starlord 已提交
376
Status
Y
youny626 已提交
377
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
378
    try {
Y
Yu Kun 已提交
379
        server::MetricCollector metric;
380

Y
youny626 已提交
381
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
382 383
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
384 385 386 387
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
388

S
starlord 已提交
389
        if (tables.size() > 0) {
390 391 392 393 394 395
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
396
            table_schema.flag_ = std::get<4>(tables[0]);
397
            table_schema.index_file_size_ = std::get<5>(tables[0]);
398
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
399 400
            table_schema.nlist_ = index.nlist_;
            table_schema.metric_type_ = index.metric_type_;
401 402 403

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
404
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
405 406
        }

Y
youny626 已提交
407 408 409 410 411
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
412

413
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
414
    } catch (std::exception& e) {
415
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
416
        return HandleException(msg, e.what());
417
    }
S
starlord 已提交
418 419 420 421

    return Status::OK();
}

S
starlord 已提交
422
Status
Y
youny626 已提交
423
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
S
starlord 已提交
424
    try {
Y
Yu Kun 已提交
425
        server::MetricCollector metric;
S
starlord 已提交
426

Y
youny626 已提交
427 428
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableSchema::flag_) = flag), where(c(&TableSchema::table_id_) == table_id));
429
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
Y
youny626 已提交
430
    } catch (std::exception& e) {
S
starlord 已提交
431
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
S
starlord 已提交
432
        return HandleException(msg, e.what());
S
starlord 已提交
433 434
    }

435 436 437
    return Status::OK();
}

S
starlord 已提交
438
Status
Y
youny626 已提交
439
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
440
    try {
Y
Yu Kun 已提交
441
        server::MetricCollector metric;
442

Y
youny626 已提交
443 444 445
        auto groups = ConnectorPtr->select(
            columns(&TableSchema::engine_type_, &TableSchema::nlist_, &TableSchema::metric_type_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
446 447 448

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
449
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
450
            index.metric_type_ = std::get<2>(groups[0]);
451
        } else {
S
starlord 已提交
452
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
453
        }
Y
youny626 已提交
454
    } catch (std::exception& e) {
S
starlord 已提交
455
        return HandleException("Encounter exception when describe index", e.what());
456 457 458 459 460
    }

    return Status::OK();
}

S
starlord 已提交
461
Status
Y
youny626 已提交
462
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
463
    try {
Y
Yu Kun 已提交
464
        server::MetricCollector metric;
465

Y
youny626 已提交
466
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
467 468
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481
        // soft delete index files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));

        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));

        // set table index type to raw
482
        ConnectorPtr->update_all(
Y
youny626 已提交
483
            set(c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE, c(&TableSchema::nlist_) = DEFAULT_NLIST,
S
starlord 已提交
484
                c(&TableSchema::metric_type_) = DEFAULT_METRIC_TYPE),
Y
youny626 已提交
485
            where(c(&TableSchema::table_id_) == table_id));
486

487
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
Y
youny626 已提交
488
    } catch (std::exception& e) {
S
starlord 已提交
489
        return HandleException("Encounter exception when delete table index files", e.what());
490 491 492 493 494
    }

    return Status::OK();
}

S
starlord 已提交
495
Status
Y
youny626 已提交
496
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
497
    has_or_not = false;
498

G
groot 已提交
499
    try {
Y
Yu Kun 已提交
500
        server::MetricCollector metric;
Y
youny626 已提交
501 502 503
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
504
        if (tables.size() == 1) {
505 506 507 508
            has_or_not = true;
        } else {
            has_or_not = false;
        }
Y
youny626 已提交
509
    } catch (std::exception& e) {
S
starlord 已提交
510
        return HandleException("Encounter exception when lookup table", e.what());
G
groot 已提交
511
    }
G
groot 已提交
512

G
groot 已提交
513 514 515
    return Status::OK();
}

S
starlord 已提交
516
Status
Y
youny626 已提交
517
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
518
    try {
Y
Yu Kun 已提交
519
        server::MetricCollector metric;
G
groot 已提交
520

Y
youny626 已提交
521 522 523 524 525 526
        auto selected =
            ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::dimension_,
                                         &TableSchema::created_on_, &TableSchema::flag_, &TableSchema::index_file_size_,
                                         &TableSchema::engine_type_, &TableSchema::nlist_, &TableSchema::metric_type_),
                                 where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
        for (auto& table : selected) {
G
groot 已提交
527 528 529
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
S
starlord 已提交
530 531 532
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
533 534 535
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
            schema.nlist_ = std::get<7>(table);
S
starlord 已提交
536
            schema.metric_type_ = std::get<8>(table);
G
groot 已提交
537 538 539

            table_schema_array.emplace_back(schema);
        }
Y
youny626 已提交
540
    } catch (std::exception& e) {
S
starlord 已提交
541
        return HandleException("Encounter exception when lookup all tables", e.what());
X
Xu Peng 已提交
542
    }
G
groot 已提交
543

X
Xu Peng 已提交
544
    return Status::OK();
X
Xu Peng 已提交
545 546
}

S
starlord 已提交
547
Status
Y
youny626 已提交
548
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
549
    if (file_schema.date_ == EmptyDate) {
550
        file_schema.date_ = utils::GetDate();
X
Xu Peng 已提交
551
    }
552
    TableSchema table_schema;
G
groot 已提交
553
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
554
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
555 556 557
    if (!status.ok()) {
        return status;
    }
558

G
groot 已提交
559
    try {
Y
Yu Kun 已提交
560
        server::MetricCollector metric;
G
groot 已提交
561 562 563

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
564 565
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
G
groot 已提交
566 567
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
568
        file_schema.index_file_size_ = table_schema.index_file_size_;
G
groot 已提交
569
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
570 571
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
572

Y
youny626 已提交
573
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
574 575
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
576 577 578
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

579
        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
S
starlord 已提交
580
        return utils::CreateTableFilePath(options_, file_schema);
Y
youny626 已提交
581
    } catch (std::exception& e) {
S
starlord 已提交
582
        return HandleException("Encounter exception when create table file", e.what());
583 584
    }

X
Xu Peng 已提交
585
    return Status::OK();
X
Xu Peng 已提交
586 587
}

S
starlord 已提交
588
Status
Y
youny626 已提交
589
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
X
Xu Peng 已提交
590
    files.clear();
X
Xu Peng 已提交
591

592
    try {
Y
Yu Kun 已提交
593
        server::MetricCollector metric;
G
groot 已提交
594

Y
youny626 已提交
595 596 597 598 599
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_INDEX));
600

601
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
602
        TableFileSchema table_file;
603

S
starlord 已提交
604
        Status ret;
Y
youny626 已提交
605
        for (auto& file : selected) {
G
groot 已提交
606 607 608 609
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
610 611 612 613 614
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
615

S
starlord 已提交
616
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
617
            if (!status.ok()) {
S
starlord 已提交
618
                ret = status;
S
starlord 已提交
619
            }
G
groot 已提交
620
            auto groupItr = groups.find(table_file.table_id_);
621
            if (groupItr == groups.end()) {
622
                TableSchema table_schema;
G
groot 已提交
623
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
624
                auto status = DescribeTable(table_schema);
625 626 627
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
628
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
629
            }
630
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
631
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
632
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
633
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
X
Xu Peng 已提交
634
            files.push_back(table_file);
X
Xu Peng 已提交
635
        }
G
groot 已提交
636

S
starlord 已提交
637
        if (selected.size() > 0) {
638 639
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
S
starlord 已提交
640
        return ret;
Y
youny626 已提交
641
    } catch (std::exception& e) {
S
starlord 已提交
642
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
643 644 645
    }
}

S
starlord 已提交
646
Status
Y
youny626 已提交
647 648
SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& dates,
                              DatePartionedTableFilesSchema& files) {
X
xj.lin 已提交
649
    files.clear();
Y
Yu Kun 已提交
650
    server::MetricCollector metric;
X
xj.lin 已提交
651 652

    try {
Y
youny626 已提交
653 654 655 656
        auto select_columns =
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
657 658

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
659

Y
youny626 已提交
660 661
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
662
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
663 664 665 666

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
667 668 669
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
670

Y
youny626 已提交
671 672
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
673 674 675
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
676
        for (DateT date : dates) {
677 678
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
679
            if (last_batch.size() > batch_size) {
680 681 682 683
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
684
        // perform query
685
        decltype(ConnectorPtr->select(select_columns)) selected;
686
        if (dates.empty() && ids.empty()) {
X
xj.lin 已提交
687
            auto filter = where(match_tableid and match_type);
688
            selected = ConnectorPtr->select(select_columns, filter);
689
        } else if (dates.empty() && !ids.empty()) {
X
xj.lin 已提交
690
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
691
            auto filter = where(match_tableid and match_fileid and match_type);
692
            selected = ConnectorPtr->select(select_columns, filter);
693
        } else if (!dates.empty() && ids.empty()) {
Y
youny626 已提交
694 695
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
696 697 698 699 700
                    continue;
                }
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
701
                for (auto& file : batch_selected) {
702 703 704 705 706
                    selected.push_back(file);
                }
            }

        } else if (!dates.empty() && !ids.empty()) {
Y
youny626 已提交
707 708
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
709 710 711 712 713 714
                    continue;
                }
                auto match_fileid = in(&TableFileSchema::id_, ids);
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_fileid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
715
                for (auto& file : batch_selected) {
716 717 718
                    selected.push_back(file);
                }
            }
X
xj.lin 已提交
719 720
        }

S
starlord 已提交
721
        Status ret;
X
xj.lin 已提交
722
        TableFileSchema table_file;
Y
youny626 已提交
723
        for (auto& file : selected) {
X
xj.lin 已提交
724 725 726 727
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
728 729 730 731
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
732
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
733
            table_file.index_file_size_ = table_schema.index_file_size_;
734
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
735 736
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
737
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
738
            if (!status.ok()) {
S
starlord 已提交
739
                ret = status;
S
starlord 已提交
740 741
            }

X
xj.lin 已提交
742 743 744 745 746 747
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }
S
starlord 已提交
748
        if (files.empty()) {
S
starlord 已提交
749
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
750
        }
S
starlord 已提交
751

S
starlord 已提交
752
        if (selected.size() > 0) {
753 754
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
755
        return ret;
Y
youny626 已提交
756
    } catch (std::exception& e) {
S
starlord 已提交
757
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
758 759 760
    }
}

S
starlord 已提交
761
Status
Y
youny626 已提交
762
SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) {
X
Xu Peng 已提交
763
    files.clear();
X
Xu Peng 已提交
764

765
    try {
Y
Yu Kun 已提交
766
        server::MetricCollector metric;
G
groot 已提交
767

Y
youny626 已提交
768
        // check table existence
S
starlord 已提交
769 770 771 772 773 774 775
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
776 777 778 779 780 781 782 783
        // get files to merge
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
784

S
starlord 已提交
785
        Status result;
Y
youny626 已提交
786
        for (auto& file : selected) {
S
starlord 已提交
787 788
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
S
starlord 已提交
789
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
790
                continue;  // skip large file
S
starlord 已提交
791 792
            }

G
groot 已提交
793 794 795 796
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
797 798 799
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
800
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
801
            table_file.index_file_size_ = table_schema.index_file_size_;
802
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
803 804
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
805
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
806
            if (!status.ok()) {
S
starlord 已提交
807
                result = status;
S
starlord 已提交
808 809
            }

G
groot 已提交
810
            auto dateItr = files.find(table_file.date_);
811
            if (dateItr == files.end()) {
G
groot 已提交
812
                files[table_file.date_] = TableFilesSchema();
813
            }
G
groot 已提交
814
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
815
        }
S
starlord 已提交
816

S
starlord 已提交
817
        if (selected.size() > 0) {
818 819
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-merge files";
        }
S
starlord 已提交
820
        return result;
Y
youny626 已提交
821
    } catch (std::exception& e) {
S
starlord 已提交
822
        return HandleException("Encounter exception when iterate merge files", e.what());
X
Xu Peng 已提交
823
    }
X
Xu Peng 已提交
824 825
}

S
starlord 已提交
826
Status
Y
youny626 已提交
827 828
SqliteMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
                              TableFilesSchema& table_files) {
X
Xu Peng 已提交
829
    try {
830
        table_files.clear();
Y
youny626 已提交
831 832 833 834 835 836
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::file_id_, &TableFileSchema::file_type_,
                    &TableFileSchema::file_size_, &TableFileSchema::row_count_, &TableFileSchema::date_,
                    &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::id_, ids) and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
837 838 839 840 841 842 843 844

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
845
        Status result;
Y
youny626 已提交
846
        for (auto& file : files) {
847
            TableFileSchema file_schema;
G
groot 已提交
848
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
849 850 851
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
852 853 854 855
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
S
starlord 已提交
856
            file_schema.created_on_ = std::get<7>(file);
857
            file_schema.dimension_ = table_schema.dimension_;
858 859 860
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
861

S
starlord 已提交
862
            utils::GetTableFilePath(options_, file_schema);
863 864

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
865
        }
S
starlord 已提交
866

867
        ENGINE_LOG_DEBUG << "Get table files by id";
S
starlord 已提交
868
        return result;
Y
youny626 已提交
869
    } catch (std::exception& e) {
S
starlord 已提交
870
        return HandleException("Encounter exception when lookup table files", e.what());
X
Xu Peng 已提交
871
    }
X
Xu Peng 已提交
872 873
}

S
starlord 已提交
874
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
875 876
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
877
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
878 879 880 881 882
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
883 884
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
885
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
S
starlord 已提交
886 887
            int64_t usecs = limit * D_SEC * US_PS;
            int64_t now = utils::GetMicroSecTimeStamp();
888
            try {
Y
youny626 已提交
889
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
890 891
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
892 893 894 895
                ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
                                         where(c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                                               c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
            } catch (std::exception& e) {
S
starlord 已提交
896
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
897
            }
898 899

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
900
        }
G
groot 已提交
901
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
902
            uint64_t sum = 0;
X
Xu Peng 已提交
903
            Size(sum);
X
Xu Peng 已提交
904

Y
youny626 已提交
905
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
906
            DiscardFiles(to_delete);
907 908

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
909 910 911 912 913 914
        }
    }

    return Status::OK();
}

S
starlord 已提交
915
Status
Y
youny626 已提交
916
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
917
    result = 0;
X
Xu Peng 已提交
918
    try {
919
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
920 921
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
922 923
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
924
            }
Y
youny626 已提交
925
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
926
        }
Y
youny626 已提交
927
    } catch (std::exception& e) {
S
starlord 已提交
928
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
929 930 931 932 933
    }

    return Status::OK();
}

S
starlord 已提交
934 935
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
X
Xu Peng 已提交
936 937 938
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
939

G
groot 已提交
940
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
941

X
Xu Peng 已提交
942
    try {
Y
Yu Kun 已提交
943
        server::MetricCollector metric;
G
groot 已提交
944

Y
youny626 已提交
945
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
946 947
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
948
        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
949 950 951 952
            auto selected =
                ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::file_size_),
                                     where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE),
                                     order_by(&TableFileSchema::id_), limit(10));
X
Xu Peng 已提交
953

G
groot 已提交
954 955
            std::vector<int> ids;
            TableFileSchema table_file;
956

Y
youny626 已提交
957 958 959
            for (auto& file : selected) {
                if (to_discard_size <= 0)
                    break;
G
groot 已提交
960
                table_file.id_ = std::get<0>(file);
961
                table_file.file_size_ = std::get<1>(file);
G
groot 已提交
962 963
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
964 965
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
G
groot 已提交
966
            }
967

G
groot 已提交
968 969 970
            if (ids.size() == 0) {
                return true;
            }
971

Y
youny626 已提交
972 973 974
            ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                         c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                     where(in(&TableFileSchema::id_, ids)));
G
groot 已提交
975 976 977 978 979

            return true;
        });

        if (!commited) {
S
starlord 已提交
980
            return HandleException("DiscardFiles error: sqlite transaction failed");
G
groot 已提交
981
        }
Y
youny626 已提交
982
    } catch (std::exception& e) {
S
starlord 已提交
983
        return HandleException("Encounter exception when discard table file", e.what());
X
Xu Peng 已提交
984 985
    }

X
Xu Peng 已提交
986
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
987 988
}

S
starlord 已提交
989
Status
Y
youny626 已提交
990
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
991
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
992
    try {
Y
Yu Kun 已提交
993
        server::MetricCollector metric;
G
groot 已提交
994

Y
youny626 已提交
995
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
996 997
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
998 999 1000
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

Y
youny626 已提交
1001 1002 1003
        // if the table has been deleted, just mark the table file as TO_DELETE
        // clean thread will delete the file later
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
1004 1005 1006
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
1007
        ConnectorPtr->update(file_schema);
G
groot 已提交
1008

1009
        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
Y
youny626 已提交
1010 1011 1012
    } catch (std::exception& e) {
        std::string msg =
            "Exception update table file: table_id = " + file_schema.table_id_ + " file_id = " + file_schema.file_id_;
S
starlord 已提交
1013
        return HandleException(msg, e.what());
X
Xu Peng 已提交
1014
    }
X
Xu Peng 已提交
1015
    return Status::OK();
X
Xu Peng 已提交
1016 1017
}

S
starlord 已提交
1018
Status
Y
youny626 已提交
1019
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
P
peng.xu 已提交
1020
    try {
Y
Yu Kun 已提交
1021
        server::MetricCollector metric;
1022

Y
youny626 已提交
1023
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1024 1025
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1026 1027 1028
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
1029 1030

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
Y
youny626 已提交
1031
    } catch (std::exception& e) {
S
starlord 已提交
1032
        return HandleException("Encounter exception when update table files to to_index", e.what());
P
peng.xu 已提交
1033 1034 1035 1036 1037
    }

    return Status::OK();
}

S
starlord 已提交
1038
Status
Y
youny626 已提交
1039
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
1040
    try {
Y
Yu Kun 已提交
1041
        server::MetricCollector metric;
G
groot 已提交
1042

Y
youny626 已提交
1043
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1044 1045
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1046
        std::map<std::string, bool> has_tables;
Y
youny626 已提交
1047
        for (auto& file : files) {
S
starlord 已提交
1048
            if (has_tables.find(file.table_id_) != has_tables.end()) {
G
groot 已提交
1049 1050 1051
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
Y
youny626 已提交
1052 1053
                                               where(c(&TableSchema::table_id_) == file.table_id_ and
                                                     c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
S
starlord 已提交
1054
            if (tables.size() >= 1) {
G
groot 已提交
1055 1056 1057 1058 1059 1060
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

1061
        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1062
            for (auto& file : files) {
S
starlord 已提交
1063
                if (!has_tables[file.table_id_]) {
G
groot 已提交
1064 1065 1066
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
1067
                file.updated_time_ = utils::GetMicroSecTimeStamp();
1068 1069 1070 1071
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
1072

1073
        if (!commited) {
S
starlord 已提交
1074
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
X
Xu Peng 已提交
1075
        }
G
groot 已提交
1076

1077
        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
Y
youny626 已提交
1078
    } catch (std::exception& e) {
S
starlord 已提交
1079
        return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1080
    }
1081 1082 1083
    return Status::OK();
}

S
starlord 已提交
1084 1085
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
1086
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1087 1088
    std::set<std::string> table_ids;

Y
youny626 已提交
1089
    // remove to_delete files
1090
    try {
Y
Yu Kun 已提交
1091
        server::MetricCollector metric;
1092

Y
youny626 已提交
1093
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1094 1095
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1096 1097 1098 1099
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_, &TableFileSchema::date_),
                                          where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_DELETE and
                                                c(&TableFileSchema::updated_time_) < now - seconds * US_PS));
1100

G
groot 已提交
1101 1102
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1103
            for (auto& file : files) {
G
groot 已提交
1104 1105 1106 1107 1108
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1109
                utils::DeleteTableFilePath(options_, table_file);
1110
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1111 1112
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1113
                table_ids.insert(table_file.table_id_);
1114
            }
G
groot 已提交
1115 1116 1117 1118
            return true;
        });

        if (!commited) {
S
starlord 已提交
1119
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1120 1121
        }

S
starlord 已提交
1122
        if (files.size() > 0) {
1123 1124
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
        }
Y
youny626 已提交
1125
    } catch (std::exception& e) {
S
starlord 已提交
1126
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1127 1128
    }

Y
youny626 已提交
1129
    // remove to_delete tables
G
groot 已提交
1130
    try {
Y
Yu Kun 已提交
1131
        server::MetricCollector metric;
G
groot 已提交
1132

Y
youny626 已提交
1133
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1134 1135
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1136 1137
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1138 1139

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1140 1141
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1142
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1143
            }
G
groot 已提交
1144 1145 1146 1147 1148

            return true;
        });

        if (!commited) {
S
starlord 已提交
1149
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1150
        }
G
groot 已提交
1151

S
starlord 已提交
1152
        if (tables.size() > 0) {
1153 1154
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1155
    } catch (std::exception& e) {
S
starlord 已提交
1156
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1157 1158
    }

Y
youny626 已提交
1159 1160
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1161
    try {
Y
Yu Kun 已提交
1162
        server::MetricCollector metric;
S
starlord 已提交
1163

Y
youny626 已提交
1164
        for (auto& table_id : table_ids) {
S
starlord 已提交
1165 1166
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1167
            if (selected.size() == 0) {
S
starlord 已提交
1168 1169 1170 1171
                utils::DeleteTablePath(options_, table_id);
            }
        }

S
starlord 已提交
1172
        if (table_ids.size() > 0) {
1173 1174
            ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
        }
Y
youny626 已提交
1175
    } catch (std::exception& e) {
S
starlord 已提交
1176
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1177 1178
    }

X
Xu Peng 已提交
1179 1180 1181
    return Status::OK();
}

S
starlord 已提交
1182 1183
Status
SqliteMetaImpl::CleanUp() {
1184
    try {
Y
Yu Kun 已提交
1185
        server::MetricCollector metric;
1186

Y
youny626 已提交
1187
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1188 1189
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1190 1191
        std::vector<int> file_types = {(int)TableFileSchema::NEW, (int)TableFileSchema::NEW_INDEX,
                                       (int)TableFileSchema::NEW_MERGE};
S
starlord 已提交
1192 1193
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1194

G
groot 已提交
1195
        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1196
            for (auto& file : files) {
G
groot 已提交
1197 1198
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
1199
            }
G
groot 已提交
1200 1201 1202 1203
            return true;
        });

        if (!commited) {
S
starlord 已提交
1204
            return HandleException("CleanUp error: sqlite transaction failed");
X
Xu Peng 已提交
1205
        }
G
groot 已提交
1206

S
starlord 已提交
1207
        if (files.size() > 0) {
1208 1209
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
        }
Y
youny626 已提交
1210
    } catch (std::exception& e) {
S
starlord 已提交
1211
        return HandleException("Encounter exception when clean table file", e.what());
X
Xu Peng 已提交
1212 1213 1214 1215 1216
    }

    return Status::OK();
}

S
starlord 已提交
1217
Status
Y
youny626 已提交
1218
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1219
    try {
Y
Yu Kun 已提交
1220
        server::MetricCollector metric;
1221

Y
youny626 已提交
1222 1223 1224 1225 1226
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1227

1228
        TableSchema table_schema;
G
groot 已提交
1229
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1230
        auto status = DescribeTable(table_schema);
1231

1232 1233 1234 1235 1236
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1237
        for (auto& file : selected) {
1238 1239
            result += std::get<0>(file);
        }
Y
youny626 已提交
1240
    } catch (std::exception& e) {
S
starlord 已提交
1241
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1242 1243 1244 1245
    }
    return Status::OK();
}

S
starlord 已提交
1246 1247
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1248 1249 1250
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1251 1252
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1253
    } catch (std::exception& e) {
S
starlord 已提交
1254
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1255
    }
S
starlord 已提交
1256

X
Xu Peng 已提交
1257 1258 1259
    return Status::OK();
}

Y
youny626 已提交
1260 1261 1262
}  // namespace meta
}  // namespace engine
}  // namespace milvus