提交 2780b1a6 编写于 作者: X Xu Peng

feat(db): fix meta lock issue


Former-commit-id: 9c276ca661d234e3e1268836fc6efddef408be3e
上级 58c0a6f8
......@@ -90,15 +90,16 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
group_info.files_cnt = 0;
group_info.id = -1;
auto commited = ConnectorPtr->transaction([&] () mutable {
auto id = ConnectorPtr->insert(group_info);
group_info.id = id;
std::cout << __func__ << " id=" << id << std::endl;
return true;
});
if (!commited) {
return Status::DBTransactionError("Add Group Error");
{
std::unique_lock<std::mutex> lk(mutex_);
try {
auto id = ConnectorPtr->insert(group_info);
group_info.id = id;
std::cout << __func__ << " id=" << id << std::endl;
} catch (...) {
return Status::DBTransactionError("Add Group Error");
}
}
auto group_path = GetGroupPath(group_info.group_id);
......@@ -111,6 +112,11 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
}
Status DBMetaImpl::get_group(GroupSchema& group_info) {
std::unique_lock<std::mutex> lk(mutex_);
return get_group_no_lock(group_info);
}
Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
auto groups = ConnectorPtr->select(columns(&GroupSchema::id,
&GroupSchema::group_id,
&GroupSchema::files_cnt,
......@@ -131,6 +137,7 @@ Status DBMetaImpl::get_group(GroupSchema& group_info) {
}
Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
std::unique_lock<std::mutex> lk(mutex_);
auto groups = ConnectorPtr->select(columns(&GroupSchema::id),
where(c(&GroupSchema::group_id) == group_id));
assert(groups.size() <= 1);
......@@ -161,15 +168,16 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
group_file.dimension = group_info.dimension;
GetGroupFilePath(group_file);
auto commited = ConnectorPtr->transaction([&] () mutable {
auto id = ConnectorPtr->insert(group_file);
group_file.id = id;
std::cout << __func__ << " id=" << id << std::endl;
return true;
});
{
std::unique_lock<std::mutex> lk(mutex_);
if (!commited) {
return Status::DBTransactionError("Add file Error");
try {
auto id = ConnectorPtr->insert(group_file);
group_file.id = id;
std::cout << __func__ << " id=" << id << std::endl;
} catch (...) {
return Status::DBTransactionError("Add file Error");
}
}
auto partition_path = GetGroupDatePartitionPath(group_file.group_id, group_file.date);
......@@ -184,6 +192,7 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
files.clear();
std::unique_lock<std::mutex> lk(mutex_);
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
......@@ -206,7 +215,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
if (groupItr == groups.end()) {
GroupSchema group_info;
group_info.group_id = group_file.group_id;
auto status = get_group(group_info);
auto status = get_group_no_lock(group_info);
if (!status.ok()) {
return status;
}
......@@ -223,6 +232,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) {
files.clear();
std::unique_lock<std::mutex> lk(mutex_);
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
......@@ -234,7 +244,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group(group_info);
auto status = get_group_no_lock(group_info);
if (!status.ok()) {
return status;
}
......@@ -279,12 +289,20 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_,
return Status::OK();
}
Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file_) {
//PXU TODO
Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file) {
std::unique_lock<std::mutex> lk(mutex_);
auto commited = ConnectorPtr->transaction([&] () mutable {
ConnectorPtr->update(group_file);
return true;
});
if (!commited) {
return Status::DBTransactionError("Update file Error");
}
return Status::OK();
}
Status DBMetaImpl::update_files(const GroupFilesSchema& files) {
std::unique_lock<std::mutex> lk(mutex_);
auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
ConnectorPtr->update(file);
......
#ifndef VECENGINE_DB_META_IMPL_H_
#define VECENGINE_DB_META_IMPL_H_
#include <thread>
#include <mutex>
#include "Meta.h"
#include "Options.h"
......@@ -42,12 +44,14 @@ public:
private:
Status get_group_no_lock(GroupSchema& group_info);
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
void GetGroupFilePath(GroupFileSchema& group_file);
Status initialize();
const DBMetaOptions _options;
std::mutex mutex_;
}; // DBMetaImpl
......
......@@ -51,8 +51,9 @@ Status MemVectors::serialize(std::string& group_id) {
schema_.rows = rows;
schema_.file_type = (rows >= options_.index_trigger_size) ?
meta::GroupFileSchema::TO_INDEX : meta::GroupFileSchema::RAW;
pMeta_->update_group_file(schema_);
return Status::OK();
auto status = pMeta_->update_group_file(schema_);
return status;
}
MemVectors::~MemVectors() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册