DBMetaImpl.cpp 36.9 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7 8
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
G
groot 已提交
9
#include "Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
G
groot 已提交
11
#include "Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36
}

G
groot 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
class MetricCollector {
public:
    MetricCollector() {
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
        start_time_ = METRICS_NOW_TIME;
    }

    ~MetricCollector() {
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
    }

private:
    using TIME_POINT = std::chrono::system_clock::time_point;
    TIME_POINT start_time_;
};

G
groot 已提交
55 56
}

57
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
58
    return make_storage(path,
G
groot 已提交
59
                        make_table("Tables",
G
groot 已提交
60 61
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
62
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
63 64 65 66 67
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("files_cnt", &TableSchema::files_cnt_, default_value(0)),
                                   make_column("engine_type", &TableSchema::engine_type_),
                                   make_column("store_raw_data", &TableSchema::store_raw_data_)),
G
groot 已提交
68
                        make_table("TableFiles",
G
groot 已提交
69 70
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
71
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
72 73 74 75 76 77
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
                                   make_column("size", &TableFileSchema::size_, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
78
    );
X
Xu Peng 已提交
79 80 81

}

X
Xu Peng 已提交
82
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
83
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
84
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
85

86
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
X
Xu Peng 已提交
87
    return options_.path + "/tables/" + table_id;
88 89
}

90
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
X
Xu Peng 已提交
91
    std::stringstream ss;
X
Xu Peng 已提交
92
    ss << GetTablePath(table_id) << "/" << date;
X
Xu Peng 已提交
93 94 95
    return ss.str();
}

96
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
G
groot 已提交
97 98
    if (group_file.date_ == EmptyDate) {
        group_file.date_ = Meta::GetDate();
X
Xu Peng 已提交
99 100
    }
    std::stringstream ss;
G
groot 已提交
101 102 103
    ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
       << "/" << group_file.file_id_;
    group_file.location_ = ss.str();
X
Xu Peng 已提交
104 105
}

106
Status DBMetaImpl::NextTableId(std::string &table_id) {
107 108
    std::stringstream ss;
    SimpleIDGenerator g;
109
    ss << g.GetNextIDNumber();
110
    table_id = ss.str();
111 112 113
    return Status::OK();
}

114
Status DBMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
115 116
    std::stringstream ss;
    SimpleIDGenerator g;
117
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
118 119 120 121
    file_id = ss.str();
    return Status::OK();
}

122
DBMetaImpl::DBMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
123 124
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
125 126
}

X
Xu Peng 已提交
127 128 129
Status DBMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
130
        if (!ret) {
G
groot 已提交
131 132
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
            return Status::DBTransactionError("Failed to create db directory", options_.path);
133
        }
X
Xu Peng 已提交
134
    }
X
Xu Peng 已提交
135

136
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
137

X
Xu Peng 已提交
138
    ConnectorPtr->sync_schema();
139
    ConnectorPtr->open_forever(); // thread safe option
140
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
141

142
    CleanUp();
X
Xu Peng 已提交
143

X
Xu Peng 已提交
144
    return Status::OK();
X
Xu Peng 已提交
145 146
}

X
Xu Peng 已提交
147
// PXU TODO: Temp solution. Will fix later
148 149
Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                         const DatesT &dates) {
X
Xu Peng 已提交
150 151 152 153
    if (dates.size() == 0) {
        return Status::OK();
    }

154
    TableSchema table_schema;
G
groot 已提交
155
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
156
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
157 158 159 160
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
161 162
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
163

G
groot 已提交
164 165 166 167
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
168 169 170
        }

        ConnectorPtr->update_all(
171
            set(
G
groot 已提交
172
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
173 174
            ),
            where(
G
groot 已提交
175 176
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
177 178
            ));
    } catch (std::exception &e) {
G
groot 已提交
179
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
180
    }
G
groot 已提交
181

X
Xu Peng 已提交
182 183 184
    return Status::OK();
}

185
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
G
groot 已提交
186 187 188 189 190
    try {
        MetricCollector metric;

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
191 192 193 194 195
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
                std::string msg = (TableSchema::TO_DELETE == std::get<0>(table[0])) ?
G
groot 已提交
196
                          "Table already exists and it is in delete state, please wait a second" : "Table already exists";
G
groot 已提交
197 198
                return Status::Error(msg);
            }
G
groot 已提交
199
        }
G
groot 已提交
200

G
groot 已提交
201 202 203 204
        table_schema.files_cnt_ = 0;
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
205
        try {
206
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
207
            table_schema.id_ = id;
X
Xu Peng 已提交
208
        } catch (...) {
X
Xu Peng 已提交
209
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
210
        }
G
groot 已提交
211 212 213 214 215 216 217

        auto table_path = GetTablePath(table_schema.table_id_);
        table_schema.location_ = table_path;
        if (!boost::filesystem::is_directory(table_path)) {
            auto ret = boost::filesystem::create_directories(table_path);
            if (!ret) {
                ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
G
groot 已提交
218
                return Status::Error("Failed to create table path");
G
groot 已提交
219
            }
220
        }
G
groot 已提交
221 222 223

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
224 225
    }

X
Xu Peng 已提交
226
    return Status::OK();
X
Xu Peng 已提交
227 228
}

G
groot 已提交
229 230
Status DBMetaImpl::DeleteTable(const std::string& table_id) {
    try {
G
groot 已提交
231 232 233 234 235 236 237 238 239
        MetricCollector metric;

        //soft delete table
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_,
                                                   &TableSchema::created_on_),
G
groot 已提交
240 241
                                           where(c(&TableSchema::table_id_) == table_id));
        for (auto &table : tables) {
G
groot 已提交
242 243 244 245 246 247 248 249 250 251 252
            TableSchema table_schema;
            table_schema.table_id_ = table_id;
            table_schema.state_ = (int)TableSchema::TO_DELETE;
            table_schema.id_ = std::get<0>(table);
            table_schema.files_cnt_ = std::get<1>(table);
            table_schema.dimension_ = std::get<2>(table);
            table_schema.engine_type_ = std::get<3>(table);
            table_schema.store_raw_data_ = std::get<4>(table);
            table_schema.created_on_ = std::get<5>(table);

            ConnectorPtr->update<TableSchema>(table_schema);
G
groot 已提交
253 254
        }
    } catch (std::exception &e) {
G
groot 已提交
255
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
256 257 258 259 260
    }

    return Status::OK();
}

G
groot 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
    try {
        MetricCollector metric;

        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
    }

    return Status::OK();
}

283
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
284
    try {
G
groot 已提交
285 286
        MetricCollector metric;

G
groot 已提交
287 288 289 290 291 292
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_),
G
groot 已提交
293 294 295
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

296
        if (groups.size() == 1) {
G
groot 已提交
297 298 299 300 301
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.files_cnt_ = std::get<2>(groups[0]);
            table_schema.dimension_ = std::get<3>(groups[0]);
            table_schema.engine_type_ = std::get<4>(groups[0]);
            table_schema.store_raw_data_ = std::get<5>(groups[0]);
302
        } else {
G
groot 已提交
303
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
304
        }
G
groot 已提交
305 306 307 308

        auto table_path = GetTablePath(table_schema.table_id_);
        table_schema.location_ = table_path;

309
    } catch (std::exception &e) {
G
groot 已提交
310
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
311
    }
X
Xu Peng 已提交
312

X
Xu Peng 已提交
313
    return Status::OK();
X
Xu Peng 已提交
314 315
}

316
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
317
    has_or_not = false;
318

G
groot 已提交
319 320
    try {
        MetricCollector metric;
G
groot 已提交
321
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
322 323
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
324
        if (tables.size() == 1) {
325 326 327 328
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
329

330
    } catch (std::exception &e) {
G
groot 已提交
331
        HandleException("Encounter exception when lookup table", e);
G
groot 已提交
332
    }
G
groot 已提交
333

G
groot 已提交
334 335 336 337 338
    return Status::OK();
}

Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
    try {
G
groot 已提交
339 340
        MetricCollector metric;

G
groot 已提交
341 342 343 344 345
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
G
groot 已提交
346 347
                                                   &TableSchema::store_raw_data_),
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
348 349 350 351 352 353 354 355 356 357 358
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.files_cnt_ = std::get<2>(table);
            schema.dimension_ = std::get<3>(table);
            schema.engine_type_ = std::get<4>(table);
            schema.store_raw_data_ = std::get<5>(table);

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
359

G
groot 已提交
360
    } catch (std::exception &e) {
G
groot 已提交
361
        HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
362
    }
G
groot 已提交
363

X
Xu Peng 已提交
364
    return Status::OK();
X
Xu Peng 已提交
365 366
}

367
Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
368 369
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
370
    }
371
    TableSchema table_schema;
G
groot 已提交
372
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
373
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
374 375 376
    if (!status.ok()) {
        return status;
    }
377

G
groot 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
    try {
        MetricCollector metric;

        NextFileId(file_schema.file_id_);
        file_schema.file_type_ = TableFileSchema::NEW;
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.size_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.engine_type_ = table_schema.engine_type_;
        GetTableFilePath(file_schema);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);

        if (!boost::filesystem::is_directory(partition_path)) {
            auto ret = boost::filesystem::create_directory(partition_path);
            if (!ret) {
                ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
                return Status::DBTransactionError("Failed to create partition directory");
            }
X
Xu Peng 已提交
401
        }
402

G
groot 已提交
403 404
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
405 406
    }

X
Xu Peng 已提交
407
    return Status::OK();
X
Xu Peng 已提交
408 409
}

410
Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
411
    files.clear();
X
Xu Peng 已提交
412

413
    try {
G
groot 已提交
414 415
        MetricCollector metric;

G
groot 已提交
416 417 418 419 420 421 422 423
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_),
                                             where(c(&TableFileSchema::file_type_)
424
                                                       == (int) TableFileSchema::TO_INDEX));
425

426
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
427
        TableFileSchema table_file;
428

429
        for (auto &file : selected) {
G
groot 已提交
430 431 432 433 434 435 436 437
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.engine_type_ = std::get<6>(file);

X
Xu Peng 已提交
438
            GetTableFilePath(table_file);
G
groot 已提交
439
            auto groupItr = groups.find(table_file.table_id_);
440
            if (groupItr == groups.end()) {
441
                TableSchema table_schema;
G
groot 已提交
442
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
443
                auto status = DescribeTable(table_schema);
444 445 446
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
447
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
448
            }
G
groot 已提交
449
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
450
            files.push_back(table_file);
X
Xu Peng 已提交
451
        }
G
groot 已提交
452

453
    } catch (std::exception &e) {
G
groot 已提交
454
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
455
    }
X
Xu Peng 已提交
456

X
Xu Peng 已提交
457 458 459
    return Status::OK();
}

X
Xu Peng 已提交
460
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
461 462
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
463
    files.clear();
X
Xu Peng 已提交
464

465
    try {
G
groot 已提交
466 467
        MetricCollector metric;

X
Xu Peng 已提交
468
        if (partition.empty()) {
G
groot 已提交
469 470 471 472 473 474 475 476 477 478
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
479
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
480
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
481
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
482

X
Xu Peng 已提交
483
            TableSchema table_schema;
G
groot 已提交
484
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
485 486 487 488
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
489

X
Xu Peng 已提交
490 491 492
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
493 494 495 496 497 498 499 500
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
                table_file.engine_type_ = std::get<6>(file);
                table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
501
                GetTableFilePath(table_file);
G
groot 已提交
502
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
503
                if (dateItr == files.end()) {
G
groot 已提交
504
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
505
                }
G
groot 已提交
506
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
507 508 509
            }
        }
        else {
G
groot 已提交
510 511 512 513 514
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
G
groot 已提交
515 516
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
517 518 519 520
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     in(&TableFileSchema::date_, partition) and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
521
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
522
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
523
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
524

X
Xu Peng 已提交
525
            TableSchema table_schema;
G
groot 已提交
526
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
527 528 529 530
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
531

X
Xu Peng 已提交
532 533 534
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
535 536 537 538 539 540
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
G
groot 已提交
541
                table_file.engine_type_ = std::get<6>(file);
G
groot 已提交
542
                table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
543
                GetTableFilePath(table_file);
G
groot 已提交
544
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
545
                if (dateItr == files.end()) {
G
groot 已提交
546
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
547
                }
G
groot 已提交
548
                files[table_file.date_].push_back(table_file);
549
            }
X
Xu Peng 已提交
550

X
xj.lin 已提交
551
        }
552
    } catch (std::exception &e) {
G
groot 已提交
553
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
554 555 556 557 558
    }

    return Status::OK();
}

559 560
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
561
    files.clear();
X
Xu Peng 已提交
562

563
    try {
G
groot 已提交
564 565
        MetricCollector metric;

G
groot 已提交
566 567 568 569 570 571 572
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
573 574
                                                 c(&TableFileSchema::table_id_) == table_id),
                                             order_by(&TableFileSchema::size_).desc());
G
groot 已提交
575

576
        TableSchema table_schema;
G
groot 已提交
577
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
578
        auto status = DescribeTable(table_schema);
579

580 581 582
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
583

X
Xu Peng 已提交
584
        TableFileSchema table_file;
585
        for (auto &file : selected) {
G
groot 已提交
586 587 588 589 590 591 592
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
593
            GetTableFilePath(table_file);
G
groot 已提交
594
            auto dateItr = files.find(table_file.date_);
595
            if (dateItr == files.end()) {
G
groot 已提交
596
                files[table_file.date_] = TableFilesSchema();
597
            }
G
groot 已提交
598
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
599
        }
600
    } catch (std::exception &e) {
G
groot 已提交
601
        return HandleException("Encounter exception when iterate merge files", e);
G
groot 已提交
602 603 604 605 606
    }

    return Status::OK();
}

607 608 609
Status DBMetaImpl::GetTableFiles(const std::string& table_id,
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
610
    try {
611 612
        table_files.clear();
        auto files = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
G
groot 已提交
613 614
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::size_,
615 616 617 618
                                                  &TableFileSchema::date_,
                                                  &TableFileSchema::engine_type_),
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
619
                                          ));
620 621 622 623 624 625 626 627 628 629

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
630
            file_schema.table_id_ = table_id;
631 632 633 634 635 636 637 638 639
            file_schema.file_id_ = std::get<0>(file);
            file_schema.file_type_ = std::get<1>(file);
            file_schema.size_ = std::get<2>(file);
            file_schema.date_ = std::get<3>(file);
            file_schema.engine_type_ = std::get<4>(file);
            file_schema.dimension_ = table_schema.dimension_;
            GetTableFilePath(file_schema);

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
640 641
        }
    } catch (std::exception &e) {
642
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
643 644
    }

X
Xu Peng 已提交
645
    return Status::OK();
X
Xu Peng 已提交
646 647
}

X
Xu Peng 已提交
648
// PXU TODO: Support Swap
X
Xu Peng 已提交
649
Status DBMetaImpl::Archive() {
650
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
651 652 653 654 655
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
656 657
        auto &criteria = kv.first;
        auto &limit = kv.second;
X
Xu Peng 已提交
658
        if (criteria == "days") {
X
Xu Peng 已提交
659
            long usecs = limit * D_SEC * US_PS;
660
            long now = utils::GetMicroSecTimeStamp();
661
            try {
X
Xu Peng 已提交
662
                ConnectorPtr->update_all(
663
                    set(
G
groot 已提交
664
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
665 666
                    ),
                    where(
G
groot 已提交
667 668
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
669 670
                    ));
            } catch (std::exception &e) {
G
groot 已提交
671
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
672 673 674
            }
        }
        if (criteria == "disk") {
G
groot 已提交
675
            uint64_t sum = 0;
X
Xu Peng 已提交
676
            Size(sum);
X
Xu Peng 已提交
677

678
            auto to_delete = (sum - limit * G);
X
Xu Peng 已提交
679
            DiscardFiles(to_delete);
X
Xu Peng 已提交
680 681 682 683 684 685
        }
    }

    return Status::OK();
}

G
groot 已提交
686
Status DBMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
687
    result = 0;
X
Xu Peng 已提交
688
    try {
G
groot 已提交
689
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size_)),
690
                                             where(
G
groot 已提交
691
                                                 c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
692
                                             ));
X
Xu Peng 已提交
693

694 695
        for (auto &sub_query : selected) {
            if (!std::get<0>(sub_query)) {
X
Xu Peng 已提交
696 697
                continue;
            }
G
groot 已提交
698
            result += (uint64_t) (*std::get<0>(sub_query));
X
Xu Peng 已提交
699
        }
700
    } catch (std::exception &e) {
G
groot 已提交
701
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
702 703 704 705 706
    }

    return Status::OK();
}

X
Xu Peng 已提交
707
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
708 709 710
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
711

G
groot 已提交
712
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
713

X
Xu Peng 已提交
714
    try {
G
groot 已提交
715 716 717 718 719 720
        MetricCollector metric;

        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::size_),
                                                 where(c(&TableFileSchema::file_type_)
721
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
722 723
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
724

G
groot 已提交
725 726
            std::vector<int> ids;
            TableFileSchema table_file;
727

G
groot 已提交
728 729 730 731 732 733 734 735 736
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
                table_file.size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.size_;
                to_discard_size -= table_file.size_;
            }
737

G
groot 已提交
738 739 740
            if (ids.size() == 0) {
                return true;
            }
741

G
groot 已提交
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
757

758
    } catch (std::exception &e) {
G
groot 已提交
759
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
760 761
    }

X
Xu Peng 已提交
762
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
763 764
}

765
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
766
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
767
    try {
G
groot 已提交
768 769
        MetricCollector metric;

G
groot 已提交
770 771 772 773 774 775 776 777 778
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
779
        ConnectorPtr->update(file_schema);
G
groot 已提交
780

781
    } catch (std::exception &e) {
G
groot 已提交
782 783 784
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
785
    }
X
Xu Peng 已提交
786
    return Status::OK();
X
Xu Peng 已提交
787 788
}

789
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
790
    try {
G
groot 已提交
791 792
        MetricCollector metric;

G
groot 已提交
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

808 809
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
810 811 812 813
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
814
                file.updated_time_ = utils::GetMicroSecTimeStamp();
815 816 817 818
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
819

820
        if (!commited) {
G
groot 已提交
821
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
822
        }
G
groot 已提交
823

824
    } catch (std::exception &e) {
G
groot 已提交
825
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
826
    }
827 828 829
    return Status::OK();
}

830
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
831
    auto now = utils::GetMicroSecTimeStamp();
832
    try {
G
groot 已提交
833
        MetricCollector metric;
834

G
groot 已提交
835 836 837 838 839 840 841 842 843 844
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
845

G
groot 已提交
846 847 848 849 850 851 852 853 854 855
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);
                GetTableFilePath(table_file);

                ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl;
G
groot 已提交
856
                boost::filesystem::remove(table_file.location_);
G
groot 已提交
857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

            }
            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

    try {
        MetricCollector metric;

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
                auto table_path = GetTablePath(std::get<1>(table));

                ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
                boost::filesystem::remove_all(table_path);
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
885
            }
G
groot 已提交
886 887 888 889 890 891

            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
892
        }
G
groot 已提交
893

894
    } catch (std::exception &e) {
G
groot 已提交
895
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
896 897 898 899 900
    }

    return Status::OK();
}

901
Status DBMetaImpl::CleanUp() {
902
    try {
G
groot 已提交
903 904
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
905

G
groot 已提交
906 907 908 909
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
910
            }
G
groot 已提交
911 912 913 914 915
            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
916
        }
G
groot 已提交
917

918
    } catch (std::exception &e) {
G
groot 已提交
919
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
920 921 922 923 924
    }

    return Status::OK();
}

G
groot 已提交
925
Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
926

927
    try {
G
groot 已提交
928
        MetricCollector metric;
929

G
groot 已提交
930 931 932 933 934 935
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_),
                                             where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
                                                    or
                                                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX
                                                    or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
                                                   and c(&TableFileSchema::table_id_) == table_id));
G
groot 已提交
936

937
        TableSchema table_schema;
G
groot 已提交
938
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
939
        auto status = DescribeTable(table_schema);
940

941 942 943 944 945
        if (!status.ok()) {
            return status;
        }

        result = 0;
946
        for (auto &file : selected) {
947 948
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
949

G
groot 已提交
950
        result /= table_schema.dimension_;
G
groot 已提交
951
        result /= sizeof(float);
X
Xu Peng 已提交
952

953
    } catch (std::exception &e) {
G
groot 已提交
954
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
955 956 957 958
    }
    return Status::OK();
}

959
Status DBMetaImpl::DropAll() {
X
Xu Peng 已提交
960 961
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
962 963 964 965
    }
    return Status::OK();
}

X
Xu Peng 已提交
966
DBMetaImpl::~DBMetaImpl() {
967
    CleanUp();
X
Xu Peng 已提交
968 969
}

970
} // namespace meta
X
Xu Peng 已提交
971
} // namespace engine
J
jinhai 已提交
972
} // namespace milvus
X
Xu Peng 已提交
973
} // namespace zilliz