DBMetaImpl.cpp 30.9 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6 7 8 9
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
#include "MetaConsts.h"
10
#include "metrics/Metrics.h"
X
Xu Peng 已提交
11

X
Xu Peng 已提交
12
#include <unistd.h>
X
Xu Peng 已提交
13 14
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
15
#include <boost/filesystem.hpp>
16
#include <chrono>
X
Xu Peng 已提交
17
#include <fstream>
18
#include <sqlite_orm.h>
19
#include <easylogging++.h>
X
Xu Peng 已提交
20

X
Xu Peng 已提交
21 22 23 24

namespace zilliz {
namespace vecwise {
namespace engine {
25
namespace meta {
X
Xu Peng 已提交
26

X
Xu Peng 已提交
27 28
using namespace sqlite_orm;

29
inline auto StoragePrototype(const std::string &path) {
X
Xu Peng 已提交
30
    return make_storage(path,
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
                        make_table("Table",
                                   make_column("id", &TableSchema::id, primary_key()),
                                   make_column("table_id", &TableSchema::table_id, unique()),
                                   make_column("dimension", &TableSchema::dimension),
                                   make_column("created_on", &TableSchema::created_on),
                                   make_column("files_cnt", &TableSchema::files_cnt, default_value(0))),
                        make_table("TableFile",
                                   make_column("id", &TableFileSchema::id, primary_key()),
                                   make_column("table_id", &TableFileSchema::table_id),
                                   make_column("file_id", &TableFileSchema::file_id),
                                   make_column("file_type", &TableFileSchema::file_type),
                                   make_column("size", &TableFileSchema::size, default_value(0)),
                                   make_column("updated_time", &TableFileSchema::updated_time),
                                   make_column("created_on", &TableFileSchema::created_on),
                                   make_column("date", &TableFileSchema::date))
    );
X
Xu Peng 已提交
47 48 49

}

X
Xu Peng 已提交
50
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
51
static std::unique_ptr<ConnectorT> ConnectorPtr;
X
Xu Peng 已提交
52
using ConditionT = decltype(c(&TableFileSchema::id) == 1UL);
X
Xu Peng 已提交
53

54
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
X
Xu Peng 已提交
55
    return options_.path + "/tables/" + table_id;
56 57
}

58
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
X
Xu Peng 已提交
59
    std::stringstream ss;
X
Xu Peng 已提交
60
    ss << GetTablePath(table_id) << "/" << date;
X
Xu Peng 已提交
61 62 63
    return ss.str();
}

64
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
X
Xu Peng 已提交
65 66 67 68
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    std::stringstream ss;
X
Xu Peng 已提交
69
    ss << GetTableDatePartitionPath(group_file.table_id, group_file.date)
X
Xu Peng 已提交
70 71 72 73
       << "/" << group_file.file_id;
    group_file.location = ss.str();
}

74
Status DBMetaImpl::NextTableId(std::string &table_id) {
75 76
    std::stringstream ss;
    SimpleIDGenerator g;
77
    ss << g.GetNextIDNumber();
78
    table_id = ss.str();
79 80 81
    return Status::OK();
}

82
Status DBMetaImpl::NextFileId(std::string &file_id) {
X
Xu Peng 已提交
83 84
    std::stringstream ss;
    SimpleIDGenerator g;
85
    ss << g.GetNextIDNumber();
X
Xu Peng 已提交
86 87 88 89
    file_id = ss.str();
    return Status::OK();
}

90
DBMetaImpl::DBMetaImpl(const DBMetaOptions &options_)
X
Xu Peng 已提交
91 92
    : options_(options_) {
    Initialize();
X
Xu Peng 已提交
93 94
}

X
Xu Peng 已提交
95 96 97
Status DBMetaImpl::Initialize() {
    if (!boost::filesystem::is_directory(options_.path)) {
        auto ret = boost::filesystem::create_directory(options_.path);
98
        if (!ret) {
X
Xu Peng 已提交
99
            LOG(ERROR) << "Create directory " << options_.path << " Error";
100 101
        }
        assert(ret);
X
Xu Peng 已提交
102
    }
X
Xu Peng 已提交
103

104
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
X
Xu Peng 已提交
105

X
Xu Peng 已提交
106
    ConnectorPtr->sync_schema();
107
    ConnectorPtr->open_forever(); // thread safe option
108
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
109

110
    CleanUp();
X
Xu Peng 已提交
111

X
Xu Peng 已提交
112
    return Status::OK();
X
Xu Peng 已提交
113 114
}

X
Xu Peng 已提交
115
// PXU TODO: Temp solution. Will fix later
116 117
Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
                                         const DatesT &dates) {
X
Xu Peng 已提交
118 119 120 121
    if (dates.size() == 0) {
        return Status::OK();
    }

122 123
    TableSchema table_schema;
    table_schema.table_id = table_id;
X
Xu Peng 已提交
124
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
125 126 127 128
    if (!status.ok()) {
        return status;
    }

X
Xu Peng 已提交
129
    auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
130

131
    for (auto &date : dates) {
X
Xu Peng 已提交
132 133 134 135 136 137 138
        if (date >= yesterday) {
            return Status::Error("Could not delete partitions with 2 days");
        }
    }

    try {
        ConnectorPtr->update_all(
139 140 141 142 143 144 145 146
            set(
                c(&TableFileSchema::file_type) = (int) TableFileSchema::TO_DELETE
            ),
            where(
                c(&TableFileSchema::table_id) == table_id and
                    in(&TableFileSchema::date, dates)
            ));
    } catch (std::exception &e) {
X
Xu Peng 已提交
147 148 149 150 151 152
        LOG(DEBUG) << e.what();
        throw e;
    }
    return Status::OK();
}

153
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
Y
yu yunfeng 已提交
154
    server::Metrics::GetInstance().MetaAccessTotalIncrement();
155
    if (table_schema.table_id == "") {
X
Xu Peng 已提交
156
        NextTableId(table_schema.table_id);
157
    }
158 159 160
    table_schema.files_cnt = 0;
    table_schema.id = -1;
    table_schema.created_on = utils::GetMicroSecTimeStamp();
Y
yu yunfeng 已提交
161
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
162 163
    {
        try {
164 165
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id = id;
X
Xu Peng 已提交
166
        } catch (...) {
X
Xu Peng 已提交
167
            return Status::DBTransactionError("Add Table Error");
X
Xu Peng 已提交
168
        }
X
Xu Peng 已提交
169
    }
Y
yu yunfeng 已提交
170
    auto end_time = METRICS_NOW_TIME;
171
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
172
    server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
173

X
Xu Peng 已提交
174
    auto group_path = GetTablePath(table_schema.table_id);
175 176

    if (!boost::filesystem::is_directory(group_path)) {
177
        auto ret = boost::filesystem::create_directories(group_path);
178 179 180 181
        if (!ret) {
            LOG(ERROR) << "Create directory " << group_path << " Error";
        }
        assert(ret);
182 183
    }

X
Xu Peng 已提交
184
    return Status::OK();
X
Xu Peng 已提交
185 186
}

187
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
188
    try {
Y
yu yunfeng 已提交
189
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
190
        auto start_time = METRICS_NOW_TIME;
191
        auto groups = ConnectorPtr->select(columns(&TableSchema::id,
192 193 194 195
                                                   &TableSchema::table_id,
                                                   &TableSchema::files_cnt,
                                                   &TableSchema::dimension),
                                           where(c(&TableSchema::table_id) == table_schema.table_id));
Y
yu yunfeng 已提交
196
        auto end_time = METRICS_NOW_TIME;
197
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
198
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
199 200
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
201 202 203
            table_schema.id = std::get<0>(groups[0]);
            table_schema.files_cnt = std::get<2>(groups[0]);
            table_schema.dimension = std::get<3>(groups[0]);
204
        } else {
X
Xu Peng 已提交
205
            return Status::NotFound("Table " + table_schema.table_id + " not found");
206 207 208 209
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
210
    }
X
Xu Peng 已提交
211

X
Xu Peng 已提交
212
    return Status::OK();
X
Xu Peng 已提交
213 214
}

215
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
216
    try {
Y
yu yunfeng 已提交
217
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
218
        auto start_time = METRICS_NOW_TIME;
219

220
        auto tables = ConnectorPtr->select(columns(&TableSchema::id),
221
                                           where(c(&TableSchema::table_id) == table_id));
Y
yu yunfeng 已提交
222
        auto end_time = METRICS_NOW_TIME;
223
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
224
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
225 226
        assert(tables.size() <= 1);
        if (tables.size() == 1) {
227 228 229 230 231 232 233
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
234
    }
X
Xu Peng 已提交
235
    return Status::OK();
X
Xu Peng 已提交
236 237
}

238
Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
X
Xu Peng 已提交
239 240
    if (file_schema.date == EmptyDate) {
        file_schema.date = Meta::GetDate();
X
Xu Peng 已提交
241
    }
242
    TableSchema table_schema;
X
Xu Peng 已提交
243
    table_schema.table_id = file_schema.table_id;
X
Xu Peng 已提交
244
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
245 246 247
    if (!status.ok()) {
        return status;
    }
248

X
Xu Peng 已提交
249 250 251 252 253 254
    NextFileId(file_schema.file_id);
    file_schema.file_type = TableFileSchema::NEW;
    file_schema.dimension = table_schema.dimension;
    file_schema.size = 0;
    file_schema.created_on = utils::GetMicroSecTimeStamp();
    file_schema.updated_time = file_schema.created_on;
X
Xu Peng 已提交
255
    GetTableFilePath(file_schema);
X
Xu Peng 已提交
256

X
Xu Peng 已提交
257 258
    {
        try {
Y
yu yunfeng 已提交
259
            server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
260
            auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
261 262
            auto id = ConnectorPtr->insert(file_schema);
            file_schema.id = id;
Y
yu yunfeng 已提交
263
            auto end_time = METRICS_NOW_TIME;
264
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
265
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
X
Xu Peng 已提交
266 267 268
        } catch (...) {
            return Status::DBTransactionError("Add file Error");
        }
X
Xu Peng 已提交
269
    }
270

X
Xu Peng 已提交
271
    auto partition_path = GetTableDatePartitionPath(file_schema.table_id, file_schema.date);
272 273

    if (!boost::filesystem::is_directory(partition_path)) {
274 275 276 277 278
        auto ret = boost::filesystem::create_directory(partition_path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << partition_path << " Error";
        }
        assert(ret);
279 280
    }

X
Xu Peng 已提交
281
    return Status::OK();
X
Xu Peng 已提交
282 283
}

284
Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
X
Xu Peng 已提交
285
    files.clear();
X
Xu Peng 已提交
286

287
    try {
Y
yu yunfeng 已提交
288
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
289
        auto start_time = METRICS_NOW_TIME;
290
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
291 292 293 294 295 296 297
                                                     &TableFileSchema::table_id,
                                                     &TableFileSchema::file_id,
                                                     &TableFileSchema::file_type,
                                                     &TableFileSchema::size,
                                                     &TableFileSchema::date),
                                             where(c(&TableFileSchema::file_type)
                                                       == (int) TableFileSchema::TO_INDEX));
Y
yu yunfeng 已提交
298
        auto end_time = METRICS_NOW_TIME;
299
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
300
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
301

302
        std::map<std::string, TableSchema> groups;
X
Xu Peng 已提交
303
        TableFileSchema table_file;
304

305
        for (auto &file : selected) {
X
Xu Peng 已提交
306 307 308 309 310 311
            table_file.id = std::get<0>(file);
            table_file.table_id = std::get<1>(file);
            table_file.file_id = std::get<2>(file);
            table_file.file_type = std::get<3>(file);
            table_file.size = std::get<4>(file);
            table_file.date = std::get<5>(file);
X
Xu Peng 已提交
312
            GetTableFilePath(table_file);
X
Xu Peng 已提交
313
            auto groupItr = groups.find(table_file.table_id);
314
            if (groupItr == groups.end()) {
315
                TableSchema table_schema;
X
Xu Peng 已提交
316
                table_schema.table_id = table_file.table_id;
X
Xu Peng 已提交
317
                auto status = DescribeTable(table_schema);
318 319 320
                if (!status.ok()) {
                    return status;
                }
X
Xu Peng 已提交
321
                groups[table_file.table_id] = table_schema;
X
Xu Peng 已提交
322
            }
X
Xu Peng 已提交
323 324
            table_file.dimension = groups[table_file.table_id].dimension;
            files.push_back(table_file);
X
Xu Peng 已提交
325
        }
326
    } catch (std::exception &e) {
327 328
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
329
    }
X
Xu Peng 已提交
330

X
Xu Peng 已提交
331 332 333
    return Status::OK();
}

X
Xu Peng 已提交
334
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
335 336
                                 const DatesT &partition,
                                 DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
337
    files.clear();
X
Xu Peng 已提交
338

339
    try {
Y
yu yunfeng 已提交
340
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
341
        auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
        if (partition.empty()) {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
                                                         &TableFileSchema::table_id,
                                                         &TableFileSchema::file_id,
                                                         &TableFileSchema::file_type,
                                                         &TableFileSchema::size,
                                                         &TableFileSchema::date),
                                                 where(c(&TableFileSchema::table_id) == table_id and
                                                     (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type)
                                                             == (int) TableFileSchema::TO_INDEX or
                                                         c(&TableFileSchema::file_type)
                                                             == (int) TableFileSchema::INDEX)));
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
            TableSchema table_schema;
            table_schema.table_id = table_id;
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
xj.lin 已提交
364

X
Xu Peng 已提交
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
            TableFileSchema table_file;

            for (auto &file : selected) {
                table_file.id = std::get<0>(file);
                table_file.table_id = std::get<1>(file);
                table_file.file_id = std::get<2>(file);
                table_file.file_type = std::get<3>(file);
                table_file.size = std::get<4>(file);
                table_file.date = std::get<5>(file);
                table_file.dimension = table_schema.dimension;
                GetTableFilePath(table_file);
                auto dateItr = files.find(table_file.date);
                if (dateItr == files.end()) {
                    files[table_file.date] = TableFilesSchema();
                }
                files[table_file.date].push_back(table_file);
            }
        }
        else {
            auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
                                                         &TableFileSchema::table_id,
                                                         &TableFileSchema::file_id,
                                                         &TableFileSchema::file_type,
                                                         &TableFileSchema::size,
                                                         &TableFileSchema::date),
                                                 where(c(&TableFileSchema::table_id) == table_id and
                                                     in(&TableFileSchema::date, partition) and
                                                     (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
                                                         c(&TableFileSchema::file_type)
                                                             == (int) TableFileSchema::TO_INDEX or
                                                         c(&TableFileSchema::file_type)
                                                             == (int) TableFileSchema::INDEX)));
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
            TableSchema table_schema;
            table_schema.table_id = table_id;
            auto status = DescribeTable(table_schema);
            if (!status.ok()) {
                return status;
            }
X
Xu Peng 已提交
406

X
Xu Peng 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
            TableFileSchema table_file;

            for (auto &file : selected) {
                table_file.id = std::get<0>(file);
                table_file.table_id = std::get<1>(file);
                table_file.file_id = std::get<2>(file);
                table_file.file_type = std::get<3>(file);
                table_file.size = std::get<4>(file);
                table_file.date = std::get<5>(file);
                table_file.dimension = table_schema.dimension;
                GetTableFilePath(table_file);
                auto dateItr = files.find(table_file.date);
                if (dateItr == files.end()) {
                    files[table_file.date] = TableFilesSchema();
                }
                files[table_file.date].push_back(table_file);
423
            }
X
Xu Peng 已提交
424

X
xj.lin 已提交
425
        }
426
    } catch (std::exception &e) {
427 428
        LOG(DEBUG) << e.what();
        throw e;
X
xj.lin 已提交
429 430 431 432 433
    }

    return Status::OK();
}

434 435
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
                                DatePartionedTableFilesSchema &files) {
X
Xu Peng 已提交
436
    files.clear();
X
Xu Peng 已提交
437

438
    try {
Y
yu yunfeng 已提交
439
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
440
        auto start_time = METRICS_NOW_TIME;
441
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
442 443 444 445 446 447 448
                                                     &TableFileSchema::table_id,
                                                     &TableFileSchema::file_id,
                                                     &TableFileSchema::file_type,
                                                     &TableFileSchema::size,
                                                     &TableFileSchema::date),
                                             where(c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW and
                                                 c(&TableFileSchema::table_id) == table_id));
Y
yu yunfeng 已提交
449
        auto end_time = METRICS_NOW_TIME;
450
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
451
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
452 453
        TableSchema table_schema;
        table_schema.table_id = table_id;
X
Xu Peng 已提交
454
        auto status = DescribeTable(table_schema);
455

456 457 458
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
459

X
Xu Peng 已提交
460
        TableFileSchema table_file;
461
        for (auto &file : selected) {
X
Xu Peng 已提交
462 463 464 465 466 467 468
            table_file.id = std::get<0>(file);
            table_file.table_id = std::get<1>(file);
            table_file.file_id = std::get<2>(file);
            table_file.file_type = std::get<3>(file);
            table_file.size = std::get<4>(file);
            table_file.date = std::get<5>(file);
            table_file.dimension = table_schema.dimension;
X
Xu Peng 已提交
469
            GetTableFilePath(table_file);
X
Xu Peng 已提交
470
            auto dateItr = files.find(table_file.date);
471
            if (dateItr == files.end()) {
X
Xu Peng 已提交
472
                files[table_file.date] = TableFilesSchema();
473
            }
X
Xu Peng 已提交
474
            files[table_file.date].push_back(table_file);
X
Xu Peng 已提交
475
        }
476
    } catch (std::exception &e) {
477 478
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
479 480 481
    }

    return Status::OK();
X
Xu Peng 已提交
482 483
}

484
Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
X
Xu Peng 已提交
485

X
Xu Peng 已提交
486
    try {
487
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id,
488 489 490 491 492
                                                  &TableFileSchema::table_id,
                                                  &TableFileSchema::file_id,
                                                  &TableFileSchema::file_type,
                                                  &TableFileSchema::size,
                                                  &TableFileSchema::date),
X
Xu Peng 已提交
493
                                          where(c(&TableFileSchema::file_id) == file_schema.file_id and
494
                                              c(&TableFileSchema::table_id) == file_schema.table_id
X
Xu Peng 已提交
495 496 497
                                          ));
        assert(files.size() <= 1);
        if (files.size() == 1) {
X
Xu Peng 已提交
498 499 500 501 502 503
            file_schema.id = std::get<0>(files[0]);
            file_schema.table_id = std::get<1>(files[0]);
            file_schema.file_id = std::get<2>(files[0]);
            file_schema.file_type = std::get<3>(files[0]);
            file_schema.size = std::get<4>(files[0]);
            file_schema.date = std::get<5>(files[0]);
X
Xu Peng 已提交
504
        } else {
X
Xu Peng 已提交
505
            return Status::NotFound("Table:" + file_schema.table_id +
506
                " File:" + file_schema.file_id + " not found");
X
Xu Peng 已提交
507 508 509 510 511 512
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

X
Xu Peng 已提交
513
    return Status::OK();
X
Xu Peng 已提交
514 515
}

X
Xu Peng 已提交
516
// PXU TODO: Support Swap
X
Xu Peng 已提交
517
Status DBMetaImpl::Archive() {
518
    auto &criterias = options_.archive_conf.GetCriterias();
X
Xu Peng 已提交
519 520 521 522 523
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
524 525
        auto &criteria = kv.first;
        auto &limit = kv.second;
X
Xu Peng 已提交
526
        if (criteria == "days") {
X
Xu Peng 已提交
527
            long usecs = limit * D_SEC * US_PS;
528
            long now = utils::GetMicroSecTimeStamp();
529
            try {
X
Xu Peng 已提交
530
                ConnectorPtr->update_all(
531 532 533 534 535 536 537 538
                    set(
                        c(&TableFileSchema::file_type) = (int) TableFileSchema::TO_DELETE
                    ),
                    where(
                        c(&TableFileSchema::created_on) < (long) (now - usecs) and
                            c(&TableFileSchema::file_type) != (int) TableFileSchema::TO_DELETE
                    ));
            } catch (std::exception &e) {
X
Xu Peng 已提交
539 540 541 542 543
                LOG(DEBUG) << e.what();
                throw e;
            }
        }
        if (criteria == "disk") {
X
Xu Peng 已提交
544
            long sum = 0;
X
Xu Peng 已提交
545
            Size(sum);
X
Xu Peng 已提交
546

547
            auto to_delete = (sum - limit * G);
X
Xu Peng 已提交
548
            DiscardFiles(to_delete);
X
Xu Peng 已提交
549 550 551 552 553 554
        }
    }

    return Status::OK();
}

555
Status DBMetaImpl::Size(long &result) {
X
Xu Peng 已提交
556
    result = 0;
X
Xu Peng 已提交
557
    try {
558
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size)),
559 560 561
                                             where(
                                                 c(&TableFileSchema::file_type) != (int) TableFileSchema::TO_DELETE
                                             ));
X
Xu Peng 已提交
562

563 564
        for (auto &sub_query : selected) {
            if (!std::get<0>(sub_query)) {
X
Xu Peng 已提交
565 566
                continue;
            }
567
            result += (long) (*std::get<0>(sub_query));
X
Xu Peng 已提交
568
        }
569
    } catch (std::exception &e) {
X
Xu Peng 已提交
570 571 572 573 574 575 576
        LOG(DEBUG) << e.what();
        throw e;
    }

    return Status::OK();
}

X
Xu Peng 已提交
577
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
X
Xu Peng 已提交
578
    LOG(DEBUG) << "About to discard size=" << to_discard_size;
X
Xu Peng 已提交
579 580 581 582
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    try {
583
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
584 585 586 587 588
                                                     &TableFileSchema::size),
                                             where(c(&TableFileSchema::file_type)
                                                       != (int) TableFileSchema::TO_DELETE),
                                             order_by(&TableFileSchema::id),
                                             limit(10));
X
Xu Peng 已提交
589

590
        std::vector<int> ids;
X
Xu Peng 已提交
591
        TableFileSchema table_file;
592

593
        for (auto &file : selected) {
594
            if (to_discard_size <= 0) break;
X
Xu Peng 已提交
595 596 597 598 599
            table_file.id = std::get<0>(file);
            table_file.size = std::get<1>(file);
            ids.push_back(table_file.id);
            LOG(DEBUG) << "Discard table_file.id=" << table_file.file_id << " table_file.size=" << table_file.size;
            to_discard_size -= table_file.size;
600 601 602 603 604 605 606
        }

        if (ids.size() == 0) {
            return Status::OK();
        }

        ConnectorPtr->update_all(
607 608 609 610 611 612
            set(
                c(&TableFileSchema::file_type) = (int) TableFileSchema::TO_DELETE
            ),
            where(
                in(&TableFileSchema::id, ids)
            ));
X
Xu Peng 已提交
613

614
    } catch (std::exception &e) {
X
Xu Peng 已提交
615 616 617 618 619
        LOG(DEBUG) << e.what();
        throw e;
    }


X
Xu Peng 已提交
620
    return DiscardFiles(to_discard_size);
X
Xu Peng 已提交
621 622
}

623
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
X
Xu Peng 已提交
624
    file_schema.updated_time = utils::GetMicroSecTimeStamp();
625
    try {
Y
yu yunfeng 已提交
626
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
627
        auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
628
        ConnectorPtr->update(file_schema);
Y
yu yunfeng 已提交
629
        auto end_time = METRICS_NOW_TIME;
630
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
631
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
632
    } catch (std::exception &e) {
633
        LOG(DEBUG) << e.what();
X
Xu Peng 已提交
634
        LOG(DEBUG) << "table_id= " << file_schema.table_id << " file_id=" << file_schema.file_id;
635
        throw e;
X
Xu Peng 已提交
636
    }
X
Xu Peng 已提交
637
    return Status::OK();
X
Xu Peng 已提交
638 639
}

640
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
641
    try {
Y
yu yunfeng 已提交
642
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
643
        auto start_time = METRICS_NOW_TIME;
644 645
        auto commited = ConnectorPtr->transaction([&]() mutable {
            for (auto &file : files) {
X
Xu Peng 已提交
646
                file.updated_time = utils::GetMicroSecTimeStamp();
647 648
                ConnectorPtr->update(file);
            }
Y
yu yunfeng 已提交
649
            auto end_time = METRICS_NOW_TIME;
650
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
651
            server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
652 653 654 655
            return true;
        });
        if (!commited) {
            return Status::DBTransactionError("Update files Error");
X
Xu Peng 已提交
656
        }
657
    } catch (std::exception &e) {
658 659
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
660
    }
661 662 663
    return Status::OK();
}

664
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
X
Xu Peng 已提交
665
    auto now = utils::GetMicroSecTimeStamp();
666
    try {
667
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
668 669 670 671 672 673 674 675 676 677
                                                     &TableFileSchema::table_id,
                                                     &TableFileSchema::file_id,
                                                     &TableFileSchema::file_type,
                                                     &TableFileSchema::size,
                                                     &TableFileSchema::date),
                                             where(
                                                 c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_DELETE
                                                     and
                                                         c(&TableFileSchema::updated_time)
                                                             > now - seconds * US_PS));
678

679
        TableFilesSchema updated;
680
        TableFileSchema table_file;
681

682
        for (auto &file : selected) {
683 684 685 686 687 688
            table_file.id = std::get<0>(file);
            table_file.table_id = std::get<1>(file);
            table_file.file_id = std::get<2>(file);
            table_file.file_type = std::get<3>(file);
            table_file.size = std::get<4>(file);
            table_file.date = std::get<5>(file);
X
Xu Peng 已提交
689
            GetTableFilePath(table_file);
690 691
            if (table_file.file_type == TableFileSchema::TO_DELETE) {
                boost::filesystem::remove(table_file.location);
692
            }
693 694
            ConnectorPtr->remove<TableFileSchema>(table_file.id);
            /* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
X
Xu Peng 已提交
695
        }
696
    } catch (std::exception &e) {
697 698
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
699 700 701 702 703
    }

    return Status::OK();
}

704
Status DBMetaImpl::CleanUp() {
705
    try {
706
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
707 708 709 710 711 712 713 714 715 716
                                                     &TableFileSchema::table_id,
                                                     &TableFileSchema::file_id,
                                                     &TableFileSchema::file_type,
                                                     &TableFileSchema::size,
                                                     &TableFileSchema::date),
                                             where(
                                                 c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_DELETE
                                                     or
                                                         c(&TableFileSchema::file_type)
                                                             == (int) TableFileSchema::NEW));
717

718
        TableFilesSchema updated;
719
        TableFileSchema table_file;
720

721
        for (auto &file : selected) {
722 723 724 725 726 727
            table_file.id = std::get<0>(file);
            table_file.table_id = std::get<1>(file);
            table_file.file_id = std::get<2>(file);
            table_file.file_type = std::get<3>(file);
            table_file.size = std::get<4>(file);
            table_file.date = std::get<5>(file);
X
Xu Peng 已提交
728
            GetTableFilePath(table_file);
729 730
            if (table_file.file_type == TableFileSchema::TO_DELETE) {
                boost::filesystem::remove(table_file.location);
731
            }
732 733
            ConnectorPtr->remove<TableFileSchema>(table_file.id);
            /* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
X
Xu Peng 已提交
734
        }
735
    } catch (std::exception &e) {
736 737
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
738 739 740 741 742
    }

    return Status::OK();
}

743
Status DBMetaImpl::Count(const std::string &table_id, long &result) {
X
Xu Peng 已提交
744

745
    try {
746

Y
yu yunfeng 已提交
747
        server::Metrics::GetInstance().MetaAccessTotalIncrement();
Y
yu yunfeng 已提交
748
        auto start_time = METRICS_NOW_TIME;
749 750 751 752 753 754 755 756
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size,
                                                     &TableFileSchema::date),
                                             where((c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
                                                 c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_INDEX
                                                 or
                                                     c(&TableFileSchema::file_type) == (int) TableFileSchema::INDEX)
                                                       and
                                                           c(&TableFileSchema::table_id) == table_id));
Y
yu yunfeng 已提交
757
        auto end_time = METRICS_NOW_TIME;
758
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
759
        server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
760 761
        TableSchema table_schema;
        table_schema.table_id = table_id;
X
Xu Peng 已提交
762
        auto status = DescribeTable(table_schema);
763

764 765 766 767 768
        if (!status.ok()) {
            return status;
        }

        result = 0;
769
        for (auto &file : selected) {
770 771
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
772

773
        result /= table_schema.dimension;
X
Xu Peng 已提交
774

775
    } catch (std::exception &e) {
776 777
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
778 779 780 781
    }
    return Status::OK();
}

782
Status DBMetaImpl::DropAll() {
X
Xu Peng 已提交
783 784
    if (boost::filesystem::is_directory(options_.path)) {
        boost::filesystem::remove_all(options_.path);
X
Xu Peng 已提交
785 786 787 788
    }
    return Status::OK();
}

X
Xu Peng 已提交
789
DBMetaImpl::~DBMetaImpl() {
790
    CleanUp();
X
Xu Peng 已提交
791 792
}

793
} // namespace meta
X
Xu Peng 已提交
794 795 796
} // namespace engine
} // namespace vecwise
} // namespace zilliz