DBMetaImpl.cpp 37.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7 8
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
G
groot 已提交
9
#include "Log.h"
X
Xu Peng 已提交
10
#include "MetaConsts.h"
G
groot 已提交
11
#include "Factories.h"
12
#include "metrics/Metrics.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <unistd.h>
X
Xu Peng 已提交
15 16
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
17
#include <boost/filesystem.hpp>
18
#include <chrono>
X
Xu Peng 已提交
19
#include <fstream>
20
#include <sqlite_orm.h>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23

namespace zilliz {
J
jinhai 已提交
24
namespace milvus {
X
Xu Peng 已提交
25
namespace engine {
26
namespace meta {
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28 29
using namespace sqlite_orm;

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34
Status HandleException(const std::string& desc, std::exception &e) {
    ENGINE_LOG_ERROR << desc << ": " << e.what();
    return Status::DBTransactionError(desc, e.what());
G
groot 已提交
35 36
}

G
groot 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
class MetricCollector {
public:
    MetricCollector() {
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
        start_time_ = METRICS_NOW_TIME;
    }

    ~MetricCollector() {
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
    }

private:
    using TIME_POINT = std::chrono::system_clock::time_point;
    TIME_POINT start_time_;
};

G
groot 已提交
55 56
}

57
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
58
    return make_storage(path,
G
groot 已提交
59
                        make_table("Tables",
G
groot 已提交
60 61
                                   make_column("id", &TableSchema::id_, primary_key()),
                                   make_column("table_id", &TableSchema::table_id_, unique()),
G
groot 已提交
62
                                   make_column("state", &TableSchema::state_),
G
groot 已提交
63 64 65 66 67
                                   make_column("dimension", &TableSchema::dimension_),
                                   make_column("created_on", &TableSchema::created_on_),
                                   make_column("files_cnt", &TableSchema::files_cnt_, default_value(0)),
                                   make_column("engine_type", &TableSchema::engine_type_),
                                   make_column("store_raw_data", &TableSchema::store_raw_data_)),
G
groot 已提交
68
                        make_table("TableFiles",
G
groot 已提交
69 70
                                   make_column("id", &TableFileSchema::id_, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id_),
G
groot 已提交
71
                                   make_column("engine_type", &TableFileSchema::engine_type_),
G
groot 已提交
72 73 74 75 76 77
                                   make_column("file_id", &TableFileSchema::file_id_),
                                   make_column("file_type", &TableFileSchema::file_type_),
                                   make_column("size", &TableFileSchema::size_, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time_),
                                   make_column("created_on", &TableFileSchema::created_on_),
                                   make_column("date", &TableFileSchema::date_))
78
    );
X
Xu Peng 已提交
79 80 81

}

X
Xu Peng 已提交
82
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
83
static std::unique_ptr<ConnectorT> ConnectorPtr;
G
groot 已提交
84
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
X
Xu Peng 已提交
85

86
Status DBMetaImpl::NextTableId(std::string &table_id) {
87 88
    std::stringstream ss;
    SimpleIDGenerator g;
89
    ss << g.GetNextIDNumber();
90
    table_id = ss.str();
91 92 93
    return Status::OK();
}

94
Status DBMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
95 96
    std::stringstream ss;
    SimpleIDGenerator g;
97
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
98 99 100 101
    file_id = ss.str();
    return Status::OK();
}

102
DBMetaImpl::DBMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
103 104
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
105 106
}

X
Xu Peng 已提交
107 108 109
Status DBMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
110
        if (!ret) {
G
groot 已提交
111 112
            ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
            return Status::DBTransactionError("Failed to create db directory", options_.path);
113
        }
X
Xu Peng 已提交
114
    }
X
Xu Peng 已提交
115

116
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
117

X
Xu Peng 已提交
118
    ConnectorPtr->sync_schema();
119
    ConnectorPtr->open_forever(); // thread safe option
120
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
121

122
    CleanUp();
X
Xu Peng 已提交
123

X
Xu Peng 已提交
124
    return Status::OK();
X
Xu Peng 已提交
125 126
}

X
Xu Peng 已提交
127
// PXU TODO: Temp solution. Will fix later
128 129
Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                         const DatesT &dates) {
X
Xu Peng 已提交
130 131 132 133
    if (dates.size() == 0) {
        return Status::OK();
    }

134
    TableSchema table_schema;
G
groot 已提交
135
    table_schema.table_id_ = table_id;
X
Xu Peng 已提交
136
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
137 138 139 140
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
141 142
    try {
        auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
143

G
groot 已提交
144 145 146 147
        for (auto &date : dates) {
            if (date >= yesterday) {
                return Status::Error("Could not delete partitions with 2 days");
            }
X
Xu Peng 已提交
148 149 150
        }

        ConnectorPtr->update_all(
151
            set(
G
groot 已提交
152
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
153 154
            ),
            where(
G
groot 已提交
155 156
                c(&TableFileSchema::table_id_) == table_id and
                    in(&TableFileSchema::date_, dates)
157 158
            ));
    } catch (std::exception &e) {
G
groot 已提交
159
        return HandleException("Encounter exception when drop partition", e);
X
Xu Peng 已提交
160
    }
G
groot 已提交
161

X
Xu Peng 已提交
162 163 164
    return Status::OK();
}

165
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
Z
zhiru 已提交
166

G
groot 已提交
167 168 169 170 171
    try {
        MetricCollector metric;

        if (table_schema.table_id_ == "") {
            NextTableId(table_schema.table_id_);
G
groot 已提交
172 173 174 175
        } else {
            auto table = ConnectorPtr->select(columns(&TableSchema::state_),
                                               where(c(&TableSchema::table_id_) == table_schema.table_id_));
            if (table.size() == 1) {
G
groot 已提交
176 177 178
                if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
                    return Status::Error("Table already exists and it is in delete state, please wait a second");
                } else {
179 180
                    // Change from no error to already exist.
                    return Status::AlreadyExist("Table already exists");
G
groot 已提交
181
                }
G
groot 已提交
182
            }
G
groot 已提交
183
        }
G
groot 已提交
184

G
groot 已提交
185 186 187 188
        table_schema.files_cnt_ = 0;
        table_schema.id_ = -1;
        table_schema.created_on_ = utils::GetMicroSecTimeStamp();

X
Xu Peng 已提交
189
        try {
190
            auto id = ConnectorPtr->insert(table_schema);
G
groot 已提交
191
            table_schema.id_ = id;
X
Xu Peng 已提交
192
        } catch (...) {
X
Xu Peng 已提交
193
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
194
        }
195

S
starlord 已提交
196
        return utils::CreateTablePath(options_, table_schema.table_id_);
G
groot 已提交
197 198 199

    } catch (std::exception &e) {
        return HandleException("Encounter exception when create table", e);
200 201
    }

X
Xu Peng 已提交
202
    return Status::OK();
X
Xu Peng 已提交
203 204
}

G
groot 已提交
205 206
Status DBMetaImpl::DeleteTable(const std::string& table_id) {
    try {
G
groot 已提交
207 208 209 210 211 212 213 214 215
        MetricCollector metric;

        //soft delete table
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_,
                                                   &TableSchema::created_on_),
G
groot 已提交
216 217
                                           where(c(&TableSchema::table_id_) == table_id));
        for (auto &table : tables) {
G
groot 已提交
218 219 220 221 222 223 224 225 226 227 228
            TableSchema table_schema;
            table_schema.table_id_ = table_id;
            table_schema.state_ = (int)TableSchema::TO_DELETE;
            table_schema.id_ = std::get<0>(table);
            table_schema.files_cnt_ = std::get<1>(table);
            table_schema.dimension_ = std::get<2>(table);
            table_schema.engine_type_ = std::get<3>(table);
            table_schema.store_raw_data_ = std::get<4>(table);
            table_schema.created_on_ = std::get<5>(table);

            ConnectorPtr->update<TableSchema>(table_schema);
G
groot 已提交
229 230
        }
    } catch (std::exception &e) {
G
groot 已提交
231
        return HandleException("Encounter exception when delete table", e);
G
groot 已提交
232 233 234 235 236
    }

    return Status::OK();
}

G
groot 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
    try {
        MetricCollector metric;

        //soft delete table files
        ConnectorPtr->update_all(
                set(
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                        c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                ),
                where(
                        c(&TableFileSchema::table_id_) == table_id and
                        c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
                ));

    } catch (std::exception &e) {
        return HandleException("Encounter exception when delete table files", e);
G
groot 已提交
254 255 256 257 258
    }

    return Status::OK();
}

259
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
260
    try {
G
groot 已提交
261 262
        MetricCollector metric;

G
groot 已提交
263 264 265 266 267 268
        auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
                                                   &TableSchema::store_raw_data_),
G
groot 已提交
269 270 271
                                           where(c(&TableSchema::table_id_) == table_schema.table_id_
                                                 and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

272
        if (groups.size() == 1) {
G
groot 已提交
273 274 275 276 277
            table_schema.id_ = std::get<0>(groups[0]);
            table_schema.files_cnt_ = std::get<2>(groups[0]);
            table_schema.dimension_ = std::get<3>(groups[0]);
            table_schema.engine_type_ = std::get<4>(groups[0]);
            table_schema.store_raw_data_ = std::get<5>(groups[0]);
278
        } else {
G
groot 已提交
279
            return Status::NotFound("Table " + table_schema.table_id_ + " not found");
280
        }
G
groot 已提交
281

282
    } catch (std::exception &e) {
G
groot 已提交
283
        return HandleException("Encounter exception when describe table", e);
X
Xu Peng 已提交
284
    }
X
Xu Peng 已提交
285

X
Xu Peng 已提交
286
    return Status::OK();
X
Xu Peng 已提交
287 288
}

P
peng.xu 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
Status DBMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
    has = false;
    try {
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_),
                                             where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
                                                    or
                                                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX)
                                                   and c(&TableFileSchema::table_id_) == table_id
                                             ));

        if (selected.size() >= 1) {
            has = true;
        } else {
            has = false;
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when check non index files", e);
    }
    return Status::OK();
}

311
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
G
groot 已提交
312
    has_or_not = false;
313

G
groot 已提交
314 315
    try {
        MetricCollector metric;
G
groot 已提交
316
        auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
G
groot 已提交
317 318
                                           where(c(&TableSchema::table_id_) == table_id
                                           and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
319
        if (tables.size() == 1) {
320 321 322 323
            has_or_not = true;
        } else {
            has_or_not = false;
        }
G
groot 已提交
324

325
    } catch (std::exception &e) {
G
groot 已提交
326
        return HandleException("Encounter exception when lookup table", e);
G
groot 已提交
327
    }
G
groot 已提交
328

G
groot 已提交
329 330 331 332 333
    return Status::OK();
}

Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
    try {
G
groot 已提交
334 335
        MetricCollector metric;

G
groot 已提交
336 337 338 339 340
        auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_,
                                                   &TableSchema::files_cnt_,
                                                   &TableSchema::dimension_,
                                                   &TableSchema::engine_type_,
G
groot 已提交
341 342
                                                   &TableSchema::store_raw_data_),
                                             where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
G
groot 已提交
343 344 345 346 347 348 349 350 351 352 353
        for (auto &table : selected) {
            TableSchema schema;
            schema.id_ = std::get<0>(table);
            schema.table_id_ = std::get<1>(table);
            schema.files_cnt_ = std::get<2>(table);
            schema.dimension_ = std::get<3>(table);
            schema.engine_type_ = std::get<4>(table);
            schema.store_raw_data_ = std::get<5>(table);

            table_schema_array.emplace_back(schema);
        }
G
groot 已提交
354

G
groot 已提交
355
    } catch (std::exception &e) {
G
groot 已提交
356
        return HandleException("Encounter exception when lookup all tables", e);
X
Xu Peng 已提交
357
    }
G
groot 已提交
358

X
Xu Peng 已提交
359
    return Status::OK();
X
Xu Peng 已提交
360 361
}

362
Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
363 364
    if (file_schema.date_ == EmptyDate) {
        file_schema.date_ = Meta::GetDate();
X
Xu Peng 已提交
365
    }
366
    TableSchema table_schema;
G
groot 已提交
367
    table_schema.table_id_ = file_schema.table_id_;
X
Xu Peng 已提交
368
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
369 370 371
    if (!status.ok()) {
        return status;
    }
372

G
groot 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386
    try {
        MetricCollector metric;

        NextFileId(file_schema.file_id_);
        file_schema.file_type_ = TableFileSchema::NEW;
        file_schema.dimension_ = table_schema.dimension_;
        file_schema.size_ = 0;
        file_schema.created_on_ = utils::GetMicroSecTimeStamp();
        file_schema.updated_time_ = file_schema.created_on_;
        file_schema.engine_type_ = table_schema.engine_type_;

        auto id = ConnectorPtr->insert(file_schema);
        file_schema.id_ = id;

S
starlord 已提交
387
        return utils::CreateTableFilePath(options_, file_schema);
388

G
groot 已提交
389 390
    } catch (std::exception& ex) {
        return HandleException("Encounter exception when create table file", ex);
391 392
    }

X
Xu Peng 已提交
393
    return Status::OK();
X
Xu Peng 已提交
394 395
}

396
Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
397
    files.clear();
X
Xu Peng 已提交
398

399
    try {
G
groot 已提交
400 401
        MetricCollector metric;

G
groot 已提交
402 403 404 405 406 407 408 409
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_,
                                                     &TableFileSchema::engine_type_),
                                             where(c(&TableFileSchema::file_type_)
410
                                                       == (int) TableFileSchema::TO_INDEX));
411

412
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
413
        TableFileSchema table_file;
414

415
        for (auto &file : selected) {
G
groot 已提交
416 417 418 419 420 421 422 423
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.engine_type_ = std::get<6>(file);

S
starlord 已提交
424
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
425
            auto groupItr = groups.find(table_file.table_id_);
426
            if (groupItr == groups.end()) {
427
                TableSchema table_schema;
G
groot 已提交
428
                table_schema.table_id_ = table_file.table_id_;
X
Xu Peng 已提交
429
                auto status = DescribeTable(table_schema);
430 431 432
                if (!status.ok()) {
                    return status;
                }
G
groot 已提交
433
                groups[table_file.table_id_] = table_schema;
X
Xu Peng 已提交
434
            }
G
groot 已提交
435
            table_file.dimension_ = groups[table_file.table_id_].dimension_;
X
Xu Peng 已提交
436
            files.push_back(table_file);
X
Xu Peng 已提交
437
        }
G
groot 已提交
438

439
    } catch (std::exception &e) {
G
groot 已提交
440
        return HandleException("Encounter exception when iterate raw files", e);
X
Xu Peng 已提交
441
    }
X
Xu Peng 已提交
442

X
Xu Peng 已提交
443 444 445
    return Status::OK();
}

X
Xu Peng 已提交
446
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
447 448
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
449
    files.clear();
X
Xu Peng 已提交
450

451
    try {
G
groot 已提交
452 453
        MetricCollector metric;

X
Xu Peng 已提交
454
        if (partition.empty()) {
G
groot 已提交
455 456 457 458 459 460 461 462 463 464
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
465
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
466
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
467
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
468

X
Xu Peng 已提交
469
            TableSchema table_schema;
G
groot 已提交
470
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
471 472 473 474
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
475

X
Xu Peng 已提交
476 477 478
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
479 480 481 482 483 484 485 486
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
                table_file.engine_type_ = std::get<6>(file);
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
487
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
488
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
489
                if (dateItr == files.end()) {
G
groot 已提交
490
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
491
                }
G
groot 已提交
492
                files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
493 494 495
            }
        }
        else {
G
groot 已提交
496 497 498 499 500
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::table_id_,
                                                         &TableFileSchema::file_id_,
                                                         &TableFileSchema::file_type_,
                                                         &TableFileSchema::size_,
G
groot 已提交
501 502
                                                         &TableFileSchema::date_,
                                                         &TableFileSchema::engine_type_),
G
groot 已提交
503 504 505 506
                                                 where(c(&TableFileSchema::table_id_) == table_id and
                                                     in(&TableFileSchema::date_, partition) and
                                                     (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
507
                                                             == (int) TableFileSchema::TO_INDEX or
G
groot 已提交
508
                                                         c(&TableFileSchema::file_type_)
X
Xu Peng 已提交
509
                                                             == (int) TableFileSchema::INDEX)));
G
groot 已提交
510

X
Xu Peng 已提交
511
            TableSchema table_schema;
G
groot 已提交
512
            table_schema.table_id_ = table_id;
X
Xu Peng 已提交
513 514 515 516
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
517

X
Xu Peng 已提交
518 519 520
            TableFileSchema table_file;

            for (auto &file : selected) {
G
groot 已提交
521 522 523 524 525 526
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.file_type_ = std::get<3>(file);
                table_file.size_ = std::get<4>(file);
                table_file.date_ = std::get<5>(file);
G
groot 已提交
527
                table_file.engine_type_ = std::get<6>(file);
G
groot 已提交
528
                table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
529
                utils::GetTableFilePath(options_, table_file);
G
groot 已提交
530
                auto dateItr = files.find(table_file.date_);
X
Xu Peng 已提交
531
                if (dateItr == files.end()) {
G
groot 已提交
532
                    files[table_file.date_] = TableFilesSchema();
X
Xu Peng 已提交
533
                }
G
groot 已提交
534
                files[table_file.date_].push_back(table_file);
535
            }
X
Xu Peng 已提交
536

X
xj.lin 已提交
537
        }
538
    } catch (std::exception &e) {
G
groot 已提交
539
        return HandleException("Encounter exception when iterate index files", e);
X
xj.lin 已提交
540 541 542 543 544
    }

    return Status::OK();
}

545 546
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
547
    files.clear();
X
Xu Peng 已提交
548

549
    try {
G
groot 已提交
550 551
        MetricCollector metric;

G
groot 已提交
552 553 554 555 556 557 558
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                     &TableFileSchema::table_id_,
                                                     &TableFileSchema::file_id_,
                                                     &TableFileSchema::file_type_,
                                                     &TableFileSchema::size_,
                                                     &TableFileSchema::date_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
G
groot 已提交
559 560
                                                 c(&TableFileSchema::table_id_) == table_id),
                                             order_by(&TableFileSchema::size_).desc());
G
groot 已提交
561

562
        TableSchema table_schema;
G
groot 已提交
563
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
564
        auto status = DescribeTable(table_schema);
565

566 567 568
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
569

X
Xu Peng 已提交
570
        TableFileSchema table_file;
571
        for (auto &file : selected) {
G
groot 已提交
572 573 574 575 576 577 578
            table_file.id_ = std::get<0>(file);
            table_file.table_id_ = std::get<1>(file);
            table_file.file_id_ = std::get<2>(file);
            table_file.file_type_ = std::get<3>(file);
            table_file.size_ = std::get<4>(file);
            table_file.date_ = std::get<5>(file);
            table_file.dimension_ = table_schema.dimension_;
S
starlord 已提交
579
            utils::GetTableFilePath(options_, table_file);
G
groot 已提交
580
            auto dateItr = files.find(table_file.date_);
581
            if (dateItr == files.end()) {
G
groot 已提交
582
                files[table_file.date_] = TableFilesSchema();
583
            }
G
groot 已提交
584
            files[table_file.date_].push_back(table_file);
X
Xu Peng 已提交
585
        }
586
    } catch (std::exception &e) {
G
groot 已提交
587
        return HandleException("Encounter exception when iterate merge files", e);
X
Xu Peng 已提交
588 589 590
    }

    return Status::OK();
X
Xu Peng 已提交
591 592
}

593 594 595
Status DBMetaImpl::GetTableFiles(const std::string& table_id,
                                 const std::vector<size_t>& ids,
                                 TableFilesSchema& table_files) {
X
Xu Peng 已提交
596
    try {
597
        table_files.clear();
Y
yu yunfeng 已提交
598 599
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::file_id_,
G
groot 已提交
600 601
                                                  &TableFileSchema::file_type_,
                                                  &TableFileSchema::size_,
602 603 604 605
                                                  &TableFileSchema::date_,
                                                  &TableFileSchema::engine_type_),
                                          where(c(&TableFileSchema::table_id_) == table_id and
                                                  in(&TableFileSchema::id_, ids)
X
Xu Peng 已提交
606
                                          ));
607 608 609 610 611 612 613 614 615 616

        TableSchema table_schema;
        table_schema.table_id_ = table_id;
        auto status = DescribeTable(table_schema);
        if (!status.ok()) {
            return status;
        }

        for (auto &file : files) {
            TableFileSchema file_schema;
G
groot 已提交
617
            file_schema.table_id_ = table_id;
Y
yu yunfeng 已提交
618 619 620 621 622 623
            file_schema.id_ = std::get<0>(file);
            file_schema.file_id_ = std::get<1>(file);
            file_schema.file_type_ = std::get<2>(file);
            file_schema.size_ = std::get<3>(file);
            file_schema.date_ = std::get<4>(file);
            file_schema.engine_type_ = std::get<5>(file);
624
            file_schema.dimension_ = table_schema.dimension_;
S
starlord 已提交
625
            utils::GetTableFilePath(options_, file_schema);
626 627

            table_files.emplace_back(file_schema);
X
Xu Peng 已提交
628 629
        }
    } catch (std::exception &e) {
630
        return HandleException("Encounter exception when lookup table files", e);
X
Xu Peng 已提交
631 632
    }

X
Xu Peng 已提交
633
    return Status::OK();
X
Xu Peng 已提交
634 635
}

X
Xu Peng 已提交
636
// PXU TODO: Support Swap
X
Xu Peng 已提交
637
Status DBMetaImpl::Archive() {
638
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
639 640 641 642 643
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
644 645
        auto &criteria = kv.first;
        auto &limit = kv.second;
G
groot 已提交
646
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
X
Xu Peng 已提交
647
            long usecs = limit * D_SEC * US_PS;
648
            long now = utils::GetMicroSecTimeStamp();
649
            try {
X
Xu Peng 已提交
650
                ConnectorPtr->update_all(
651
                    set(
G
groot 已提交
652
                        c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
653 654
                    ),
                    where(
G
groot 已提交
655 656
                        c(&TableFileSchema::created_on_) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
657 658
                    ));
            } catch (std::exception &e) {
G
groot 已提交
659
                return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
660 661
            }
        }
G
groot 已提交
662
        if (criteria == engine::ARCHIVE_CONF_DISK) {
G
groot 已提交
663
            uint64_t sum = 0;
X
Xu Peng 已提交
664
            Size(sum);
X
Xu Peng 已提交
665

G
groot 已提交
666
            int64_t to_delete = (int64_t)sum - limit * G;
X
Xu Peng 已提交
667
            DiscardFiles(to_delete);
X
Xu Peng 已提交
668 669 670 671 672 673
        }
    }

    return Status::OK();
}

G
groot 已提交
674
Status DBMetaImpl::Size(uint64_t &result) {
X
Xu Peng 已提交
675
    result = 0;
X
Xu Peng 已提交
676
    try {
G
groot 已提交
677
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size_)),
678
                                             where(
G
groot 已提交
679
                                                 c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
680
                                             ));
X
Xu Peng 已提交
681

682 683
        for (auto &sub_query : selected) {
            if (!std::get<0>(sub_query)) {
X
Xu Peng 已提交
684 685
                continue;
            }
G
groot 已提交
686
            result += (uint64_t) (*std::get<0>(sub_query));
X
Xu Peng 已提交
687
        }
688
    } catch (std::exception &e) {
G
groot 已提交
689
        return HandleException("Encounter exception when calculte db size", e);
X
Xu Peng 已提交
690 691 692 693 694
    }

    return Status::OK();
}

X
Xu Peng 已提交
695
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
696 697 698
    if (to_discard_size <= 0) {
        return Status::OK();
    }
G
groot 已提交
699

G
groot 已提交
700
    ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
G
groot 已提交
701

X
Xu Peng 已提交
702
    try {
G
groot 已提交
703 704 705 706 707 708
        MetricCollector metric;

        auto commited = ConnectorPtr->transaction([&]() mutable {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                         &TableFileSchema::size_),
                                                 where(c(&TableFileSchema::file_type_)
709
                                                       != (int) TableFileSchema::TO_DELETE),
G
groot 已提交
710 711
                                                 order_by(&TableFileSchema::id_),
                                                 limit(10));
X
Xu Peng 已提交
712

G
groot 已提交
713 714
            std::vector<int> ids;
            TableFileSchema table_file;
715

G
groot 已提交
716 717 718 719 720 721 722 723 724
            for (auto &file : selected) {
                if (to_discard_size <= 0) break;
                table_file.id_ = std::get<0>(file);
                table_file.size_ = std::get<1>(file);
                ids.push_back(table_file.id_);
                ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
                                 << " table_file.size=" << table_file.size_;
                to_discard_size -= table_file.size_;
            }
725

G
groot 已提交
726 727 728
            if (ids.size() == 0) {
                return true;
            }
729

G
groot 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744
            ConnectorPtr->update_all(
                    set(
                            c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
                            c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
                    ),
                    where(
                            in(&TableFileSchema::id_, ids)
                    ));

            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Update table file error");
        }
X
Xu Peng 已提交
745

746
    } catch (std::exception &e) {
G
groot 已提交
747
        return HandleException("Encounter exception when discard table file", e);
X
Xu Peng 已提交
748 749
    }

X
Xu Peng 已提交
750
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
751 752
}

753
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
G
groot 已提交
754
    file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
755
    try {
G
groot 已提交
756 757
        MetricCollector metric;

G
groot 已提交
758 759 760 761 762 763 764 765 766
        auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
                                           where(c(&TableSchema::table_id_) == file_schema.table_id_));

        //if the table has been deleted, just mark the table file as TO_DELETE
        //clean thread will delete the file later
        if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
            file_schema.file_type_ = TableFileSchema::TO_DELETE;
        }

X
Xu Peng 已提交
767
        ConnectorPtr->update(file_schema);
G
groot 已提交
768

769
    } catch (std::exception &e) {
G
groot 已提交
770 771 772
        std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
            + " file_id = " + file_schema.file_id_;
        return HandleException(msg, e);
X
Xu Peng 已提交
773
    }
X
Xu Peng 已提交
774
    return Status::OK();
X
Xu Peng 已提交
775 776
}

P
peng.xu 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
    try {
        ConnectorPtr->update_all(
            set(
                c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
            ),
            where(
                c(&TableFileSchema::table_id_) == table_id and
                c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
            ));
    } catch (std::exception &e) {
        return HandleException("Encounter exception when update table files to to_index", e);
    }

    return Status::OK();
}

794
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
795
    try {
G
groot 已提交
796 797
        MetricCollector metric;

G
groot 已提交
798 799 800 801 802 803 804 805 806 807 808 809 810 811 812
        std::map<std::string, bool> has_tables;
        for (auto &file : files) {
            if(has_tables.find(file.table_id_) != has_tables.end()) {
                continue;
            }
            auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
                                               where(c(&TableSchema::table_id_) == file.table_id_
                                                     and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
            if(tables.size() >= 1) {
                has_tables[file.table_id_] = true;
            } else {
                has_tables[file.table_id_] = false;
            }
        }

813 814
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
G
groot 已提交
815 816 817 818
                if(!has_tables[file.table_id_]) {
                    file.file_type_ = TableFileSchema::TO_DELETE;
                }

G
groot 已提交
819
                file.updated_time_ = utils::GetMicroSecTimeStamp();
820 821 822 823
                ConnectorPtr->update(file);
            }
            return true;
        });
G
groot 已提交
824

825
        if (!commited) {
G
groot 已提交
826
            return Status::DBTransactionError("Update table files error");
X
Xu Peng 已提交
827
        }
G
groot 已提交
828

829
    } catch (std::exception &e) {
G
groot 已提交
830
        return HandleException("Encounter exception when update table files", e);
X
Xu Peng 已提交
831
    }
832 833 834
    return Status::OK();
}

835
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
836
    auto now = utils::GetMicroSecTimeStamp();
837
    try {
G
groot 已提交
838
        MetricCollector metric;
839

G
groot 已提交
840 841 842 843 844 845 846 847 848 849
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
                                                  &TableFileSchema::table_id_,
                                                  &TableFileSchema::file_id_,
                                                  &TableFileSchema::date_),
                                          where(
                                                  c(&TableFileSchema::file_type_) ==
                                                  (int) TableFileSchema::TO_DELETE
                                                  and
                                                  c(&TableFileSchema::updated_time_)
                                                  < now - seconds * US_PS));
850

G
groot 已提交
851 852 853 854 855 856 857 858
        auto commited = ConnectorPtr->transaction([&]() mutable {
            TableFileSchema table_file;
            for (auto &file : files) {
                table_file.id_ = std::get<0>(file);
                table_file.table_id_ = std::get<1>(file);
                table_file.file_id_ = std::get<2>(file);
                table_file.date_ = std::get<3>(file);

S
starlord 已提交
859
                utils::DeleteTableFilePath(options_, table_file);
G
groot 已提交
860 861 862
                ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl;
                ConnectorPtr->remove<TableFileSchema>(table_file.id_);

863
            }
G
groot 已提交
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
        }

    } catch (std::exception &e) {
        return HandleException("Encounter exception when clean table files", e);
    }

    try {
        MetricCollector metric;

        auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
                                                   &TableSchema::table_id_),
                                           where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));

        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &table : tables) {
S
starlord 已提交
884
                utils::DeleteTablePath(options_, std::get<1>(table));
G
groot 已提交
885
                ConnectorPtr->remove<TableSchema>(std::get<0>(table));
886
            }
G
groot 已提交
887 888 889 890 891 892

            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
893
        }
G
groot 已提交
894

895
    } catch (std::exception &e) {
G
groot 已提交
896
        return HandleException("Encounter exception when clean table files", e);
X
Xu Peng 已提交
897 898 899 900 901
    }

    return Status::OK();
}

902
Status DBMetaImpl::CleanUp() {
903
    try {
G
groot 已提交
904 905
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
                                             where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
906

G
groot 已提交
907 908 909 910
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
                ENGINE_LOG_DEBUG << "Remove table file type as NEW";
                ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
911
            }
G
groot 已提交
912 913 914 915 916
            return true;
        });

        if (!commited) {
            return Status::DBTransactionError("Clean files error");
X
Xu Peng 已提交
917
        }
G
groot 已提交
918

919
    } catch (std::exception &e) {
G
groot 已提交
920
        return HandleException("Encounter exception when clean table file", e);
X
Xu Peng 已提交
921 922 923 924 925
    }

    return Status::OK();
}

G
groot 已提交
926
Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
X
Xu Peng 已提交
927

928
    try {
G
groot 已提交
929
        MetricCollector metric;
930

G
groot 已提交
931 932 933 934 935 936
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_),
                                             where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
                                                    or
                                                    c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX
                                                    or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
                                                   and c(&TableFileSchema::table_id_) == table_id));
937

938
        TableSchema table_schema;
G
groot 已提交
939
        table_schema.table_id_ = table_id;
X
Xu Peng 已提交
940
        auto status = DescribeTable(table_schema);
941

942 943 944 945 946
        if (!status.ok()) {
            return status;
        }

        result = 0;
947
        for (auto &file : selected) {
948 949
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
950

G
groot 已提交
951
        result /= table_schema.dimension_;
G
groot 已提交
952
        result /= sizeof(float);
X
Xu Peng 已提交
953

954
    } catch (std::exception &e) {
G
groot 已提交
955
        return HandleException("Encounter exception when calculate table file size", e);
X
Xu Peng 已提交
956 957 958 959
    }
    return Status::OK();
}

960
Status DBMetaImpl::DropAll() {
X
Xu Peng 已提交
961 962
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
963 964 965 966
    }
    return Status::OK();
}

X
Xu Peng 已提交
967
DBMetaImpl::~DBMetaImpl() {
968
    CleanUp();
X
Xu Peng 已提交
969 970
}

971
} // namespace meta
X
Xu Peng 已提交
972
} // namespace engine
J
jinhai 已提交
973
} // namespace milvus
X
Xu Peng 已提交
974
} // namespace zilliz