DBMetaImpl.cpp 25.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6

X
Xu Peng 已提交
7
#include <unistd.h>
X
Xu Peng 已提交
8 9
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
10
#include <boost/filesystem.hpp>
11
#include <chrono>
X
Xu Peng 已提交
12
#include <fstream>
13
#include <sqlite_orm.h>
14
#include <easylogging++.h>
X
Xu Peng 已提交
15

X
Xu Peng 已提交
16 17
#include "DBMetaImpl.h"
#include "IDGenerator.h"
X
Xu Peng 已提交
18
#include "Utils.h"
X
Xu Peng 已提交
19
#include "MetaConsts.h"
X
Xu Peng 已提交
20 21 22 23

namespace zilliz {
namespace vecwise {
namespace engine {
24
namespace meta {
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26 27
using namespace sqlite_orm;

X
Xu Peng 已提交
28 29
inline auto StoragePrototype(const std::string& path) {
    return make_storage(path,
X
Xu Peng 已提交
30
            make_table("Group",
X
Xu Peng 已提交
31 32 33
                      make_column("id", &GroupSchema::id, primary_key()),
                      make_column("group_id", &GroupSchema::group_id, unique()),
                      make_column("dimension", &GroupSchema::dimension),
34
                      make_column("created_on", &GroupSchema::created_on),
X
Xu Peng 已提交
35 36 37 38 39 40
                      make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))),
            make_table("GroupFile",
                      make_column("id", &GroupFileSchema::id, primary_key()),
                      make_column("group_id", &GroupFileSchema::group_id),
                      make_column("file_id", &GroupFileSchema::file_id),
                      make_column("file_type", &GroupFileSchema::file_type),
41
                      make_column("size", &GroupFileSchema::size, default_value(0)),
42
                      make_column("updated_time", &GroupFileSchema::updated_time),
43
                      make_column("created_on", &GroupFileSchema::created_on),
X
Xu Peng 已提交
44 45
                      make_column("date", &GroupFileSchema::date))
            );
X
Xu Peng 已提交
46 47 48

}

X
Xu Peng 已提交
49
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
50 51
static std::unique_ptr<ConnectorT> ConnectorPtr;

X
Xu Peng 已提交
52
std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
53
    return _options.path + "/tables/" + group_id;
X
Xu Peng 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
}

std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
    std::stringstream ss;
    ss << GetGroupPath(group_id) << "/" << date;
    return ss.str();
}

void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) {
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    std::stringstream ss;
    ss << GetGroupDatePartitionPath(group_file.group_id, group_file.date)
       << "/" << group_file.file_id;
    group_file.location = ss.str();
}

X
Xu Peng 已提交
72 73
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
    : _options(options_) {
X
Xu Peng 已提交
74 75 76 77
    initialize();
}

Status DBMetaImpl::initialize() {
X
Xu Peng 已提交
78
    if (!boost::filesystem::is_directory(_options.path)) {
79 80 81 82 83
        auto ret = boost::filesystem::create_directory(_options.path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << _options.path << " Error";
        }
        assert(ret);
X
Xu Peng 已提交
84
    }
X
Xu Peng 已提交
85

X
Xu Peng 已提交
86
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(_options.path+"/meta.sqlite"));
X
Xu Peng 已提交
87

X
Xu Peng 已提交
88
    ConnectorPtr->sync_schema();
89
    ConnectorPtr->open_forever(); // thread safe option
90
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
91

X
Xu Peng 已提交
92 93
    cleanup();

X
Xu Peng 已提交
94
    return Status::OK();
X
Xu Peng 已提交
95 96
}

X
Xu Peng 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110
// PXU TODO: Temp solution. Will fix later
Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
            const meta::DatesT& dates) {
    if (dates.size() == 0) {
        return Status::OK();
    }

    GroupSchema group_info;
    group_info.group_id = group_id;
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }

X
Xu Peng 已提交
111
    auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134

    for (auto& date : dates) {
        if (date >= yesterday) {
            return Status::Error("Could not delete partitions with 2 days");
        }
    }

    try {
        ConnectorPtr->update_all(
                    set(
                        c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                    ),
                    where(
                        c(&GroupFileSchema::group_id) == group_id and
                        in(&GroupFileSchema::date, dates)
                    ));
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }
    return Status::OK();
}

135 136 137 138 139 140 141 142 143
Status DBMetaImpl::add_group(GroupSchema& group_info) {
    if (group_info.group_id == "") {
        std::stringstream ss;
        SimpleIDGenerator g;
        ss << g.getNextIDNumber();
        group_info.group_id = ss.str();
    }
    group_info.files_cnt = 0;
    group_info.id = -1;
X
Xu Peng 已提交
144
    group_info.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
145

X
Xu Peng 已提交
146 147 148 149
    {
        try {
            auto id = ConnectorPtr->insert(group_info);
            group_info.id = id;
X
Xu Peng 已提交
150
            /* LOG(DEBUG) << "Add group " << id; */
X
Xu Peng 已提交
151 152 153
        } catch (...) {
            return Status::DBTransactionError("Add Group Error");
        }
X
Xu Peng 已提交
154
    }
155 156 157 158

    auto group_path = GetGroupPath(group_info.group_id);

    if (!boost::filesystem::is_directory(group_path)) {
159
        auto ret = boost::filesystem::create_directories(group_path);
160 161 162 163
        if (!ret) {
            LOG(ERROR) << "Create directory " << group_path << " Error";
        }
        assert(ret);
164 165
    }

X
Xu Peng 已提交
166
    return Status::OK();
X
Xu Peng 已提交
167 168
}

X
Xu Peng 已提交
169
Status DBMetaImpl::get_group(GroupSchema& group_info) {
X
Xu Peng 已提交
170 171 172 173
    return get_group_no_lock(group_info);
}

Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
    try {
        auto groups = ConnectorPtr->select(columns(&GroupSchema::id,
                                                  &GroupSchema::group_id,
                                                  &GroupSchema::files_cnt,
                                                  &GroupSchema::dimension),
                                          where(c(&GroupSchema::group_id) == group_info.group_id));
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            group_info.id = std::get<0>(groups[0]);
            group_info.files_cnt = std::get<2>(groups[0]);
            group_info.dimension = std::get<3>(groups[0]);
        } else {
            return Status::NotFound("Group " + group_info.group_id + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
191
    }
X
Xu Peng 已提交
192

X
Xu Peng 已提交
193
    return Status::OK();
X
Xu Peng 已提交
194 195
}

X
Xu Peng 已提交
196
Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
197 198 199 200 201 202 203 204 205 206 207 208
    try {
        auto groups = ConnectorPtr->select(columns(&GroupSchema::id),
                                          where(c(&GroupSchema::group_id) == group_id));
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
209
    }
X
Xu Peng 已提交
210
    return Status::OK();
X
Xu Peng 已提交
211 212
}

X
Xu Peng 已提交
213 214 215 216 217 218 219 220 221 222
Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    GroupSchema group_info;
    group_info.group_id = group_file.group_id;
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }
223 224

    SimpleIDGenerator g;
225 226
    std::stringstream ss;
    ss << g.getNextIDNumber();
X
Xu Peng 已提交
227
    group_file.file_type = GroupFileSchema::NEW;
228
    group_file.file_id = ss.str();
X
Xu Peng 已提交
229
    group_file.dimension = group_info.dimension;
230
    group_file.size = 0;
X
Xu Peng 已提交
231
    group_file.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
232
    group_file.updated_time = group_file.created_on;
X
Xu Peng 已提交
233
    GetGroupFilePath(group_file);
X
Xu Peng 已提交
234

X
Xu Peng 已提交
235 236 237 238
    {
        try {
            auto id = ConnectorPtr->insert(group_file);
            group_file.id = id;
X
Xu Peng 已提交
239
            /* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */
X
Xu Peng 已提交
240 241 242
        } catch (...) {
            return Status::DBTransactionError("Add file Error");
        }
X
Xu Peng 已提交
243
    }
244 245 246 247

    auto partition_path = GetGroupDatePartitionPath(group_file.group_id, group_file.date);

    if (!boost::filesystem::is_directory(partition_path)) {
248 249 250 251 252
        auto ret = boost::filesystem::create_directory(partition_path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << partition_path << " Error";
        }
        assert(ret);
253 254
    }

X
Xu Peng 已提交
255
    return Status::OK();
X
Xu Peng 已提交
256 257
}

X
Xu Peng 已提交
258 259
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
    files.clear();
X
Xu Peng 已提交
260

261 262 263 264 265
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
266
                                                   &GroupFileSchema::size,
267 268 269 270 271 272 273 274 275 276 277
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));

        std::map<std::string, GroupSchema> groups;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
278
            group_file.size = std::get<4>(file);
279 280 281 282 283 284 285 286 287 288 289
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            auto groupItr = groups.find(group_file.group_id);
            if (groupItr == groups.end()) {
                GroupSchema group_info;
                group_info.group_id = group_file.group_id;
                auto status = get_group_no_lock(group_info);
                if (!status.ok()) {
                    return status;
                }
                groups[group_file.group_id] = group_info;
X
Xu Peng 已提交
290
            }
291 292
            group_file.dimension = groups[group_file.group_id].dimension;
            files.push_back(group_file);
X
Xu Peng 已提交
293
        }
294 295 296
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
297
    }
X
Xu Peng 已提交
298

X
Xu Peng 已提交
299 300 301
    return Status::OK();
}

X
xj.lin 已提交
302
Status DBMetaImpl::files_to_search(const std::string &group_id,
X
Xu Peng 已提交
303
                                   const DatesT& partition,
X
xj.lin 已提交
304 305
                                   DatePartionedGroupFilesSchema &files) {
    files.clear();
X
Xu Peng 已提交
306 307 308
    DatesT today = {Meta::GetDate()};
    const DatesT& dates = (partition.empty() == true) ? today : partition;

309 310 311 312 313
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                     &GroupFileSchema::group_id,
                                                     &GroupFileSchema::file_id,
                                                     &GroupFileSchema::file_type,
314
                                                     &GroupFileSchema::size,
315 316
                                                     &GroupFileSchema::date),
                                             where(c(&GroupFileSchema::group_id) == group_id and
X
Xu Peng 已提交
317
                                                 in(&GroupFileSchema::date, dates) and
318
                                                 (c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or
319
                                                     c(&GroupFileSchema::file_type) == (int) GroupFileSchema::TO_INDEX or
320 321 322 323 324 325 326 327
                                                     c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX)));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
328

329 330 331 332 333 334
        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
335
            group_file.size = std::get<4>(file);
336 337 338 339 340 341 342 343
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
                files[group_file.date] = GroupFilesSchema();
            }
            files[group_file.date].push_back(group_file);
X
xj.lin 已提交
344
        }
345 346 347
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
xj.lin 已提交
348 349 350 351 352
    }

    return Status::OK();
}

X
Xu Peng 已提交
353 354 355
Status DBMetaImpl::files_to_merge(const std::string& group_id,
        DatePartionedGroupFilesSchema& files) {
    files.clear();
X
Xu Peng 已提交
356

357 358 359 360 361
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
362
                                                   &GroupFileSchema::size,
363 364 365 366 367 368 369 370 371 372
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and
                                                c(&GroupFileSchema::group_id) == group_id));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
373

374 375 376 377 378 379
        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
380
            group_file.size = std::get<4>(file);
381 382 383 384 385 386 387 388
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
                files[group_file.date] = GroupFilesSchema();
            }
            files[group_file.date].push_back(group_file);
X
Xu Peng 已提交
389
        }
390 391 392
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
393 394 395
    }

    return Status::OK();
X
Xu Peng 已提交
396 397
}

398 399 400
Status DBMetaImpl::has_group_file(const std::string& group_id_,
                              const std::string& file_id_,
                              bool& has_or_not_) {
X
Xu Peng 已提交
401
    //PXU TODO
X
Xu Peng 已提交
402
    return Status::OK();
X
Xu Peng 已提交
403 404
}

405 406 407
Status DBMetaImpl::get_group_file(const std::string& group_id_,
                              const std::string& file_id_,
                              GroupFileSchema& group_file_info_) {
X
Xu Peng 已提交
408 409 410 411 412
    try {
        auto files = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
413
                                                   &GroupFileSchema::size,
X
Xu Peng 已提交
414 415 416 417 418 419 420 421 422 423
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_id) == file_id_ and
                                                c(&GroupFileSchema::group_id) == group_id_
                                          ));
        assert(files.size() <= 1);
        if (files.size() == 1) {
            group_file_info_.id = std::get<0>(files[0]);
            group_file_info_.group_id = std::get<1>(files[0]);
            group_file_info_.file_id = std::get<2>(files[0]);
            group_file_info_.file_type = std::get<3>(files[0]);
424
            group_file_info_.size = std::get<4>(files[0]);
X
Xu Peng 已提交
425 426 427 428 429 430 431 432 433
            group_file_info_.date = std::get<5>(files[0]);
        } else {
            return Status::NotFound("GroupFile " + file_id_ + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

X
Xu Peng 已提交
434
    return Status::OK();
X
Xu Peng 已提交
435 436
}

437
Status DBMetaImpl::get_group_files(const std::string& group_id_,
438
                               const int date_delta_,
439
                               GroupFilesSchema& group_files_info_) {
X
Xu Peng 已提交
440
    // PXU TODO
X
Xu Peng 已提交
441
    return Status::OK();
X
Xu Peng 已提交
442 443
}

X
Xu Peng 已提交
444 445 446 447 448 449 450 451 452 453 454
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
    auto& criterias = _options.archive_conf.GetCriterias();
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == "days") {
X
Xu Peng 已提交
455
            long usecs = limit * D_SEC * US_PS;
456
            long now = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
457 458 459 460 461 462 463
            try
            {
                ConnectorPtr->update_all(
                        set(
                            c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                           ),
                        where(
464
                            c(&GroupFileSchema::created_on) < (long)(now - usecs) and
X
Xu Peng 已提交
465 466 467 468 469 470 471 472
                            c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
                            ));
            } catch (std::exception & e) {
                LOG(DEBUG) << e.what();
                throw e;
            }
        }
        if (criteria == "disk") {
X
Xu Peng 已提交
473 474 475
            long sum = 0;
            size(sum);

476 477
            // PXU TODO: refactor size
            auto to_delete = (sum - limit*G);
X
Xu Peng 已提交
478 479 480 481 482 483 484
            discard_files_of_size(to_delete);
        }
    }

    return Status::OK();
}

X
Xu Peng 已提交
485
Status DBMetaImpl::size(long& result) {
X
Xu Peng 已提交
486
    result = 0;
X
Xu Peng 已提交
487
    try {
488
        auto selected = ConnectorPtr->select(columns(sum(&GroupFileSchema::size)),
X
Xu Peng 已提交
489 490 491
                where(
                    c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
                    ));
X
Xu Peng 已提交
492 493 494 495 496

        for (auto& sub_query : selected) {
            if(!std::get<0>(sub_query)) {
                continue;
            }
497
            result += (long)(*std::get<0>(sub_query));
X
Xu Peng 已提交
498
        }
X
Xu Peng 已提交
499 500 501 502 503 504 505 506
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

    return Status::OK();
}

X
Xu Peng 已提交
507
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
508
    LOG(DEBUG) << "Abort to discard size=" << to_discard_size;
X
Xu Peng 已提交
509 510 511 512 513
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
514
                                                   &GroupFileSchema::size),
X
Xu Peng 已提交
515 516 517
                                          where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE),
                                          order_by(&GroupFileSchema::id),
                                          limit(10));
518 519 520 521 522 523
        std::vector<int> ids;

        for (auto& file : selected) {
            if (to_discard_size <= 0) break;
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
524
            group_file.size = std::get<1>(file);
525
            ids.push_back(group_file.id);
526 527
            LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size;
            to_discard_size -= group_file.size;
528 529 530 531 532 533 534 535 536 537 538 539 540
        }

        if (ids.size() == 0) {
            return Status::OK();
        }

        ConnectorPtr->update_all(
                    set(
                        c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                    ),
                    where(
                        in(&GroupFileSchema::id, ids)
                    ));
X
Xu Peng 已提交
541 542 543 544 545 546 547

    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }


548
    return discard_files_of_size(to_discard_size);
X
Xu Peng 已提交
549 550
}

551
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
X
Xu Peng 已提交
552
    group_file.updated_time = utils::GetMicroSecTimeStamp();
553
    try {
554
        ConnectorPtr->update(group_file);
555 556 557 558
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
        throw e;
X
Xu Peng 已提交
559
    }
X
Xu Peng 已提交
560
    return Status::OK();
X
Xu Peng 已提交
561 562
}

563
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
564 565 566
    try {
        auto commited = ConnectorPtr->transaction([&] () mutable {
            for (auto& file : files) {
X
Xu Peng 已提交
567
                file.updated_time = utils::GetMicroSecTimeStamp();
568 569 570 571 572 573
                ConnectorPtr->update(file);
            }
            return true;
        });
        if (!commited) {
            return Status::DBTransactionError("Update files Error");
X
Xu Peng 已提交
574
        }
575 576 577
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
578
    }
579 580 581
    return Status::OK();
}

X
Xu Peng 已提交
582
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
X
Xu Peng 已提交
583
    auto now = utils::GetMicroSecTimeStamp();
584 585 586 587 588
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
589
                                                   &GroupFileSchema::size,
590 591
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and
X
Xu Peng 已提交
592
                                                c(&GroupFileSchema::updated_time) > now - seconds*US_PS));
593 594 595 596 597 598 599 600 601

        GroupFilesSchema updated;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
602
            group_file.size = std::get<4>(file);
603 604 605 606 607 608
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            if (group_file.file_type == GroupFileSchema::TO_DELETE) {
                boost::filesystem::remove(group_file.location);
            }
            ConnectorPtr->remove<GroupFileSchema>(group_file.id);
X
Xu Peng 已提交
609
            /* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
610
        }
611 612 613
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
614 615 616 617 618
    }

    return Status::OK();
}

X
Xu Peng 已提交
619
Status DBMetaImpl::cleanup() {
620 621 622 623 624
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
625
                                                   &GroupFileSchema::size,
626 627 628 629 630 631 632 633 634 635 636 637
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or
                                                c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW));

        GroupFilesSchema updated;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
638
            group_file.size = std::get<4>(file);
639 640 641 642 643 644
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            if (group_file.file_type == GroupFileSchema::TO_DELETE) {
                boost::filesystem::remove(group_file.location);
            }
            ConnectorPtr->remove<GroupFileSchema>(group_file.id);
X
Xu Peng 已提交
645
            /* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
646
        }
647 648 649
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
650 651 652 653 654
    }

    return Status::OK();
}

X
Xu Peng 已提交
655 656
Status DBMetaImpl::count(const std::string& group_id, long& result) {

657
    try {
658
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size,
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
                                                   &GroupFileSchema::date),
                                          where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or
                                                 c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or
                                                 c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and
                                                c(&GroupFileSchema::group_id) == group_id));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }

        result = 0;
        for (auto& file : selected) {
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
676

677
        result /= group_info.dimension;
X
Xu Peng 已提交
678

679 680 681
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
682 683 684 685
    }
    return Status::OK();
}

X
Xu Peng 已提交
686 687 688 689 690 691 692
Status DBMetaImpl::drop_all() {
    if (boost::filesystem::is_directory(_options.path)) {
        boost::filesystem::remove_all(_options.path);
    }
    return Status::OK();
}

X
Xu Peng 已提交
693 694 695 696
DBMetaImpl::~DBMetaImpl() {
    cleanup();
}

697
} // namespace meta
X
Xu Peng 已提交
698 699 700
} // namespace engine
} // namespace vecwise
} // namespace zilliz