DBMetaImpl.cpp 37.4 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7 8
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
G
groot 已提交
9
#include "Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
G
groot 已提交
11
#include "Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23 24 25

namespace zilliz {
namespace vecwise {
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31 32 33 34 35 36 37 38
namespace {

void HandleException(std::exception &e) {
    ENGINE_LOG_DEBUG << "Engine meta exception: " << e.what();
    throw e;
}

}

39
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
40
    return make_storage(path,
41
                        make_table("Table",
G
groot 已提交
42 43 44 45 46 47 48
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("files_cnt", &TableSchema::files_cnt_, default_value(0)),
                                   make_column("engine_type", &TableSchema::engine_type_),
                                   make_column("store_raw_data", &TableSchema::store_raw_data_)),
49
                        make_table("TableFile",
G
groot 已提交
50 51
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
52
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
53 54 55 56 57 58
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
                                   make_column("size", &TableFileSchema::size_, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
59
    );
X
Xu Peng 已提交
60 61 62

}

X
Xu Peng 已提交
63
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
64
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
65
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
66

67
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
X
Xu Peng 已提交
68
    return options_.path + "/tables/" + table_id;
69 70
}

71
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
X
Xu Peng 已提交
72
    std::stringstream ss;
X
Xu Peng 已提交
73
    ss << GetTablePath(table_id) << "/" << date;
X
Xu Peng 已提交
74 75 76
    return ss.str();
}

77
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
G
groot 已提交
78 79
    if (group_file.date_ == EmptyDate) {
        group_file.date_ = Meta::GetDate();
X
Xu Peng 已提交
80 81
    }
    std::stringstream ss;
G
groot 已提交
82 83 84
    ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
       << "/" << group_file.file_id_;
    group_file.location_ = ss.str();
X
Xu Peng 已提交
85 86
}

87
Status DBMetaImpl::NextTableId(std::string &table_id) {
88 89
    std::stringstream ss;
    SimpleIDGenerator g;
90
    ss << g.GetNextIDNumber();
91
    table_id = ss.str();
92 93 94
    return Status::OK();
}

95
Status DBMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
96 97
    std::stringstream ss;
    SimpleIDGenerator g;
98
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
99 100 101 102
    file_id = ss.str();
    return Status::OK();
}

103
DBMetaImpl::DBMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
104 105
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
106 107
}

X
Xu Peng 已提交
108 109 110
Status DBMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
111
        if (!ret) {
G
groot 已提交
112
            ENGINE_LOG_ERROR << "Create directory " << options_.path << " Error";
113 114
        }
        assert(ret);
X
Xu Peng 已提交
115
    }
X
Xu Peng 已提交
116

117
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
118

X
Xu Peng 已提交
119
    ConnectorPtr->sync_schema();
120
    ConnectorPtr->open_forever(); // thread safe option
121
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
122

123
    CleanUp();
X
Xu Peng 已提交
124

X
Xu Peng 已提交
125
    return Status::OK();
X
Xu Peng 已提交
126 127
}

X
Xu Peng 已提交
128
// PXU TODO: Temp solution. Will fix later
129 130
Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                         const DatesT &dates) {
X
Xu Peng 已提交
131 132 133 134
    if (dates.size() == 0) {
        return Status::OK();
    }

135
    TableSchema table_schema;
G
groot 已提交
136
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
137
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
138 139 140 141
    if (!status.ok()) {
        return status;
    }

X
Xu Peng 已提交
142
    auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
143

144
    for (auto &date : dates) {
X
Xu Peng 已提交
145 146 147 148 149 150 151
        if (date >= yesterday) {
            return Status::Error("Could not delete partitions with 2 days");
        }
    }

    try {
        ConnectorPtr->update_all(
152
            set(
G
groot 已提交
153
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
154 155
            ),
            where(
G
groot 已提交
156 157
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
158 159
            ));
    } catch (std::exception &e) {
G
groot 已提交
160
        HandleException(e);
X
Xu Peng 已提交
161 162 163 164
    }
    return Status::OK();
}

165
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
Y
yu yunfeng 已提交
166
    server::Metrics::GetInstance().MetaAccessTotalIncrement();
G
groot 已提交
167 168
    if (table_schema.table_id_ == "") {
        NextTableId(table_schema.table_id_);
169
    }
G
groot 已提交
170 171 172
    table_schema.files_cnt_ = 0;
    table_schema.id_ = -1;
    table_schema.created_on_ = utils::GetMicroSecTimeStamp();
Y
yu yunfeng 已提交
173
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
174 175
    {
        try {
176
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
177
            table_schema.id_ = id;
X
Xu Peng 已提交
178
        } catch (...) {
X
Xu Peng 已提交
179
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
180
        }
X
Xu Peng 已提交
181
    }
Y
yu yunfeng 已提交
182
    auto end_time = METRICS_NOW_TIME;
183
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
184
    server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
185

G
groot 已提交
186 187 188 189
    auto table_path = GetTablePath(table_schema.table_id_);
    table_schema.location_ = table_path;
    if (!boost::filesystem::is_directory(table_path)) {
        auto ret = boost::filesystem::create_directories(table_path);
190
        if (!ret) {
G
groot 已提交
191
            ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
192 193
        }
        assert(ret);
194 195
    }

X
Xu Peng 已提交
196
    return Status::OK();
X
Xu Peng 已提交
197 198
}

G
groot 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
Status DBMetaImpl::DeleteTable(const std::string& table_id) {
    try {
        //drop the table from meta
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                           where(c(&TableSchema::table_id_) == table_id));
        for (auto &table : tables) {
            ConnectorPtr->remove<TableSchema>(std::get<0>(table));
        }
    } catch (std::exception &e) {
        HandleException(e);
    }

    return Status::OK();
}

214
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
215
    try {
Y
yu yunfeng 已提交
216
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
217
        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
218 219 220 221 222 223 224
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_),
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_));
Y
yu yunfeng 已提交
225
        auto end_time = METRICS_NOW_TIME;
226
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
227
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
228 229
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
G
groot 已提交
230 231 232 233 234
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.files_cnt_ = std::get<2>(groups[0]);
            table_schema.dimension_ = std::get<3>(groups[0]);
            table_schema.engine_type_ = std::get<4>(groups[0]);
            table_schema.store_raw_data_ = std::get<5>(groups[0]);
235
        } else {
G
groot 已提交
236
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
237
        }
G
groot 已提交
238 239 240 241

        auto table_path = GetTablePath(table_schema.table_id_);
        table_schema.location_ = table_path;

242
    } catch (std::exception &e) {
G
groot 已提交
243
        HandleException(e);
X
Xu Peng 已提交
244
    }
X
Xu Peng 已提交
245

X
Xu Peng 已提交
246
    return Status::OK();
X
Xu Peng 已提交
247 248
}

249
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
250
    try {
Y
yu yunfeng 已提交
251
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
252
        auto start_time = METRICS_NOW_TIME;
253

G
groot 已提交
254 255
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                           where(c(&TableSchema::table_id_) == table_id));
Y
yu yunfeng 已提交
256
        auto end_time = METRICS_NOW_TIME;
257
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
258
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
259 260
        assert(tables.size() <= 1);
        if (tables.size() == 1) {
261 262 263 264 265
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
G
groot 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
        HandleException(e);
    }
    return Status::OK();
}

Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
    try {
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
        auto start_time = METRICS_NOW_TIME;
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_));
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.files_cnt_ = std::get<2>(table);
            schema.dimension_ = std::get<3>(table);
            schema.engine_type_ = std::get<4>(table);
            schema.store_raw_data_ = std::get<5>(table);

            table_schema_array.emplace_back(schema);
        }
    } catch (std::exception &e) {
        HandleException(e);
X
Xu Peng 已提交
297
    }
G
groot 已提交
298

X
Xu Peng 已提交
299
    return Status::OK();
X
Xu Peng 已提交
300 301
}

302
Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
303 304
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
305
    }
306
    TableSchema table_schema;
G
groot 已提交
307
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
308
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
309 310 311
    if (!status.ok()) {
        return status;
    }
312

G
groot 已提交
313 314 315 316 317 318 319
    NextFileId(file_schema.file_id_);
    file_schema.file_type_ = TableFileSchema::NEW;
    file_schema.dimension_ = table_schema.dimension_;
    file_schema.size_ = 0;
    file_schema.created_on_ = utils::GetMicroSecTimeStamp();
    file_schema.updated_time_ = file_schema.created_on_;
    file_schema.engine_type_ = table_schema.engine_type_;
X
Xu Peng 已提交
320
    GetTableFilePath(file_schema);
X
Xu Peng 已提交
321

X
Xu Peng 已提交
322 323
    {
        try {
Y
yu yunfeng 已提交
324
            server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
325
            auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
326
            auto id = ConnectorPtr->insert(file_schema);
G
groot 已提交
327
            file_schema.id_ = id;
Y
yu yunfeng 已提交
328
            auto end_time = METRICS_NOW_TIME;
329
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
330
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
X
Xu Peng 已提交
331 332 333
        } catch (...) {
            return Status::DBTransactionError("Add file Error");
        }
X
Xu Peng 已提交
334
    }
335

G
groot 已提交
336
    auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);
337 338

    if (!boost::filesystem::is_directory(partition_path)) {
339 340
        auto ret = boost::filesystem::create_directory(partition_path);
        if (!ret) {
G
groot 已提交
341
            ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
342 343
        }
        assert(ret);
344 345
    }

X
Xu Peng 已提交
346
    return Status::OK();
X
Xu Peng 已提交
347 348
}

349
Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
350
    files.clear();
X
Xu Peng 已提交
351

352
    try {
Y
yu yunfeng 已提交
353
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
354
        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
355 356 357 358 359 360 361 362
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_),
                                             where(c(&TableFileSchema::file_type_)
363
                                                       == (int) TableFileSchema::TO_INDEX));
Y
yu yunfeng 已提交
364
        auto end_time = METRICS_NOW_TIME;
365
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
366
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
367

368
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
369
        TableFileSchema table_file;
370

371
        for (auto &file : selected) {
G
groot 已提交
372 373 374 375 376 377 378 379
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.engine_type_ = std::get<6>(file);

X
Xu Peng 已提交
380
            GetTableFilePath(table_file);
G
groot 已提交
381
            auto groupItr = groups.find(table_file.table_id_);
382
            if (groupItr == groups.end()) {
383
                TableSchema table_schema;
G
groot 已提交
384
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
385
                auto status = DescribeTable(table_schema);
386 387 388
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
389
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
390
            }
G
groot 已提交
391
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
392
            files.push_back(table_file);
X
Xu Peng 已提交
393
        }
394
    } catch (std::exception &e) {
G
groot 已提交
395
        HandleException(e);
X
Xu Peng 已提交
396
    }
X
Xu Peng 已提交
397

X
Xu Peng 已提交
398 399 400
    return Status::OK();
}

X
Xu Peng 已提交
401
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
402 403
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
404
    files.clear();
X
Xu Peng 已提交
405

406
    try {
Y
yu yunfeng 已提交
407
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
408
        auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
409
        if (partition.empty()) {
G
groot 已提交
410 411 412 413 414 415 416 417 418 419
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
420
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
421
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
422 423 424 425 426
                                                             == (int) TableFileSchema::INDEX)));
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
            TableSchema table_schema;
G
groot 已提交
427
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
428 429 430 431
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
432

X
Xu Peng 已提交
433 434 435
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
436 437 438 439 440 441 442 443
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
                table_file.engine_type_ = std::get<6>(file);
                table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
444
                GetTableFilePath(table_file);
G
groot 已提交
445
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
446
                if (dateItr == files.end()) {
G
groot 已提交
447
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
448
                }
G
groot 已提交
449
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
450 451 452
            }
        }
        else {
G
groot 已提交
453 454 455 456 457 458 459 460 461 462
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     in(&TableFileSchema::date_, partition) and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
463
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
464
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
465 466 467 468 469
                                                             == (int) TableFileSchema::INDEX)));
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
            TableSchema table_schema;
G
groot 已提交
470
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
471 472 473 474
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
475

X
Xu Peng 已提交
476 477 478
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
479 480 481 482 483 484 485
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
                table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
486
                GetTableFilePath(table_file);
G
groot 已提交
487
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
488
                if (dateItr == files.end()) {
G
groot 已提交
489
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
490
                }
G
groot 已提交
491
                files[table_file.date_].push_back(table_file);
492
            }
X
Xu Peng 已提交
493

X
xj.lin 已提交
494
        }
495
    } catch (std::exception &e) {
G
groot 已提交
496
        HandleException(e);
X
xj.lin 已提交
497 498 499 500 501
    }

    return Status::OK();
}

502 503
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
504
    files.clear();
X
Xu Peng 已提交
505

506
    try {
Y
yu yunfeng 已提交
507
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
508
        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
509 510 511 512 513 514 515 516
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
                                                 c(&TableFileSchema::table_id_) == table_id));
Y
yu yunfeng 已提交
517
        auto end_time = METRICS_NOW_TIME;
518
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
519
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
520
        TableSchema table_schema;
G
groot 已提交
521
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
522
        auto status = DescribeTable(table_schema);
523

524 525 526
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
527

X
Xu Peng 已提交
528
        TableFileSchema table_file;
529
        for (auto &file : selected) {
G
groot 已提交
530 531 532 533 534 535 536
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
537
            GetTableFilePath(table_file);
G
groot 已提交
538
            auto dateItr = files.find(table_file.date_);
539
            if (dateItr == files.end()) {
G
groot 已提交
540
                files[table_file.date_] = TableFilesSchema();
541
            }
G
groot 已提交
542
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
543
        }
544
    } catch (std::exception &e) {
G
groot 已提交
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
        HandleException(e);
    }

    return Status::OK();
}

Status DBMetaImpl::FilesToDelete(const std::string& table_id,
        const DatesT& partition,
        DatePartionedTableFilesSchema& files) {
    auto now = utils::GetMicroSecTimeStamp();
    try {
        if(partition.empty()) {
            //step 1: get table files by dates
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_),
                                                 where(c(&TableFileSchema::file_type_) !=
                                                       (int) TableFileSchema::TO_DELETE
                                                       and c(&TableFileSchema::table_id_) == table_id));

            //step 2: erase table files from meta
            for (auto &file : selected) {
                TableFileSchema table_file;
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.size_ = std::get<3>(file);
                table_file.date_ = std::get<4>(file);
                GetTableFilePath(table_file);
                auto dateItr = files.find(table_file.date_);
                if (dateItr == files.end()) {
                    files[table_file.date_] = TableFilesSchema();
                }
                files[table_file.date_].push_back(table_file);

                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
            }

        } else {
            //step 1: get all table files
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_),
                                                 where(c(&TableFileSchema::file_type_) !=
                                                       (int) TableFileSchema::TO_DELETE
                                                       and in(&TableFileSchema::date_, partition)
                                                       and c(&TableFileSchema::table_id_) == table_id));

            //step 2: erase table files from meta
            for (auto &file : selected) {
                TableFileSchema table_file;
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.size_ = std::get<3>(file);
                table_file.date_ = std::get<4>(file);
                GetTableFilePath(table_file);
                auto dateItr = files.find(table_file.date_);
                if (dateItr == files.end()) {
                    files[table_file.date_] = TableFilesSchema();
                }
                files[table_file.date_].push_back(table_file);

                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
            }
        }

    } catch (std::exception &e) {
        HandleException(e);
X
Xu Peng 已提交
618 619 620
    }

    return Status::OK();
X
Xu Peng 已提交
621 622
}

623
Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
X
Xu Peng 已提交
624

X
Xu Peng 已提交
625
    try {
G
groot 已提交
626 627 628 629 630 631 632 633
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::size_,
                                                  &TableFileSchema::date_),
                                          where(c(&TableFileSchema::file_id_) == file_schema.file_id_ and
                                              c(&TableFileSchema::table_id_) == file_schema.table_id_
X
Xu Peng 已提交
634 635 636
                                          ));
        assert(files.size() <= 1);
        if (files.size() == 1) {
G
groot 已提交
637 638 639 640 641 642
            file_schema.id_ = std::get<0>(files[0]);
            file_schema.table_id_ = std::get<1>(files[0]);
            file_schema.file_id_ = std::get<2>(files[0]);
            file_schema.file_type_ = std::get<3>(files[0]);
            file_schema.size_ = std::get<4>(files[0]);
            file_schema.date_ = std::get<5>(files[0]);
X
Xu Peng 已提交
643
        } else {
G
groot 已提交
644 645
            return Status::NotFound("Table:" + file_schema.table_id_ +
                " File:" + file_schema.file_id_ + " not found");
X
Xu Peng 已提交
646 647
        }
    } catch (std::exception &e) {
G
groot 已提交
648
        HandleException(e);
X
Xu Peng 已提交
649 650
    }

X
Xu Peng 已提交
651
    return Status::OK();
X
Xu Peng 已提交
652 653
}

X
Xu Peng 已提交
654
// PXU TODO: Support Swap
X
Xu Peng 已提交
655
Status DBMetaImpl::Archive() {
656
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
657 658 659 660 661
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
662 663
        auto &criteria = kv.first;
        auto &limit = kv.second;
X
Xu Peng 已提交
664
        if (criteria == "days") {
X
Xu Peng 已提交
665
            long usecs = limit * D_SEC * US_PS;
666
            long now = utils::GetMicroSecTimeStamp();
667
            try {
X
Xu Peng 已提交
668
                ConnectorPtr->update_all(
669
                    set(
G
groot 已提交
670
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
671 672
                    ),
                    where(
G
groot 已提交
673 674
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
675 676
                    ));
            } catch (std::exception &e) {
G
groot 已提交
677
                HandleException(e);
X
Xu Peng 已提交
678 679 680
            }
        }
        if (criteria == "disk") {
G
groot 已提交
681
            uint64_t sum = 0;
X
Xu Peng 已提交
682
            Size(sum);
X
Xu Peng 已提交
683

684
            auto to_delete = (sum - limit * G);
X
Xu Peng 已提交
685
            DiscardFiles(to_delete);
X
Xu Peng 已提交
686 687 688 689 690 691
        }
    }

    return Status::OK();
}

G
groot 已提交
692
Status DBMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
693
    result = 0;
X
Xu Peng 已提交
694
    try {
G
groot 已提交
695
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size_)),
696
                                             where(
G
groot 已提交
697
                                                 c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
698
                                             ));
X
Xu Peng 已提交
699

700 701
        for (auto &sub_query : selected) {
            if (!std::get<0>(sub_query)) {
X
Xu Peng 已提交
702 703
                continue;
            }
G
groot 已提交
704
            result += (uint64_t) (*std::get<0>(sub_query));
X
Xu Peng 已提交
705
        }
706
    } catch (std::exception &e) {
G
groot 已提交
707
        HandleException(e);
X
Xu Peng 已提交
708 709 710 711 712
    }

    return Status::OK();
}

X
Xu Peng 已提交
713
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
714
    LOG(DEBUG) << "About to discard size=" << to_discard_size;
X
Xu Peng 已提交
715 716 717 718
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    try {
G
groot 已提交
719 720 721
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::size_),
                                             where(c(&TableFileSchema::file_type_)
722
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
723
                                             order_by(&TableFileSchema::id_),
724
                                             limit(10));
X
Xu Peng 已提交
725

726
        std::vector<int> ids;
X
Xu Peng 已提交
727
        TableFileSchema table_file;
728

729
        for (auto &file : selected) {
730
            if (to_discard_size <= 0) break;
G
groot 已提交
731 732 733
            table_file.id_ = std::get<0>(file);
            table_file.size_ = std::get<1>(file);
            ids.push_back(table_file.id_);
G
groot 已提交
734 735
            ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                << " table_file.size=" << table_file.size_;
G
groot 已提交
736
            to_discard_size -= table_file.size_;
737 738 739 740 741 742 743
        }

        if (ids.size() == 0) {
            return Status::OK();
        }

        ConnectorPtr->update_all(
744
            set(
G
groot 已提交
745
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
746 747
            ),
            where(
G
groot 已提交
748
                in(&TableFileSchema::id_, ids)
749
            ));
X
Xu Peng 已提交
750

751
    } catch (std::exception &e) {
G
groot 已提交
752
        HandleException(e);
X
Xu Peng 已提交
753 754
    }

X
Xu Peng 已提交
755
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
756 757
}

758
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
759
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
760
    try {
Y
yu yunfeng 已提交
761
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
762
        auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
763
        ConnectorPtr->update(file_schema);
Y
yu yunfeng 已提交
764
        auto end_time = METRICS_NOW_TIME;
765
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
766
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
767
    } catch (std::exception &e) {
G
groot 已提交
768 769
        ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
        HandleException(e);
X
Xu Peng 已提交
770
    }
X
Xu Peng 已提交
771
    return Status::OK();
X
Xu Peng 已提交
772 773
}

774
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
775
    try {
Y
yu yunfeng 已提交
776
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
777
        auto start_time = METRICS_NOW_TIME;
778 779
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
780
                file.updated_time_ = utils::GetMicroSecTimeStamp();
781 782
                ConnectorPtr->update(file);
            }
Y
yu yunfeng 已提交
783
            auto end_time = METRICS_NOW_TIME;
784
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
785
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
786 787 788 789
            return true;
        });
        if (!commited) {
            return Status::DBTransactionError("Update files Error");
X
Xu Peng 已提交
790
        }
791
    } catch (std::exception &e) {
G
groot 已提交
792
        HandleException(e);
X
Xu Peng 已提交
793
    }
794 795 796
    return Status::OK();
}

797
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
798
    auto now = utils::GetMicroSecTimeStamp();
799
    try {
G
groot 已提交
800 801 802 803 804 805
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_),
806
                                             where(
G
groot 已提交
807
                                                 c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE
808
                                                     and
G
groot 已提交
809
                                                         c(&TableFileSchema::updated_time_)
810
                                                             > now - seconds * US_PS));
811

812
        TableFilesSchema updated;
813
        TableFileSchema table_file;
814

815
        for (auto &file : selected) {
G
groot 已提交
816 817 818 819 820 821
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
X
Xu Peng 已提交
822
            GetTableFilePath(table_file);
G
groot 已提交
823 824
            if (table_file.file_type_ == TableFileSchema::TO_DELETE) {
                boost::filesystem::remove(table_file.location_);
825
            }
G
groot 已提交
826
            ConnectorPtr->remove<TableFileSchema>(table_file.id_);
827
            /* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
X
Xu Peng 已提交
828
        }
829
    } catch (std::exception &e) {
G
groot 已提交
830
        HandleException(e);
X
Xu Peng 已提交
831 832 833 834 835
    }

    return Status::OK();
}

836
Status DBMetaImpl::CleanUp() {
837
    try {
G
groot 已提交
838 839 840 841 842 843
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_),
844
                                             where(
G
groot 已提交
845
                                                 c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE
846
                                                     or
G
groot 已提交
847
                                                         c(&TableFileSchema::file_type_)
848
                                                             == (int) TableFileSchema::NEW));
849

850
        TableFilesSchema updated;
851
        TableFileSchema table_file;
852

853
        for (auto &file : selected) {
G
groot 已提交
854 855 856 857 858 859
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
X
Xu Peng 已提交
860
            GetTableFilePath(table_file);
G
groot 已提交
861 862
            if (table_file.file_type_ == TableFileSchema::TO_DELETE) {
                boost::filesystem::remove(table_file.location_);
863
            }
G
groot 已提交
864
            ConnectorPtr->remove<TableFileSchema>(table_file.id_);
865
            /* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
X
Xu Peng 已提交
866
        }
867
    } catch (std::exception &e) {
G
groot 已提交
868
        HandleException(e);
X
Xu Peng 已提交
869 870 871 872 873
    }

    return Status::OK();
}

G
groot 已提交
874
Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
875

876
    try {
877

Y
yu yunfeng 已提交
878
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
879
        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
880 881 882 883
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_,
                                                     &TableFileSchema::date_),
                                             where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                 c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX
884
                                                 or
G
groot 已提交
885
                                                     c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
886
                                                       and
G
groot 已提交
887
                                                           c(&TableFileSchema::table_id_) == table_id));
Y
yu yunfeng 已提交
888
        auto end_time = METRICS_NOW_TIME;
889
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
890
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
891
        TableSchema table_schema;
G
groot 已提交
892
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
893
        auto status = DescribeTable(table_schema);
894

895 896 897 898 899
        if (!status.ok()) {
            return status;
        }

        result = 0;
900
        for (auto &file : selected) {
901 902
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
903

G
groot 已提交
904
        result /= table_schema.dimension_;
G
groot 已提交
905
        result /= sizeof(float);
X
Xu Peng 已提交
906

907
    } catch (std::exception &e) {
G
groot 已提交
908
        HandleException(e);
X
Xu Peng 已提交
909 910 911 912
    }
    return Status::OK();
}

913
Status DBMetaImpl::DropAll() {
X
Xu Peng 已提交
914 915
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
916 917 918 919
    }
    return Status::OK();
}

X
Xu Peng 已提交
920
DBMetaImpl::~DBMetaImpl() {
921
    CleanUp();
X
Xu Peng 已提交
922 923
}

924
} // namespace meta
X
Xu Peng 已提交
925 926 927
} // namespace engine
} // namespace vecwise
} // namespace zilliz