From 89cdeb11810a1b938881ffc7f9363ac6434aa740 Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Thu, 23 May 2019 20:22:39 +0800 Subject: [PATCH] feat(db): add archive files in meta Former-commit-id: 0d284f947dbf65bbc258a037eec1bb5458fa0007 --- cpp/src/db/DBMetaImpl.cpp | 107 ++++++++++++++++++++++++++++++++--- cpp/src/db/DBMetaImpl.h | 3 + cpp/src/db/LocalMetaImpl.cpp | 5 ++ cpp/src/db/LocalMetaImpl.h | 2 + cpp/src/db/Meta.h | 4 ++ cpp/src/db/Options.h | 2 +- 6 files changed, 114 insertions(+), 9 deletions(-) diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 3552bf20..e506f14e 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -151,6 +151,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) { } group_info.files_cnt = 0; group_info.id = -1; + group_info.created_on = GetMicroSecTimeStamp(); { try { @@ -237,7 +238,8 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { group_file.file_id = ss.str(); group_file.dimension = group_info.dimension; group_file.rows = 0; - group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front(); + group_file.created_on = GetMicroSecTimeStamp(); + group_file.updated_time = group_file.created_on; GetGroupFilePath(group_file); { @@ -449,17 +451,106 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_, return Status::OK(); } +// PXU TODO: Support Swap +Status DBMetaImpl::archive_files() { + auto& criterias = _options.archive_conf.GetCriterias(); + if (criterias.size() == 0) { + return Status::OK(); + } + + for (auto kv : criterias) { + auto& criteria = kv.first; + auto& limit = kv.second; + if (criteria == "days") { + auto usecs = 3600*24*limit*1000000; + auto now = GetMicroSecTimeStamp(); + try + { + ConnectorPtr->update_all( + set( + c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE + ), + where( + c(&GroupFileSchema::created_on) < now - usecs and + c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE + )); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; + } + } + if (criteria == "disk") { + int G = 1024*1024*1024; + long unsigned int sum = 0; + try { + auto sum_c = ConnectorPtr->sum( + &GroupFileSchema::rows, + where( + c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE + )); + sum = *sum_c; + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; + } + // PXU TODO: refactor rows + auto to_delete = sum - limit*G/sizeof(float); + discard_files_of_size(to_delete); + } + } + + return Status::OK(); +} + +Status DBMetaImpl::discard_files_of_size(long to_discard_size) { + if (to_discard_size <= 0) { + return Status::OK(); + } + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows), + where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE), + order_by(&GroupFileSchema::id), + limit(10)); + + /* std::map groups; */ + + /* for (auto& file : selected) { */ + /* GroupFileSchema group_file; */ + /* group_file.id = std::get<0>(file); */ + /* group_file.group_id = std::get<1>(file); */ + /* group_file.file_id = std::get<2>(file); */ + /* group_file.file_type = std::get<3>(file); */ + /* group_file.rows = std::get<4>(file); */ + /* group_file.date = std::get<5>(file); */ + /* GetGroupFilePath(group_file); */ + /* auto groupItr = groups.find(group_file.group_id); */ + /* if (groupItr == groups.end()) { */ + /* GroupSchema group_info; */ + /* group_info.group_id = group_file.group_id; */ + /* auto status = get_group_no_lock(group_info); */ + /* if (!status.ok()) { */ + /* return status; */ + /* } */ + /* groups[group_file.group_id] = group_info; */ + /* } */ + /* group_file.dimension = groups[group_file.group_id].dimension; */ + /* files.push_back(group_file); */ + /* } */ + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; + } + + return Status::OK(); + +} + Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { group_file.updated_time = GetMicroSecTimeStamp(); try { ConnectorPtr->update(group_file); - /* auto commited = ConnectorPtr->transaction([&] () mutable { */ - /* ConnectorPtr->update(group_file); */ - /* return true; */ - /* }); */ - /* if (!commited) { */ - /* return Status::DBTransactionError("Update file Error"); */ - /* } */ } catch (std::exception & e) { LOG(DEBUG) << e.what(); LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id; diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index fab2e156..e2fd051b 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -50,6 +50,8 @@ public: virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status archive_files() override; + virtual Status cleanup() override; virtual Status cleanup_ttl_files(uint16_t seconds) override; @@ -62,6 +64,7 @@ public: private: + Status discard_files_of_size(long to_discard_size); long GetMicroSecTimeStamp(); Status get_group_no_lock(GroupSchema& group_info); std::string GetGroupPath(const std::string& group_id); diff --git a/cpp/src/db/LocalMetaImpl.cpp b/cpp/src/db/LocalMetaImpl.cpp index 60c23158..2ec5c08c 100644 --- a/cpp/src/db/LocalMetaImpl.cpp +++ b/cpp/src/db/LocalMetaImpl.cpp @@ -241,6 +241,11 @@ Status LocalMetaImpl::update_files(GroupFilesSchema& files) { return Status::OK(); } +Status LocalMetaImpl::archive_files() { + //PXU TODO + return Status::OK(); +} + Status LocalMetaImpl::cleanup() { //PXU TODO return Status::OK(); diff --git a/cpp/src/db/LocalMetaImpl.h b/cpp/src/db/LocalMetaImpl.h index 116c9e46..71927e84 100644 --- a/cpp/src/db/LocalMetaImpl.h +++ b/cpp/src/db/LocalMetaImpl.h @@ -47,6 +47,8 @@ public: virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status archive_files() override; + virtual Status cleanup_ttl_files(uint16_t seconds) override; virtual Status count(const std::string& group_id, long& result) override; diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index 6d2abb8d..74e1637f 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -30,6 +30,7 @@ struct GroupSchema { size_t files_cnt = 0; uint16_t dimension; std::string location = ""; + long created_on; }; // GroupSchema @@ -51,6 +52,7 @@ struct GroupFileSchema { uint16_t dimension; std::string location = ""; long updated_time; + long created_on; }; // GroupFileSchema typedef std::vector GroupFilesSchema; @@ -91,6 +93,8 @@ public: virtual Status files_to_merge(const std::string& group_id, DatePartionedGroupFilesSchema& files) = 0; + virtual Status archive_files() = 0; + virtual Status files_to_index(GroupFilesSchema&) = 0; virtual Status cleanup() = 0; diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index ee2d62f7..26608ab9 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -32,9 +32,9 @@ private: }; struct DBMetaOptions { - /* DBMetaOptions(const std::string&, const std::string&); */ std::string path; std::string backend_uri; + ArchiveConf archive_conf = ArchiveConf("delete"); }; // DBMetaOptions -- GitLab