DBMetaImpl.cpp 37.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7 8
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
G
groot 已提交
9
#include "Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
G
groot 已提交
11
#include "Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36
}

G
groot 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
class MetricCollector {
public:
    MetricCollector() {
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
        start_time_ = METRICS_NOW_TIME;
    }

    ~MetricCollector() {
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
    }

private:
    using TIME_POINT = std::chrono::system_clock::time_point;
    TIME_POINT start_time_;
};

G
groot 已提交
55 56
}

57
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
58
    return make_storage(path,
G
groot 已提交
59
                        make_table("Tables",
G
groot 已提交
60 61
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
62
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
63 64 65 66 67
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("files_cnt", &TableSchema::files_cnt_, default_value(0)),
                                   make_column("engine_type", &TableSchema::engine_type_),
                                   make_column("store_raw_data", &TableSchema::store_raw_data_)),
G
groot 已提交
68
                        make_table("TableFiles",
G
groot 已提交
69 70
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
71
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
72 73 74 75 76 77
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
                                   make_column("size", &TableFileSchema::size_, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
78
    );
X
Xu Peng 已提交
79 80 81

}

X
Xu Peng 已提交
82
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
83
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
84
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
85

86
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
X
Xu Peng 已提交
87
    return options_.path + "/tables/" + table_id;
88 89
}

90
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
X
Xu Peng 已提交
91
    std::stringstream ss;
X
Xu Peng 已提交
92
    ss << GetTablePath(table_id) << "/" << date;
X
Xu Peng 已提交
93 94 95
    return ss.str();
}

96
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
G
groot 已提交
97 98
    if (group_file.date_ == EmptyDate) {
        group_file.date_ = Meta::GetDate();
X
Xu Peng 已提交
99 100
    }
    std::stringstream ss;
G
groot 已提交
101 102 103
    ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
       << "/" << group_file.file_id_;
    group_file.location_ = ss.str();
X
Xu Peng 已提交
104 105
}

106
Status DBMetaImpl::NextTableId(std::string &table_id) {
107 108
    std::stringstream ss;
    SimpleIDGenerator g;
109
    ss << g.GetNextIDNumber();
110
    table_id = ss.str();
111 112 113
    return Status::OK();
}

114
Status DBMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
115 116
    std::stringstream ss;
    SimpleIDGenerator g;
117
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
118 119 120 121
    file_id = ss.str();
    return Status::OK();
}

122
DBMetaImpl::DBMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
123 124
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
125 126
}

X
Xu Peng 已提交
127 128 129
Status DBMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
130
        if (!ret) {
G
groot 已提交
131 132
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
            return Status::DBTransactionError("Failed to create db directory", options_.path);
133
        }
X
Xu Peng 已提交
134
    }
X
Xu Peng 已提交
135

136
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
137

X
Xu Peng 已提交
138
    ConnectorPtr->sync_schema();
139
    ConnectorPtr->open_forever(); // thread safe option
140
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
141

142
    CleanUp();
X
Xu Peng 已提交
143

X
Xu Peng 已提交
144
    return Status::OK();
X
Xu Peng 已提交
145 146
}

X
Xu Peng 已提交
147
// PXU TODO: Temp solution. Will fix later
148 149
Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                         const DatesT &dates) {
X
Xu Peng 已提交
150 151 152 153
    if (dates.size() == 0) {
        return Status::OK();
    }

154
    TableSchema table_schema;
G
groot 已提交
155
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
156
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
157 158 159 160
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
161 162
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
163

G
groot 已提交
164 165 166 167
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
168 169 170
        }

        ConnectorPtr->update_all(
171
            set(
G
groot 已提交
172
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
173 174
            ),
            where(
G
groot 已提交
175 176
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
177 178
            ));
    } catch (std::exception &e) {
G
groot 已提交
179
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
180
    }
G
groot 已提交
181

X
Xu Peng 已提交
182 183 184
    return Status::OK();
}

185
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
Z
zhiru 已提交
186

G
groot 已提交
187 188 189 190 191
    try {
        MetricCollector metric;

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
192 193 194 195
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
S
starlord 已提交
196 197 198 199 200
                if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status::Error("Table already exists and it is in delete state, please wait a second");
                } else {
                    return Status::OK();//table already exists, no error
                }
G
groot 已提交
201
            }
G
groot 已提交
202
        }
G
groot 已提交
203

G
groot 已提交
204 205 206 207
        table_schema.files_cnt_ = 0;
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
208
        try {
209
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
210
            table_schema.id_ = id;
X
Xu Peng 已提交
211
        } catch (...) {
X
Xu Peng 已提交
212
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
213
        }
214

G
groot 已提交
215 216 217 218 219 220
        auto table_path = GetTablePath(table_schema.table_id_);
        table_schema.location_ = table_path;
        if (!boost::filesystem::is_directory(table_path)) {
            auto ret = boost::filesystem::create_directories(table_path);
            if (!ret) {
                ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
G
groot 已提交
221
                return Status::Error("Failed to create table path");
G
groot 已提交
222
            }
223
        }
G
groot 已提交
224 225 226

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
227 228
    }

X
Xu Peng 已提交
229
    return Status::OK();
X
Xu Peng 已提交
230 231
}

G
groot 已提交
232 233
Status DBMetaImpl::DeleteTable(const std::string& table_id) {
    try {
G
groot 已提交
234 235 236 237 238 239 240 241 242
        MetricCollector metric;

        //soft delete table
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_,
                                                   &TableSchema::created_on_),
G
groot 已提交
243 244
                                           where(c(&TableSchema::table_id_) == table_id));
        for (auto &table : tables) {
G
groot 已提交
245 246 247 248 249 250 251 252 253 254 255
            TableSchema table_schema;
            table_schema.table_id_ = table_id;
            table_schema.state_ = (int)TableSchema::TO_DELETE;
            table_schema.id_ = std::get<0>(table);
            table_schema.files_cnt_ = std::get<1>(table);
            table_schema.dimension_ = std::get<2>(table);
            table_schema.engine_type_ = std::get<3>(table);
            table_schema.store_raw_data_ = std::get<4>(table);
            table_schema.created_on_ = std::get<5>(table);

            ConnectorPtr->update<TableSchema>(table_schema);
G
groot 已提交
256 257
        }
    } catch (std::exception &e) {
G
groot 已提交
258
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
259 260 261 262 263
    }

    return Status::OK();
}

G
groot 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
    try {
        MetricCollector metric;

        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
G
groot 已提交
281 282 283 284 285
    }

    return Status::OK();
}

286
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
287
    try {
G
groot 已提交
288 289
        MetricCollector metric;

G
groot 已提交
290 291 292 293 294 295
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_),
G
groot 已提交
296 297 298
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

299
        if (groups.size() == 1) {
G
groot 已提交
300 301 302 303 304
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.files_cnt_ = std::get<2>(groups[0]);
            table_schema.dimension_ = std::get<3>(groups[0]);
            table_schema.engine_type_ = std::get<4>(groups[0]);
            table_schema.store_raw_data_ = std::get<5>(groups[0]);
305
        } else {
G
groot 已提交
306
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
307
        }
G
groot 已提交
308 309 310 311

        auto table_path = GetTablePath(table_schema.table_id_);
        table_schema.location_ = table_path;

312
    } catch (std::exception &e) {
G
groot 已提交
313
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
314
    }
X
Xu Peng 已提交
315

X
Xu Peng 已提交
316
    return Status::OK();
X
Xu Peng 已提交
317 318
}

319
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
320
    has_or_not = false;
321

G
groot 已提交
322 323
    try {
        MetricCollector metric;
G
groot 已提交
324
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
325 326
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
327
        if (tables.size() == 1) {
328 329 330 331
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
332

333
    } catch (std::exception &e) {
S
starlord 已提交
334
        return HandleException("Encounter exception when lookup table", e);
G
groot 已提交
335
    }
G
groot 已提交
336

G
groot 已提交
337 338 339 340 341
    return Status::OK();
}

Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
    try {
G
groot 已提交
342 343
        MetricCollector metric;

G
groot 已提交
344 345 346 347 348
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
G
groot 已提交
349 350
                                                   &TableSchema::store_raw_data_),
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
351 352 353 354 355 356 357 358 359 360 361
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.files_cnt_ = std::get<2>(table);
            schema.dimension_ = std::get<3>(table);
            schema.engine_type_ = std::get<4>(table);
            schema.store_raw_data_ = std::get<5>(table);

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
362

G
groot 已提交
363
    } catch (std::exception &e) {
S
starlord 已提交
364
        return HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
365
    }
G
groot 已提交
366

X
Xu Peng 已提交
367
    return Status::OK();
X
Xu Peng 已提交
368 369
}

370
Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
371 372
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
373
    }
374
    TableSchema table_schema;
G
groot 已提交
375
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
376
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
377 378 379
    if (!status.ok()) {
        return status;
    }
380

G
groot 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
    try {
        MetricCollector metric;

        NextFileId(file_schema.file_id_);
        file_schema.file_type_ = TableFileSchema::NEW;
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.size_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.engine_type_ = table_schema.engine_type_;
        GetTableFilePath(file_schema);

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

        auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);

        if (!boost::filesystem::is_directory(partition_path)) {
            auto ret = boost::filesystem::create_directory(partition_path);
            if (!ret) {
                ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
                return Status::DBTransactionError("Failed to create partition directory");
            }
X
Xu Peng 已提交
404
        }
405

G
groot 已提交
406 407
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
408 409
    }

X
Xu Peng 已提交
410
    return Status::OK();
X
Xu Peng 已提交
411 412
}

413
Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
414
    files.clear();
X
Xu Peng 已提交
415

416
    try {
G
groot 已提交
417 418
        MetricCollector metric;

G
groot 已提交
419 420 421 422 423 424 425 426
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_),
                                             where(c(&TableFileSchema::file_type_)
427
                                                       == (int) TableFileSchema::TO_INDEX));
428

429
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
430
        TableFileSchema table_file;
431

432
        for (auto &file : selected) {
G
groot 已提交
433 434 435 436 437 438 439 440
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.engine_type_ = std::get<6>(file);

X
Xu Peng 已提交
441
            GetTableFilePath(table_file);
G
groot 已提交
442
            auto groupItr = groups.find(table_file.table_id_);
443
            if (groupItr == groups.end()) {
444
                TableSchema table_schema;
G
groot 已提交
445
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
446
                auto status = DescribeTable(table_schema);
447 448 449
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
450
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
451
            }
G
groot 已提交
452
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
453
            files.push_back(table_file);
X
Xu Peng 已提交
454
        }
G
groot 已提交
455

456
    } catch (std::exception &e) {
G
groot 已提交
457
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
458
    }
X
Xu Peng 已提交
459

X
Xu Peng 已提交
460 461 462
    return Status::OK();
}

X
Xu Peng 已提交
463
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
464 465
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
466
    files.clear();
X
Xu Peng 已提交
467

468
    try {
G
groot 已提交
469 470
        MetricCollector metric;

X
Xu Peng 已提交
471
        if (partition.empty()) {
G
groot 已提交
472 473 474 475 476 477 478 479 480 481
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
482
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
483
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
484
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
485

X
Xu Peng 已提交
486
            TableSchema table_schema;
G
groot 已提交
487
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
488 489 490 491
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
492

X
Xu Peng 已提交
493 494 495
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
496 497 498 499 500 501 502 503
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
                table_file.engine_type_ = std::get<6>(file);
                table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
504
                GetTableFilePath(table_file);
G
groot 已提交
505
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
506
                if (dateItr == files.end()) {
G
groot 已提交
507
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
508
                }
G
groot 已提交
509
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
510 511 512
            }
        }
        else {
G
groot 已提交
513 514 515 516 517
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
G
groot 已提交
518 519
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
520 521 522 523
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     in(&TableFileSchema::date_, partition) and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
524
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
525
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
526
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
527

X
Xu Peng 已提交
528
            TableSchema table_schema;
G
groot 已提交
529
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
530 531 532 533
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
534

X
Xu Peng 已提交
535 536 537
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
538 539 540 541 542 543
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
G
groot 已提交
544
                table_file.engine_type_ = std::get<6>(file);
G
groot 已提交
545
                table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
546
                GetTableFilePath(table_file);
G
groot 已提交
547
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
548
                if (dateItr == files.end()) {
G
groot 已提交
549
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
550
                }
G
groot 已提交
551
                files[table_file.date_].push_back(table_file);
552
            }
X
Xu Peng 已提交
553

X
xj.lin 已提交
554
        }
555
    } catch (std::exception &e) {
G
groot 已提交
556
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
557 558 559 560 561
    }

    return Status::OK();
}

562 563
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
564
    files.clear();
X
Xu Peng 已提交
565

566
    try {
G
groot 已提交
567 568
        MetricCollector metric;

G
groot 已提交
569 570 571 572 573 574 575
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
576 577
                                                 c(&TableFileSchema::table_id_) == table_id),
                                             order_by(&TableFileSchema::size_).desc());
G
groot 已提交
578

579
        TableSchema table_schema;
G
groot 已提交
580
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
581
        auto status = DescribeTable(table_schema);
582

583 584 585
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
586

X
Xu Peng 已提交
587
        TableFileSchema table_file;
588
        for (auto &file : selected) {
G
groot 已提交
589 590 591 592 593 594 595
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.dimension_ = table_schema.dimension_;
X
Xu Peng 已提交
596
            GetTableFilePath(table_file);
G
groot 已提交
597
            auto dateItr = files.find(table_file.date_);
598
            if (dateItr == files.end()) {
G
groot 已提交
599
                files[table_file.date_] = TableFilesSchema();
600
            }
G
groot 已提交
601
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
602
        }
603
    } catch (std::exception &e) {
G
groot 已提交
604
        return HandleException("Encounter exception when iterate merge files", e);
X
Xu Peng 已提交
605 606 607
    }

    return Status::OK();
X
Xu Peng 已提交
608 609
}

610 611 612
Status DBMetaImpl::GetTableFiles(const std::string& table_id,
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
613
    try {
614 615
        table_files.clear();
        auto files = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
G
groot 已提交
616 617
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::size_,
618 619 620 621
                                                  &TableFileSchema::date_,
                                                  &TableFileSchema::engine_type_),
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
622
                                          ));
623 624 625 626 627 628 629 630 631 632

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
633
            file_schema.table_id_ = table_id;
634 635 636 637 638 639 640 641 642
            file_schema.file_id_ = std::get<0>(file);
            file_schema.file_type_ = std::get<1>(file);
            file_schema.size_ = std::get<2>(file);
            file_schema.date_ = std::get<3>(file);
            file_schema.engine_type_ = std::get<4>(file);
            file_schema.dimension_ = table_schema.dimension_;
            GetTableFilePath(file_schema);

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
643 644
        }
    } catch (std::exception &e) {
645
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
646 647
    }

X
Xu Peng 已提交
648
    return Status::OK();
X
Xu Peng 已提交
649 650
}

X
Xu Peng 已提交
651
// PXU TODO: Support Swap
X
Xu Peng 已提交
652
Status DBMetaImpl::Archive() {
653
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
654 655 656 657 658
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
659 660
        auto &criteria = kv.first;
        auto &limit = kv.second;
S
starlord 已提交
661
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
X
Xu Peng 已提交
662
            long usecs = limit * D_SEC * US_PS;
663
            long now = utils::GetMicroSecTimeStamp();
664
            try {
X
Xu Peng 已提交
665
                ConnectorPtr->update_all(
666
                    set(
G
groot 已提交
667
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
668 669
                    ),
                    where(
G
groot 已提交
670 671
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
672 673
                    ));
            } catch (std::exception &e) {
G
groot 已提交
674
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
675 676
            }
        }
S
starlord 已提交
677
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
678
            uint64_t sum = 0;
X
Xu Peng 已提交
679
            Size(sum);
X
Xu Peng 已提交
680

S
starlord 已提交
681
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
682
            DiscardFiles(to_delete);
X
Xu Peng 已提交
683 684 685 686 687 688
        }
    }

    return Status::OK();
}

G
groot 已提交
689
Status DBMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
690
    result = 0;
X
Xu Peng 已提交
691
    try {
G
groot 已提交
692
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size_)),
693
                                             where(
G
groot 已提交
694
                                                 c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
695
                                             ));
X
Xu Peng 已提交
696

697 698
        for (auto &sub_query : selected) {
            if (!std::get<0>(sub_query)) {
X
Xu Peng 已提交
699 700
                continue;
            }
G
groot 已提交
701
            result += (uint64_t) (*std::get<0>(sub_query));
X
Xu Peng 已提交
702
        }
703
    } catch (std::exception &e) {
G
groot 已提交
704
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
705 706 707 708 709
    }

    return Status::OK();
}

X
Xu Peng 已提交
710
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
711 712 713
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
714

G
groot 已提交
715
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
716

X
Xu Peng 已提交
717
    try {
G
groot 已提交
718 719 720 721 722 723
        MetricCollector metric;

        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::size_),
                                                 where(c(&TableFileSchema::file_type_)
724
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
725 726
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
727

G
groot 已提交
728 729
            std::vector<int> ids;
            TableFileSchema table_file;
730

G
groot 已提交
731 732 733 734 735 736 737 738 739
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
                table_file.size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.size_;
                to_discard_size -= table_file.size_;
            }
740

G
groot 已提交
741 742 743
            if (ids.size() == 0) {
                return true;
            }
744

G
groot 已提交
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
760

761
    } catch (std::exception &e) {
G
groot 已提交
762
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
763 764
    }

X
Xu Peng 已提交
765
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
766 767
}

768
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
769
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
770
    try {
G
groot 已提交
771 772
        MetricCollector metric;

G
groot 已提交
773 774 775 776 777 778 779 780 781
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
782
        ConnectorPtr->update(file_schema);
G
groot 已提交
783

784
    } catch (std::exception &e) {
G
groot 已提交
785 786 787
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
788
    }
X
Xu Peng 已提交
789
    return Status::OK();
X
Xu Peng 已提交
790 791
}

792
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
793
    try {
G
groot 已提交
794 795
        MetricCollector metric;

G
groot 已提交
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

811 812
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
813 814 815 816
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
817
                file.updated_time_ = utils::GetMicroSecTimeStamp();
818 819 820 821
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
822

823
        if (!commited) {
G
groot 已提交
824
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
825
        }
G
groot 已提交
826

827
    } catch (std::exception &e) {
G
groot 已提交
828
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
829
    }
830 831 832
    return Status::OK();
}

833
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
834
    auto now = utils::GetMicroSecTimeStamp();
835
    try {
G
groot 已提交
836
        MetricCollector metric;
837

G
groot 已提交
838 839 840 841 842 843 844 845 846 847
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
848

G
groot 已提交
849 850 851 852 853 854 855 856 857 858
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);
                GetTableFilePath(table_file);

                ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl;
G
groot 已提交
859
                boost::filesystem::remove(table_file.location_);
G
groot 已提交
860 861
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

862
            }
G
groot 已提交
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

    try {
        MetricCollector metric;

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
                auto table_path = GetTablePath(std::get<1>(table));

                ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
                boost::filesystem::remove_all(table_path);
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
888
            }
G
groot 已提交
889 890 891 892 893 894

            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
895
        }
G
groot 已提交
896

897
    } catch (std::exception &e) {
G
groot 已提交
898
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
899 900 901 902 903
    }

    return Status::OK();
}

904
Status DBMetaImpl::CleanUp() {
905
    try {
G
groot 已提交
906 907
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
908

G
groot 已提交
909 910 911 912
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
913
            }
G
groot 已提交
914 915 916 917 918
            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
919
        }
G
groot 已提交
920

921
    } catch (std::exception &e) {
G
groot 已提交
922
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
923 924 925 926 927
    }

    return Status::OK();
}

G
groot 已提交
928
Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
929

930
    try {
G
groot 已提交
931
        MetricCollector metric;
932

G
groot 已提交
933 934 935 936 937 938
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_),
                                             where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
                                                    or
                                                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX
                                                    or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
                                                   and c(&TableFileSchema::table_id_) == table_id));
939

940
        TableSchema table_schema;
G
groot 已提交
941
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
942
        auto status = DescribeTable(table_schema);
943

944 945 946 947 948
        if (!status.ok()) {
            return status;
        }

        result = 0;
949
        for (auto &file : selected) {
950 951
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
952

G
groot 已提交
953
        result /= table_schema.dimension_;
G
groot 已提交
954
        result /= sizeof(float);
X
Xu Peng 已提交
955

956
    } catch (std::exception &e) {
G
groot 已提交
957
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
958 959 960 961
    }
    return Status::OK();
}

962
Status DBMetaImpl::DropAll() {
X
Xu Peng 已提交
963 964
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
965 966 967 968
    }
    return Status::OK();
}

X
Xu Peng 已提交
969
DBMetaImpl::~DBMetaImpl() {
970
    CleanUp();
X
Xu Peng 已提交
971 972
}

973
} // namespace meta
X
Xu Peng 已提交
974
} // namespace engine
J
jinhai 已提交
975
} // namespace milvus
X
Xu Peng 已提交
976
} // namespace zilliz