提交 73d91135 编写于 作者: X Xu Peng

feat(db): add archive files in meta


Former-commit-id: 64a1e267815cc9fb9ccd1b41b6ef8a1f082e3b20
上级 73fb498f
......@@ -151,6 +151,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
}
group_info.files_cnt = 0;
group_info.id = -1;
group_info.created_on = GetMicroSecTimeStamp();
{
try {
......@@ -237,7 +238,8 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
group_file.file_id = ss.str();
group_file.dimension = group_info.dimension;
group_file.rows = 0;
group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front();
group_file.created_on = GetMicroSecTimeStamp();
group_file.updated_time = group_file.created_on;
GetGroupFilePath(group_file);
{
......@@ -449,17 +451,106 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_,
return Status::OK();
}
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
auto& criterias = _options.archive_conf.GetCriterias();
if (criterias.size() == 0) {
return Status::OK();
}
for (auto kv : criterias) {
auto& criteria = kv.first;
auto& limit = kv.second;
if (criteria == "days") {
auto usecs = 3600*24*limit*1000000;
auto now = GetMicroSecTimeStamp();
try
{
ConnectorPtr->update_all(
set(
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
),
where(
c(&GroupFileSchema::created_on) < now - usecs and
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
));
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
throw e;
}
}
if (criteria == "disk") {
int G = 1024*1024*1024;
long unsigned int sum = 0;
try {
auto sum_c = ConnectorPtr->sum(
&GroupFileSchema::rows,
where(
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
));
sum = *sum_c;
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
throw e;
}
// PXU TODO: refactor rows
auto to_delete = sum - limit*G/sizeof(float);
discard_files_of_size(to_delete);
}
}
return Status::OK();
}
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
if (to_discard_size <= 0) {
return Status::OK();
}
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::file_type,
&GroupFileSchema::rows),
where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE),
order_by(&GroupFileSchema::id),
limit(10));
/* std::map<std::string, GroupSchema> groups; */
/* for (auto& file : selected) { */
/* GroupFileSchema group_file; */
/* group_file.id = std::get<0>(file); */
/* group_file.group_id = std::get<1>(file); */
/* group_file.file_id = std::get<2>(file); */
/* group_file.file_type = std::get<3>(file); */
/* group_file.rows = std::get<4>(file); */
/* group_file.date = std::get<5>(file); */
/* GetGroupFilePath(group_file); */
/* auto groupItr = groups.find(group_file.group_id); */
/* if (groupItr == groups.end()) { */
/* GroupSchema group_info; */
/* group_info.group_id = group_file.group_id; */
/* auto status = get_group_no_lock(group_info); */
/* if (!status.ok()) { */
/* return status; */
/* } */
/* groups[group_file.group_id] = group_info; */
/* } */
/* group_file.dimension = groups[group_file.group_id].dimension; */
/* files.push_back(group_file); */
/* } */
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
throw e;
}
return Status::OK();
}
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
group_file.updated_time = GetMicroSecTimeStamp();
try {
ConnectorPtr->update(group_file);
/* auto commited = ConnectorPtr->transaction([&] () mutable { */
/* ConnectorPtr->update(group_file); */
/* return true; */
/* }); */
/* if (!commited) { */
/* return Status::DBTransactionError("Update file Error"); */
/* } */
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
......
......@@ -50,6 +50,8 @@ public:
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status archive_files() override;
virtual Status cleanup() override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
......@@ -62,6 +64,7 @@ public:
private:
Status discard_files_of_size(long to_discard_size);
long GetMicroSecTimeStamp();
Status get_group_no_lock(GroupSchema& group_info);
std::string GetGroupPath(const std::string& group_id);
......
......@@ -241,6 +241,11 @@ Status LocalMetaImpl::update_files(GroupFilesSchema& files) {
return Status::OK();
}
Status LocalMetaImpl::archive_files() {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::cleanup() {
//PXU TODO
return Status::OK();
......
......@@ -47,6 +47,8 @@ public:
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status archive_files() override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
virtual Status count(const std::string& group_id, long& result) override;
......
......@@ -30,6 +30,7 @@ struct GroupSchema {
size_t files_cnt = 0;
uint16_t dimension;
std::string location = "";
long created_on;
}; // GroupSchema
......@@ -51,6 +52,7 @@ struct GroupFileSchema {
uint16_t dimension;
std::string location = "";
long updated_time;
long created_on;
}; // GroupFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema;
......@@ -91,6 +93,8 @@ public:
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) = 0;
virtual Status archive_files() = 0;
virtual Status files_to_index(GroupFilesSchema&) = 0;
virtual Status cleanup() = 0;
......
......@@ -32,9 +32,9 @@ private:
};
struct DBMetaOptions {
/* DBMetaOptions(const std::string&, const std::string&); */
std::string path;
std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete");
}; // DBMetaOptions
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册