DBMetaImpl.cpp 40.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7 8
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
G
groot 已提交
9
#include "Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
G
groot 已提交
11
#include "Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36
}

G
groot 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
class MetricCollector {
public:
    MetricCollector() {
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
        start_time_ = METRICS_NOW_TIME;
    }

    ~MetricCollector() {
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
    }

private:
    using TIME_POINT = std::chrono::system_clock::time_point;
    TIME_POINT start_time_;
};

G
groot 已提交
55 56
}

57
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
58
    return make_storage(path,
G
groot 已提交
59
                        make_table("Tables",
G
groot 已提交
60 61
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
62
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
63 64 65 66 67
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("files_cnt", &TableSchema::files_cnt_, default_value(0)),
                                   make_column("engine_type", &TableSchema::engine_type_),
                                   make_column("store_raw_data", &TableSchema::store_raw_data_)),
G
groot 已提交
68
                        make_table("TableFiles",
G
groot 已提交
69 70
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
71
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
72 73 74 75 76 77
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
                                   make_column("size", &TableFileSchema::size_, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
78
    );
X
Xu Peng 已提交
79 80 81

}

X
Xu Peng 已提交
82
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
83
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
84
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
85

86
Status DBMetaImpl::NextTableId(std::string &table_id) {
87 88
    std::stringstream ss;
    SimpleIDGenerator g;
89
    ss << g.GetNextIDNumber();
90
    table_id = ss.str();
91 92 93
    return Status::OK();
}

94
Status DBMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
95 96
    std::stringstream ss;
    SimpleIDGenerator g;
97
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
98 99 100 101
    file_id = ss.str();
    return Status::OK();
}

102
DBMetaImpl::DBMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
103 104
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
105 106
}

X
Xu Peng 已提交
107 108 109
Status DBMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
110
        if (!ret) {
G
groot 已提交
111
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
G
groot 已提交
112
            return Status::InvalidDBPath("Failed to create db directory", options_.path);
113
        }
X
Xu Peng 已提交
114
    }
X
Xu Peng 已提交
115

116
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
117

X
Xu Peng 已提交
118
    ConnectorPtr->sync_schema();
119
    ConnectorPtr->open_forever(); // thread safe option
120
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
121

122
    CleanUp();
X
Xu Peng 已提交
123

X
Xu Peng 已提交
124
    return Status::OK();
X
Xu Peng 已提交
125 126
}

X
Xu Peng 已提交
127
// PXU TODO: Temp solution. Will fix later
128 129
Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                         const DatesT &dates) {
X
Xu Peng 已提交
130 131 132 133
    if (dates.size() == 0) {
        return Status::OK();
    }

134
    TableSchema table_schema;
G
groot 已提交
135
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
136
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
137 138 139 140
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
141 142
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
143

G
groot 已提交
144 145 146 147
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
148 149
        }

G
groot 已提交
150 151 152
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
153
        ConnectorPtr->update_all(
154
            set(
G
groot 已提交
155
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
156 157
            ),
            where(
G
groot 已提交
158 159
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
160 161
            ));
    } catch (std::exception &e) {
G
groot 已提交
162
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
163
    }
G
groot 已提交
164

X
Xu Peng 已提交
165 166 167
    return Status::OK();
}

168
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
Z
zhiru 已提交
169

G
groot 已提交
170 171 172
    try {
        MetricCollector metric;

G
groot 已提交
173 174 175
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
176 177
        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
178 179 180 181
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
G
groot 已提交
182 183 184
                if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status::Error("Table already exists and it is in delete state, please wait a second");
                } else {
185 186
                    // Change from no error to already exist.
                    return Status::AlreadyExist("Table already exists");
G
groot 已提交
187
                }
G
groot 已提交
188
            }
G
groot 已提交
189
        }
G
groot 已提交
190

G
groot 已提交
191 192 193 194
        table_schema.files_cnt_ = 0;
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
195
        try {
196
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
197
            table_schema.id_ = id;
X
Xu Peng 已提交
198
        } catch (...) {
G
groot 已提交
199
            ENGINE_LOG_ERROR << "sqlite transaction failed";
X
Xu Peng 已提交
200
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
201
        }
202

G
groot 已提交
203
        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
204 205 206

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
207 208
    }

X
Xu Peng 已提交
209
    return Status::OK();
X
Xu Peng 已提交
210 211
}

G
groot 已提交
212 213
Status DBMetaImpl::DeleteTable(const std::string& table_id) {
    try {
G
groot 已提交
214 215
        MetricCollector metric;

G
groot 已提交
216 217 218
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
219 220 221 222 223 224 225
        //soft delete table
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_,
                                                   &TableSchema::created_on_),
G
groot 已提交
226 227
                                           where(c(&TableSchema::table_id_) == table_id));
        for (auto &table : tables) {
G
groot 已提交
228 229 230 231 232 233 234 235 236 237 238
            TableSchema table_schema;
            table_schema.table_id_ = table_id;
            table_schema.state_ = (int)TableSchema::TO_DELETE;
            table_schema.id_ = std::get<0>(table);
            table_schema.files_cnt_ = std::get<1>(table);
            table_schema.dimension_ = std::get<2>(table);
            table_schema.engine_type_ = std::get<3>(table);
            table_schema.store_raw_data_ = std::get<4>(table);
            table_schema.created_on_ = std::get<5>(table);

            ConnectorPtr->update<TableSchema>(table_schema);
G
groot 已提交
239 240
        }
    } catch (std::exception &e) {
G
groot 已提交
241
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
242 243 244 245 246
    }

    return Status::OK();
}

G
groot 已提交
247 248 249 250
Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
    try {
        MetricCollector metric;

G
groot 已提交
251 252 253
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266
        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
G
groot 已提交
267 268 269 270 271
    }

    return Status::OK();
}

272
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
273
    try {
G
groot 已提交
274 275
        MetricCollector metric;

G
groot 已提交
276 277 278 279 280 281
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_),
G
groot 已提交
282 283 284
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

285
        if (groups.size() == 1) {
G
groot 已提交
286 287 288 289 290
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.files_cnt_ = std::get<2>(groups[0]);
            table_schema.dimension_ = std::get<3>(groups[0]);
            table_schema.engine_type_ = std::get<4>(groups[0]);
            table_schema.store_raw_data_ = std::get<5>(groups[0]);
291
        } else {
G
groot 已提交
292
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
293
        }
G
groot 已提交
294

295
    } catch (std::exception &e) {
G
groot 已提交
296
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
297
    }
X
Xu Peng 已提交
298

X
Xu Peng 已提交
299
    return Status::OK();
X
Xu Peng 已提交
300 301
}

P
peng.xu 已提交
302 303 304 305 306
Status DBMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
    has = false;
    try {
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_),
                                             where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
G
groot 已提交
307 308
                                                    or
                                                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW
P
peng.xu 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
                                                    or
                                                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX)
                                                   and c(&TableFileSchema::table_id_) == table_id
                                             ));

        if (selected.size() >= 1) {
            has = true;
        } else {
            has = false;
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when check non index files", e);
    }
    return Status::OK();
}

326
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
327
    has_or_not = false;
328

G
groot 已提交
329 330
    try {
        MetricCollector metric;
G
groot 已提交
331
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
332 333
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
334
        if (tables.size() == 1) {
335 336 337 338
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
339

340
    } catch (std::exception &e) {
G
groot 已提交
341
        return HandleException("Encounter exception when lookup table", e);
G
groot 已提交
342
    }
G
groot 已提交
343

G
groot 已提交
344 345 346 347 348
    return Status::OK();
}

Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
    try {
G
groot 已提交
349 350
        MetricCollector metric;

G
groot 已提交
351 352 353 354 355
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
G
groot 已提交
356 357
                                                   &TableSchema::store_raw_data_),
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
358 359 360 361 362 363 364 365 366 367 368
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.files_cnt_ = std::get<2>(table);
            schema.dimension_ = std::get<3>(table);
            schema.engine_type_ = std::get<4>(table);
            schema.store_raw_data_ = std::get<5>(table);

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
369

G
groot 已提交
370
    } catch (std::exception &e) {
G
groot 已提交
371
        return HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
372
    }
G
groot 已提交
373

X
Xu Peng 已提交
374
    return Status::OK();
X
Xu Peng 已提交
375 376
}

377
Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
378 379
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
380
    }
381
    TableSchema table_schema;
G
groot 已提交
382
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
383
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
384 385 386
    if (!status.ok()) {
        return status;
    }
387

G
groot 已提交
388 389 390 391 392 393 394 395 396 397 398
    try {
        MetricCollector metric;

        NextFileId(file_schema.file_id_);
        file_schema.file_type_ = TableFileSchema::NEW;
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.size_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.engine_type_ = table_schema.engine_type_;

G
groot 已提交
399 400 401
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
402 403 404
        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

G
groot 已提交
405
        return utils::CreateTableFilePath(options_, file_schema);
406

G
groot 已提交
407 408
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
409 410
    }

X
Xu Peng 已提交
411
    return Status::OK();
X
Xu Peng 已提交
412 413
}

414
Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
415
    files.clear();
X
Xu Peng 已提交
416

417
    try {
G
groot 已提交
418 419
        MetricCollector metric;

G
groot 已提交
420 421 422 423 424 425 426 427
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_),
                                             where(c(&TableFileSchema::file_type_)
428
                                                       == (int) TableFileSchema::TO_INDEX));
429

430
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
431
        TableFileSchema table_file;
432

433
        for (auto &file : selected) {
G
groot 已提交
434 435 436 437 438 439 440 441
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.engine_type_ = std::get<6>(file);

G
groot 已提交
442
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
443
            auto groupItr = groups.find(table_file.table_id_);
444
            if (groupItr == groups.end()) {
445
                TableSchema table_schema;
G
groot 已提交
446
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
447
                auto status = DescribeTable(table_schema);
448 449 450
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
451
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
452
            }
G
groot 已提交
453
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
454
            files.push_back(table_file);
X
Xu Peng 已提交
455
        }
G
groot 已提交
456

457
    } catch (std::exception &e) {
G
groot 已提交
458
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
459
    }
X
Xu Peng 已提交
460

X
Xu Peng 已提交
461 462 463
    return Status::OK();
}

X
Xu Peng 已提交
464
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
465 466
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
467
    files.clear();
X
Xu Peng 已提交
468

469
    try {
G
groot 已提交
470 471
        MetricCollector metric;

X
Xu Peng 已提交
472
        if (partition.empty()) {
G
groot 已提交
473 474 475 476 477 478 479 480 481 482
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
483
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
484
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
485
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
486

X
Xu Peng 已提交
487
            TableSchema table_schema;
G
groot 已提交
488
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
489 490 491 492
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
493

X
Xu Peng 已提交
494 495 496
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
497 498 499 500 501 502 503 504
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
                table_file.engine_type_ = std::get<6>(file);
                table_file.dimension_ = table_schema.dimension_;
G
groot 已提交
505
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
506
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
507
                if (dateItr == files.end()) {
G
groot 已提交
508
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
509
                }
G
groot 已提交
510
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
511 512 513
            }
        }
        else {
G
groot 已提交
514 515 516 517 518
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
G
groot 已提交
519 520
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
521 522 523 524
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     in(&TableFileSchema::date_, partition) and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
525
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
526
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
527
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
528

X
Xu Peng 已提交
529
            TableSchema table_schema;
G
groot 已提交
530
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
531 532 533 534
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
535

X
Xu Peng 已提交
536 537 538
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
539 540 541 542 543 544
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
G
groot 已提交
545
                table_file.engine_type_ = std::get<6>(file);
G
groot 已提交
546
                table_file.dimension_ = table_schema.dimension_;
G
groot 已提交
547
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
548
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
549
                if (dateItr == files.end()) {
G
groot 已提交
550
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
551
                }
G
groot 已提交
552
                files[table_file.date_].push_back(table_file);
553
            }
X
Xu Peng 已提交
554

X
xj.lin 已提交
555
        }
556
    } catch (std::exception &e) {
G
groot 已提交
557
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
558 559 560 561 562
    }

    return Status::OK();
}

563 564
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
565
    files.clear();
X
Xu Peng 已提交
566

567
    try {
G
groot 已提交
568 569
        MetricCollector metric;

G
groot 已提交
570 571 572 573 574 575 576
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
577 578
                                                 c(&TableFileSchema::table_id_) == table_id),
                                             order_by(&TableFileSchema::size_).desc());
G
groot 已提交
579

580
        TableSchema table_schema;
G
groot 已提交
581
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
582
        auto status = DescribeTable(table_schema);
583

584 585 586
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
587

X
Xu Peng 已提交
588
        TableFileSchema table_file;
589
        for (auto &file : selected) {
G
groot 已提交
590 591 592 593 594 595 596
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.dimension_ = table_schema.dimension_;
G
groot 已提交
597
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
598
            auto dateItr = files.find(table_file.date_);
599
            if (dateItr == files.end()) {
G
groot 已提交
600
                files[table_file.date_] = TableFilesSchema();
601
            }
G
groot 已提交
602
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
603
        }
604
    } catch (std::exception &e) {
G
groot 已提交
605
        return HandleException("Encounter exception when iterate merge files", e);
X
Xu Peng 已提交
606 607 608
    }

    return Status::OK();
X
Xu Peng 已提交
609 610
}

611 612 613
Status DBMetaImpl::GetTableFiles(const std::string& table_id,
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
614
    try {
615
        table_files.clear();
Y
yu yunfeng 已提交
616 617
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
G
groot 已提交
618 619
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::size_,
620 621 622 623
                                                  &TableFileSchema::date_,
                                                  &TableFileSchema::engine_type_),
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
624
                                          ));
625 626 627 628 629 630 631 632 633 634

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
635
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
636 637 638 639 640 641
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
            file_schema.size_ = std::get<3>(file);
            file_schema.date_ = std::get<4>(file);
            file_schema.engine_type_ = std::get<5>(file);
642
            file_schema.dimension_ = table_schema.dimension_;
G
groot 已提交
643
            utils::GetTableFilePath(options_, file_schema);
644 645

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
646 647
        }
    } catch (std::exception &e) {
648
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
649 650
    }

X
Xu Peng 已提交
651
    return Status::OK();
X
Xu Peng 已提交
652 653
}

X
Xu Peng 已提交
654
// PXU TODO: Support Swap
X
Xu Peng 已提交
655
Status DBMetaImpl::Archive() {
656
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
657 658 659 660 661
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
662 663
        auto &criteria = kv.first;
        auto &limit = kv.second;
G
groot 已提交
664
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
X
Xu Peng 已提交
665
            long usecs = limit * D_SEC * US_PS;
666
            long now = utils::GetMicroSecTimeStamp();
667
            try {
G
groot 已提交
668 669 670
                //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
                std::lock_guard<std::mutex> meta_lock(meta_mutex_);

X
Xu Peng 已提交
671
                ConnectorPtr->update_all(
672
                    set(
G
groot 已提交
673
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
674 675
                    ),
                    where(
G
groot 已提交
676 677
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
678 679
                    ));
            } catch (std::exception &e) {
G
groot 已提交
680
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
681 682
            }
        }
G
groot 已提交
683
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
684
            uint64_t sum = 0;
X
Xu Peng 已提交
685
            Size(sum);
X
Xu Peng 已提交
686

G
groot 已提交
687
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
688
            DiscardFiles(to_delete);
X
Xu Peng 已提交
689 690 691 692 693 694
        }
    }

    return Status::OK();
}

G
groot 已提交
695
Status DBMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
696
    result = 0;
X
Xu Peng 已提交
697
    try {
G
groot 已提交
698 699 700 701 702 703
        auto files = ConnectorPtr->select(columns(&TableFileSchema::size_,
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::engine_type_),
                                          where(
                                                  c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                                          ));
X
Xu Peng 已提交
704

G
groot 已提交
705 706 707 708 709 710 711 712
        for (auto &file : files) {
            auto file_size = std::get<0>(file);
            auto file_type = std::get<1>(file);
            auto engine_type = std::get<2>(file);
            if(file_type == (int)TableFileSchema::INDEX && engine_type == (int)EngineType::FAISS_IVFSQ8) {
                result += (uint64_t)file_size/4;//hardcode for sq8
            } else {
                result += (uint64_t)file_size;
X
Xu Peng 已提交
713 714
            }
        }
715
    } catch (std::exception &e) {
G
groot 已提交
716
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
717 718 719 720 721
    }

    return Status::OK();
}

X
Xu Peng 已提交
722
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
723 724 725
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
726

G
groot 已提交
727
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
728

X
Xu Peng 已提交
729
    try {
G
groot 已提交
730 731
        MetricCollector metric;

G
groot 已提交
732 733 734
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
735 736 737 738
        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::size_),
                                                 where(c(&TableFileSchema::file_type_)
739
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
740 741
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
742

G
groot 已提交
743 744
            std::vector<int> ids;
            TableFileSchema table_file;
745

G
groot 已提交
746 747 748 749 750 751 752 753 754
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
                table_file.size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.size_;
                to_discard_size -= table_file.size_;
            }
755

G
groot 已提交
756 757 758
            if (ids.size() == 0) {
                return true;
            }
759

G
groot 已提交
760 761 762 763 764 765 766 767 768 769 770 771 772
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
G
groot 已提交
773
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
774 775
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
776

777
    } catch (std::exception &e) {
G
groot 已提交
778
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
779 780
    }

X
Xu Peng 已提交
781
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
782 783
}

784
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
785
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
786
    try {
G
groot 已提交
787 788
        MetricCollector metric;

G
groot 已提交
789 790 791
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
792 793 794 795 796 797 798 799 800
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
801
        ConnectorPtr->update(file_schema);
G
groot 已提交
802

803
    } catch (std::exception &e) {
G
groot 已提交
804 805 806
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
807
    }
X
Xu Peng 已提交
808
    return Status::OK();
X
Xu Peng 已提交
809 810
}

P
peng.xu 已提交
811 812
Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
    try {
G
groot 已提交
813 814 815 816 817
        MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

P
peng.xu 已提交
818 819 820 821 822 823 824 825 826 827 828 829 830 831 832
        ConnectorPtr->update_all(
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
            ),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
            ));
    } catch (std::exception &e) {
        return HandleException("Encounter exception when update table files to to_index", e);
    }

    return Status::OK();
}

833
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
834
    try {
G
groot 已提交
835 836
        MetricCollector metric;

G
groot 已提交
837 838 839
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

855 856
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
857 858 859 860
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
861
                file.updated_time_ = utils::GetMicroSecTimeStamp();
862 863 864 865
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
866

867
        if (!commited) {
G
groot 已提交
868
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
869
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
870
        }
G
groot 已提交
871

872
    } catch (std::exception &e) {
G
groot 已提交
873
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
874
    }
875 876 877
    return Status::OK();
}

878
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
879
    auto now = utils::GetMicroSecTimeStamp();
880
    try {
G
groot 已提交
881
        MetricCollector metric;
882

G
groot 已提交
883 884 885
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
886 887 888 889 890 891 892 893 894 895
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
896

G
groot 已提交
897 898 899 900 901 902 903 904
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

G
groot 已提交
905
                utils::DeleteTableFilePath(options_, table_file);
G
groot 已提交
906
                ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
G
groot 已提交
907 908
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

909
            }
G
groot 已提交
910 911 912 913
            return true;
        });

        if (!commited) {
G
groot 已提交
914
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
915 916 917 918 919 920 921 922 923 924
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

    try {
        MetricCollector metric;

G
groot 已提交
925 926 927
        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
928 929 930 931 932 933
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
G
groot 已提交
934
                utils::DeleteTablePath(options_, std::get<1>(table));
G
groot 已提交
935
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
936
            }
G
groot 已提交
937 938 939 940 941

            return true;
        });

        if (!commited) {
G
groot 已提交
942
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
943
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
944
        }
G
groot 已提交
945

946
    } catch (std::exception &e) {
G
groot 已提交
947
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
948 949 950 951 952
    }

    return Status::OK();
}

953
Status DBMetaImpl::CleanUp() {
954
    try {
G
groot 已提交
955 956 957 958 959
        MetricCollector metric;

        //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
        std::lock_guard<std::mutex> meta_lock(meta_mutex_);

G
groot 已提交
960 961
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
962

G
groot 已提交
963 964 965 966
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
967
            }
G
groot 已提交
968 969 970 971
            return true;
        });

        if (!commited) {
G
groot 已提交
972
            ENGINE_LOG_ERROR << "sqlite transaction failed";
G
groot 已提交
973
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
974
        }
G
groot 已提交
975

976
    } catch (std::exception &e) {
G
groot 已提交
977
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
978 979 980 981 982
    }

    return Status::OK();
}

G
groot 已提交
983
Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
984

985
    try {
G
groot 已提交
986
        MetricCollector metric;
987

G
groot 已提交
988 989 990 991 992 993
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_),
                                             where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
                                                    or
                                                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX
                                                    or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
                                                   and c(&TableFileSchema::table_id_) == table_id));
994

995
        TableSchema table_schema;
G
groot 已提交
996
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
997
        auto status = DescribeTable(table_schema);
998

999 1000 1001 1002 1003
        if (!status.ok()) {
            return status;
        }

        result = 0;
1004
        for (auto &file : selected) {
1005 1006
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
1007

G
groot 已提交
1008
        result /= table_schema.dimension_;
G
groot 已提交
1009
        result /= sizeof(float);
X
Xu Peng 已提交
1010

1011
    } catch (std::exception &e) {
G
groot 已提交
1012
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
1013 1014 1015 1016
    }
    return Status::OK();
}

1017
Status DBMetaImpl::DropAll() {
X
Xu Peng 已提交
1018 1019
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
1020 1021 1022 1023
    }
    return Status::OK();
}

X
Xu Peng 已提交
1024
DBMetaImpl::~DBMetaImpl() {
1025
    CleanUp();
X
Xu Peng 已提交
1026 1027
}

1028
} // namespace meta
X
Xu Peng 已提交
1029
} // namespace engine
J
jinhai 已提交
1030
} // namespace milvus
X
Xu Peng 已提交
1031
} // namespace zilliz