DBMetaImpl.cpp 25.1 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
6

X
Xu Peng 已提交
7
#include <unistd.h>
X
Xu Peng 已提交
8 9
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
10
#include <boost/filesystem.hpp>
11
#include <chrono>
X
Xu Peng 已提交
12
#include <fstream>
13
#include <sqlite_orm.h>
14
#include <easylogging++.h>
X
Xu Peng 已提交
15

X
Xu Peng 已提交
16 17
#include "DBMetaImpl.h"
#include "IDGenerator.h"
X
Xu Peng 已提交
18
#include "Utils.h"
X
Xu Peng 已提交
19
#include "MetaConsts.h"
X
Xu Peng 已提交
20 21 22 23

namespace zilliz {
namespace vecwise {
namespace engine {
24
namespace meta {
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26 27
using namespace sqlite_orm;

X
Xu Peng 已提交
28 29
inline auto StoragePrototype(const std::string& path) {
    return make_storage(path,
X
Xu Peng 已提交
30
            make_table("Group",
X
Xu Peng 已提交
31 32 33
                      make_column("id", &GroupSchema::id, primary_key()),
                      make_column("group_id", &GroupSchema::group_id, unique()),
                      make_column("dimension", &GroupSchema::dimension),
34
                      make_column("created_on", &GroupSchema::created_on),
X
Xu Peng 已提交
35 36 37 38 39 40
                      make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))),
            make_table("GroupFile",
                      make_column("id", &GroupFileSchema::id, primary_key()),
                      make_column("group_id", &GroupFileSchema::group_id),
                      make_column("file_id", &GroupFileSchema::file_id),
                      make_column("file_type", &GroupFileSchema::file_type),
41
                      make_column("size", &GroupFileSchema::size, default_value(0)),
42
                      make_column("updated_time", &GroupFileSchema::updated_time),
43
                      make_column("created_on", &GroupFileSchema::created_on),
X
Xu Peng 已提交
44 45
                      make_column("date", &GroupFileSchema::date))
            );
X
Xu Peng 已提交
46 47 48

}

X
Xu Peng 已提交
49
using ConnectorT = decltype(StoragePrototype(""));
X
Xu Peng 已提交
50 51
static std::unique_ptr<ConnectorT> ConnectorPtr;

X
Xu Peng 已提交
52
std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
53
    return _options.path + "/tables/" + group_id;
X
Xu Peng 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
}

std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
    std::stringstream ss;
    ss << GetGroupPath(group_id) << "/" << date;
    return ss.str();
}

void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) {
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    std::stringstream ss;
    ss << GetGroupDatePartitionPath(group_file.group_id, group_file.date)
       << "/" << group_file.file_id;
    group_file.location = ss.str();
}

72 73 74 75 76 77 78 79
Status DBMetaImpl::NextGroupId(std::string& group_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.getNextIDNumber();
    group_id = ss.str();
    return Status::OK();
}

X
Xu Peng 已提交
80 81 82 83 84 85 86 87
Status DBMetaImpl::NextFileId(std::string& file_id) {
    std::stringstream ss;
    SimpleIDGenerator g;
    ss << g.getNextIDNumber();
    file_id = ss.str();
    return Status::OK();
}

X
Xu Peng 已提交
88 89
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
    : _options(options_) {
X
Xu Peng 已提交
90 91 92 93
    initialize();
}

Status DBMetaImpl::initialize() {
X
Xu Peng 已提交
94
    if (!boost::filesystem::is_directory(_options.path)) {
95 96 97 98 99
        auto ret = boost::filesystem::create_directory(_options.path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << _options.path << " Error";
        }
        assert(ret);
X
Xu Peng 已提交
100
    }
X
Xu Peng 已提交
101

X
Xu Peng 已提交
102
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(_options.path+"/meta.sqlite"));
X
Xu Peng 已提交
103

X
Xu Peng 已提交
104
    ConnectorPtr->sync_schema();
105
    ConnectorPtr->open_forever(); // thread safe option
106
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
107

X
Xu Peng 已提交
108 109
    cleanup();

X
Xu Peng 已提交
110
    return Status::OK();
X
Xu Peng 已提交
111 112
}

X
Xu Peng 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126
// PXU TODO: Temp solution. Will fix later
Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
            const meta::DatesT& dates) {
    if (dates.size() == 0) {
        return Status::OK();
    }

    GroupSchema group_info;
    group_info.group_id = group_id;
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }

X
Xu Peng 已提交
127
    auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150

    for (auto& date : dates) {
        if (date >= yesterday) {
            return Status::Error("Could not delete partitions with 2 days");
        }
    }

    try {
        ConnectorPtr->update_all(
                    set(
                        c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                    ),
                    where(
                        c(&GroupFileSchema::group_id) == group_id and
                        in(&GroupFileSchema::date, dates)
                    ));
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }
    return Status::OK();
}

151 152
Status DBMetaImpl::add_group(GroupSchema& group_info) {
    if (group_info.group_id == "") {
153
        NextGroupId(group_info.group_id);
154 155 156
    }
    group_info.files_cnt = 0;
    group_info.id = -1;
X
Xu Peng 已提交
157
    group_info.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
158

X
Xu Peng 已提交
159 160 161 162 163 164 165
    {
        try {
            auto id = ConnectorPtr->insert(group_info);
            group_info.id = id;
        } catch (...) {
            return Status::DBTransactionError("Add Group Error");
        }
X
Xu Peng 已提交
166
    }
167 168 169 170

    auto group_path = GetGroupPath(group_info.group_id);

    if (!boost::filesystem::is_directory(group_path)) {
171
        auto ret = boost::filesystem::create_directories(group_path);
172 173 174 175
        if (!ret) {
            LOG(ERROR) << "Create directory " << group_path << " Error";
        }
        assert(ret);
176 177
    }

X
Xu Peng 已提交
178
    return Status::OK();
X
Xu Peng 已提交
179 180
}

X
Xu Peng 已提交
181
Status DBMetaImpl::get_group(GroupSchema& group_info) {
X
Xu Peng 已提交
182 183 184 185
    return get_group_no_lock(group_info);
}

Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
    try {
        auto groups = ConnectorPtr->select(columns(&GroupSchema::id,
                                                  &GroupSchema::group_id,
                                                  &GroupSchema::files_cnt,
                                                  &GroupSchema::dimension),
                                          where(c(&GroupSchema::group_id) == group_info.group_id));
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            group_info.id = std::get<0>(groups[0]);
            group_info.files_cnt = std::get<2>(groups[0]);
            group_info.dimension = std::get<3>(groups[0]);
        } else {
            return Status::NotFound("Group " + group_info.group_id + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
203
    }
X
Xu Peng 已提交
204

X
Xu Peng 已提交
205
    return Status::OK();
X
Xu Peng 已提交
206 207
}

X
Xu Peng 已提交
208
Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
209 210 211 212 213 214 215 216 217 218 219 220
    try {
        auto groups = ConnectorPtr->select(columns(&GroupSchema::id),
                                          where(c(&GroupSchema::group_id) == group_id));
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
221
    }
X
Xu Peng 已提交
222
    return Status::OK();
X
Xu Peng 已提交
223 224
}

X
Xu Peng 已提交
225 226 227 228 229 230 231 232 233 234
Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    GroupSchema group_info;
    group_info.group_id = group_file.group_id;
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }
235

X
Xu Peng 已提交
236
    NextFileId(group_file.file_id);
X
Xu Peng 已提交
237 238
    group_file.file_type = GroupFileSchema::NEW;
    group_file.dimension = group_info.dimension;
239
    group_file.size = 0;
X
Xu Peng 已提交
240
    group_file.created_on = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
241
    group_file.updated_time = group_file.created_on;
X
Xu Peng 已提交
242
    GetGroupFilePath(group_file);
X
Xu Peng 已提交
243

X
Xu Peng 已提交
244 245 246 247 248 249 250
    {
        try {
            auto id = ConnectorPtr->insert(group_file);
            group_file.id = id;
        } catch (...) {
            return Status::DBTransactionError("Add file Error");
        }
X
Xu Peng 已提交
251
    }
252 253 254 255

    auto partition_path = GetGroupDatePartitionPath(group_file.group_id, group_file.date);

    if (!boost::filesystem::is_directory(partition_path)) {
256 257 258 259 260
        auto ret = boost::filesystem::create_directory(partition_path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << partition_path << " Error";
        }
        assert(ret);
261 262
    }

X
Xu Peng 已提交
263
    return Status::OK();
X
Xu Peng 已提交
264 265
}

X
Xu Peng 已提交
266 267
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
    files.clear();
X
Xu Peng 已提交
268

269 270 271 272 273
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
274
                                                   &GroupFileSchema::size,
275 276 277 278 279 280 281 282 283 284 285
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));

        std::map<std::string, GroupSchema> groups;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
286
            group_file.size = std::get<4>(file);
287 288 289 290 291 292 293 294 295 296 297
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            auto groupItr = groups.find(group_file.group_id);
            if (groupItr == groups.end()) {
                GroupSchema group_info;
                group_info.group_id = group_file.group_id;
                auto status = get_group_no_lock(group_info);
                if (!status.ok()) {
                    return status;
                }
                groups[group_file.group_id] = group_info;
X
Xu Peng 已提交
298
            }
299 300
            group_file.dimension = groups[group_file.group_id].dimension;
            files.push_back(group_file);
X
Xu Peng 已提交
301
        }
302 303 304
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
305
    }
X
Xu Peng 已提交
306

X
Xu Peng 已提交
307 308 309
    return Status::OK();
}

X
xj.lin 已提交
310
Status DBMetaImpl::files_to_search(const std::string &group_id,
X
Xu Peng 已提交
311
                                   const DatesT& partition,
X
xj.lin 已提交
312 313
                                   DatePartionedGroupFilesSchema &files) {
    files.clear();
X
Xu Peng 已提交
314 315 316
    DatesT today = {Meta::GetDate()};
    const DatesT& dates = (partition.empty() == true) ? today : partition;

317 318 319 320 321
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                     &GroupFileSchema::group_id,
                                                     &GroupFileSchema::file_id,
                                                     &GroupFileSchema::file_type,
322
                                                     &GroupFileSchema::size,
323 324
                                                     &GroupFileSchema::date),
                                             where(c(&GroupFileSchema::group_id) == group_id and
X
Xu Peng 已提交
325
                                                 in(&GroupFileSchema::date, dates) and
326
                                                 (c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or
327
                                                     c(&GroupFileSchema::file_type) == (int) GroupFileSchema::TO_INDEX or
328 329 330 331 332 333 334 335
                                                     c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX)));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
336

337 338 339 340 341 342
        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
343
            group_file.size = std::get<4>(file);
344 345 346 347 348 349 350 351
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
                files[group_file.date] = GroupFilesSchema();
            }
            files[group_file.date].push_back(group_file);
X
xj.lin 已提交
352
        }
353 354 355
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
xj.lin 已提交
356 357 358 359 360
    }

    return Status::OK();
}

X
Xu Peng 已提交
361 362 363
Status DBMetaImpl::files_to_merge(const std::string& group_id,
        DatePartionedGroupFilesSchema& files) {
    files.clear();
X
Xu Peng 已提交
364

365 366 367 368 369
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
370
                                                   &GroupFileSchema::size,
371 372 373 374 375 376 377 378 379 380
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and
                                                c(&GroupFileSchema::group_id) == group_id));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
381

382 383 384 385 386 387
        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
388
            group_file.size = std::get<4>(file);
389 390 391 392 393 394 395 396
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
                files[group_file.date] = GroupFilesSchema();
            }
            files[group_file.date].push_back(group_file);
X
Xu Peng 已提交
397
        }
398 399 400
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
401 402 403
    }

    return Status::OK();
X
Xu Peng 已提交
404 405
}

406 407 408
Status DBMetaImpl::has_group_file(const std::string& group_id_,
                              const std::string& file_id_,
                              bool& has_or_not_) {
X
Xu Peng 已提交
409
    //PXU TODO
X
Xu Peng 已提交
410
    return Status::OK();
X
Xu Peng 已提交
411 412
}

413 414 415
Status DBMetaImpl::get_group_file(const std::string& group_id_,
                              const std::string& file_id_,
                              GroupFileSchema& group_file_info_) {
X
Xu Peng 已提交
416 417 418 419 420
    try {
        auto files = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
421
                                                   &GroupFileSchema::size,
X
Xu Peng 已提交
422 423 424 425 426 427 428 429 430 431
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_id) == file_id_ and
                                                c(&GroupFileSchema::group_id) == group_id_
                                          ));
        assert(files.size() <= 1);
        if (files.size() == 1) {
            group_file_info_.id = std::get<0>(files[0]);
            group_file_info_.group_id = std::get<1>(files[0]);
            group_file_info_.file_id = std::get<2>(files[0]);
            group_file_info_.file_type = std::get<3>(files[0]);
432
            group_file_info_.size = std::get<4>(files[0]);
X
Xu Peng 已提交
433 434 435 436 437 438 439 440 441
            group_file_info_.date = std::get<5>(files[0]);
        } else {
            return Status::NotFound("GroupFile " + file_id_ + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

X
Xu Peng 已提交
442
    return Status::OK();
X
Xu Peng 已提交
443 444
}

445
Status DBMetaImpl::get_group_files(const std::string& group_id_,
446
                               const int date_delta_,
447
                               GroupFilesSchema& group_files_info_) {
X
Xu Peng 已提交
448
    // PXU TODO
X
Xu Peng 已提交
449
    return Status::OK();
X
Xu Peng 已提交
450 451
}

X
Xu Peng 已提交
452 453 454 455 456 457 458 459 460 461 462
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
    auto& criterias = _options.archive_conf.GetCriterias();
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == "days") {
X
Xu Peng 已提交
463
            long usecs = limit * D_SEC * US_PS;
464
            long now = utils::GetMicroSecTimeStamp();
X
Xu Peng 已提交
465 466 467 468 469 470 471
            try
            {
                ConnectorPtr->update_all(
                        set(
                            c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                           ),
                        where(
472
                            c(&GroupFileSchema::created_on) < (long)(now - usecs) and
X
Xu Peng 已提交
473 474 475 476 477 478 479 480
                            c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
                            ));
            } catch (std::exception & e) {
                LOG(DEBUG) << e.what();
                throw e;
            }
        }
        if (criteria == "disk") {
X
Xu Peng 已提交
481 482 483
            long sum = 0;
            size(sum);

484 485
            // PXU TODO: refactor size
            auto to_delete = (sum - limit*G);
X
Xu Peng 已提交
486 487 488 489 490 491 492
            discard_files_of_size(to_delete);
        }
    }

    return Status::OK();
}

X
Xu Peng 已提交
493
Status DBMetaImpl::size(long& result) {
X
Xu Peng 已提交
494
    result = 0;
X
Xu Peng 已提交
495
    try {
496
        auto selected = ConnectorPtr->select(columns(sum(&GroupFileSchema::size)),
X
Xu Peng 已提交
497 498 499
                where(
                    c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
                    ));
X
Xu Peng 已提交
500 501 502 503 504

        for (auto& sub_query : selected) {
            if(!std::get<0>(sub_query)) {
                continue;
            }
505
            result += (long)(*std::get<0>(sub_query));
X
Xu Peng 已提交
506
        }
X
Xu Peng 已提交
507 508 509 510 511 512 513 514
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

    return Status::OK();
}

X
Xu Peng 已提交
515
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
516
    LOG(DEBUG) << "Abort to discard size=" << to_discard_size;
X
Xu Peng 已提交
517 518 519 520 521
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
522
                                                   &GroupFileSchema::size),
X
Xu Peng 已提交
523 524 525
                                          where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE),
                                          order_by(&GroupFileSchema::id),
                                          limit(10));
526 527 528 529 530 531
        std::vector<int> ids;

        for (auto& file : selected) {
            if (to_discard_size <= 0) break;
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
532
            group_file.size = std::get<1>(file);
533
            ids.push_back(group_file.id);
534 535
            LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size;
            to_discard_size -= group_file.size;
536 537 538 539 540 541 542 543 544 545 546 547 548
        }

        if (ids.size() == 0) {
            return Status::OK();
        }

        ConnectorPtr->update_all(
                    set(
                        c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                    ),
                    where(
                        in(&GroupFileSchema::id, ids)
                    ));
X
Xu Peng 已提交
549 550 551 552 553 554 555

    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }


556
    return discard_files_of_size(to_discard_size);
X
Xu Peng 已提交
557 558
}

559
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
X
Xu Peng 已提交
560
    group_file.updated_time = utils::GetMicroSecTimeStamp();
561
    try {
562
        ConnectorPtr->update(group_file);
563 564 565 566
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
        throw e;
X
Xu Peng 已提交
567
    }
X
Xu Peng 已提交
568
    return Status::OK();
X
Xu Peng 已提交
569 570
}

571
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
572 573 574
    try {
        auto commited = ConnectorPtr->transaction([&] () mutable {
            for (auto& file : files) {
X
Xu Peng 已提交
575
                file.updated_time = utils::GetMicroSecTimeStamp();
576 577 578 579 580 581
                ConnectorPtr->update(file);
            }
            return true;
        });
        if (!commited) {
            return Status::DBTransactionError("Update files Error");
X
Xu Peng 已提交
582
        }
583 584 585
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
586
    }
587 588 589
    return Status::OK();
}

X
Xu Peng 已提交
590
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
X
Xu Peng 已提交
591
    auto now = utils::GetMicroSecTimeStamp();
592 593 594 595 596
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
597
                                                   &GroupFileSchema::size,
598 599
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and
X
Xu Peng 已提交
600
                                                c(&GroupFileSchema::updated_time) > now - seconds*US_PS));
601 602 603 604 605 606 607 608 609

        GroupFilesSchema updated;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
610
            group_file.size = std::get<4>(file);
611 612 613 614 615 616
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            if (group_file.file_type == GroupFileSchema::TO_DELETE) {
                boost::filesystem::remove(group_file.location);
            }
            ConnectorPtr->remove<GroupFileSchema>(group_file.id);
X
Xu Peng 已提交
617
            /* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
618
        }
619 620 621
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
622 623 624 625 626
    }

    return Status::OK();
}

X
Xu Peng 已提交
627
Status DBMetaImpl::cleanup() {
628 629 630 631 632
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
633
                                                   &GroupFileSchema::size,
634 635 636 637 638 639 640 641 642 643 644 645
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or
                                                c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW));

        GroupFilesSchema updated;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
646
            group_file.size = std::get<4>(file);
647 648 649 650 651 652
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            if (group_file.file_type == GroupFileSchema::TO_DELETE) {
                boost::filesystem::remove(group_file.location);
            }
            ConnectorPtr->remove<GroupFileSchema>(group_file.id);
X
Xu Peng 已提交
653
            /* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
654
        }
655 656 657
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
658 659 660 661 662
    }

    return Status::OK();
}

X
Xu Peng 已提交
663 664
Status DBMetaImpl::count(const std::string& group_id, long& result) {

665
    try {
666
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size,
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
                                                   &GroupFileSchema::date),
                                          where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or
                                                 c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or
                                                 c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and
                                                c(&GroupFileSchema::group_id) == group_id));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }

        result = 0;
        for (auto& file : selected) {
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
684

685
        result /= group_info.dimension;
X
Xu Peng 已提交
686

687 688 689
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
690 691 692 693
    }
    return Status::OK();
}

X
Xu Peng 已提交
694 695 696 697 698 699 700
Status DBMetaImpl::drop_all() {
    if (boost::filesystem::is_directory(_options.path)) {
        boost::filesystem::remove_all(_options.path);
    }
    return Status::OK();
}

X
Xu Peng 已提交
701 702 703 704
DBMetaImpl::~DBMetaImpl() {
    cleanup();
}

705
} // namespace meta
X
Xu Peng 已提交
706 707 708
} // namespace engine
} // namespace vecwise
} // namespace zilliz