DBMetaImpl.cpp 25.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6

X
Xu Peng 已提交
7
#include <unistd.h>
X
Xu Peng 已提交
8 9
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
10
#include <boost/filesystem.hpp>
11
#include <chrono>
X
Xu Peng 已提交
12
#include <fstream>
13
#include <sqlite_orm.h>
14
#include <easylogging++.h>
X
Xu Peng 已提交
15

X
Xu Peng 已提交
16 17
#include "DBMetaImpl.h"
#include "IDGenerator.h"
X
Xu Peng 已提交
18
#include "Utils.h"
X
Xu Peng 已提交
19
#include "MetaConsts.h"
X
Xu Peng 已提交
20 21 22 23

namespace zilliz {
namespace vecwise {
namespace engine {
24
namespace meta {
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26 27
using namespace sqlite_orm;

X
Xu Peng 已提交
28 29
inline auto StoragePrototype(const std::string& path) {
    return make_storage(path,
X
Xu Peng 已提交
30
            make_table("Group",
31
                      make_column("id", &TableSchema::id, primary_key()),
32
                      make_column("table_id", &TableSchema::table_id, unique()),
33 34 35
                      make_column("dimension", &TableSchema::dimension),
                      make_column("created_on", &TableSchema::created_on),
                      make_column("files_cnt", &TableSchema::files_cnt, default_value(0))),
X
Xu Peng 已提交
36
            make_table("GroupFile",
37
                      make_column("id", &TableFileSchema::id, primary_key()),
38
                      make_column("table_id", &TableFileSchema::table_id),
39 40 41 42 43 44
                      make_column("file_id", &TableFileSchema::file_id),
                      make_column("file_type", &TableFileSchema::file_type),
                      make_column("size", &TableFileSchema::size, default_value(0)),
                      make_column("updated_time", &TableFileSchema::updated_time),
                      make_column("created_on", &TableFileSchema::created_on),
                      make_column("date", &TableFileSchema::date))
X
Xu Peng 已提交
45
            );
X
Xu Peng 已提交
46 47 48

}

X
Xu Peng 已提交
49
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
50 51
static std::unique_ptr<ConnectorT> ConnectorPtr;

52 53
std::string DBMetaImpl::GetGroupPath(const std::string& table_id) {
    return _options.path + "/tables/" + table_id;
X
Xu Peng 已提交
54 55
}

56
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& table_id, DateT& date) {
X
Xu Peng 已提交
57
    std::stringstream ss;
58
    ss << GetGroupPath(table_id) << "/" << date;
X
Xu Peng 已提交
59 60 61
    return ss.str();
}

62
void DBMetaImpl::GetGroupFilePath(TableFileSchema& group_file) {
X
Xu Peng 已提交
63 64 65 66
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    std::stringstream ss;
67
    ss << GetGroupDatePartitionPath(group_file.table_id, group_file.date)
X
Xu Peng 已提交
68 69 70 71
       << "/" << group_file.file_id;
    group_file.location = ss.str();
}

72
Status DBMetaImpl::NextGroupId(std::string& table_id) {
73 74 75
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.getNextIDNumber();
76
    table_id = ss.str();
77 78 79
    return Status::OK();
}

X
Xu Peng 已提交
80 81 82 83 84 85 86 87
Status DBMetaImpl::NextFileId(std::string& file_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.getNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

X
Xu Peng 已提交
88 89
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
    : _options(options_) {
X
Xu Peng 已提交
90 91 92 93
    initialize();
}

Status DBMetaImpl::initialize() {
X
Xu Peng 已提交
94
    if (!boost::filesystem::is_directory(_options.path)) {
95 96 97 98 99
        auto ret = boost::filesystem::create_directory(_options.path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << _options.path << " Error";
        }
        assert(ret);
X
Xu Peng 已提交
100
    }
X
Xu Peng 已提交
101

X
Xu Peng 已提交
102
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(_options.path+"/meta.sqlite"));
X
Xu Peng 已提交
103

X
Xu Peng 已提交
104
    ConnectorPtr->sync_schema();
105
    ConnectorPtr->open_forever(); // thread safe option
106
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
107

X
Xu Peng 已提交
108 109
    cleanup();

X
Xu Peng 已提交
110
    return Status::OK();
X
Xu Peng 已提交
111 112
}

X
Xu Peng 已提交
113
// PXU TODO: Temp solution. Will fix later
114
Status DBMetaImpl::delete_group_partitions(const std::string& table_id,
X
Xu Peng 已提交
115 116 117 118 119
            const meta::DatesT& dates) {
    if (dates.size() == 0) {
        return Status::OK();
    }

120 121
    TableSchema table_schema;
    table_schema.table_id = table_id;
X
Xu Peng 已提交
122
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
123 124 125 126
    if (!status.ok()) {
        return status;
    }

X
Xu Peng 已提交
127
    auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
128 129 130 131 132 133 134 135 136 137

    for (auto& date : dates) {
        if (date >= yesterday) {
            return Status::Error("Could not delete partitions with 2 days");
        }
    }

    try {
        ConnectorPtr->update_all(
                    set(
138
                        c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
139 140
                    ),
                    where(
141
                        c(&TableFileSchema::table_id) == table_id and
142
                        in(&TableFileSchema::date, dates)
X
Xu Peng 已提交
143 144 145 146 147 148 149 150
                    ));
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }
    return Status::OK();
}

151 152 153
Status DBMetaImpl::CreateTable(TableSchema& table_schema) {
    if (table_schema.table_id == "") {
        NextGroupId(table_schema.table_id);
154
    }
155 156 157
    table_schema.files_cnt = 0;
    table_schema.id = -1;
    table_schema.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
158

X
Xu Peng 已提交
159 160
    {
        try {
161 162
            auto id = ConnectorPtr->insert(table_schema);
            table_schema.id = id;
X
Xu Peng 已提交
163 164 165
        } catch (...) {
            return Status::DBTransactionError("Add Group Error");
        }
X
Xu Peng 已提交
166
    }
167

168
    auto group_path = GetGroupPath(table_schema.table_id);
169 170

    if (!boost::filesystem::is_directory(group_path)) {
171
        auto ret = boost::filesystem::create_directories(group_path);
172 173 174 175
        if (!ret) {
            LOG(ERROR) << "Create directory " << group_path << " Error";
        }
        assert(ret);
176 177
    }

X
Xu Peng 已提交
178
    return Status::OK();
X
Xu Peng 已提交
179 180
}

X
Xu Peng 已提交
181
Status DBMetaImpl::DescribeTable(TableSchema& table_schema) {
182
    try {
183
        auto groups = ConnectorPtr->select(columns(&TableSchema::id,
184
                                                  &TableSchema::table_id,
185 186
                                                  &TableSchema::files_cnt,
                                                  &TableSchema::dimension),
187
                                          where(c(&TableSchema::table_id) == table_schema.table_id));
188 189
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
190 191 192
            table_schema.id = std::get<0>(groups[0]);
            table_schema.files_cnt = std::get<2>(groups[0]);
            table_schema.dimension = std::get<3>(groups[0]);
193
        } else {
194
            return Status::NotFound("Group " + table_schema.table_id + " not found");
195 196 197 198
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
199
    }
X
Xu Peng 已提交
200

X
Xu Peng 已提交
201
    return Status::OK();
X
Xu Peng 已提交
202 203
}

204
Status DBMetaImpl::has_group(const std::string& table_id, bool& has_or_not) {
205
    try {
206
        auto groups = ConnectorPtr->select(columns(&TableSchema::id),
207
                                          where(c(&TableSchema::table_id) == table_id));
208 209 210 211 212 213 214 215 216
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
217
    }
X
Xu Peng 已提交
218
    return Status::OK();
X
Xu Peng 已提交
219 220
}

221
Status DBMetaImpl::add_group_file(TableFileSchema& group_file) {
X
Xu Peng 已提交
222 223 224
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
225 226
    TableSchema table_schema;
    table_schema.table_id = group_file.table_id;
X
Xu Peng 已提交
227
    auto status = DescribeTable(table_schema);
X
Xu Peng 已提交
228 229 230
    if (!status.ok()) {
        return status;
    }
231

X
Xu Peng 已提交
232
    NextFileId(group_file.file_id);
233
    group_file.file_type = TableFileSchema::NEW;
234
    group_file.dimension = table_schema.dimension;
235
    group_file.size = 0;
X
Xu Peng 已提交
236
    group_file.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
237
    group_file.updated_time = group_file.created_on;
X
Xu Peng 已提交
238
    GetGroupFilePath(group_file);
X
Xu Peng 已提交
239

X
Xu Peng 已提交
240 241 242 243 244 245 246
    {
        try {
            auto id = ConnectorPtr->insert(group_file);
            group_file.id = id;
        } catch (...) {
            return Status::DBTransactionError("Add file Error");
        }
X
Xu Peng 已提交
247
    }
248

249
    auto partition_path = GetGroupDatePartitionPath(group_file.table_id, group_file.date);
250 251

    if (!boost::filesystem::is_directory(partition_path)) {
252 253 254 255 256
        auto ret = boost::filesystem::create_directory(partition_path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << partition_path << " Error";
        }
        assert(ret);
257 258
    }

X
Xu Peng 已提交
259
    return Status::OK();
X
Xu Peng 已提交
260 261
}

262
Status DBMetaImpl::files_to_index(TableFilesSchema& files) {
X
Xu Peng 已提交
263
    files.clear();
X
Xu Peng 已提交
264

265
    try {
266
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
267
                                                   &TableFileSchema::table_id,
268 269 270 271 272
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX));
273

274
        std::map<std::string, TableSchema> groups;
275 276

        for (auto& file : selected) {
277
            TableFileSchema group_file;
278
            group_file.id = std::get<0>(file);
279
            group_file.table_id = std::get<1>(file);
280 281
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
282
            group_file.size = std::get<4>(file);
283 284
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
285
            auto groupItr = groups.find(group_file.table_id);
286
            if (groupItr == groups.end()) {
287 288
                TableSchema table_schema;
                table_schema.table_id = group_file.table_id;
X
Xu Peng 已提交
289
                auto status = DescribeTable(table_schema);
290 291 292
                if (!status.ok()) {
                    return status;
                }
293
                groups[group_file.table_id] = table_schema;
X
Xu Peng 已提交
294
            }
295
            group_file.dimension = groups[group_file.table_id].dimension;
296
            files.push_back(group_file);
X
Xu Peng 已提交
297
        }
298 299 300
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
301
    }
X
Xu Peng 已提交
302

X
Xu Peng 已提交
303 304 305
    return Status::OK();
}

306
Status DBMetaImpl::files_to_search(const std::string &table_id,
X
Xu Peng 已提交
307
                                   const DatesT& partition,
308
                                   DatePartionedTableFilesSchema &files) {
X
xj.lin 已提交
309
    files.clear();
X
Xu Peng 已提交
310 311 312
    DatesT today = {Meta::GetDate()};
    const DatesT& dates = (partition.empty() == true) ? today : partition;

313
    try {
314
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
315
                                                     &TableFileSchema::table_id,
316 317 318 319
                                                     &TableFileSchema::file_id,
                                                     &TableFileSchema::file_type,
                                                     &TableFileSchema::size,
                                                     &TableFileSchema::date),
320
                                             where(c(&TableFileSchema::table_id) == table_id and
321 322 323 324 325
                                                 in(&TableFileSchema::date, dates) and
                                                 (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
                                                     c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_INDEX or
                                                     c(&TableFileSchema::file_type) == (int) TableFileSchema::INDEX)));

326 327
        TableSchema table_schema;
        table_schema.table_id = table_id;
X
Xu Peng 已提交
328
        auto status = DescribeTable(table_schema);
329 330 331
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
332

333
        for (auto& file : selected) {
334
            TableFileSchema group_file;
335
            group_file.id = std::get<0>(file);
336
            group_file.table_id = std::get<1>(file);
337 338
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
339
            group_file.size = std::get<4>(file);
340
            group_file.date = std::get<5>(file);
341
            group_file.dimension = table_schema.dimension;
342 343 344
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
345
                files[group_file.date] = TableFilesSchema();
346 347
            }
            files[group_file.date].push_back(group_file);
X
xj.lin 已提交
348
        }
349 350 351
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
xj.lin 已提交
352 353 354 355 356
    }

    return Status::OK();
}

357
Status DBMetaImpl::files_to_merge(const std::string& table_id,
358
        DatePartionedTableFilesSchema& files) {
X
Xu Peng 已提交
359
    files.clear();
X
Xu Peng 已提交
360

361
    try {
362
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
363
                                                   &TableFileSchema::table_id,
364 365 366 367 368
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW and
369
                                                c(&TableFileSchema::table_id) == table_id));
370

371 372
        TableSchema table_schema;
        table_schema.table_id = table_id;
X
Xu Peng 已提交
373
        auto status = DescribeTable(table_schema);
374 375 376
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
377

378
        for (auto& file : selected) {
379
            TableFileSchema group_file;
380
            group_file.id = std::get<0>(file);
381
            group_file.table_id = std::get<1>(file);
382 383
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
384
            group_file.size = std::get<4>(file);
385
            group_file.date = std::get<5>(file);
386
            group_file.dimension = table_schema.dimension;
387 388 389
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
390
                files[group_file.date] = TableFilesSchema();
391 392
            }
            files[group_file.date].push_back(group_file);
X
Xu Peng 已提交
393
        }
394 395 396
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
397 398 399
    }

    return Status::OK();
X
Xu Peng 已提交
400 401
}

402
Status DBMetaImpl::has_group_file(const std::string& table_id_,
403 404
                              const std::string& file_id_,
                              bool& has_or_not_) {
X
Xu Peng 已提交
405
    //PXU TODO
X
Xu Peng 已提交
406
    return Status::OK();
X
Xu Peng 已提交
407 408
}

409
Status DBMetaImpl::get_group_file(const std::string& table_id_,
410
                              const std::string& file_id_,
411
                              TableFileSchema& group_file_info_) {
X
Xu Peng 已提交
412
    try {
413
        auto files = ConnectorPtr->select(columns(&TableFileSchema::id,
414
                                                   &TableFileSchema::table_id,
415 416 417 418 419
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_id) == file_id_ and
420
                                                c(&TableFileSchema::table_id) == table_id_
X
Xu Peng 已提交
421 422 423 424
                                          ));
        assert(files.size() <= 1);
        if (files.size() == 1) {
            group_file_info_.id = std::get<0>(files[0]);
425
            group_file_info_.table_id = std::get<1>(files[0]);
X
Xu Peng 已提交
426 427
            group_file_info_.file_id = std::get<2>(files[0]);
            group_file_info_.file_type = std::get<3>(files[0]);
428
            group_file_info_.size = std::get<4>(files[0]);
X
Xu Peng 已提交
429 430 431 432 433 434 435 436 437
            group_file_info_.date = std::get<5>(files[0]);
        } else {
            return Status::NotFound("GroupFile " + file_id_ + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

X
Xu Peng 已提交
438
    return Status::OK();
X
Xu Peng 已提交
439 440
}

441
Status DBMetaImpl::get_group_files(const std::string& table_id_,
442
                               const int date_delta_,
443
                               TableFilesSchema& group_files_info_) {
X
Xu Peng 已提交
444
    // PXU TODO
X
Xu Peng 已提交
445
    return Status::OK();
X
Xu Peng 已提交
446 447
}

X
Xu Peng 已提交
448 449 450 451 452 453 454 455 456 457 458
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
    auto& criterias = _options.archive_conf.GetCriterias();
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == "days") {
X
Xu Peng 已提交
459
            long usecs = limit * D_SEC * US_PS;
460
            long now = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
461 462 463 464
            try
            {
                ConnectorPtr->update_all(
                        set(
465
                            c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
466 467
                           ),
                        where(
468 469
                            c(&TableFileSchema::created_on) < (long)(now - usecs) and
                            c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
470 471 472 473 474 475 476
                            ));
            } catch (std::exception & e) {
                LOG(DEBUG) << e.what();
                throw e;
            }
        }
        if (criteria == "disk") {
X
Xu Peng 已提交
477 478 479
            long sum = 0;
            size(sum);

480 481
            // PXU TODO: refactor size
            auto to_delete = (sum - limit*G);
X
Xu Peng 已提交
482 483 484 485 486 487 488
            discard_files_of_size(to_delete);
        }
    }

    return Status::OK();
}

X
Xu Peng 已提交
489
Status DBMetaImpl::size(long& result) {
X
Xu Peng 已提交
490
    result = 0;
X
Xu Peng 已提交
491
    try {
492
        auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size)),
X
Xu Peng 已提交
493
                where(
494
                    c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
X
Xu Peng 已提交
495
                    ));
X
Xu Peng 已提交
496 497 498 499 500

        for (auto& sub_query : selected) {
            if(!std::get<0>(sub_query)) {
                continue;
            }
501
            result += (long)(*std::get<0>(sub_query));
X
Xu Peng 已提交
502
        }
X
Xu Peng 已提交
503 504 505 506 507 508 509 510
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

    return Status::OK();
}

X
Xu Peng 已提交
511
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
512
    LOG(DEBUG) << "Abort to discard size=" << to_discard_size;
X
Xu Peng 已提交
513 514 515 516
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    try {
517 518 519 520
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
                                                   &TableFileSchema::size),
                                          where(c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE),
                                          order_by(&TableFileSchema::id),
X
Xu Peng 已提交
521
                                          limit(10));
522 523 524 525
        std::vector<int> ids;

        for (auto& file : selected) {
            if (to_discard_size <= 0) break;
526
            TableFileSchema group_file;
527
            group_file.id = std::get<0>(file);
528
            group_file.size = std::get<1>(file);
529
            ids.push_back(group_file.id);
530 531
            LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size;
            to_discard_size -= group_file.size;
532 533 534 535 536 537 538 539
        }

        if (ids.size() == 0) {
            return Status::OK();
        }

        ConnectorPtr->update_all(
                    set(
540
                        c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
541 542
                    ),
                    where(
543
                        in(&TableFileSchema::id, ids)
544
                    ));
X
Xu Peng 已提交
545 546 547 548 549 550 551

    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }


552
    return discard_files_of_size(to_discard_size);
X
Xu Peng 已提交
553 554
}

555
Status DBMetaImpl::update_group_file(TableFileSchema& group_file) {
X
Xu Peng 已提交
556
    group_file.updated_time = utils::GetMicroSecTimeStamp();
557
    try {
558
        ConnectorPtr->update(group_file);
559 560 561 562
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
        throw e;
X
Xu Peng 已提交
563
    }
X
Xu Peng 已提交
564
    return Status::OK();
X
Xu Peng 已提交
565 566
}

567
Status DBMetaImpl::update_files(TableFilesSchema& files) {
568 569 570
    try {
        auto commited = ConnectorPtr->transaction([&] () mutable {
            for (auto& file : files) {
X
Xu Peng 已提交
571
                file.updated_time = utils::GetMicroSecTimeStamp();
572 573 574 575 576 577
                ConnectorPtr->update(file);
            }
            return true;
        });
        if (!commited) {
            return Status::DBTransactionError("Update files Error");
X
Xu Peng 已提交
578
        }
579 580 581
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
582
    }
583 584 585
    return Status::OK();
}

X
Xu Peng 已提交
586
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
X
Xu Peng 已提交
587
    auto now = utils::GetMicroSecTimeStamp();
588
    try {
589
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
590
                                                   &TableFileSchema::table_id,
591 592 593 594 595 596
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE and
                                                c(&TableFileSchema::updated_time) > now - seconds*US_PS));
597

598
        TableFilesSchema updated;
599 600

        for (auto& file : selected) {
601
            TableFileSchema group_file;
602
            group_file.id = std::get<0>(file);
603
            group_file.table_id = std::get<1>(file);
604 605
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
606
            group_file.size = std::get<4>(file);
607 608
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
609
            if (group_file.file_type == TableFileSchema::TO_DELETE) {
610 611
                boost::filesystem::remove(group_file.location);
            }
612
            ConnectorPtr->remove<TableFileSchema>(group_file.id);
X
Xu Peng 已提交
613
            /* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
614
        }
615 616 617
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
618 619 620 621 622
    }

    return Status::OK();
}

X
Xu Peng 已提交
623
Status DBMetaImpl::cleanup() {
624
    try {
625
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
626
                                                   &TableFileSchema::table_id,
627 628 629 630 631 632
                                                   &TableFileSchema::file_id,
                                                   &TableFileSchema::file_type,
                                                   &TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE or
                                                c(&TableFileSchema::file_type) == (int)TableFileSchema::NEW));
633

634
        TableFilesSchema updated;
635 636

        for (auto& file : selected) {
637
            TableFileSchema group_file;
638
            group_file.id = std::get<0>(file);
639
            group_file.table_id = std::get<1>(file);
640 641
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
642
            group_file.size = std::get<4>(file);
643 644
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
645
            if (group_file.file_type == TableFileSchema::TO_DELETE) {
646 647
                boost::filesystem::remove(group_file.location);
            }
648
            ConnectorPtr->remove<TableFileSchema>(group_file.id);
X
Xu Peng 已提交
649
            /* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
650
        }
651 652 653
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
654 655 656 657 658
    }

    return Status::OK();
}

659
Status DBMetaImpl::count(const std::string& table_id, long& result) {
X
Xu Peng 已提交
660

661
    try {
662 663 664 665 666
        auto selected = ConnectorPtr->select(columns(&TableFileSchema::size,
                                                   &TableFileSchema::date),
                                          where((c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW or
                                                 c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX or
                                                 c(&TableFileSchema::file_type) == (int)TableFileSchema::INDEX) and
667
                                                c(&TableFileSchema::table_id) == table_id));
668

669 670
        TableSchema table_schema;
        table_schema.table_id = table_id;
X
Xu Peng 已提交
671
        auto status = DescribeTable(table_schema);
672 673 674 675 676 677 678 679
        if (!status.ok()) {
            return status;
        }

        result = 0;
        for (auto& file : selected) {
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
680

681
        result /= table_schema.dimension;
X
Xu Peng 已提交
682

683 684 685
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
686 687 688 689
    }
    return Status::OK();
}

X
Xu Peng 已提交
690 691 692 693 694 695 696
Status DBMetaImpl::drop_all() {
    if (boost::filesystem::is_directory(_options.path)) {
        boost::filesystem::remove_all(_options.path);
    }
    return Status::OK();
}

X
Xu Peng 已提交
697 698 699 700
DBMetaImpl::~DBMetaImpl() {
    cleanup();
}

701
} // namespace meta
X
Xu Peng 已提交
702 703 704
} // namespace engine
} // namespace vecwise
} // namespace zilliz