DBMetaImpl.cpp 24.9 KB
Newer Older
X
Xu Peng 已提交
1
/*******************************************************************************
2
 * long rows = 3*1024*1024*1024;
X
Xu Peng 已提交
3 4 5 6
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
Xu Peng 已提交
7

X
Xu Peng 已提交
8
#include <unistd.h>
X
Xu Peng 已提交
9 10
#include <sstream>
#include <iostream>
X
Xu Peng 已提交
11
#include <boost/filesystem.hpp>
12
#include <chrono>
X
Xu Peng 已提交
13
#include <fstream>
14
#include <sqlite_orm.h>
15
#include <easylogging++.h>
X
Xu Peng 已提交
16 17
#include "DBMetaImpl.h"
#include "IDGenerator.h"
X
Xu Peng 已提交
18 19 20 21

namespace zilliz {
namespace vecwise {
namespace engine {
22
namespace meta {
X
Xu Peng 已提交
23

X
Xu Peng 已提交
24 25
using namespace sqlite_orm;

X
Xu Peng 已提交
26 27
inline auto StoragePrototype(const std::string& path) {
    return make_storage(path,
X
Xu Peng 已提交
28
            make_table("Group",
X
Xu Peng 已提交
29 30 31
                      make_column("id", &GroupSchema::id, primary_key()),
                      make_column("group_id", &GroupSchema::group_id, unique()),
                      make_column("dimension", &GroupSchema::dimension),
X
Xu Peng 已提交
32 33 34 35 36 37 38
                      make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))),
            make_table("GroupFile",
                      make_column("id", &GroupFileSchema::id, primary_key()),
                      make_column("group_id", &GroupFileSchema::group_id),
                      make_column("file_id", &GroupFileSchema::file_id),
                      make_column("file_type", &GroupFileSchema::file_type),
                      make_column("rows", &GroupFileSchema::rows, default_value(0)),
39
                      make_column("updated_time", &GroupFileSchema::updated_time),
X
Xu Peng 已提交
40 41
                      make_column("date", &GroupFileSchema::date))
            );
X
Xu Peng 已提交
42 43 44

}

X
Xu Peng 已提交
45
using ConnectorT = decltype(StoragePrototype("/tmp/dummy.sqlite3"));
X
Xu Peng 已提交
46 47
static std::unique_ptr<ConnectorT> ConnectorPtr;

X
Xu Peng 已提交
48 49 50 51 52 53 54
long GetFileSize(const std::string& filename)
{
    struct stat stat_buf;
    int rc = stat(filename.c_str(), &stat_buf);
    return rc == 0 ? stat_buf.st_size : -1;
}

X
Xu Peng 已提交
55 56 57 58
std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
    return _options.path + "/" + group_id;
}

59 60 61 62 63 64 65 66
long DBMetaImpl::GetMicroSecTimeStamp() {
    auto now = std::chrono::system_clock::now();
    auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
            now.time_since_epoch()).count();

    return micros;
}

X
Xu Peng 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
    std::stringstream ss;
    ss << GetGroupPath(group_id) << "/" << date;
    return ss.str();
}

void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) {
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    std::stringstream ss;
    ss << GetGroupDatePartitionPath(group_file.group_id, group_file.date)
       << "/" << group_file.file_id;
    group_file.location = ss.str();
}

X
Xu Peng 已提交
83 84
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
    : _options(options_) {
X
Xu Peng 已提交
85 86 87 88
    initialize();
}

Status DBMetaImpl::initialize() {
X
Xu Peng 已提交
89
    if (!boost::filesystem::is_directory(_options.path)) {
90 91 92 93 94
        auto ret = boost::filesystem::create_directory(_options.path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << _options.path << " Error";
        }
        assert(ret);
X
Xu Peng 已提交
95
    }
X
Xu Peng 已提交
96

X
Xu Peng 已提交
97
    ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(_options.path+"/meta.sqlite"));
X
Xu Peng 已提交
98

X
Xu Peng 已提交
99
    ConnectorPtr->sync_schema();
100
    ConnectorPtr->open_forever(); // thread safe option
101
    ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
X
Xu Peng 已提交
102

X
Xu Peng 已提交
103 104
    cleanup();

X
Xu Peng 已提交
105
    return Status::OK();
X
Xu Peng 已提交
106 107
}

X
Xu Peng 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121
// PXU TODO: Temp solution. Will fix later
Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
            const meta::DatesT& dates) {
    if (dates.size() == 0) {
        return Status::OK();
    }

    GroupSchema group_info;
    group_info.group_id = group_id;
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }

X
Xu Peng 已提交
122
    auto yesterday = GetDateWithDelta(-1);
X
Xu Peng 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145

    for (auto& date : dates) {
        if (date >= yesterday) {
            return Status::Error("Could not delete partitions with 2 days");
        }
    }

    try {
        ConnectorPtr->update_all(
                    set(
                        c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                    ),
                    where(
                        c(&GroupFileSchema::group_id) == group_id and
                        in(&GroupFileSchema::date, dates)
                    ));
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }
    return Status::OK();
}

146 147 148 149 150 151 152 153 154
Status DBMetaImpl::add_group(GroupSchema& group_info) {
    if (group_info.group_id == "") {
        std::stringstream ss;
        SimpleIDGenerator g;
        ss << g.getNextIDNumber();
        group_info.group_id = ss.str();
    }
    group_info.files_cnt = 0;
    group_info.id = -1;
X
Xu Peng 已提交
155
    group_info.created_on = GetMicroSecTimeStamp();
X
Xu Peng 已提交
156

X
Xu Peng 已提交
157 158 159 160
    {
        try {
            auto id = ConnectorPtr->insert(group_info);
            group_info.id = id;
X
Xu Peng 已提交
161
            /* LOG(DEBUG) << "Add group " << id; */
X
Xu Peng 已提交
162 163 164
        } catch (...) {
            return Status::DBTransactionError("Add Group Error");
        }
X
Xu Peng 已提交
165
    }
166 167 168 169

    auto group_path = GetGroupPath(group_info.group_id);

    if (!boost::filesystem::is_directory(group_path)) {
170 171 172 173 174
        auto ret = boost::filesystem::create_directory(group_path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << group_path << " Error";
        }
        assert(ret);
175 176
    }

X
Xu Peng 已提交
177
    return Status::OK();
X
Xu Peng 已提交
178 179
}

X
Xu Peng 已提交
180
Status DBMetaImpl::get_group(GroupSchema& group_info) {
X
Xu Peng 已提交
181 182 183 184
    return get_group_no_lock(group_info);
}

Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
    try {
        auto groups = ConnectorPtr->select(columns(&GroupSchema::id,
                                                  &GroupSchema::group_id,
                                                  &GroupSchema::files_cnt,
                                                  &GroupSchema::dimension),
                                          where(c(&GroupSchema::group_id) == group_info.group_id));
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            group_info.id = std::get<0>(groups[0]);
            group_info.files_cnt = std::get<2>(groups[0]);
            group_info.dimension = std::get<3>(groups[0]);
        } else {
            return Status::NotFound("Group " + group_info.group_id + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
202
    }
X
Xu Peng 已提交
203

X
Xu Peng 已提交
204
    return Status::OK();
X
Xu Peng 已提交
205 206
}

X
Xu Peng 已提交
207
Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
208 209 210 211 212 213 214 215 216 217 218 219
    try {
        auto groups = ConnectorPtr->select(columns(&GroupSchema::id),
                                          where(c(&GroupSchema::group_id) == group_id));
        assert(groups.size() <= 1);
        if (groups.size() == 1) {
            has_or_not = true;
        } else {
            has_or_not = false;
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
220
    }
X
Xu Peng 已提交
221
    return Status::OK();
X
Xu Peng 已提交
222 223
}

X
Xu Peng 已提交
224 225 226 227 228 229 230 231 232 233
Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
    if (group_file.date == EmptyDate) {
        group_file.date = Meta::GetDate();
    }
    GroupSchema group_info;
    group_info.group_id = group_file.group_id;
    auto status = get_group(group_info);
    if (!status.ok()) {
        return status;
    }
234 235

    SimpleIDGenerator g;
236 237
    std::stringstream ss;
    ss << g.getNextIDNumber();
X
Xu Peng 已提交
238
    group_file.file_type = GroupFileSchema::NEW;
239
    group_file.file_id = ss.str();
X
Xu Peng 已提交
240
    group_file.dimension = group_info.dimension;
X
Xu Peng 已提交
241
    group_file.rows = 0;
X
Xu Peng 已提交
242 243
    group_file.created_on = GetMicroSecTimeStamp();
    group_file.updated_time = group_file.created_on;
X
Xu Peng 已提交
244
    GetGroupFilePath(group_file);
X
Xu Peng 已提交
245

X
Xu Peng 已提交
246 247 248 249
    {
        try {
            auto id = ConnectorPtr->insert(group_file);
            group_file.id = id;
X
Xu Peng 已提交
250
            /* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */
X
Xu Peng 已提交
251 252 253
        } catch (...) {
            return Status::DBTransactionError("Add file Error");
        }
X
Xu Peng 已提交
254
    }
255 256 257 258

    auto partition_path = GetGroupDatePartitionPath(group_file.group_id, group_file.date);

    if (!boost::filesystem::is_directory(partition_path)) {
259 260 261 262 263
        auto ret = boost::filesystem::create_directory(partition_path);
        if (!ret) {
            LOG(ERROR) << "Create directory " << partition_path << " Error";
        }
        assert(ret);
264 265
    }

X
Xu Peng 已提交
266
    return Status::OK();
X
Xu Peng 已提交
267 268
}

X
Xu Peng 已提交
269 270
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
    files.clear();
X
Xu Peng 已提交
271

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
                                                   &GroupFileSchema::rows,
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));

        std::map<std::string, GroupSchema> groups;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
            group_file.rows = std::get<4>(file);
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            auto groupItr = groups.find(group_file.group_id);
            if (groupItr == groups.end()) {
                GroupSchema group_info;
                group_info.group_id = group_file.group_id;
                auto status = get_group_no_lock(group_info);
                if (!status.ok()) {
                    return status;
                }
                groups[group_file.group_id] = group_info;
X
Xu Peng 已提交
301
            }
302 303
            group_file.dimension = groups[group_file.group_id].dimension;
            files.push_back(group_file);
X
Xu Peng 已提交
304
        }
305 306 307
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
308
    }
X
Xu Peng 已提交
309

X
Xu Peng 已提交
310 311 312
    return Status::OK();
}

X
xj.lin 已提交
313
Status DBMetaImpl::files_to_search(const std::string &group_id,
X
Xu Peng 已提交
314
                                   const DatesT& partition,
X
xj.lin 已提交
315 316
                                   DatePartionedGroupFilesSchema &files) {
    files.clear();
X
Xu Peng 已提交
317 318 319
    DatesT today = {Meta::GetDate()};
    const DatesT& dates = (partition.empty() == true) ? today : partition;

320 321 322 323 324 325 326 327
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                     &GroupFileSchema::group_id,
                                                     &GroupFileSchema::file_id,
                                                     &GroupFileSchema::file_type,
                                                     &GroupFileSchema::rows,
                                                     &GroupFileSchema::date),
                                             where(c(&GroupFileSchema::group_id) == group_id and
X
Xu Peng 已提交
328
                                                 in(&GroupFileSchema::date, dates) and
329
                                                 (c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or
330
                                                     c(&GroupFileSchema::file_type) == (int) GroupFileSchema::TO_INDEX or
331 332 333 334 335 336 337 338
                                                     c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX)));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
xj.lin 已提交
339

340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
            group_file.rows = std::get<4>(file);
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
                files[group_file.date] = GroupFilesSchema();
            }
            files[group_file.date].push_back(group_file);
X
xj.lin 已提交
355
        }
356 357 358
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
xj.lin 已提交
359 360 361 362 363
    }

    return Status::OK();
}

X
Xu Peng 已提交
364 365 366
Status DBMetaImpl::files_to_merge(const std::string& group_id,
        DatePartionedGroupFilesSchema& files) {
    files.clear();
X
Xu Peng 已提交
367

368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
                                                   &GroupFileSchema::rows,
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and
                                                c(&GroupFileSchema::group_id) == group_id));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }
X
Xu Peng 已提交
384

385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
            group_file.rows = std::get<4>(file);
            group_file.date = std::get<5>(file);
            group_file.dimension = group_info.dimension;
            GetGroupFilePath(group_file);
            auto dateItr = files.find(group_file.date);
            if (dateItr == files.end()) {
                files[group_file.date] = GroupFilesSchema();
            }
            files[group_file.date].push_back(group_file);
X
Xu Peng 已提交
400
        }
401 402 403
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
404 405 406
    }

    return Status::OK();
X
Xu Peng 已提交
407 408
}

409 410 411
Status DBMetaImpl::has_group_file(const std::string& group_id_,
                              const std::string& file_id_,
                              bool& has_or_not_) {
X
Xu Peng 已提交
412
    //PXU TODO
X
Xu Peng 已提交
413
    return Status::OK();
X
Xu Peng 已提交
414 415
}

416 417 418
Status DBMetaImpl::get_group_file(const std::string& group_id_,
                              const std::string& file_id_,
                              GroupFileSchema& group_file_info_) {
X
Xu Peng 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
    try {
        auto files = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
                                                   &GroupFileSchema::rows,
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_id) == file_id_ and
                                                c(&GroupFileSchema::group_id) == group_id_
                                          ));
        assert(files.size() <= 1);
        if (files.size() == 1) {
            group_file_info_.id = std::get<0>(files[0]);
            group_file_info_.group_id = std::get<1>(files[0]);
            group_file_info_.file_id = std::get<2>(files[0]);
            group_file_info_.file_type = std::get<3>(files[0]);
            group_file_info_.rows = std::get<4>(files[0]);
            group_file_info_.date = std::get<5>(files[0]);
        } else {
            return Status::NotFound("GroupFile " + file_id_ + " not found");
        }
    } catch (std::exception &e) {
        LOG(DEBUG) << e.what();
        throw e;
    }

X
Xu Peng 已提交
445
    return Status::OK();
X
Xu Peng 已提交
446 447
}

448
Status DBMetaImpl::get_group_files(const std::string& group_id_,
449
                               const int date_delta_,
450
                               GroupFilesSchema& group_files_info_) {
X
Xu Peng 已提交
451
    // PXU TODO
X
Xu Peng 已提交
452
    return Status::OK();
X
Xu Peng 已提交
453 454
}

X
Xu Peng 已提交
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
    auto& criterias = _options.archive_conf.GetCriterias();
    if (criterias.size() == 0) {
        return Status::OK();
    }

    for (auto kv : criterias) {
        auto& criteria = kv.first;
        auto& limit = kv.second;
        if (criteria == "days") {
            auto usecs = 3600*24*limit*1000000;
            auto now = GetMicroSecTimeStamp();
            try
            {
                ConnectorPtr->update_all(
                        set(
                            c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                           ),
                        where(
                            c(&GroupFileSchema::created_on) < now - usecs and
                            c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
                            ));
            } catch (std::exception & e) {
                LOG(DEBUG) << e.what();
                throw e;
            }
        }
        if (criteria == "disk") {
484
            size_t G = 1024*1024*1024;
X
Xu Peng 已提交
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
            long unsigned int sum = 0;
            try {
                auto sum_c = ConnectorPtr->sum(
                        &GroupFileSchema::rows,
                        where(
                            c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
                        ));
                sum = *sum_c;
            } catch (std::exception & e) {
                LOG(DEBUG) << e.what();
                throw e;
            }
            // PXU TODO: refactor rows
            auto to_delete = sum - limit*G/sizeof(float);
            discard_files_of_size(to_delete);
        }
    }

    return Status::OK();
}

Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
    if (to_discard_size <= 0) {
        return Status::OK();
    }
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::rows),
                                          where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE),
                                          order_by(&GroupFileSchema::id),
                                          limit(10));
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
        std::vector<int> ids;

        for (auto& file : selected) {
            if (to_discard_size <= 0) break;
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.rows = std::get<1>(file);
            ids.push_back(group_file.id);
            to_discard_size -= group_file.rows;
        }

        if (ids.size() == 0) {
            return Status::OK();
        }

        ConnectorPtr->update_all(
                    set(
                        c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
                    ),
                    where(
                        in(&GroupFileSchema::id, ids)
                    ));
X
Xu Peng 已提交
538 539 540 541 542 543 544

    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
    }


545
    return discard_files_of_size(to_discard_size);
X
Xu Peng 已提交
546 547
}

548
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
549
    group_file.updated_time = GetMicroSecTimeStamp();
550
    try {
551
        ConnectorPtr->update(group_file);
552 553 554 555
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
        throw e;
X
Xu Peng 已提交
556
    }
X
Xu Peng 已提交
557
    return Status::OK();
X
Xu Peng 已提交
558 559
}

560
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
561 562 563 564 565 566 567 568 569 570
    try {
        auto commited = ConnectorPtr->transaction([&] () mutable {
            for (auto& file : files) {
                file.updated_time = GetMicroSecTimeStamp();
                ConnectorPtr->update(file);
            }
            return true;
        });
        if (!commited) {
            return Status::DBTransactionError("Update files Error");
X
Xu Peng 已提交
571
        }
572 573 574
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
575
    }
576 577 578
    return Status::OK();
}

X
Xu Peng 已提交
579 580
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
    auto now = GetMicroSecTimeStamp();
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
                                                   &GroupFileSchema::rows,
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and
                                                c(&GroupFileSchema::updated_time) > now - 1000000*seconds));

        GroupFilesSchema updated;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
            group_file.rows = std::get<4>(file);
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            if (group_file.file_type == GroupFileSchema::TO_DELETE) {
                boost::filesystem::remove(group_file.location);
            }
            ConnectorPtr->remove<GroupFileSchema>(group_file.id);
X
Xu Peng 已提交
606
            /* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
607
        }
608 609 610
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
611 612 613 614 615
    }

    return Status::OK();
}

X
Xu Peng 已提交
616
Status DBMetaImpl::cleanup() {
617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
                                                   &GroupFileSchema::group_id,
                                                   &GroupFileSchema::file_id,
                                                   &GroupFileSchema::file_type,
                                                   &GroupFileSchema::rows,
                                                   &GroupFileSchema::date),
                                          where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or
                                                c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW));

        GroupFilesSchema updated;

        for (auto& file : selected) {
            GroupFileSchema group_file;
            group_file.id = std::get<0>(file);
            group_file.group_id = std::get<1>(file);
            group_file.file_id = std::get<2>(file);
            group_file.file_type = std::get<3>(file);
            group_file.rows = std::get<4>(file);
            group_file.date = std::get<5>(file);
            GetGroupFilePath(group_file);
            if (group_file.file_type == GroupFileSchema::TO_DELETE) {
                boost::filesystem::remove(group_file.location);
            }
            ConnectorPtr->remove<GroupFileSchema>(group_file.id);
X
Xu Peng 已提交
642
            /* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */
X
Xu Peng 已提交
643
        }
644 645 646
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
647 648 649 650 651
    }

    return Status::OK();
}

X
Xu Peng 已提交
652 653
Status DBMetaImpl::count(const std::string& group_id, long& result) {

654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
    try {
        auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows,
                                                   &GroupFileSchema::date),
                                          where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or
                                                 c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or
                                                 c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and
                                                c(&GroupFileSchema::group_id) == group_id));

        GroupSchema group_info;
        group_info.group_id = group_id;
        auto status = get_group_no_lock(group_info);
        if (!status.ok()) {
            return status;
        }

        result = 0;
        for (auto& file : selected) {
            result += std::get<0>(file);
        }
X
Xu Peng 已提交
673

674
        result /= group_info.dimension;
X
Xu Peng 已提交
675

676 677 678
    } catch (std::exception & e) {
        LOG(DEBUG) << e.what();
        throw e;
X
Xu Peng 已提交
679 680 681 682
    }
    return Status::OK();
}

X
Xu Peng 已提交
683 684 685 686 687 688 689
Status DBMetaImpl::drop_all() {
    if (boost::filesystem::is_directory(_options.path)) {
        boost::filesystem::remove_all(_options.path);
    }
    return Status::OK();
}

X
Xu Peng 已提交
690 691 692 693
DBMetaImpl::~DBMetaImpl() {
    cleanup();
}

694
} // namespace meta
X
Xu Peng 已提交
695 696 697
} // namespace engine
} // namespace vecwise
} // namespace zilliz