SqliteMetaImpl.cpp 48.8 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/meta/SqliteMetaImpl.h"
Y
youny626 已提交
19
#include "MetaConsts.h"
S
starlord 已提交
20 21
#include "db/IDGenerator.h"
#include "db/Utils.h"
22
#include "metrics/Metrics.h"
Y
youny626 已提交
23 24
#include "utils/Exception.h"
#include "utils/Log.h"
X
Xu Peng 已提交
25

Y
youny626 已提交
26
#include <sqlite_orm.h>
X
Xu Peng 已提交
27
#include <unistd.h>
X
Xu Peng 已提交
28
#include <boost/filesystem.hpp>
29
#include <chrono>
X
Xu Peng 已提交
30
#include <fstream>
Y
youny626 已提交
31
#include <iostream>
S
starlord 已提交
32
#include <map>
Y
youny626 已提交
33
#include <memory>
S
starlord 已提交
34
#include <set>
Y
youny626 已提交
35
#include <sstream>
S
starlord 已提交
36

J
jinhai 已提交
37
namespace milvus {
X
Xu Peng 已提交
38
namespace engine {
39
namespace meta {
X
Xu Peng 已提交
40

X
Xu Peng 已提交
41 42
using namespace sqlite_orm;

G
groot 已提交
43 44
namespace {

S
starlord 已提交
45
Status
Y
youny626 已提交
46
HandleException(const std::string& desc, const char* what = nullptr) {
S
starlord 已提交
47
    if (what == nullptr) {
S
starlord 已提交
48 49 50 51 52 53 54
        ENGINE_LOG_ERROR << desc;
        return Status(DB_META_TRANSACTION_FAILED, desc);
    } else {
        std::string msg = desc + ":" + what;
        ENGINE_LOG_ERROR << msg;
        return Status(DB_META_TRANSACTION_FAILED, msg);
    }
G
groot 已提交
55 56
}

Y
youny626 已提交
57
}  // namespace
G
groot 已提交
58

S
starlord 已提交
59
inline auto
Y
youny626 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
StoragePrototype(const std::string& path) {
    return make_storage(
        path,
        make_table(META_TABLES, make_column("id", &TableSchema::id_, primary_key()),
                   make_column("table_id", &TableSchema::table_id_, unique()),
                   make_column("state", &TableSchema::state_), make_column("dimension", &TableSchema::dimension_),
                   make_column("created_on", &TableSchema::created_on_),
                   make_column("flag", &TableSchema::flag_, default_value(0)),
                   make_column("index_file_size", &TableSchema::index_file_size_),
                   make_column("engine_type", &TableSchema::engine_type_), make_column("nlist", &TableSchema::nlist_),
                   make_column("metric_type", &TableSchema::metric_type_)),
        make_table(
            META_TABLEFILES, make_column("id", &TableFileSchema::id_, primary_key()),
            make_column("table_id", &TableFileSchema::table_id_),
            make_column("engine_type", &TableFileSchema::engine_type_),
            make_column("file_id", &TableFileSchema::file_id_), make_column("file_type", &TableFileSchema::file_type_),
            make_column("file_size", &TableFileSchema::file_size_, default_value(0)),
            make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
            make_column("updated_time", &TableFileSchema::updated_time_),
            make_column("created_on", &TableFileSchema::created_on_), make_column("date", &TableFileSchema::date_)));
X
Xu Peng 已提交
80 81
}

X
Xu Peng 已提交
82
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
83 84
static std::unique_ptr<ConnectorT> ConnectorPtr;

Y
youny626 已提交
85
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
86 87 88 89 90 91
    Initialize();
}

SqliteMetaImpl::~SqliteMetaImpl() {
}

S
starlord 已提交
92
Status
Y
youny626 已提交
93
SqliteMetaImpl::NextTableId(std::string& table_id) {
94 95
    std::stringstream ss;
    SimpleIDGenerator g;
96
    ss << g.GetNextIDNumber();
97
    table_id = ss.str();
98 99 100
    return Status::OK();
}

S
starlord 已提交
101
Status
Y
youny626 已提交
102
SqliteMetaImpl::NextFileId(std::string& file_id) {
X
Xu Peng 已提交
103 104
    std::stringstream ss;
    SimpleIDGenerator g;
105
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
106 107 108 109
    file_id = ss.str();
    return Status::OK();
}

S
starlord 已提交
110 111 112
void
SqliteMetaImpl::ValidateMetaSchema() {
    if (ConnectorPtr == nullptr) {
113 114 115
        return;
    }

Y
youny626 已提交
116
    // old meta could be recreated since schema changed, throw exception if meta schema is not compatible
117
    auto ret = ConnectorPtr->sync_schema_simulate();
Y
youny626 已提交
118 119
    if (ret.find(META_TABLES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
120 121
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
    }
Y
youny626 已提交
122 123
    if (ret.find(META_TABLEFILES) != ret.end() &&
        sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
124 125 126 127
        throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
    }
}

S
starlord 已提交
128 129
Status
SqliteMetaImpl::Initialize() {
S
starlord 已提交
130 131
    if (!boost::filesystem::is_directory(options_.path_)) {
        auto ret = boost::filesystem::create_directory(options_.path_);
132
        if (!ret) {
S
starlord 已提交
133
            std::string msg = "Failed to create db directory " + options_.path_;
S
starlord 已提交
134 135
            ENGINE_LOG_ERROR << msg;
            return Status(DB_INVALID_PATH, msg);
136
        }
X
Xu Peng 已提交
137
    }
X
Xu Peng 已提交
138

S
starlord 已提交
139
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
X
Xu Peng 已提交
140

141
    ValidateMetaSchema();
142

X
Xu Peng 已提交
143
    ConnectorPtr->sync_schema();
Y
youny626 已提交
144 145
    ConnectorPtr->open_forever();                          // thread safe option
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL);  // WAL => write ahead log
X
Xu Peng 已提交
146

147
    CleanUp();
X
Xu Peng 已提交
148

X
Xu Peng 已提交
149
    return Status::OK();
X
Xu Peng 已提交
150 151
}

S
starlord 已提交
152
// TODO(myh): Delete single vecotor by id
S
starlord 已提交
153
Status
Y
youny626 已提交
154
SqliteMetaImpl::DropPartitionsByDates(const std::string& table_id, const DatesT& dates) {
155
    if (dates.empty()) {
X
Xu Peng 已提交
156 157 158
        return Status::OK();
    }

159
    TableSchema table_schema;
G
groot 已提交
160
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
161
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
162 163 164 165
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
166
    try {
Y
youny626 已提交
167 168
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
169 170 171
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
172
        for (DateT date : dates) {
173 174
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
175
            if (last_batch.size() > batch_size) {
176 177 178 179
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
180
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
181 182
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
183 184
        for (auto& batch_dates : split_dates) {
            if (batch_dates.empty()) {
185 186 187 188
                continue;
            }

            ConnectorPtr->update_all(
Y
youny626 已提交
189
                set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
190
                    c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
Y
youny626 已提交
191
                where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::date_, batch_dates)));
192
        }
193 194

        ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;
Y
youny626 已提交
195
    } catch (std::exception& e) {
S
starlord 已提交
196
        return HandleException("Encounter exception when drop partition", e.what());
X
Xu Peng 已提交
197
    }
G
groot 已提交
198

X
Xu Peng 已提交
199 200 201
    return Status::OK();
}

S
starlord 已提交
202
Status
Y
youny626 已提交
203
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
G
groot 已提交
204
    try {
Y
Yu Kun 已提交
205
        server::MetricCollector metric;
G
groot 已提交
206

Y
youny626 已提交
207
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
208 209
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
210 211
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
212 213
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
S
starlord 已提交
214
                                              where(c(&TableSchema::table_id_) == table_schema.table_id_));
G
groot 已提交
215
            if (table.size() == 1) {
S
starlord 已提交
216
                if (TableSchema::TO_DELETE == std::get<0>(table[0])) {
S
starlord 已提交
217
                    return Status(DB_ERROR, "Table already exists and it is in delete state, please wait a second");
G
groot 已提交
218
                } else {
219
                    // Change from no error to already exist.
S
starlord 已提交
220
                    return Status(DB_ALREADY_EXIST, "Table already exists");
G
groot 已提交
221
                }
G
groot 已提交
222
            }
G
groot 已提交
223
        }
G
groot 已提交
224

G
groot 已提交
225 226 227
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
228
        try {
229
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
230
            table_schema.id_ = id;
Y
youny626 已提交
231
        } catch (std::exception& e) {
S
starlord 已提交
232
            return HandleException("Encounter exception when create table", e.what());
X
Xu Peng 已提交
233
        }
234

235 236
        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;

S
starlord 已提交
237
        return utils::CreateTablePath(options_, table_schema.table_id_);
Y
youny626 已提交
238
    } catch (std::exception& e) {
S
starlord 已提交
239
        return HandleException("Encounter exception when create table", e.what());
240
    }
X
Xu Peng 已提交
241 242
}

S
starlord 已提交
243
Status
Y
youny626 已提交
244
SqliteMetaImpl::DeleteTable(const std::string& table_id) {
G
groot 已提交
245
    try {
Y
Yu Kun 已提交
246
        server::MetricCollector metric;
G
groot 已提交
247

Y
youny626 已提交
248
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
249 250
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
251
        // soft delete table
S
starlord 已提交
252
        ConnectorPtr->update_all(
Y
youny626 已提交
253 254
            set(c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
255

256
        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
Y
youny626 已提交
257
    } catch (std::exception& e) {
S
starlord 已提交
258
        return HandleException("Encounter exception when delete table", e.what());
G
groot 已提交
259 260 261 262 263
    }

    return Status::OK();
}

S
starlord 已提交
264
Status
Y
youny626 已提交
265
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
G
groot 已提交
266
    try {
Y
Yu Kun 已提交
267
        server::MetricCollector metric;
G
groot 已提交
268

Y
youny626 已提交
269
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
270 271
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
272 273 274 275 276
        // soft delete table files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
G
groot 已提交
277

278
        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
Y
youny626 已提交
279
    } catch (std::exception& e) {
S
starlord 已提交
280
        return HandleException("Encounter exception when delete table files", e.what());
G
groot 已提交
281 282 283 284 285
    }

    return Status::OK();
}

S
starlord 已提交
286
Status
Y
youny626 已提交
287
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
288
    try {
Y
Yu Kun 已提交
289
        server::MetricCollector metric;
G
groot 已提交
290

Y
youny626 已提交
291 292 293 294 295 296
        auto groups =
            ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_,
                                         &TableSchema::created_on_, &TableSchema::flag_, &TableSchema::index_file_size_,
                                         &TableSchema::engine_type_, &TableSchema::nlist_, &TableSchema::metric_type_),
                                 where(c(&TableSchema::table_id_) == table_schema.table_id_ and
                                       c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
297

298
        if (groups.size() == 1) {
G
groot 已提交
299
            table_schema.id_ = std::get<0>(groups[0]);
S
starlord 已提交
300 301 302
            table_schema.state_ = std::get<1>(groups[0]);
            table_schema.dimension_ = std::get<2>(groups[0]);
            table_schema.created_on_ = std::get<3>(groups[0]);
S
starlord 已提交
303
            table_schema.flag_ = std::get<4>(groups[0]);
304 305 306
            table_schema.index_file_size_ = std::get<5>(groups[0]);
            table_schema.engine_type_ = std::get<6>(groups[0]);
            table_schema.nlist_ = std::get<7>(groups[0]);
S
starlord 已提交
307
            table_schema.metric_type_ = std::get<8>(groups[0]);
308
        } else {
S
starlord 已提交
309
            return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
310
        }
Y
youny626 已提交
311
    } catch (std::exception& e) {
S
starlord 已提交
312
        return HandleException("Encounter exception when describe table", e.what());
X
Xu Peng 已提交
313
    }
X
Xu Peng 已提交
314

X
Xu Peng 已提交
315
    return Status::OK();
X
Xu Peng 已提交
316 317
}

S
starlord 已提交
318
Status
Y
youny626 已提交
319 320
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
                            std::vector<std::string>& file_ids) {
S
starlord 已提交
321
    if (file_types.empty()) {
S
starlord 已提交
322
        return Status(DB_ERROR, "file types array is empty");
323 324
    }

P
peng.xu 已提交
325
    try {
326
        file_ids.clear();
Y
youny626 已提交
327 328 329
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::file_id_, &TableFileSchema::file_type_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
P
peng.xu 已提交
330 331

        if (selected.size() >= 1) {
332 333
            int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
            int to_index_count = 0, index_count = 0, backup_count = 0;
Y
youny626 已提交
334
            for (auto& file : selected) {
335
                file_ids.push_back(std::get<0>(file));
336
                switch (std::get<1>(file)) {
Y
youny626 已提交
337 338 339 340 341
                    case (int)TableFileSchema::RAW:
                        raw_count++;
                        break;
                    case (int)TableFileSchema::NEW:
                        new_count++;
342
                        break;
Y
youny626 已提交
343 344
                    case (int)TableFileSchema::NEW_MERGE:
                        new_merge_count++;
345
                        break;
Y
youny626 已提交
346 347
                    case (int)TableFileSchema::NEW_INDEX:
                        new_index_count++;
348
                        break;
Y
youny626 已提交
349 350
                    case (int)TableFileSchema::TO_INDEX:
                        to_index_count++;
351
                        break;
Y
youny626 已提交
352 353
                    case (int)TableFileSchema::INDEX:
                        index_count++;
354
                        break;
Y
youny626 已提交
355 356
                    case (int)TableFileSchema::BACKUP:
                        backup_count++;
357
                        break;
Y
youny626 已提交
358
                    default:
359 360 361 362 363
                        break;
                }
            }

            ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
364 365 366
                             << " new files:" << new_count << " new_merge files:" << new_merge_count
                             << " new_index files:" << new_index_count << " to_index files:" << to_index_count
                             << " index files:" << index_count << " backup files:" << backup_count;
P
peng.xu 已提交
367
        }
Y
youny626 已提交
368
    } catch (std::exception& e) {
S
starlord 已提交
369
        return HandleException("Encounter exception when check non index files", e.what());
P
peng.xu 已提交
370 371 372 373
    }
    return Status::OK();
}

S
starlord 已提交
374
Status
Y
youny626 已提交
375
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
376
    try {
Y
Yu Kun 已提交
377
        server::MetricCollector metric;
378

Y
youny626 已提交
379
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
380 381
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
382 383 384 385
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
386

S
starlord 已提交
387
        if (tables.size() > 0) {
388 389 390 391 392 393
            meta::TableSchema table_schema;
            table_schema.id_ = std::get<0>(tables[0]);
            table_schema.table_id_ = table_id;
            table_schema.state_ = std::get<1>(tables[0]);
            table_schema.dimension_ = std::get<2>(tables[0]);
            table_schema.created_on_ = std::get<3>(tables[0]);
S
starlord 已提交
394
            table_schema.flag_ = std::get<4>(tables[0]);
395
            table_schema.index_file_size_ = std::get<5>(tables[0]);
396
            table_schema.engine_type_ = index.engine_type_;
S
starlord 已提交
397 398
            table_schema.nlist_ = index.nlist_;
            table_schema.metric_type_ = index.metric_type_;
399 400 401

            ConnectorPtr->update(table_schema);
        } else {
S
starlord 已提交
402
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
403 404
        }

Y
youny626 已提交
405 406 407 408 409
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
410

411
        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
Y
youny626 已提交
412
    } catch (std::exception& e) {
413
        std::string msg = "Encounter exception when update table index: table_id = " + table_id;
S
starlord 已提交
414
        return HandleException(msg, e.what());
415
    }
S
starlord 已提交
416 417 418 419

    return Status::OK();
}

S
starlord 已提交
420
Status
Y
youny626 已提交
421
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
S
starlord 已提交
422
    try {
Y
Yu Kun 已提交
423
        server::MetricCollector metric;
S
starlord 已提交
424

Y
youny626 已提交
425 426
        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableSchema::flag_) = flag), where(c(&TableSchema::table_id_) == table_id));
427
        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
Y
youny626 已提交
428
    } catch (std::exception& e) {
S
starlord 已提交
429
        std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
S
starlord 已提交
430
        return HandleException(msg, e.what());
S
starlord 已提交
431 432
    }

433 434 435
    return Status::OK();
}

S
starlord 已提交
436
Status
Y
youny626 已提交
437
SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
438
    try {
Y
Yu Kun 已提交
439
        server::MetricCollector metric;
440

Y
youny626 已提交
441 442 443
        auto groups = ConnectorPtr->select(
            columns(&TableSchema::engine_type_, &TableSchema::nlist_, &TableSchema::metric_type_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
444 445 446

        if (groups.size() == 1) {
            index.engine_type_ = std::get<0>(groups[0]);
S
starlord 已提交
447
            index.nlist_ = std::get<1>(groups[0]);
S
starlord 已提交
448
            index.metric_type_ = std::get<2>(groups[0]);
449
        } else {
S
starlord 已提交
450
            return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
451
        }
Y
youny626 已提交
452
    } catch (std::exception& e) {
S
starlord 已提交
453
        return HandleException("Encounter exception when describe index", e.what());
454 455 456 457 458
    }

    return Status::OK();
}

S
starlord 已提交
459
Status
Y
youny626 已提交
460
SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
461
    try {
Y
Yu Kun 已提交
462
        server::MetricCollector metric;
463

Y
youny626 已提交
464
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
465 466
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479
        // soft delete index files
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));

        // set all backup file to raw
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
                                     c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));

        // set table index type to raw
480
        ConnectorPtr->update_all(
Y
youny626 已提交
481
            set(c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE, c(&TableSchema::nlist_) = DEFAULT_NLIST,
S
starlord 已提交
482
                c(&TableSchema::metric_type_) = DEFAULT_METRIC_TYPE),
Y
youny626 已提交
483
            where(c(&TableSchema::table_id_) == table_id));
484

485
        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
Y
youny626 已提交
486
    } catch (std::exception& e) {
S
starlord 已提交
487
        return HandleException("Encounter exception when delete table index files", e.what());
488 489 490 491 492
    }

    return Status::OK();
}

S
starlord 已提交
493
Status
Y
youny626 已提交
494
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
495
    has_or_not = false;
496

G
groot 已提交
497
    try {
Y
Yu Kun 已提交
498
        server::MetricCollector metric;
Y
youny626 已提交
499 500 501
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
502
        if (tables.size() == 1) {
503 504 505 506
            has_or_not = true;
        } else {
            has_or_not = false;
        }
Y
youny626 已提交
507
    } catch (std::exception& e) {
S
starlord 已提交
508
        return HandleException("Encounter exception when lookup table", e.what());
G
groot 已提交
509
    }
G
groot 已提交
510

G
groot 已提交
511 512 513
    return Status::OK();
}

S
starlord 已提交
514
Status
Y
youny626 已提交
515
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
G
groot 已提交
516
    try {
Y
Yu Kun 已提交
517
        server::MetricCollector metric;
G
groot 已提交
518

Y
youny626 已提交
519 520 521 522 523 524
        auto selected =
            ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::dimension_,
                                         &TableSchema::created_on_, &TableSchema::flag_, &TableSchema::index_file_size_,
                                         &TableSchema::engine_type_, &TableSchema::nlist_, &TableSchema::metric_type_),
                                 where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
        for (auto& table : selected) {
G
groot 已提交
525 526 527
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
S
starlord 已提交
528 529 530
            schema.dimension_ = std::get<2>(table);
            schema.created_on_ = std::get<3>(table);
            schema.flag_ = std::get<4>(table);
531 532 533
            schema.index_file_size_ = std::get<5>(table);
            schema.engine_type_ = std::get<6>(table);
            schema.nlist_ = std::get<7>(table);
S
starlord 已提交
534
            schema.metric_type_ = std::get<8>(table);
G
groot 已提交
535 536 537

            table_schema_array.emplace_back(schema);
        }
Y
youny626 已提交
538
    } catch (std::exception& e) {
S
starlord 已提交
539
        return HandleException("Encounter exception when lookup all tables", e.what());
X
Xu Peng 已提交
540
    }
G
groot 已提交
541

X
Xu Peng 已提交
542
    return Status::OK();
X
Xu Peng 已提交
543 544
}

S
starlord 已提交
545
Status
Y
youny626 已提交
546
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
547
    if (file_schema.date_ == EmptyDate) {
548
        file_schema.date_ = utils::GetDate();
X
Xu Peng 已提交
549
    }
550
    TableSchema table_schema;
G
groot 已提交
551
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
552
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
553 554 555
    if (!status.ok()) {
        return status;
    }
556

G
groot 已提交
557
    try {
Y
Yu Kun 已提交
558
        server::MetricCollector metric;
G
groot 已提交
559 560 561

        NextFileId(file_schema.file_id_);
        file_schema.dimension_ = table_schema.dimension_;
562 563
        file_schema.file_size_ = 0;
        file_schema.row_count_ = 0;
G
groot 已提交
564 565
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
566
        file_schema.index_file_size_ = table_schema.index_file_size_;
G
groot 已提交
567
        file_schema.engine_type_ = table_schema.engine_type_;
S
starlord 已提交
568 569
        file_schema.nlist_ = table_schema.nlist_;
        file_schema.metric_type_ = table_schema.metric_type_;
G
groot 已提交
570

Y
youny626 已提交
571
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
572 573
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
574 575 576
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

577
        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
S
starlord 已提交
578
        return utils::CreateTableFilePath(options_, file_schema);
Y
youny626 已提交
579
    } catch (std::exception& e) {
S
starlord 已提交
580
        return HandleException("Encounter exception when create table file", e.what());
581 582
    }

X
Xu Peng 已提交
583
    return Status::OK();
X
Xu Peng 已提交
584 585
}

S
starlord 已提交
586
Status
Y
youny626 已提交
587
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
X
Xu Peng 已提交
588
    files.clear();
X
Xu Peng 已提交
589

590
    try {
Y
Yu Kun 已提交
591
        server::MetricCollector metric;
G
groot 已提交
592

Y
youny626 已提交
593 594 595 596 597
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_INDEX));
598

599
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
600
        TableFileSchema table_file;
601

S
starlord 已提交
602
        Status ret;
Y
youny626 已提交
603
        for (auto& file : selected) {
G
groot 已提交
604 605 606 607
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
608 609 610 611 612
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
            table_file.created_on_ = std::get<8>(file);
G
groot 已提交
613

S
starlord 已提交
614
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
615
            if (!status.ok()) {
S
starlord 已提交
616
                ret = status;
S
starlord 已提交
617
            }
G
groot 已提交
618
            auto groupItr = groups.find(table_file.table_id_);
619
            if (groupItr == groups.end()) {
620
                TableSchema table_schema;
G
groot 已提交
621
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
622
                auto status = DescribeTable(table_schema);
623 624 625
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
626
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
627
            }
628
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
S
starlord 已提交
629
            table_file.index_file_size_ = groups[table_file.table_id_].index_file_size_;
630
            table_file.nlist_ = groups[table_file.table_id_].nlist_;
S
starlord 已提交
631
            table_file.metric_type_ = groups[table_file.table_id_].metric_type_;
X
Xu Peng 已提交
632
            files.push_back(table_file);
X
Xu Peng 已提交
633
        }
G
groot 已提交
634

S
starlord 已提交
635
        if (selected.size() > 0) {
636 637
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
        }
S
starlord 已提交
638
        return ret;
Y
youny626 已提交
639
    } catch (std::exception& e) {
S
starlord 已提交
640
        return HandleException("Encounter exception when iterate raw files", e.what());
X
Xu Peng 已提交
641 642 643
    }
}

S
starlord 已提交
644
Status
Y
youny626 已提交
645 646
SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& dates,
                              DatePartionedTableFilesSchema& files) {
X
xj.lin 已提交
647
    files.clear();
Y
Yu Kun 已提交
648
    server::MetricCollector metric;
X
xj.lin 已提交
649 650

    try {
Y
youny626 已提交
651 652 653 654
        auto select_columns =
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::engine_type_);
X
xj.lin 已提交
655 656

        auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
X
xj.lin 已提交
657

Y
youny626 已提交
658 659
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
S
starlord 已提交
660
        auto match_type = in(&TableFileSchema::file_type_, file_types);
X
xj.lin 已提交
661 662 663 664

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
Y
youny626 已提交
665 666 667
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
668

Y
youny626 已提交
669 670
        // sqlite_orm has a bug, 'in' statement cannot handle too many elements
        // so we split one query into multi-queries, this is a work-around!!
671 672 673
        std::vector<DatesT> split_dates;
        split_dates.push_back(DatesT());
        const size_t batch_size = 30;
Y
youny626 已提交
674
        for (DateT date : dates) {
675 676
            DatesT& last_batch = *split_dates.rbegin();
            last_batch.push_back(date);
Y
youny626 已提交
677
            if (last_batch.size() > batch_size) {
678 679 680 681
                split_dates.push_back(DatesT());
            }
        }

Y
youny626 已提交
682
        // perform query
683
        decltype(ConnectorPtr->select(select_columns)) selected;
684
        if (dates.empty() && ids.empty()) {
X
xj.lin 已提交
685
            auto filter = where(match_tableid and match_type);
686
            selected = ConnectorPtr->select(select_columns, filter);
687
        } else if (dates.empty() && !ids.empty()) {
X
xj.lin 已提交
688
            auto match_fileid = in(&TableFileSchema::id_, ids);
X
xj.lin 已提交
689
            auto filter = where(match_tableid and match_fileid and match_type);
690
            selected = ConnectorPtr->select(select_columns, filter);
691
        } else if (!dates.empty() && ids.empty()) {
Y
youny626 已提交
692 693
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
694 695 696 697 698
                    continue;
                }
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
699
                for (auto& file : batch_selected) {
700 701 702 703 704
                    selected.push_back(file);
                }
            }

        } else if (!dates.empty() && !ids.empty()) {
Y
youny626 已提交
705 706
            for (auto& batch_dates : split_dates) {
                if (batch_dates.empty()) {
707 708 709 710 711 712
                    continue;
                }
                auto match_fileid = in(&TableFileSchema::id_, ids);
                auto match_date = in(&TableFileSchema::date_, batch_dates);
                auto filter = where(match_tableid and match_fileid and match_date and match_type);
                auto batch_selected = ConnectorPtr->select(select_columns, filter);
Y
youny626 已提交
713
                for (auto& file : batch_selected) {
714 715 716
                    selected.push_back(file);
                }
            }
X
xj.lin 已提交
717 718
        }

S
starlord 已提交
719
        Status ret;
X
xj.lin 已提交
720
        TableFileSchema table_file;
Y
youny626 已提交
721
        for (auto& file : selected) {
X
xj.lin 已提交
722 723 724 725
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
726 727 728 729
            table_file.file_size_ = std::get<4>(file);
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.engine_type_ = std::get<7>(file);
X
xj.lin 已提交
730
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
731
            table_file.index_file_size_ = table_schema.index_file_size_;
732
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
733 734
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
735
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
736
            if (!status.ok()) {
S
starlord 已提交
737
                ret = status;
S
starlord 已提交
738 739
            }

X
xj.lin 已提交
740 741 742 743 744 745
            auto dateItr = files.find(table_file.date_);
            if (dateItr == files.end()) {
                files[table_file.date_] = TableFilesSchema();
            }
            files[table_file.date_].push_back(table_file);
        }
S
starlord 已提交
746
        if (files.empty()) {
S
starlord 已提交
747
            ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
748
        }
S
starlord 已提交
749

S
starlord 已提交
750
        if (selected.size() > 0) {
751 752
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
        }
S
starlord 已提交
753
        return ret;
Y
youny626 已提交
754
    } catch (std::exception& e) {
S
starlord 已提交
755
        return HandleException("Encounter exception when iterate index files", e.what());
X
xj.lin 已提交
756 757 758
    }
}

S
starlord 已提交
759
Status
Y
youny626 已提交
760
SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) {
X
Xu Peng 已提交
761
    files.clear();
X
Xu Peng 已提交
762

763
    try {
Y
Yu Kun 已提交
764
        server::MetricCollector metric;
G
groot 已提交
765

Y
youny626 已提交
766
        // check table existence
S
starlord 已提交
767 768 769 770 771 772 773
        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

Y
youny626 已提交
774 775 776 777 778 779 780 781
        // get files to merge
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_,
                    &TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
                    &TableFileSchema::date_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW and
                  c(&TableFileSchema::table_id_) == table_id),
            order_by(&TableFileSchema::file_size_).desc());
G
groot 已提交
782

S
starlord 已提交
783
        Status result;
Y
youny626 已提交
784
        for (auto& file : selected) {
S
starlord 已提交
785 786
            TableFileSchema table_file;
            table_file.file_size_ = std::get<4>(file);
S
starlord 已提交
787
            if (table_file.file_size_ >= table_schema.index_file_size_) {
Y
youny626 已提交
788
                continue;  // skip large file
S
starlord 已提交
789 790
            }

G
groot 已提交
791 792 793 794
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
S
starlord 已提交
795 796 797
            table_file.row_count_ = std::get<5>(file);
            table_file.date_ = std::get<6>(file);
            table_file.created_on_ = std::get<7>(file);
G
groot 已提交
798
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
799
            table_file.index_file_size_ = table_schema.index_file_size_;
800
            table_file.nlist_ = table_schema.nlist_;
S
starlord 已提交
801 802
            table_file.metric_type_ = table_schema.metric_type_;

S
starlord 已提交
803
            auto status = utils::GetTableFilePath(options_, table_file);
S
starlord 已提交
804
            if (!status.ok()) {
S
starlord 已提交
805
                result = status;
S
starlord 已提交
806 807
            }

G
groot 已提交
808
            auto dateItr = files.find(table_file.date_);
809
            if (dateItr == files.end()) {
G
groot 已提交
810
                files[table_file.date_] = TableFilesSchema();
811
            }
G
groot 已提交
812
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
813
        }
S
starlord 已提交
814

S
starlord 已提交
815
        if (selected.size() > 0) {
816 817
            ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-merge files";
        }
S
starlord 已提交
818
        return result;
Y
youny626 已提交
819
    } catch (std::exception& e) {
S
starlord 已提交
820
        return HandleException("Encounter exception when iterate merge files", e.what());
X
Xu Peng 已提交
821
    }
X
Xu Peng 已提交
822 823
}

S
starlord 已提交
824
Status
Y
youny626 已提交
825 826
SqliteMetaImpl::GetTableFiles(const std::string& table_id, const std::vector<size_t>& ids,
                              TableFilesSchema& table_files) {
X
Xu Peng 已提交
827
    try {
828
        table_files.clear();
Y
youny626 已提交
829 830 831 832 833 834
        auto files = ConnectorPtr->select(
            columns(&TableFileSchema::id_, &TableFileSchema::file_id_, &TableFileSchema::file_type_,
                    &TableFileSchema::file_size_, &TableFileSchema::row_count_, &TableFileSchema::date_,
                    &TableFileSchema::engine_type_, &TableFileSchema::created_on_),
            where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::id_, ids) and
                  c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
835 836 837 838 839 840 841 842

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

S
starlord 已提交
843
        Status result;
Y
youny626 已提交
844
        for (auto& file : files) {
845
            TableFileSchema file_schema;
G
groot 已提交
846
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
847 848 849
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
850 851 852 853
            file_schema.file_size_ = std::get<3>(file);
            file_schema.row_count_ = std::get<4>(file);
            file_schema.date_ = std::get<5>(file);
            file_schema.engine_type_ = std::get<6>(file);
S
starlord 已提交
854
            file_schema.created_on_ = std::get<7>(file);
855
            file_schema.dimension_ = table_schema.dimension_;
856 857 858
            file_schema.index_file_size_ = table_schema.index_file_size_;
            file_schema.nlist_ = table_schema.nlist_;
            file_schema.metric_type_ = table_schema.metric_type_;
S
starlord 已提交
859

S
starlord 已提交
860
            utils::GetTableFilePath(options_, file_schema);
861 862

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
863
        }
S
starlord 已提交
864

865
        ENGINE_LOG_DEBUG << "Get table files by id";
S
starlord 已提交
866
        return result;
Y
youny626 已提交
867
    } catch (std::exception& e) {
S
starlord 已提交
868
        return HandleException("Encounter exception when lookup table files", e.what());
X
Xu Peng 已提交
869
    }
X
Xu Peng 已提交
870 871
}

S
starlord 已提交
872
// TODO(myh): Support swap to cloud storage
S
starlord 已提交
873 874
Status
SqliteMetaImpl::Archive() {
Y
youny626 已提交
875
    auto& criterias = options_.archive_conf_.GetCriterias();
X
Xu Peng 已提交
876 877 878 879 880
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
Y
youny626 已提交
881 882
        auto& criteria = kv.first;
        auto& limit = kv.second;
G
groot 已提交
883
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
S
starlord 已提交
884 885
            int64_t usecs = limit * D_SEC * US_PS;
            int64_t now = utils::GetMicroSecTimeStamp();
886
            try {
Y
youny626 已提交
887
                // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
888 889
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
890 891 892 893
                ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
                                         where(c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
                                               c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
            } catch (std::exception& e) {
S
starlord 已提交
894
                return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
895
            }
896 897

            ENGINE_LOG_DEBUG << "Archive old files";
X
Xu Peng 已提交
898
        }
G
groot 已提交
899
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
900
            uint64_t sum = 0;
X
Xu Peng 已提交
901
            Size(sum);
X
Xu Peng 已提交
902

Y
youny626 已提交
903
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
904
            DiscardFiles(to_delete);
905 906

            ENGINE_LOG_DEBUG << "Archive files to free disk";
X
Xu Peng 已提交
907 908 909 910 911 912
        }
    }

    return Status::OK();
}

S
starlord 已提交
913
Status
Y
youny626 已提交
914
SqliteMetaImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
915
    result = 0;
X
Xu Peng 已提交
916
    try {
917
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
Y
youny626 已提交
918 919
                                             where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
        for (auto& total_size : selected) {
920 921
            if (!std::get<0>(total_size)) {
                continue;
X
Xu Peng 已提交
922
            }
Y
youny626 已提交
923
            result += (uint64_t)(*std::get<0>(total_size));
X
Xu Peng 已提交
924
        }
Y
youny626 已提交
925
    } catch (std::exception& e) {
S
starlord 已提交
926
        return HandleException("Encounter exception when calculte db size", e.what());
X
Xu Peng 已提交
927 928 929 930 931
    }

    return Status::OK();
}

S
starlord 已提交
932 933
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
X
Xu Peng 已提交
934 935 936
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
937

G
groot 已提交
938
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
939

X
Xu Peng 已提交
940
    try {
Y
Yu Kun 已提交
941
        server::MetricCollector metric;
G
groot 已提交
942

Y
youny626 已提交
943
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
944 945
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
946
        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
947 948 949 950
            auto selected =
                ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::file_size_),
                                     where(c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE),
                                     order_by(&TableFileSchema::id_), limit(10));
X
Xu Peng 已提交
951

G
groot 已提交
952 953
            std::vector<int> ids;
            TableFileSchema table_file;
954

Y
youny626 已提交
955 956 957
            for (auto& file : selected) {
                if (to_discard_size <= 0)
                    break;
G
groot 已提交
958
                table_file.id_ = std::get<0>(file);
959
                table_file.file_size_ = std::get<1>(file);
G
groot 已提交
960 961
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
962 963
                                 << " table_file.size=" << table_file.file_size_;
                to_discard_size -= table_file.file_size_;
G
groot 已提交
964
            }
965

G
groot 已提交
966 967 968
            if (ids.size() == 0) {
                return true;
            }
969

Y
youny626 已提交
970 971 972
            ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
                                         c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
                                     where(in(&TableFileSchema::id_, ids)));
G
groot 已提交
973 974 975 976 977

            return true;
        });

        if (!commited) {
S
starlord 已提交
978
            return HandleException("DiscardFiles error: sqlite transaction failed");
G
groot 已提交
979
        }
Y
youny626 已提交
980
    } catch (std::exception& e) {
S
starlord 已提交
981
        return HandleException("Encounter exception when discard table file", e.what());
X
Xu Peng 已提交
982 983
    }

X
Xu Peng 已提交
984
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
985 986
}

S
starlord 已提交
987
Status
Y
youny626 已提交
988
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
G
groot 已提交
989
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
990
    try {
Y
Yu Kun 已提交
991
        server::MetricCollector metric;
G
groot 已提交
992

Y
youny626 已提交
993
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
994 995
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
996 997 998
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

Y
youny626 已提交
999 1000 1001
        // if the table has been deleted, just mark the table file as TO_DELETE
        // clean thread will delete the file later
        if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
G
groot 已提交
1002 1003 1004
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
1005
        ConnectorPtr->update(file_schema);
G
groot 已提交
1006

1007
        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
Y
youny626 已提交
1008 1009 1010
    } catch (std::exception& e) {
        std::string msg =
            "Exception update table file: table_id = " + file_schema.table_id_ + " file_id = " + file_schema.file_id_;
S
starlord 已提交
1011
        return HandleException(msg, e.what());
X
Xu Peng 已提交
1012
    }
X
Xu Peng 已提交
1013
    return Status::OK();
X
Xu Peng 已提交
1014 1015
}

S
starlord 已提交
1016
Status
Y
youny626 已提交
1017
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
P
peng.xu 已提交
1018
    try {
Y
Yu Kun 已提交
1019
        server::MetricCollector metric;
1020

Y
youny626 已提交
1021
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1022 1023
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1024 1025 1026
        ConnectorPtr->update_all(set(c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
                                 where(c(&TableFileSchema::table_id_) == table_id and
                                       c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
1027 1028

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
Y
youny626 已提交
1029
    } catch (std::exception& e) {
S
starlord 已提交
1030
        return HandleException("Encounter exception when update table files to to_index", e.what());
P
peng.xu 已提交
1031 1032 1033 1034 1035
    }

    return Status::OK();
}

S
starlord 已提交
1036
Status
Y
youny626 已提交
1037
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
1038
    try {
Y
Yu Kun 已提交
1039
        server::MetricCollector metric;
G
groot 已提交
1040

Y
youny626 已提交
1041
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1042 1043
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
1044
        std::map<std::string, bool> has_tables;
Y
youny626 已提交
1045
        for (auto& file : files) {
S
starlord 已提交
1046
            if (has_tables.find(file.table_id_) != has_tables.end()) {
G
groot 已提交
1047 1048 1049
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
Y
youny626 已提交
1050 1051
                                               where(c(&TableSchema::table_id_) == file.table_id_ and
                                                     c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
S
starlord 已提交
1052
            if (tables.size() >= 1) {
G
groot 已提交
1053 1054 1055 1056 1057 1058
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

1059
        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1060
            for (auto& file : files) {
S
starlord 已提交
1061
                if (!has_tables[file.table_id_]) {
G
groot 已提交
1062 1063 1064
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
1065
                file.updated_time_ = utils::GetMicroSecTimeStamp();
1066 1067 1068 1069
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
1070

1071
        if (!commited) {
S
starlord 已提交
1072
            return HandleException("UpdateTableFiles error: sqlite transaction failed");
X
Xu Peng 已提交
1073
        }
G
groot 已提交
1074

1075
        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
Y
youny626 已提交
1076
    } catch (std::exception& e) {
S
starlord 已提交
1077
        return HandleException("Encounter exception when update table files", e.what());
X
Xu Peng 已提交
1078
    }
1079 1080 1081
    return Status::OK();
}

S
starlord 已提交
1082 1083
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
1084
    auto now = utils::GetMicroSecTimeStamp();
S
starlord 已提交
1085 1086
    std::set<std::string> table_ids;

Y
youny626 已提交
1087
    // remove to_delete files
1088
    try {
Y
Yu Kun 已提交
1089
        server::MetricCollector metric;
1090

Y
youny626 已提交
1091
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1092 1093
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1094 1095 1096 1097
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_, &TableFileSchema::date_),
                                          where(c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_DELETE and
                                                c(&TableFileSchema::updated_time_) < now - seconds * US_PS));
1098

G
groot 已提交
1099 1100
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
Y
youny626 已提交
1101
            for (auto& file : files) {
G
groot 已提交
1102 1103 1104 1105 1106
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
1107
                utils::DeleteTableFilePath(options_, table_file);
1108
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
G
groot 已提交
1109 1110
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

S
starlord 已提交
1111
                table_ids.insert(table_file.table_id_);
1112
            }
G
groot 已提交
1113 1114 1115 1116
            return true;
        });

        if (!commited) {
S
starlord 已提交
1117
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
G
groot 已提交
1118 1119
        }

S
starlord 已提交
1120
        if (files.size() > 0) {
1121 1122
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
        }
Y
youny626 已提交
1123
    } catch (std::exception& e) {
S
starlord 已提交
1124
        return HandleException("Encounter exception when clean table files", e.what());
G
groot 已提交
1125 1126
    }

Y
youny626 已提交
1127
    // remove to_delete tables
G
groot 已提交
1128
    try {
Y
Yu Kun 已提交
1129
        server::MetricCollector metric;
G
groot 已提交
1130

Y
youny626 已提交
1131
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1132 1133
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1134 1135
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int)TableSchema::TO_DELETE));
G
groot 已提交
1136 1137

        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1138 1139
            for (auto& table : tables) {
                utils::DeleteTablePath(options_, std::get<1>(table), false);  // only delete empty folder
G
groot 已提交
1140
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
1141
            }
G
groot 已提交
1142 1143 1144 1145 1146

            return true;
        });

        if (!commited) {
S
starlord 已提交
1147
            return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
X
Xu Peng 已提交
1148
        }
G
groot 已提交
1149

S
starlord 已提交
1150
        if (tables.size() > 0) {
1151 1152
            ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
        }
Y
youny626 已提交
1153
    } catch (std::exception& e) {
S
starlord 已提交
1154
        return HandleException("Encounter exception when clean table files", e.what());
X
Xu Peng 已提交
1155 1156
    }

Y
youny626 已提交
1157 1158
    // remove deleted table folder
    // don't remove table folder until all its files has been deleted
S
starlord 已提交
1159
    try {
Y
Yu Kun 已提交
1160
        server::MetricCollector metric;
S
starlord 已提交
1161

Y
youny626 已提交
1162
        for (auto& table_id : table_ids) {
S
starlord 已提交
1163 1164
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
                                                 where(c(&TableFileSchema::table_id_) == table_id));
S
starlord 已提交
1165
            if (selected.size() == 0) {
S
starlord 已提交
1166 1167 1168 1169
                utils::DeleteTablePath(options_, table_id);
            }
        }

S
starlord 已提交
1170
        if (table_ids.size() > 0) {
1171 1172
            ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
        }
Y
youny626 已提交
1173
    } catch (std::exception& e) {
S
starlord 已提交
1174
        return HandleException("Encounter exception when delete table folder", e.what());
S
starlord 已提交
1175 1176
    }

X
Xu Peng 已提交
1177 1178 1179
    return Status::OK();
}

S
starlord 已提交
1180 1181
Status
SqliteMetaImpl::CleanUp() {
1182
    try {
Y
Yu Kun 已提交
1183
        server::MetricCollector metric;
1184

Y
youny626 已提交
1185
        // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
1186 1187
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

Y
youny626 已提交
1188 1189
        std::vector<int> file_types = {(int)TableFileSchema::NEW, (int)TableFileSchema::NEW_INDEX,
                                       (int)TableFileSchema::NEW_MERGE};
S
starlord 已提交
1190 1191
        auto files =
            ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
1192

G
groot 已提交
1193
        auto commited = ConnectorPtr->transaction([&]() mutable {
Y
youny626 已提交
1194
            for (auto& file : files) {
G
groot 已提交
1195 1196
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
1197
            }
G
groot 已提交
1198 1199 1200 1201
            return true;
        });

        if (!commited) {
S
starlord 已提交
1202
            return HandleException("CleanUp error: sqlite transaction failed");
X
Xu Peng 已提交
1203
        }
G
groot 已提交
1204

S
starlord 已提交
1205
        if (files.size() > 0) {
1206 1207
            ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
        }
Y
youny626 已提交
1208
    } catch (std::exception& e) {
S
starlord 已提交
1209
        return HandleException("Encounter exception when clean table file", e.what());
X
Xu Peng 已提交
1210 1211 1212 1213 1214
    }

    return Status::OK();
}

S
starlord 已提交
1215
Status
Y
youny626 已提交
1216
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
1217
    try {
Y
Yu Kun 已提交
1218
        server::MetricCollector metric;
1219

Y
youny626 已提交
1220 1221 1222 1223 1224
        std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
                                       (int)TableFileSchema::INDEX};
        auto selected = ConnectorPtr->select(
            columns(&TableFileSchema::row_count_),
            where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id));
1225

1226
        TableSchema table_schema;
G
groot 已提交
1227
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
1228
        auto status = DescribeTable(table_schema);
1229

1230 1231 1232 1233 1234
        if (!status.ok()) {
            return status;
        }

        result = 0;
Y
youny626 已提交
1235
        for (auto& file : selected) {
1236 1237
            result += std::get<0>(file);
        }
Y
youny626 已提交
1238
    } catch (std::exception& e) {
S
starlord 已提交
1239
        return HandleException("Encounter exception when calculate table file size", e.what());
X
Xu Peng 已提交
1240 1241 1242 1243
    }
    return Status::OK();
}

S
starlord 已提交
1244 1245
Status
SqliteMetaImpl::DropAll() {
S
starlord 已提交
1246 1247 1248
    ENGINE_LOG_DEBUG << "Drop all sqlite meta";

    try {
1249 1250
        ConnectorPtr->drop_table(META_TABLES);
        ConnectorPtr->drop_table(META_TABLEFILES);
Y
youny626 已提交
1251
    } catch (std::exception& e) {
S
starlord 已提交
1252
        return HandleException("Encounter exception when drop all meta", e.what());
S
starlord 已提交
1253
    }
S
starlord 已提交
1254

X
Xu Peng 已提交
1255 1256 1257
    return Status::OK();
}

Y
youny626 已提交
1258 1259 1260
}  // namespace meta
}  // namespace engine
}  // namespace milvus