diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 714112fc07622e2f71e82c80b3d63b7580d9b207..ff65f4e91eba68d390136ac9d3e045aec57763c0 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -184,7 +184,8 @@ void DBImpl::background_call() { std::lock_guard lock(_mutex); assert(_bg_compaction_scheduled); - if (!_bg_error.ok()) return; + if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire)) + return ; background_compaction(); diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 9e988f61be88f142ad5a2ecec26c38f783278a00..e8ccfaba3c70c9a72aac7f9d6228965a26a50a98 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -129,31 +129,41 @@ Status DBMetaImpl::get_group(GroupSchema& group_info) { } Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { - auto groups = ConnectorPtr->select(columns(&GroupSchema::id, - &GroupSchema::group_id, - &GroupSchema::files_cnt, - &GroupSchema::dimension), - where(c(&GroupSchema::group_id) == group_info.group_id)); - assert(groups.size() <= 1); - if (groups.size() == 1) { - group_info.id = std::get<0>(groups[0]); - group_info.files_cnt = std::get<2>(groups[0]); - group_info.dimension = std::get<3>(groups[0]); - } else { - return Status::NotFound("Group " + group_info.group_id + " not found"); + try { + auto groups = ConnectorPtr->select(columns(&GroupSchema::id, + &GroupSchema::group_id, + &GroupSchema::files_cnt, + &GroupSchema::dimension), + where(c(&GroupSchema::group_id) == group_info.group_id)); + assert(groups.size() <= 1); + if (groups.size() == 1) { + group_info.id = std::get<0>(groups[0]); + group_info.files_cnt = std::get<2>(groups[0]); + group_info.dimension = std::get<3>(groups[0]); + } else { + return Status::NotFound("Group " + group_info.group_id + " not found"); + } + } catch (std::exception &e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); } Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { - auto groups = ConnectorPtr->select(columns(&GroupSchema::id), - where(c(&GroupSchema::group_id) == group_id)); - assert(groups.size() <= 1); - if (groups.size() == 1) { - has_or_not = true; - } else { - has_or_not = false; + try { + auto groups = ConnectorPtr->select(columns(&GroupSchema::id), + where(c(&GroupSchema::group_id) == group_id)); + assert(groups.size() <= 1); + if (groups.size() == 1) { + has_or_not = true; + } else { + has_or_not = false; + } + } catch (std::exception &e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); } @@ -183,6 +193,7 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { try { auto id = ConnectorPtr->insert(group_file); group_file.id = id; + LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; } catch (...) { return Status::DBTransactionError("Add file Error"); } @@ -200,37 +211,42 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { files.clear(); - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::rows, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX)); - - std::map groups; - - for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); - group_file.date = std::get<5>(file); - GetGroupFilePath(group_file); - auto groupItr = groups.find(group_file.group_id); - if (groupItr == groups.end()) { - GroupSchema group_info; - group_info.group_id = group_file.group_id; - auto status = get_group_no_lock(group_info); - if (!status.ok()) { - return status; + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX)); + + std::map groups; + + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + GetGroupFilePath(group_file); + auto groupItr = groups.find(group_file.group_id); + if (groupItr == groups.end()) { + GroupSchema group_info; + group_info.group_id = group_file.group_id; + auto status = get_group_no_lock(group_info); + if (!status.ok()) { + return status; + } + groups[group_file.group_id] = group_info; } - groups[group_file.group_id] = group_info; + group_file.dimension = groups[group_file.group_id].dimension; + files.push_back(group_file); } - group_file.dimension = groups[group_file.group_id].dimension; - files.push_back(group_file); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); @@ -241,38 +257,43 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, DatePartionedGroupFilesSchema &files) { // TODO: support data partition files.clear(); - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::rows, - &GroupFileSchema::date), - where(c(&GroupFileSchema::group_id) == group_id and - (c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or - c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX))); - - GroupSchema group_info; - group_info.group_id = group_id; - auto status = get_group_no_lock(group_info); - if (!status.ok()) { - return status; - } + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::group_id) == group_id and + (c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or + c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX))); + + GroupSchema group_info; + group_info.group_id = group_id; + auto status = get_group_no_lock(group_info); + if (!status.ok()) { + return status; + } - for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); - group_file.date = std::get<5>(file); - group_file.dimension = group_info.dimension; - GetGroupFilePath(group_file); - auto dateItr = files.find(group_file.date); - if (dateItr == files.end()) { - files[group_file.date] = GroupFilesSchema(); + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + group_file.dimension = group_info.dimension; + GetGroupFilePath(group_file); + auto dateItr = files.find(group_file.date); + if (dateItr == files.end()) { + files[group_file.date] = GroupFilesSchema(); + } + files[group_file.date].push_back(group_file); } - files[group_file.date].push_back(group_file); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); @@ -282,37 +303,42 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, DatePartionedGroupFilesSchema& files) { files.clear(); - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::rows, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and - c(&GroupFileSchema::group_id) == group_id)); - - GroupSchema group_info; - group_info.group_id = group_id; - auto status = get_group_no_lock(group_info); - if (!status.ok()) { - return status; - } + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and + c(&GroupFileSchema::group_id) == group_id)); + + GroupSchema group_info; + group_info.group_id = group_id; + auto status = get_group_no_lock(group_info); + if (!status.ok()) { + return status; + } - for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); - group_file.date = std::get<5>(file); - group_file.dimension = group_info.dimension; - GetGroupFilePath(group_file); - auto dateItr = files.find(group_file.date); - if (dateItr == files.end()) { - files[group_file.date] = GroupFilesSchema(); + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + group_file.dimension = group_info.dimension; + GetGroupFilePath(group_file); + auto dateItr = files.find(group_file.date); + if (dateItr == files.end()) { + files[group_file.date] = GroupFilesSchema(); + } + files[group_file.date].push_back(group_file); } - files[group_file.date].push_back(group_file); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); @@ -341,88 +367,109 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_, Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { group_file.updated_time = GetMicroSecTimeStamp(); - auto commited = ConnectorPtr->transaction([&] () mutable { - ConnectorPtr->update(group_file); - return true; - }); - if (!commited) { - return Status::DBTransactionError("Update file Error"); + try { + auto commited = ConnectorPtr->transaction([&] () mutable { + ConnectorPtr->update(group_file); + return true; + }); + if (!commited) { + return Status::DBTransactionError("Update file Error"); + } + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id; + throw e; } return Status::OK(); } Status DBMetaImpl::update_files(GroupFilesSchema& files) { - auto commited = ConnectorPtr->transaction([&] () mutable { - for (auto& file : files) { - file.updated_time = GetMicroSecTimeStamp(); - ConnectorPtr->update(file); + try { + auto commited = ConnectorPtr->transaction([&] () mutable { + for (auto& file : files) { + file.updated_time = GetMicroSecTimeStamp(); + ConnectorPtr->update(file); + } + return true; + }); + if (!commited) { + return Status::DBTransactionError("Update files Error"); } - return true; - }); - if (!commited) { - return Status::DBTransactionError("Update files Error"); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); } Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) { auto now = GetMicroSecTimeStamp(); - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::rows, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and - c(&GroupFileSchema::updated_time) > now - 1000000*seconds)); - - GroupFilesSchema updated; - - for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); - group_file.date = std::get<5>(file); - GetGroupFilePath(group_file); - if (group_file.file_type == GroupFileSchema::TO_DELETE) { - boost::filesystem::remove(group_file.location); + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and + c(&GroupFileSchema::updated_time) > now - 1000000*seconds)); + + GroupFilesSchema updated; + + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + GetGroupFilePath(group_file); + if (group_file.file_type == GroupFileSchema::TO_DELETE) { + boost::filesystem::remove(group_file.location); + } + ConnectorPtr->remove(group_file.id); + LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; } - ConnectorPtr->remove(group_file.id); - LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); } Status DBMetaImpl::cleanup() { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::rows, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or - c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW)); - - GroupFilesSchema updated; - - for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); - group_file.date = std::get<5>(file); - GetGroupFilePath(group_file); - if (group_file.file_type == GroupFileSchema::TO_DELETE) { - boost::filesystem::remove(group_file.location); + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or + c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW)); + + GroupFilesSchema updated; + + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + GetGroupFilePath(group_file); + if (group_file.file_type == GroupFileSchema::TO_DELETE) { + boost::filesystem::remove(group_file.location); + } + ConnectorPtr->remove(group_file.id); + LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; } - ConnectorPtr->remove(group_file.id); - LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; } return Status::OK(); @@ -430,26 +477,32 @@ Status DBMetaImpl::cleanup() { Status DBMetaImpl::count(const std::string& group_id, long& result) { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows, - &GroupFileSchema::date), - where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or - c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or - c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and - c(&GroupFileSchema::group_id) == group_id)); + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows, + &GroupFileSchema::date), + where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or + c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or + c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and + c(&GroupFileSchema::group_id) == group_id)); + + GroupSchema group_info; + group_info.group_id = group_id; + auto status = get_group_no_lock(group_info); + if (!status.ok()) { + return status; + } + + result = 0; + for (auto& file : selected) { + result += std::get<0>(file); + } - GroupSchema group_info; - group_info.group_id = group_id; - auto status = get_group_no_lock(group_info); - if (!status.ok()) { - return status; - } + result /= group_info.dimension; - result = 0; - for (auto& file : selected) { - result += std::get<0>(file); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; } - - result /= group_info.dimension; return Status::OK(); }