提交 5857cf88 编写于 作者: S SanmuWangZJU 提交者: ob-robot

[OBCDC] Improve OBCDC exit process

上级 6e527565
...@@ -1251,7 +1251,7 @@ int ObLogCommitter::commit_binlog_record_list_(TransCtx &trans_ctx, ...@@ -1251,7 +1251,7 @@ int ObLogCommitter::commit_binlog_record_list_(TransCtx &trans_ctx,
if (OB_EMPTY_RESULT == ret) { if (OB_EMPTY_RESULT == ret) {
if (0 < trans_ctx.get_total_br_count()) { if (0 < trans_ctx.get_total_br_count()) {
// unexpected // unexpected
LOG_ERROR("trans has no valid br to output, skip this trans", K(trans_ctx)); LOG_ERROR("unexpected skiping trans with valid br", KR(ret), K(trans_ctx));
} else { } else {
LOG_INFO("trans has no valid br to output, skip this trans", KR(ret), K(trans_ctx)); LOG_INFO("trans has no valid br to output, skip this trans", KR(ret), K(trans_ctx));
ret = OB_SUCCESS; ret = OB_SUCCESS;
......
...@@ -1396,6 +1396,7 @@ void ObLogInstance::mark_stop_flag(const char *stop_reason) ...@@ -1396,6 +1396,7 @@ void ObLogInstance::mark_stop_flag(const char *stop_reason)
lob_data_merger_->mark_stop_flag(); lob_data_merger_->mark_stop_flag();
storager_->mark_stop_flag(); storager_->mark_stop_flag();
reader_->mark_stop_flag(); reader_->mark_stop_flag();
store_service_->mark_stop_flag();
trans_msg_sorter_->mark_stop_flag(); trans_msg_sorter_->mark_stop_flag();
committer_->mark_stop_flag(); committer_->mark_stop_flag();
resource_collector_->mark_stop_flag(); resource_collector_->mark_stop_flag();
......
...@@ -85,6 +85,7 @@ int ObLogReader::init(const int64_t thread_num, ...@@ -85,6 +85,7 @@ int ObLogReader::init(const int64_t thread_num,
void ObLogReader::destroy() void ObLogReader::destroy()
{ {
if (inited_) { if (inited_) {
LOG_INFO("ObLogReader destroy begin");
ReaderThread::destroy(); ReaderThread::destroy();
inited_ = false; inited_ = false;
...@@ -96,6 +97,7 @@ void ObLogReader::destroy() ...@@ -96,6 +97,7 @@ void ObLogReader::destroy()
store_service_stat_.reset(); store_service_stat_.reset();
store_service_ = NULL; store_service_ = NULL;
err_handler_ = NULL; err_handler_ = NULL;
LOG_INFO("ObLogReader destroy end");
} }
} }
...@@ -222,8 +224,12 @@ int ObLogReader::handle_task_(ObLogEntryTask &log_entry_task, ...@@ -222,8 +224,12 @@ int ObLogReader::handle_task_(ObLogEntryTask &log_entry_task,
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_FAIL(log_entry_task.get_storage_key(key))) { } else if (OB_FAIL(log_entry_task.get_storage_key(key))) {
LOG_ERROR("get_storage_key fail", KR(ret), "key", key.c_str(), K(log_entry_task)); LOG_ERROR("get_storage_key fail", KR(ret), "key", key.c_str(), K(log_entry_task));
} else if (stop_flag) {
ret = OB_IN_STOP_STATE;
} else if (OB_FAIL(read_store_service_(log_entry_task, column_family_handle, key, value))) { } else if (OB_FAIL(read_store_service_(log_entry_task, column_family_handle, key, value))) {
LOG_ERROR("read_store_service_ fail", KR(ret), K(log_entry_task)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("read_store_service_ fail", KR(ret), K(log_entry_task));
}
} else { } else {
store_service_stat_.do_data_stat(value.length()); store_service_stat_.do_data_stat(value.length());
} }
...@@ -258,6 +264,8 @@ int ObLogReader::read_store_service_(ObLogEntryTask &log_entry_task, ...@@ -258,6 +264,8 @@ int ObLogReader::read_store_service_(ObLogEntryTask &log_entry_task,
} else if (OB_ISNULL(store_service_)) { } else if (OB_ISNULL(store_service_)) {
LOG_ERROR("store_service_ is NULL"); LOG_ERROR("store_service_ is NULL");
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
} else if (ReaderThread::is_stoped()) {
ret = OB_IN_STOP_STATE;
} else if (OB_FAIL(store_service_->get(column_family_handle, key, value))) { } else if (OB_FAIL(store_service_->get(column_family_handle, key, value))) {
LOG_ERROR("StoreService get fail", KR(ret), K(key.c_str()), "value_len", value.length(), K(log_entry_task)); LOG_ERROR("StoreService get fail", KR(ret), K(key.c_str()), "value_len", value.length(), K(log_entry_task));
} else { } else {
......
...@@ -112,6 +112,7 @@ int ObLogResourceCollector::init(const int64_t thread_num, ...@@ -112,6 +112,7 @@ int ObLogResourceCollector::init(const int64_t thread_num,
void ObLogResourceCollector::destroy() void ObLogResourceCollector::destroy()
{ {
LOG_INFO("resource_collector destroy begin");
RCThread::destroy(); RCThread::destroy();
inited_ = false; inited_ = false;
br_pool_ = NULL; br_pool_ = NULL;
...@@ -126,6 +127,7 @@ void ObLogResourceCollector::destroy() ...@@ -126,6 +127,7 @@ void ObLogResourceCollector::destroy()
dml_part_trans_task_count_ = 0; dml_part_trans_task_count_ = 0;
hb_part_trans_task_count_ = 0; hb_part_trans_task_count_ = 0;
other_part_trans_task_count_ = 0; other_part_trans_task_count_ = 0;
LOG_INFO("resource_collector destroy end");
} }
int ObLogResourceCollector::start() int ObLogResourceCollector::start()
...@@ -154,6 +156,7 @@ void ObLogResourceCollector::mark_stop_flag() ...@@ -154,6 +156,7 @@ void ObLogResourceCollector::mark_stop_flag()
{ {
if (inited_) { if (inited_) {
RCThread::mark_stop_flag(); RCThread::mark_stop_flag();
LOG_INFO("resource_collector mark_stop_flag");
} }
} }
...@@ -225,7 +228,9 @@ int ObLogResourceCollector::revert_log_entry_task(ObLogEntryTask *log_entry_task ...@@ -225,7 +228,9 @@ int ObLogResourceCollector::revert_log_entry_task(ObLogEntryTask *log_entry_task
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_ERROR("host of log_entry_task is invalid, failed cast to PartTransTask" ,KR(ret), KPC(log_entry_task)); LOG_ERROR("host of log_entry_task is invalid, failed cast to PartTransTask" ,KR(ret), KPC(log_entry_task));
} else if (OB_FAIL(revert_log_entry_task_(log_entry_task))) { } else if (OB_FAIL(revert_log_entry_task_(log_entry_task))) {
LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task));
}
} else if (OB_FAIL(dec_ref_cnt_and_try_to_revert_task_(part_trans_task))) { } else if (OB_FAIL(dec_ref_cnt_and_try_to_revert_task_(part_trans_task))) {
LOG_ERROR("dec_ref_cnt_and_try_to_revert_task_ fail", KR(ret), KPC(part_trans_task)); LOG_ERROR("dec_ref_cnt_and_try_to_revert_task_ fail", KR(ret), KPC(part_trans_task));
} else { } else {
...@@ -304,8 +309,10 @@ int ObLogResourceCollector::revert_log_entry_task_(ObLogEntryTask *log_entry_tas ...@@ -304,8 +309,10 @@ int ObLogResourceCollector::revert_log_entry_task_(ObLogEntryTask *log_entry_tas
if (OB_FAIL(log_entry_task->get_storage_key(key))) { if (OB_FAIL(log_entry_task->get_storage_key(key))) {
LOG_ERROR("get_storage_key fail", KR(ret), "key", key.c_str()); LOG_ERROR("get_storage_key fail", KR(ret), "key", key.c_str());
} else if (OB_FAIL(del_store_service_data_(tenant_id, key))) { } else if (OB_FAIL(del_store_service_data_(tenant_id, key))) {
LOG_ERROR("del_store_service_data_ fail", KR(ret), KPC(log_entry_task)); if (OB_IN_STOP_STATE != ret) {
} else {} LOG_ERROR("del_store_service_data_ fail", KR(ret), KPC(log_entry_task));
}
}
} }
} }
...@@ -330,9 +337,11 @@ int ObLogResourceCollector::del_store_service_data_(const uint64_t tenant_id, ...@@ -330,9 +337,11 @@ int ObLogResourceCollector::del_store_service_data_(const uint64_t tenant_id,
column_family_handle = tenant->get_cf(); column_family_handle = tenant->get_cf();
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret) && ! RCThread::is_stoped()) {
if (OB_FAIL(store_service_->del(column_family_handle, key))) { if (OB_FAIL(store_service_->del(column_family_handle, key))) {
LOG_ERROR("store_service_ del fail", KR(ret), K(tenant_id), K(key.c_str())); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("store_service_ del fail", KR(ret), K(tenant_id), K(key.c_str()));
}
} else { } else {
LOG_DEBUG("store_service_ del succ", K(tenant_id), K(key.c_str())); LOG_DEBUG("store_service_ del succ", K(tenant_id), K(key.c_str()));
} }
...@@ -355,7 +364,7 @@ int ObLogResourceCollector::revert_participants_(const int64_t thread_index, ...@@ -355,7 +364,7 @@ int ObLogResourceCollector::revert_participants_(const int64_t thread_index,
} else { } else {
PartTransTask *task = participants; PartTransTask *task = participants;
while (OB_SUCCESS == ret && NULL != task) { while (OB_SUCC(ret) && OB_NOT_NULL(task) && ! RCThread::is_stoped()) {
PartTransTask *next = task->next_task(); PartTransTask *next = task->next_task();
task->set_next_task(NULL); task->set_next_task(NULL);
...@@ -368,6 +377,10 @@ int ObLogResourceCollector::revert_participants_(const int64_t thread_index, ...@@ -368,6 +377,10 @@ int ObLogResourceCollector::revert_participants_(const int64_t thread_index,
} }
} }
if (RCThread::is_stoped()) {
ret = OB_IN_STOP_STATE;
}
task = NULL; task = NULL;
} }
...@@ -434,8 +447,10 @@ int ObLogResourceCollector::recycle_part_trans_task_(const int64_t thread_index, ...@@ -434,8 +447,10 @@ int ObLogResourceCollector::recycle_part_trans_task_(const int64_t thread_index,
} else { } else {
if (task->is_ddl_trans()) { if (task->is_ddl_trans()) {
if (OB_FAIL(revert_dll_all_binlog_records_(task))) { if (OB_FAIL(revert_dll_all_binlog_records_(task))) {
// Reclaim all Binlog Records within a DDL partitioned transaction if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("revert_dll_all_binlog_records_ fail", KR(ret), K(*task)); // Reclaim all Binlog Records within a DDL partitioned transaction
LOG_ERROR("revert_dll_all_binlog_records_ fail", KR(ret), K(*task));
}
} }
} }
LOG_DEBUG("[ResourceCollector] recycle part trans task", K(thread_index), K(*task)); LOG_DEBUG("[ResourceCollector] recycle part trans task", K(thread_index), K(*task));
...@@ -476,7 +491,9 @@ int ObLogResourceCollector::handle(void *data, ...@@ -476,7 +491,9 @@ int ObLogResourceCollector::handle(void *data,
if (! task->is_served()) { if (! task->is_served()) {
if (OB_FAIL(revert_unserved_part_trans_task_(thread_index, *task))) { if (OB_FAIL(revert_unserved_part_trans_task_(thread_index, *task))) {
LOG_ERROR("revert_unserved_part_trans_task_ fail", KR(ret), K(thread_index), KPC(task)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("revert_unserved_part_trans_task_ fail", KR(ret), K(thread_index), KPC(task));
}
} }
// DML/DDL // DML/DDL
} else if (task->is_ddl_trans() || task->is_dml_trans()) { } else if (task->is_ddl_trans() || task->is_dml_trans()) {
...@@ -509,7 +526,7 @@ int ObLogResourceCollector::handle(void *data, ...@@ -509,7 +526,7 @@ int ObLogResourceCollector::handle(void *data,
LOG_ERROR("remove trans_ctx fail", KR(ret), K(tenant_id), K(trans_id), K(trans_ctx)); LOG_ERROR("remove trans_ctx fail", KR(ret), K(tenant_id), K(trans_id), K(trans_ctx));
} }
// recycle all participants // recycle all participants
else if (NULL != participants && OB_FAIL(revert_participants_(thread_index, participants))) { else if (OB_NOT_NULL(participants) && OB_FAIL(revert_participants_(thread_index, participants))) {
if (OB_IN_STOP_STATE != ret) { if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("revert_participants_ fail", KR(ret), K(thread_index), K(participants), K(trans_id)); LOG_ERROR("revert_participants_ fail", KR(ret), K(thread_index), K(participants), K(trans_id));
} }
...@@ -657,8 +674,10 @@ int ObLogResourceCollector::del_trans_(const uint64_t tenant_id, ...@@ -657,8 +674,10 @@ int ObLogResourceCollector::del_trans_(const uint64_t tenant_id,
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (OB_FAIL(store_service_->del_range(column_family_handle, begin_key, end_key))) { if (OB_FAIL(store_service_->del_range(column_family_handle, begin_key, end_key))) {
LOG_ERROR("store_service_ del fail", KR(ret), "begin_key", begin_key.c_str(), if (OB_IN_STOP_STATE != ret) {
"end_key", end_key.c_str()); LOG_ERROR("store_service_ del fail", KR(ret), "begin_key", begin_key.c_str(),
"end_key", end_key.c_str());
}
} else { } else {
LOG_INFO("store_service_ del succ", KR(ret), "begin_key", begin_key.c_str(), LOG_INFO("store_service_ del succ", KR(ret), "begin_key", begin_key.c_str(),
"end_key", end_key.c_str()); "end_key", end_key.c_str());
...@@ -687,7 +706,9 @@ int ObLogResourceCollector::dec_ref_cnt_and_try_to_recycle_log_entry_task_(ObLog ...@@ -687,7 +706,9 @@ int ObLogResourceCollector::dec_ref_cnt_and_try_to_recycle_log_entry_task_(ObLog
if (need_revert_log_entry_task) { if (need_revert_log_entry_task) {
if (OB_FAIL(revert_log_entry_task_(log_entry_task))) { if (OB_FAIL(revert_log_entry_task_(log_entry_task))) {
LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task));
}
} else { } else {
log_entry_task = NULL; log_entry_task = NULL;
} }
...@@ -726,7 +747,7 @@ int ObLogResourceCollector::revert_dll_all_binlog_records_(PartTransTask *task) ...@@ -726,7 +747,7 @@ int ObLogResourceCollector::revert_dll_all_binlog_records_(PartTransTask *task)
// FIXME: the Binlog Record contains references to memory allocated by the PartTransTask. // FIXME: the Binlog Record contains references to memory allocated by the PartTransTask.
// They should be actively freed here, but as PartTransTask will release the memory uniformly when it is reclaimed // They should be actively freed here, but as PartTransTask will release the memory uniformly when it is reclaimed
// memory in the Binlog Record is not actively freed here // memory in the Binlog Record is not actively freed here
while (OB_SUCC(ret) && NULL != stmt_task) { while (OB_SUCC(ret) && OB_NOT_NULL(stmt_task) && ! RCThread::is_stoped()) {
DdlStmtTask *next = static_cast<DdlStmtTask *>(stmt_task->get_next()); DdlStmtTask *next = static_cast<DdlStmtTask *>(stmt_task->get_next());
ObLogBR *br = stmt_task->get_binlog_record(); ObLogBR *br = stmt_task->get_binlog_record();
stmt_task->set_binlog_record(NULL); stmt_task->set_binlog_record(NULL);
...@@ -737,6 +758,9 @@ int ObLogResourceCollector::revert_dll_all_binlog_records_(PartTransTask *task) ...@@ -737,6 +758,9 @@ int ObLogResourceCollector::revert_dll_all_binlog_records_(PartTransTask *task)
stmt_task = next; stmt_task = next;
} }
if (RCThread::is_stoped()) {
ret = OB_IN_STOP_STATE;
}
} }
return ret; return ret;
...@@ -798,7 +822,7 @@ int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t threa ...@@ -798,7 +822,7 @@ int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t threa
SortedRedoLogList &sorted_redo_list = task.get_sorted_redo_list(); SortedRedoLogList &sorted_redo_list = task.get_sorted_redo_list();
DmlRedoLogNode *dml_redo_node = static_cast<DmlRedoLogNode *>(sorted_redo_list.head_); DmlRedoLogNode *dml_redo_node = static_cast<DmlRedoLogNode *>(sorted_redo_list.head_);
while (OB_SUCC(ret) && NULL != dml_redo_node) { while (OB_SUCC(ret) && OB_NOT_NULL(dml_redo_node) && ! RCThread::is_stoped()) {
if (dml_redo_node->is_stored()) { if (dml_redo_node->is_stored()) {
const palf::LSN &store_log_lsn = dml_redo_node->get_start_log_lsn(); const palf::LSN &store_log_lsn = dml_redo_node->get_start_log_lsn();
ObLogStoreKey store_key; ObLogStoreKey store_key;
...@@ -813,6 +837,10 @@ int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t threa ...@@ -813,6 +837,10 @@ int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t threa
} else {} } else {}
} }
if (RCThread::is_stoped()) {
ret = OB_IN_STOP_STATE;
}
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
dml_redo_node = static_cast<DmlRedoLogNode *>(dml_redo_node->get_next()); dml_redo_node = static_cast<DmlRedoLogNode *>(dml_redo_node->get_next());
} }
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* OBCDC Storage based on rocksdb * OBCDC Storage based on rocksdb
*/ */
#define USING_LOG_PREFIX OBLOG #define USING_LOG_PREFIX OBLOG_STORAGER
#include "ob_log_rocksdb_store_service.h" #include "ob_log_rocksdb_store_service.h"
#include "ob_log_utils.h" #include "ob_log_utils.h"
...@@ -30,6 +30,7 @@ namespace libobcdc ...@@ -30,6 +30,7 @@ namespace libobcdc
RocksDbStoreService::RocksDbStoreService() RocksDbStoreService::RocksDbStoreService()
{ {
is_inited_ = false; is_inited_ = false;
is_stopped_ = true;
m_db_ = NULL; m_db_ = NULL;
m_db_path_.clear(); m_db_path_.clear();
} }
...@@ -73,6 +74,7 @@ int RocksDbStoreService::init(const std::string &path) ...@@ -73,6 +74,7 @@ int RocksDbStoreService::init(const std::string &path)
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
} else { } else {
_LOG_INFO("RocksDbStoreService init success, path:%s, total_threads=%d", m_db_path_.c_str(), total_threads); _LOG_INFO("RocksDbStoreService init success, path:%s, total_threads=%d", m_db_path_.c_str(), total_threads);
is_stopped_ = false;
is_inited_ = true; is_inited_ = true;
} }
...@@ -86,6 +88,10 @@ int RocksDbStoreService::close() ...@@ -86,6 +88,10 @@ int RocksDbStoreService::close()
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (NULL != m_db_) { if (NULL != m_db_) {
LOG_INFO("closing rocksdb ...");
mark_stop_flag();
usleep(100 * _MSEC_);
rocksdb::Status status = m_db_->Close(); rocksdb::Status status = m_db_->Close();
if (! status.ok()) { if (! status.ok()) {
...@@ -127,6 +133,7 @@ int RocksDbStoreService::init_dir_(const char *dir_path) ...@@ -127,6 +133,7 @@ int RocksDbStoreService::init_dir_(const char *dir_path)
void RocksDbStoreService::destroy() void RocksDbStoreService::destroy()
{ {
if (is_inited_) { if (is_inited_) {
LOG_INFO("rocksdb service destroy begin");
close(); close();
if (OB_NOT_NULL(m_db_)) { if (OB_NOT_NULL(m_db_)) {
...@@ -134,6 +141,7 @@ void RocksDbStoreService::destroy() ...@@ -134,6 +141,7 @@ void RocksDbStoreService::destroy()
m_db_ = NULL; m_db_ = NULL;
} }
is_inited_ = false; is_inited_ = false;
LOG_INFO("rocksdb service destroy end");
} }
} }
...@@ -143,13 +151,19 @@ int RocksDbStoreService::put(const std::string &key, const ObSlice &value) ...@@ -143,13 +151,19 @@ int RocksDbStoreService::put(const std::string &key, const ObSlice &value)
rocksdb::WriteOptions writer_options; rocksdb::WriteOptions writer_options;
writer_options.disableWAL = true; writer_options.disableWAL = true;
// find column family handle for cf if (is_stopped()) {
rocksdb::Status s = m_db_->Put(writer_options, rocksdb::Slice(key.c_str(), key.size()), ret = OB_IN_STOP_STATE;
rocksdb::Slice(value.buf_, value.buf_len_)); } else {
// find column family handle for cf
rocksdb::Status s = m_db_->Put(
writer_options,
rocksdb::Slice(key.c_str(), key.size()),
rocksdb::Slice(value.buf_, value.buf_len_));
if (!s.ok()) { if (!s.ok()) {
_LOG_ERROR("RocksDbStoreService put value into rocksdb failed, error %s", s.ToString().c_str()); _LOG_ERROR("RocksDbStoreService put value into rocksdb failed, error %s", s.ToString().c_str());
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
}
} }
return ret; return ret;
...@@ -165,6 +179,8 @@ int RocksDbStoreService::put(void *cf_handle, const std::string &key, const ObSl ...@@ -165,6 +179,8 @@ int RocksDbStoreService::put(void *cf_handle, const std::string &key, const ObSl
if (OB_ISNULL(column_family_handle)) { if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL"); LOG_ERROR("column_family_handle is NULL");
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else { } else {
rocksdb::Status s = m_db_->Put(writer_options, column_family_handle, rocksdb::Slice(key), rocksdb::Status s = m_db_->Put(writer_options, column_family_handle, rocksdb::Slice(key),
rocksdb::Slice(value.buf_, value.buf_len_)); rocksdb::Slice(value.buf_, value.buf_len_));
...@@ -193,15 +209,28 @@ int RocksDbStoreService::batch_write(void *cf_handle, ...@@ -193,15 +209,28 @@ int RocksDbStoreService::batch_write(void *cf_handle,
} else { } else {
rocksdb::WriteBatch batch; rocksdb::WriteBatch batch;
for (int64_t idx = 0; OB_SUCC(ret) && idx < keys.size(); ++idx) { for (int64_t idx = 0; OB_SUCC(ret) && !is_stopped() && idx < keys.size(); ++idx) {
batch.Put(column_family_handle, rocksdb::Slice(keys[idx]), rocksdb::Slice(values[idx].buf_, values[idx].buf_len_)); rocksdb::Status s = batch.Put(
column_family_handle,
rocksdb::Slice(keys[idx]),
rocksdb::Slice(values[idx].buf_, values[idx].buf_len_));
if (!s.ok()) {
ret = OB_IO_ERROR;
_LOG_ERROR("RocksDbStoreService build batch failed, error %s", s.ToString().c_str());
}
} }
rocksdb::Status s = m_db_->Write(writer_options, &batch); if (OB_SUCC(ret) && !is_stopped()) {
rocksdb::Status s = m_db_->Write(writer_options, &batch);
if (!s.ok()) { if (!s.ok()) {
_LOG_ERROR("RocksDbStoreService WriteBatch put value into rocksdb failed, error %s", s.ToString().c_str()); _LOG_ERROR("RocksDbStoreService WriteBatch put value into rocksdb failed, error %s", s.ToString().c_str());
ret = OB_IO_ERROR; ret = OB_IO_ERROR;
}
}
if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} }
} }
...@@ -211,13 +240,18 @@ int RocksDbStoreService::batch_write(void *cf_handle, ...@@ -211,13 +240,18 @@ int RocksDbStoreService::batch_write(void *cf_handle,
int RocksDbStoreService::get(const std::string &key, std::string &value) int RocksDbStoreService::get(const std::string &key, std::string &value)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
//rocksdb::PinnableSlice slice(&value);
rocksdb::Status s = m_db_->Get(rocksdb::ReadOptions(), key, &value);
if (!s.ok()) { if (is_stopped()) {
_LOG_ERROR("RocksDbStoreService get value from rocksdb failed, error %s, key:%s", ret = OB_IN_STOP_STATE;
s.ToString().c_str(), key.c_str()); } else {
ret = OB_ERR_UNEXPECTED; //rocksdb::PinnableSlice slice(&value);
rocksdb::Status s = m_db_->Get(rocksdb::ReadOptions(), key, &value);
if (!s.ok()) {
_LOG_ERROR("RocksDbStoreService get value from rocksdb failed, error %s, key:%s",
s.ToString().c_str(), key.c_str());
ret = OB_ERR_UNEXPECTED;
}
} }
return ret; return ret;
...@@ -231,6 +265,8 @@ int RocksDbStoreService::get(void *cf_handle, const std::string &key, std::strin ...@@ -231,6 +265,8 @@ int RocksDbStoreService::get(void *cf_handle, const std::string &key, std::strin
if (OB_ISNULL(column_family_handle)) { if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL"); LOG_ERROR("column_family_handle is NULL");
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else { } else {
rocksdb::Status s = m_db_->Get(rocksdb::ReadOptions(), column_family_handle, key, &value); rocksdb::Status s = m_db_->Get(rocksdb::ReadOptions(), column_family_handle, key, &value);
...@@ -247,11 +283,16 @@ int RocksDbStoreService::get(void *cf_handle, const std::string &key, std::strin ...@@ -247,11 +283,16 @@ int RocksDbStoreService::get(void *cf_handle, const std::string &key, std::strin
int RocksDbStoreService::del(const std::string &key) int RocksDbStoreService::del(const std::string &key)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
// find column family handle for cf
rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), key); if (is_stopped()) {
if (!s.ok()) { ret = OB_IN_STOP_STATE;
LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str()); } else {
ret = OB_ERR_UNEXPECTED; // find column family handle for cf
rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), key);
if (!s.ok()) {
ret = OB_IO_ERROR;
_LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str());
}
} }
return ret; return ret;
...@@ -265,8 +306,10 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key) ...@@ -265,8 +306,10 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key)
writer_options.disableWAL = true; writer_options.disableWAL = true;
if (OB_ISNULL(column_family_handle)) { if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL");
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_ERROR("column_family_handle is NULL", KR(ret));
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else { } else {
rocksdb::Status s = m_db_->Delete(writer_options, column_family_handle, key); rocksdb::Status s = m_db_->Delete(writer_options, column_family_handle, key);
...@@ -287,6 +330,8 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key ...@@ -287,6 +330,8 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key
if (OB_ISNULL(column_family_handle)) { if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL"); LOG_ERROR("column_family_handle is NULL");
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else { } else {
rocksdb::Status s = m_db_->DeleteRange(rocksdb::WriteOptions(), column_family_handle, rocksdb::Status s = m_db_->DeleteRange(rocksdb::WriteOptions(), column_family_handle,
begin_key, end_key); begin_key, end_key);
...@@ -323,16 +368,20 @@ int RocksDbStoreService::create_column_family(const std::string& column_family_n ...@@ -323,16 +368,20 @@ int RocksDbStoreService::create_column_family(const std::string& column_family_n
// Column Family's default memtable size is 64M, when the maximum limit is exceeded, memtable -> immutable memtable, increase write_buffer_size, can reduce write amplification // Column Family's default memtable size is 64M, when the maximum limit is exceeded, memtable -> immutable memtable, increase write_buffer_size, can reduce write amplification
cf_options.write_buffer_size = rocksdb_write_buffer_size << 20; cf_options.write_buffer_size = rocksdb_write_buffer_size << 20;
rocksdb::Status status = m_db_->CreateColumnFamily(cf_options, column_family_name, &column_family_handle); if (is_stopped()) {
ret = OB_IN_STOP_STATE;
if (! status.ok()) {
_LOG_ERROR("rocksdb CreateColumnFamily [%s] failed, error %s", column_family_name.c_str(), status.ToString().c_str());
ret = OB_ERR_UNEXPECTED;
} else { } else {
cf_handle = reinterpret_cast<void *>(column_family_handle); rocksdb::Status status = m_db_->CreateColumnFamily(cf_options, column_family_name, &column_family_handle);
if (! status.ok()) {
_LOG_ERROR("rocksdb CreateColumnFamily [%s] failed, error %s", column_family_name.c_str(), status.ToString().c_str());
ret = OB_ERR_UNEXPECTED;
} else {
cf_handle = reinterpret_cast<void *>(column_family_handle);
LOG_INFO("rocksdb CreateColumnFamily succ", "column_family_name", column_family_name.c_str(), LOG_INFO("rocksdb CreateColumnFamily succ", "column_family_name", column_family_name.c_str(),
K(column_family_handle), K(cf_handle), K(rocksdb_write_buffer_size)); K(column_family_handle), K(cf_handle), K(rocksdb_write_buffer_size));
}
} }
return ret; return ret;
...@@ -346,6 +395,8 @@ int RocksDbStoreService::drop_column_family(void *cf_handle) ...@@ -346,6 +395,8 @@ int RocksDbStoreService::drop_column_family(void *cf_handle)
if (OB_ISNULL(column_family_handle)) { if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL"); LOG_ERROR("column_family_handle is NULL");
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else { } else {
rocksdb::Status status = m_db_->DropColumnFamily(column_family_handle); rocksdb::Status status = m_db_->DropColumnFamily(column_family_handle);
...@@ -368,6 +419,8 @@ int RocksDbStoreService::destory_column_family(void *cf_handle) ...@@ -368,6 +419,8 @@ int RocksDbStoreService::destory_column_family(void *cf_handle)
if (OB_ISNULL(column_family_handle)) { if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL"); LOG_ERROR("column_family_handle is NULL");
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else { } else {
rocksdb::Status status = m_db_->DestroyColumnFamilyHandle(column_family_handle); rocksdb::Status status = m_db_->DestroyColumnFamilyHandle(column_family_handle);
...@@ -391,7 +444,7 @@ void RocksDbStoreService::get_mem_usage(const std::vector<uint64_t> ids, ...@@ -391,7 +444,7 @@ void RocksDbStoreService::get_mem_usage(const std::vector<uint64_t> ids,
int64_t total_table_readers_usage = 0; int64_t total_table_readers_usage = 0;
int64_t total_block_cache_pinned_usage = 0; int64_t total_block_cache_pinned_usage = 0;
for (int64_t idx = 0; OB_SUCC(ret) && idx < cf_handles.size(); ++idx) { for (int64_t idx = 0; OB_SUCC(ret) && !is_stopped() && idx < cf_handles.size(); ++idx) {
rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handles[idx]); rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handles[idx]);
if (OB_ISNULL(column_family_handle)) { if (OB_ISNULL(column_family_handle)) {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define OCEANBASE_LIBOBCDC_OB_LOG_ROCKSDB_IMPL_H_ #define OCEANBASE_LIBOBCDC_OB_LOG_ROCKSDB_IMPL_H_
#include "ob_log_store_service.h" #include "ob_log_store_service.h"
#include "lib/atomic/ob_atomic.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
...@@ -54,15 +55,18 @@ public: ...@@ -54,15 +55,18 @@ public:
virtual int drop_column_family(void *cf_handle); virtual int drop_column_family(void *cf_handle);
virtual int destory_column_family(void *cf_handle); virtual int destory_column_family(void *cf_handle);
virtual int close(); virtual void mark_stop_flag() override { ATOMIC_SET(&is_stopped_, true); }
virtual int close() override;
virtual void get_mem_usage(const std::vector<uint64_t> ids, virtual void get_mem_usage(const std::vector<uint64_t> ids,
const std::vector<void *> cf_handles); const std::vector<void *> cf_handles);
OB_INLINE bool is_stopped() const { return ATOMIC_LOAD(&is_stopped_); }
private: private:
int init_dir_(const char *dir_path); int init_dir_(const char *dir_path);
private: private:
bool is_inited_; bool is_inited_;
bool is_stopped_;
rocksdb::DB *m_db_; rocksdb::DB *m_db_;
rocksdb::Options m_options_; rocksdb::Options m_options_;
std::string m_db_path_; std::string m_db_path_;
......
...@@ -83,6 +83,7 @@ int ObLogStorager::init(const int64_t thread_num, ...@@ -83,6 +83,7 @@ int ObLogStorager::init(const int64_t thread_num,
void ObLogStorager::destroy() void ObLogStorager::destroy()
{ {
if (inited_) { if (inited_) {
LOG_INFO("store_service destroy begin");
StoragerThread::destroy(); StoragerThread::destroy();
inited_ = false; inited_ = false;
...@@ -94,6 +95,7 @@ void ObLogStorager::destroy() ...@@ -94,6 +95,7 @@ void ObLogStorager::destroy()
store_service_stat_.reset(); store_service_stat_.reset();
store_service_ = NULL; store_service_ = NULL;
err_handler_ = NULL; err_handler_ = NULL;
LOG_INFO("store_service destroy end");
} }
} }
...@@ -272,7 +274,9 @@ int ObLogStorager::handle_task_(IObLogBatchBufTask &batch_task, ...@@ -272,7 +274,9 @@ int ObLogStorager::handle_task_(IObLogBatchBufTask &batch_task,
if (OB_FAIL(write_store_service_(key.c_str(), batch_buf + start_pos, data_len, if (OB_FAIL(write_store_service_(key.c_str(), batch_buf + start_pos, data_len,
column_family_handle, thread_index))) { column_family_handle, thread_index))) {
LOG_ERROR("write_store_service_ fail", KR(ret), K(store_task)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("write_store_service_ fail", KR(ret), K(store_task));
}
} else if (OB_FAIL(store_task->st_after_consume(OB_SUCCESS))) { } else if (OB_FAIL(store_task->st_after_consume(OB_SUCCESS))) {
LOG_ERROR("st_after_consume fail", KR(ret)); LOG_ERROR("st_after_consume fail", KR(ret));
} else { } else {
...@@ -335,7 +339,9 @@ int ObLogStorager::write_store_service_(const char *key, ...@@ -335,7 +339,9 @@ int ObLogStorager::write_store_service_(const char *key,
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
} else { } else {
if (OB_FAIL(store_service_->put(column_family_handle, key, ObSlice(log_str, log_str_len)))) { if (OB_FAIL(store_service_->put(column_family_handle, key, ObSlice(log_str, log_str_len)))) {
LOG_ERROR("store_service_ put fail", KR(ret), K(thread_index), K(key), K(log_str_len)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("store_service_ put fail", KR(ret), K(thread_index), K(key), K(log_str_len));
}
} else { } else {
// Statistics rps // Statistics rps
rps_stat_.do_rps_stat(1); rps_stat_.do_rps_stat(1);
...@@ -393,7 +399,9 @@ int ObLogStorager::read_store_service_(const std::string &key) ...@@ -393,7 +399,9 @@ int ObLogStorager::read_store_service_(const std::string &key)
std::string br_string_res; std::string br_string_res;
if (OB_FAIL(store_service_->get(key, br_string_res))) { if (OB_FAIL(store_service_->get(key, br_string_res))) {
LOG_ERROR("store_service_ get fail", KR(ret), K(key.c_str()), K(br_string_res.length())); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("store_service_ get fail", KR(ret), K(key.c_str()), K(br_string_res.length()));
}
} else { } else {
LOG_DEBUG("store_service_ get succ", KR(ret), K(key.c_str()), K(br_string_res.length()), K(br_string_res.c_str())); LOG_DEBUG("store_service_ get succ", KR(ret), K(key.c_str()), K(br_string_res.length()), K(br_string_res.c_str()));
} }
......
...@@ -47,7 +47,8 @@ class IObStoreService ...@@ -47,7 +47,8 @@ class IObStoreService
public: public:
virtual ~IObStoreService() {} virtual ~IObStoreService() {}
virtual int init(const std::string &path) = 0; virtual int init(const std::string &path) = 0;
virtual int close() = 0; virtual void mark_stop_flag() = 0; // stop store service: won't handle more store task.
virtual int close() = 0; // close store service: including actual storager
public: public:
virtual int put(const std::string &key, const ObSlice &value) = 0; virtual int put(const std::string &key, const ObSlice &value) = 0;
......
...@@ -330,10 +330,12 @@ int ObLogTenantMgr::do_add_tenant_(const uint64_t tenant_id, ...@@ -330,10 +330,12 @@ int ObLogTenantMgr::do_add_tenant_(const uint64_t tenant_id,
} else if (OB_ISNULL(tenant)) { } else if (OB_ISNULL(tenant)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant is NULL", KR(ret), K(tenant_id), K(tenant)); LOG_ERROR("tenant is NULL", KR(ret), K(tenant_id), K(tenant));
} } else if (OB_FAIL(store_service->create_column_family(
else if (OB_FAIL(store_service->create_column_family(std::to_string(tenant_id) + ":" + std::string(tenant_name), std::to_string(tenant_id) + ":" + std::string(tenant_name),
column_family_handle))) { column_family_handle))) {
LOG_ERROR("create_column_family fail", KR(ret), K(tenant_id), K(tenant_name), K(column_family_handle)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("create_column_family fail", KR(ret), K(tenant_id), K(tenant_name), K(column_family_handle));
}
} }
// init tenant // init tenant
else if (OB_FAIL(tenant->init(tenant_id, tenant_name, tenant_start_serve_ts_ns, start_seq, else if (OB_FAIL(tenant->init(tenant_id, tenant_name, tenant_start_serve_ts_ns, start_seq,
...@@ -992,7 +994,9 @@ int ObLogTenantMgr::remove_tenant_(const uint64_t tenant_id, ObLogTenant *tenant ...@@ -992,7 +994,9 @@ int ObLogTenantMgr::remove_tenant_(const uint64_t tenant_id, ObLogTenant *tenant
ret= OB_ERR_UNEXPECTED; ret= OB_ERR_UNEXPECTED;
LOG_ERROR("cf is NULL", KR(ret), K(tid), KPC(tenant)); LOG_ERROR("cf is NULL", KR(ret), K(tid), KPC(tenant));
} else if (OB_FAIL(store_service->drop_column_family(cf))) { } else if (OB_FAIL(store_service->drop_column_family(cf))) {
LOG_ERROR("store_service drop_column_family fail", KR(ret), K(tid), KPC(tenant)); if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("store_service drop_column_family fail", KR(ret), K(tid), KPC(tenant));
}
} else if (OB_FAIL(tenant_hash_map_.del(tid))) { } else if (OB_FAIL(tenant_hash_map_.del(tid))) {
LOG_ERROR("tenant_hash_map_ del failed", KR(ret), K(tenant_id)); LOG_ERROR("tenant_hash_map_ del failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(tenant_server_provider->del_tenant(tenant_id))) { } else if (OB_FAIL(tenant_server_provider->del_tenant(tenant_id))) {
......
...@@ -125,7 +125,7 @@ int ObLogTransMsgSorter::start() ...@@ -125,7 +125,7 @@ int ObLogTransMsgSorter::start()
LOG_INFO("begin start TransMsgSorter"); LOG_INFO("begin start TransMsgSorter");
if (OB_FAIL(TransMsgSorterThread::init(thread_num_, task_limit_, "oblog-br-sorter"))) { if (OB_FAIL(TransMsgSorterThread::init(thread_num_, task_limit_, "obcdc-br-sorter"))) {
LOG_ERROR("failed to init sorter thread pool", KR(ret), K_(thread_num), K_(task_limit)); LOG_ERROR("failed to init sorter thread pool", KR(ret), K_(thread_num), K_(task_limit));
} else { } else {
inited_ = true; inited_ = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册