DBMetaImpl.cpp 25.1 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6

X
Xu Peng 已提交
7
#include <unistd.h>
X
Xu Peng 已提交
8 9
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
10
#include <boost/filesystem.hpp>
11
#include <chrono>
X
Xu Peng 已提交
12
#include <fstream>
13
#include <sqlite_orm.h>
14
#include <easylogging++.h>
X
Xu Peng 已提交
15

X
Xu Peng 已提交
16 17
#include "DBMetaImpl.h"
#include "IDGenerator.h"
X
Xu Peng 已提交
18
#include "Utils.h"
X
Xu Peng 已提交
19
#include "MetaConsts.h"
X
Xu Peng 已提交
20 21 22 23

namespace zilliz {
namespace vecwise {
namespace engine {
24
namespace meta {
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26 27
using namespace sqlite_orm;

X
Xu Peng 已提交
28 29
inline auto StoragePrototype(const std::string& path) {
    return make_storage(path,
X
Xu Peng 已提交
30
            make_table("Group",
31
                      make_column("id", &TableSchema::id, primary_key()),
32
                      make_column("table_id", &TableSchema::table_id, unique()),
33 34 35
                      make_column("dimension", &TableSchema::dimension),
                      make_column("created_on", &TableSchema::created_on),
                      make_column("files_cnt", &TableSchema::files_cnt, default_value(0))),
X
Xu Peng 已提交
36
            make_table("GroupFile",
37
                      make_column("id", &TableFileSchema::id, primary_key()),
38
                      make_column("table_id", &TableFileSchema::table_id),
39 40 41 42 43 44
                      make_column("file_id", &TableFileSchema::file_id),
                      make_column("file_type", &TableFileSchema::file_type),
                      make_column("size", &TableFileSchema::size, default_value(0)),
                      make_column("updated_time", &TableFileSchema::updated_time),
                      make_column("created_on", &TableFileSchema::created_on),
                      make_column("date", &TableFileSchema::date))
X
Xu Peng 已提交
45
            );
X
Xu Peng 已提交
46 47 48

}

X
Xu Peng 已提交
49
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
50 51
static std::unique_ptr<ConnectorT> ConnectorPtr;

52 53
std::string DBMetaImpl::GetGroupPath(const std::string& table_id) {
    return _options.path + "/tables/" + table_id;
X
Xu Peng 已提交
54 55
}

56
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& table_id, DateT& date) {
X
Xu Peng 已提交
57
    std::stringstream ss;
58
    ss << GetGroupPath(table_id) << "/" << date;
X
Xu Peng 已提交
59 60 61
    return ss.str();
}

62
void DBMetaImpl::GetGroupFilePath(TableFileSchema& group_file) {
X
Xu Peng 已提交
63 64 65 66
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    std::stringstream ss;
67
    ss << GetGroupDatePartitionPath(group_file.table_id, group_file.date)
X
Xu Peng 已提交
68 69 70 71
       << "/" << group_file.file_id;
    group_file.location = ss.str();
}

72
Status DBMetaImpl::NextGroupId(std::string& table_id) {
73 74 75
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.getNextIDNumber();
76
    table_id = ss.str();
77 78 79
    return Status::OK();
}

X
Xu Peng 已提交
80 81 82 83 84 85 86 87
Status DBMetaImpl::NextFileId(std::string& file_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.getNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

X
Xu Peng 已提交
88 89
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
    : _options(options_) {
X
Xu Peng 已提交
90 91 92 93
    initialize();
}

Status DBMetaImpl::initialize() {
X
Xu Peng 已提交
94
    if (!boost::filesystem::is_directory(_options.path)) {
95 96 97 98 99
        auto ret = boost::filesystem::create_directory(_options.path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << _options.path << " Error";
        }
        assert(ret);
X
Xu Peng 已提交
100
    }
X
Xu Peng 已提交
101

X
Xu Peng 已提交
102
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(_options.path+"/meta.sqlite"));
X
Xu Peng 已提交
103

X
Xu Peng 已提交
104
    ConnectorPtr->sync_schema();
105
    ConnectorPtr->open_forever(); // thread safe option
106
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
107

X
Xu Peng 已提交
108 109
    cleanup();

X
Xu Peng 已提交
110
    return Status::OK();
X
Xu Peng 已提交
111 112
}

X
Xu Peng 已提交
113
// PXU TODO: Temp solution. Will fix later
114
Status DBMetaImpl::delete_group_partitions(const std::string& table_id,
X
Xu Peng 已提交
115 116 117 118 119
            const meta::DatesT& dates) {
    if (dates.size() == 0) {
        return Status::OK();
    }

120
    TableSchema group_info;
121
    group_info.table_id = table_id;
X
Xu Peng 已提交
122 123 124 125 126
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }

X
Xu Peng 已提交
127
    auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
128 129 130 131 132 133 134 135 136 137

    for (auto& date : dates) {
        if (date >= yesterday) {
            return Status::Error("Could not delete partitions with 2 days");
        }
    }

    try {
        ConnectorPtr->update_all(
                    set(
138
                        c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
139 140
                    ),
                    where(
141
                        c(&TableFileSchema::table_id) == table_id and
142
                        in(&TableFileSchema::date, dates)
X
Xu Peng 已提交
143 144 145 146 147 148 149 150
                    ));
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }
    return Status::OK();
}

151
Status DBMetaImpl::add_group(TableSchema& group_info) {
152 153
    if (group_info.table_id == "") {
        NextGroupId(group_info.table_id);
154 155 156
    }
    group_info.files_cnt = 0;
    group_info.id = -1;
X
Xu Peng 已提交
157
    group_info.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
158

X
Xu Peng 已提交
159 160 161 162 163 164 165
    {
        try {
            auto id = ConnectorPtr->insert(group_info);
            group_info.id = id;
        } catch (...) {
            return Status::DBTransactionError("Add Group Error");
        }
X
Xu Peng 已提交
166
    }
167

168
    auto group_path = GetGroupPath(group_info.table_id);
169 170

    if (!boost::filesystem::is_directory(group_path)) {
171
        auto ret = boost::filesystem::create_directories(group_path);
172 173 174 175
        if (!ret) {
            LOG(ERROR) << "Create directory " << group_path << " Error";
        }
        assert(ret);
176 177
    }

X
Xu Peng 已提交
178
    return Status::OK();
X
Xu Peng 已提交
179 180
}

181
Status DBMetaImpl::get_group(TableSchema& group_info) {
X
Xu Peng 已提交
182 183 184
    return get_group_no_lock(group_info);
}

185
Status DBMetaImpl::get_group_no_lock(TableSchema& group_info) {
186
    try {
187
        auto groups = ConnectorPtr->select(columns(&TableSchema::id,
188
                                                  &TableSchema::table_id,
189 190
                                                  &TableSchema::files_cnt,
                                                  &TableSchema::dimension),
191
                                          where(c(&TableSchema::table_id) == group_info.table_id));
192 193 194 195 196 197
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            group_info.id = std::get<0>(groups[0]);
            group_info.files_cnt = std::get<2>(groups[0]);
            group_info.dimension = std::get<3>(groups[0]);
        } else {
198
            return Status::NotFound("Group " + group_info.table_id + " not found");
199 200 201 202
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
203
    }
X
Xu Peng 已提交
204

X
Xu Peng 已提交
205
    return Status::OK();
X
Xu Peng 已提交
206 207
}

208
Status DBMetaImpl::has_group(const std::string& table_id, bool& has_or_not) {
209
    try {
210
        auto groups = ConnectorPtr->select(columns(&TableSchema::id),
211
                                          where(c(&TableSchema::table_id) == table_id));
212 213 214 215 216 217 218 219 220
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
221
    }
X
Xu Peng 已提交
222
    return Status::OK();
X
Xu Peng 已提交
223 224
}

225
Status DBMetaImpl::add_group_file(TableFileSchema& group_file) {
X
Xu Peng 已提交
226 227 228
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
229
    TableSchema group_info;
230
    group_info.table_id = group_file.table_id;
X
Xu Peng 已提交
231 232 233 234
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }
235

X
Xu Peng 已提交
236
    NextFileId(group_file.file_id);
237
    group_file.file_type = TableFileSchema::NEW;
X
Xu Peng 已提交
238
    group_file.dimension = group_info.dimension;
239
    group_file.size = 0;
X
Xu Peng 已提交
240
    group_file.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
241
    group_file.updated_time = group_file.created_on;
X
Xu Peng 已提交
242
    GetGroupFilePath(group_file);
X
Xu Peng 已提交
243

X
Xu Peng 已提交
244 245 246 247 248 249 250
    {
        try {
            auto id = ConnectorPtr->insert(group_file);
            group_file.id = id;
        } catch (...) {
            return Status::DBTransactionError("Add file Error");
        }
X
Xu Peng 已提交
251
    }
252

253
    auto partition_path = GetGroupDatePartitionPath(group_file.table_id, group_file.date);
254 255

    if (!boost::filesystem::is_directory(partition_path)) {
256 257 258 259 260
        auto ret = boost::filesystem::create_directory(partition_path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << partition_path << " Error";
        }
        assert(ret);
261 262
    }

X
Xu Peng 已提交
263
    return Status::OK();
X
Xu Peng 已提交
264 265
}

266
Status DBMetaImpl::files_to_index(TableFilesSchema& files) {
X
Xu Peng 已提交
267
    files.clear();
X
Xu Peng 已提交
268

269
    try {
270
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
271
                                                   &TableFileSchema::table_id,
272 273 274 275 276
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX));
277

278
        std::map<std::string, TableSchema> groups;
279 280

        for (auto& file : selected) {
281
            TableFileSchema group_file;
282
            group_file.id = std::get<0>(file);
283
            group_file.table_id = std::get<1>(file);
284 285
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
286
            group_file.size = std::get<4>(file);
287 288
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
289
            auto groupItr = groups.find(group_file.table_id);
290
            if (groupItr == groups.end()) {
291
                TableSchema group_info;
292
                group_info.table_id = group_file.table_id;
293 294 295 296
                auto status = get_group_no_lock(group_info);
                if (!status.ok()) {
                    return status;
                }
297
                groups[group_file.table_id] = group_info;
X
Xu Peng 已提交
298
            }
299
            group_file.dimension = groups[group_file.table_id].dimension;
300
            files.push_back(group_file);
X
Xu Peng 已提交
301
        }
302 303 304
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
305
    }
X
Xu Peng 已提交
306

X
Xu Peng 已提交
307 308 309
    return Status::OK();
}

310
Status DBMetaImpl::files_to_search(const std::string &table_id,
X
Xu Peng 已提交
311
                                   const DatesT& partition,
312
                                   DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
313
    files.clear();
X
Xu Peng 已提交
314 315 316
    DatesT today = {Meta::GetDate()};
    const DatesT& dates = (partition.empty() == true) ? today : partition;

317
    try {
318
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
319
                                                     &TableFileSchema::table_id,
320 321 322 323
                                                     &TableFileSchema::file_id,
                                                     &TableFileSchema::file_type,
                                                     &TableFileSchema::size,
                                                     &TableFileSchema::date),
324
                                             where(c(&TableFileSchema::table_id) == table_id and
325 326 327 328 329 330
                                                 in(&TableFileSchema::date, dates) and
                                                 (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
                                                     c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_INDEX or
                                                     c(&TableFileSchema::file_type) == (int) TableFileSchema::INDEX)));

        TableSchema group_info;
331
        group_info.table_id = table_id;
332 333 334 335
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
336

337
        for (auto& file : selected) {
338
            TableFileSchema group_file;
339
            group_file.id = std::get<0>(file);
340
            group_file.table_id = std::get<1>(file);
341 342
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
343
            group_file.size = std::get<4>(file);
344 345 346 347 348
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
349
                files[group_file.date] = TableFilesSchema();
350 351
            }
            files[group_file.date].push_back(group_file);
X
xj.lin 已提交
352
        }
353 354 355
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
xj.lin 已提交
356 357 358 359 360
    }

    return Status::OK();
}

361
Status DBMetaImpl::files_to_merge(const std::string& table_id,
362
        DatePartionedTableFilesSchema& files) {
X
Xu Peng 已提交
363
    files.clear();
X
Xu Peng 已提交
364

365
    try {
366
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
367
                                                   &TableFileSchema::table_id,
368 369 370 371 372
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW and
373
                                                c(&TableFileSchema::table_id) == table_id));
374 375

        TableSchema group_info;
376
        group_info.table_id = table_id;
377 378 379 380
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
381

382
        for (auto& file : selected) {
383
            TableFileSchema group_file;
384
            group_file.id = std::get<0>(file);
385
            group_file.table_id = std::get<1>(file);
386 387
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
388
            group_file.size = std::get<4>(file);
389 390 391 392 393
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
394
                files[group_file.date] = TableFilesSchema();
395 396
            }
            files[group_file.date].push_back(group_file);
X
Xu Peng 已提交
397
        }
398 399 400
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
401 402 403
    }

    return Status::OK();
X
Xu Peng 已提交
404 405
}

406
Status DBMetaImpl::has_group_file(const std::string& table_id_,
407 408
                              const std::string& file_id_,
                              bool& has_or_not_) {
X
Xu Peng 已提交
409
    //PXU TODO
X
Xu Peng 已提交
410
    return Status::OK();
X
Xu Peng 已提交
411 412
}

413
Status DBMetaImpl::get_group_file(const std::string& table_id_,
414
                              const std::string& file_id_,
415
                              TableFileSchema& group_file_info_) {
X
Xu Peng 已提交
416
    try {
417
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id,
418
                                                   &TableFileSchema::table_id,
419 420 421 422 423
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_id) == file_id_ and
424
                                                c(&TableFileSchema::table_id) == table_id_
X
Xu Peng 已提交
425 426 427 428
                                          ));
        assert(files.size() <= 1);
        if (files.size() == 1) {
            group_file_info_.id = std::get<0>(files[0]);
429
            group_file_info_.table_id = std::get<1>(files[0]);
X
Xu Peng 已提交
430 431
            group_file_info_.file_id = std::get<2>(files[0]);
            group_file_info_.file_type = std::get<3>(files[0]);
432
            group_file_info_.size = std::get<4>(files[0]);
X
Xu Peng 已提交
433 434 435 436 437 438 439 440 441
            group_file_info_.date = std::get<5>(files[0]);
        } else {
            return Status::NotFound("GroupFile " + file_id_ + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

X
Xu Peng 已提交
442
    return Status::OK();
X
Xu Peng 已提交
443 444
}

445
Status DBMetaImpl::get_group_files(const std::string& table_id_,
446
                               const int date_delta_,
447
                               TableFilesSchema& group_files_info_) {
X
Xu Peng 已提交
448
    // PXU TODO
X
Xu Peng 已提交
449
    return Status::OK();
X
Xu Peng 已提交
450 451
}

X
Xu Peng 已提交
452 453 454 455 456 457 458 459 460 461 462
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
    auto& criterias = _options.archive_conf.GetCriterias();
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == "days") {
X
Xu Peng 已提交
463
            long usecs = limit * D_SEC * US_PS;
464
            long now = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
465 466 467 468
            try
            {
                ConnectorPtr->update_all(
                        set(
469
                            c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
470 471
                           ),
                        where(
472 473
                            c(&TableFileSchema::created_on) < (long)(now - usecs) and
                            c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
474 475 476 477 478 479 480
                            ));
            } catch (std::exception & e) {
                LOG(DEBUG) << e.what();
                throw e;
            }
        }
        if (criteria == "disk") {
X
Xu Peng 已提交
481 482 483
            long sum = 0;
            size(sum);

484 485
            // PXU TODO: refactor size
            auto to_delete = (sum - limit*G);
X
Xu Peng 已提交
486 487 488 489 490 491 492
            discard_files_of_size(to_delete);
        }
    }

    return Status::OK();
}

X
Xu Peng 已提交
493
Status DBMetaImpl::size(long& result) {
X
Xu Peng 已提交
494
    result = 0;
X
Xu Peng 已提交
495
    try {
496
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size)),
X
Xu Peng 已提交
497
                where(
498
                    c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
499
                    ));
X
Xu Peng 已提交
500 501 502 503 504

        for (auto& sub_query : selected) {
            if(!std::get<0>(sub_query)) {
                continue;
            }
505
            result += (long)(*std::get<0>(sub_query));
X
Xu Peng 已提交
506
        }
X
Xu Peng 已提交
507 508 509 510 511 512 513 514
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

    return Status::OK();
}

X
Xu Peng 已提交
515
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
516
    LOG(DEBUG) << "Abort to discard size=" << to_discard_size;
X
Xu Peng 已提交
517 518 519 520
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    try {
521 522 523 524
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
                                                   &TableFileSchema::size),
                                          where(c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE),
                                          order_by(&TableFileSchema::id),
X
Xu Peng 已提交
525
                                          limit(10));
526 527 528 529
        std::vector<int> ids;

        for (auto& file : selected) {
            if (to_discard_size <= 0) break;
530
            TableFileSchema group_file;
531
            group_file.id = std::get<0>(file);
532
            group_file.size = std::get<1>(file);
533
            ids.push_back(group_file.id);
534 535
            LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size;
            to_discard_size -= group_file.size;
536 537 538 539 540 541 542 543
        }

        if (ids.size() == 0) {
            return Status::OK();
        }

        ConnectorPtr->update_all(
                    set(
544
                        c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
545 546
                    ),
                    where(
547
                        in(&TableFileSchema::id, ids)
548
                    ));
X
Xu Peng 已提交
549 550 551 552 553 554 555

    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }


556
    return discard_files_of_size(to_discard_size);
X
Xu Peng 已提交
557 558
}

559
Status DBMetaImpl::update_group_file(TableFileSchema& group_file) {
X
Xu Peng 已提交
560
    group_file.updated_time = utils::GetMicroSecTimeStamp();
561
    try {
562
        ConnectorPtr->update(group_file);
563 564 565 566
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
        throw e;
X
Xu Peng 已提交
567
    }
X
Xu Peng 已提交
568
    return Status::OK();
X
Xu Peng 已提交
569 570
}

571
Status DBMetaImpl::update_files(TableFilesSchema& files) {
572 573 574
    try {
        auto commited = ConnectorPtr->transaction([&] () mutable {
            for (auto& file : files) {
X
Xu Peng 已提交
575
                file.updated_time = utils::GetMicroSecTimeStamp();
576 577 578 579 580 581
                ConnectorPtr->update(file);
            }
            return true;
        });
        if (!commited) {
            return Status::DBTransactionError("Update files Error");
X
Xu Peng 已提交
582
        }
583 584 585
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
586
    }
587 588 589
    return Status::OK();
}

X
Xu Peng 已提交
590
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
X
Xu Peng 已提交
591
    auto now = utils::GetMicroSecTimeStamp();
592
    try {
593
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
594
                                                   &TableFileSchema::table_id,
595 596 597 598 599 600
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE and
                                                c(&TableFileSchema::updated_time) > now - seconds*US_PS));
601

602
        TableFilesSchema updated;
603 604

        for (auto& file : selected) {
605
            TableFileSchema group_file;
606
            group_file.id = std::get<0>(file);
607
            group_file.table_id = std::get<1>(file);
608 609
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
610
            group_file.size = std::get<4>(file);
611 612
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
613
            if (group_file.file_type == TableFileSchema::TO_DELETE) {
614 615
                boost::filesystem::remove(group_file.location);
            }
616
            ConnectorPtr->remove<TableFileSchema>(group_file.id);
X
Xu Peng 已提交
617
            /* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
618
        }
619 620 621
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
622 623 624 625 626
    }

    return Status::OK();
}

X
Xu Peng 已提交
627
Status DBMetaImpl::cleanup() {
628
    try {
629
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
630
                                                   &TableFileSchema::table_id,
631 632 633 634 635 636
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE or
                                                c(&TableFileSchema::file_type) == (int)TableFileSchema::NEW));
637

638
        TableFilesSchema updated;
639 640

        for (auto& file : selected) {
641
            TableFileSchema group_file;
642
            group_file.id = std::get<0>(file);
643
            group_file.table_id = std::get<1>(file);
644 645
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
646
            group_file.size = std::get<4>(file);
647 648
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
649
            if (group_file.file_type == TableFileSchema::TO_DELETE) {
650 651
                boost::filesystem::remove(group_file.location);
            }
652
            ConnectorPtr->remove<TableFileSchema>(group_file.id);
X
Xu Peng 已提交
653
            /* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
654
        }
655 656 657
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
658 659 660 661 662
    }

    return Status::OK();
}

663
Status DBMetaImpl::count(const std::string& table_id, long& result) {
X
Xu Peng 已提交
664

665
    try {
666 667 668 669 670
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where((c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW or
                                                 c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX or
                                                 c(&TableFileSchema::file_type) == (int)TableFileSchema::INDEX) and
671
                                                c(&TableFileSchema::table_id) == table_id));
672 673

        TableSchema group_info;
674
        group_info.table_id = table_id;
675 676 677 678 679 680 681 682 683
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }

        result = 0;
        for (auto& file : selected) {
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
684

685
        result /= group_info.dimension;
X
Xu Peng 已提交
686

687 688 689
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
690 691 692 693
    }
    return Status::OK();
}

X
Xu Peng 已提交
694 695 696 697 698 699 700
Status DBMetaImpl::drop_all() {
    if (boost::filesystem::is_directory(_options.path)) {
        boost::filesystem::remove_all(_options.path);
    }
    return Status::OK();
}

X
Xu Peng 已提交
701 702 703 704
DBMetaImpl::~DBMetaImpl() {
    cleanup();
}

705
} // namespace meta
X
Xu Peng 已提交
706 707 708
} // namespace engine
} // namespace vecwise
} // namespace zilliz