diff --git a/src/logservice/libobcdc/src/ob_log_committer.cpp b/src/logservice/libobcdc/src/ob_log_committer.cpp index d514eaa21ffae2f3e9bad4fd10520b5ccc602ed2..c6e89b607fbffc8c9f2bb1fa1e1f3073f84c0170 100644 --- a/src/logservice/libobcdc/src/ob_log_committer.cpp +++ b/src/logservice/libobcdc/src/ob_log_committer.cpp @@ -1251,7 +1251,7 @@ int ObLogCommitter::commit_binlog_record_list_(TransCtx &trans_ctx, if (OB_EMPTY_RESULT == ret) { if (0 < trans_ctx.get_total_br_count()) { // unexpected - LOG_ERROR("trans has no valid br to output, skip this trans", K(trans_ctx)); + LOG_ERROR("unexpected skiping trans with valid br", KR(ret), K(trans_ctx)); } else { LOG_INFO("trans has no valid br to output, skip this trans", KR(ret), K(trans_ctx)); ret = OB_SUCCESS; diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index 202db06624bf4b456341ca32b641b056f97a7b5d..d13788912a3a940b88fe214091415b35b31f65e1 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -1396,6 +1396,7 @@ void ObLogInstance::mark_stop_flag(const char *stop_reason) lob_data_merger_->mark_stop_flag(); storager_->mark_stop_flag(); reader_->mark_stop_flag(); + store_service_->mark_stop_flag(); trans_msg_sorter_->mark_stop_flag(); committer_->mark_stop_flag(); resource_collector_->mark_stop_flag(); diff --git a/src/logservice/libobcdc/src/ob_log_reader.cpp b/src/logservice/libobcdc/src/ob_log_reader.cpp index 13b37d853f1d655586b3330369a6fad5f0dc00cf..cc0b8f24f5a94cbe5032d0b4919e6401261e5c63 100644 --- a/src/logservice/libobcdc/src/ob_log_reader.cpp +++ b/src/logservice/libobcdc/src/ob_log_reader.cpp @@ -85,6 +85,7 @@ int ObLogReader::init(const int64_t thread_num, void ObLogReader::destroy() { if (inited_) { + LOG_INFO("ObLogReader destroy begin"); ReaderThread::destroy(); inited_ = false; @@ -96,6 +97,7 @@ void ObLogReader::destroy() store_service_stat_.reset(); store_service_ = NULL; err_handler_ = NULL; + LOG_INFO("ObLogReader destroy end"); } } @@ -222,8 +224,12 @@ int ObLogReader::handle_task_(ObLogEntryTask &log_entry_task, if (OB_FAIL(ret)) { } else if (OB_FAIL(log_entry_task.get_storage_key(key))) { LOG_ERROR("get_storage_key fail", KR(ret), "key", key.c_str(), K(log_entry_task)); + } else if (stop_flag) { + ret = OB_IN_STOP_STATE; } else if (OB_FAIL(read_store_service_(log_entry_task, column_family_handle, key, value))) { - LOG_ERROR("read_store_service_ fail", KR(ret), K(log_entry_task)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("read_store_service_ fail", KR(ret), K(log_entry_task)); + } } else { store_service_stat_.do_data_stat(value.length()); } @@ -258,6 +264,8 @@ int ObLogReader::read_store_service_(ObLogEntryTask &log_entry_task, } else if (OB_ISNULL(store_service_)) { LOG_ERROR("store_service_ is NULL"); ret = OB_ERR_UNEXPECTED; + } else if (ReaderThread::is_stoped()) { + ret = OB_IN_STOP_STATE; } else if (OB_FAIL(store_service_->get(column_family_handle, key, value))) { LOG_ERROR("StoreService get fail", KR(ret), K(key.c_str()), "value_len", value.length(), K(log_entry_task)); } else { diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp index 10dfe1b6840b069b4f2f093c46e35049cf655c5f..2d5b17ccf3a591dc312b44b0a6de38e4f4124198 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp @@ -112,6 +112,7 @@ int ObLogResourceCollector::init(const int64_t thread_num, void ObLogResourceCollector::destroy() { + LOG_INFO("resource_collector destroy begin"); RCThread::destroy(); inited_ = false; br_pool_ = NULL; @@ -126,6 +127,7 @@ void ObLogResourceCollector::destroy() dml_part_trans_task_count_ = 0; hb_part_trans_task_count_ = 0; other_part_trans_task_count_ = 0; + LOG_INFO("resource_collector destroy end"); } int ObLogResourceCollector::start() @@ -154,6 +156,7 @@ void ObLogResourceCollector::mark_stop_flag() { if (inited_) { RCThread::mark_stop_flag(); + LOG_INFO("resource_collector mark_stop_flag"); } } @@ -225,7 +228,9 @@ int ObLogResourceCollector::revert_log_entry_task(ObLogEntryTask *log_entry_task ret = OB_ERR_UNEXPECTED; LOG_ERROR("host of log_entry_task is invalid, failed cast to PartTransTask" ,KR(ret), KPC(log_entry_task)); } else if (OB_FAIL(revert_log_entry_task_(log_entry_task))) { - LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task)); + } } else if (OB_FAIL(dec_ref_cnt_and_try_to_revert_task_(part_trans_task))) { LOG_ERROR("dec_ref_cnt_and_try_to_revert_task_ fail", KR(ret), KPC(part_trans_task)); } else { @@ -304,8 +309,10 @@ int ObLogResourceCollector::revert_log_entry_task_(ObLogEntryTask *log_entry_tas if (OB_FAIL(log_entry_task->get_storage_key(key))) { LOG_ERROR("get_storage_key fail", KR(ret), "key", key.c_str()); } else if (OB_FAIL(del_store_service_data_(tenant_id, key))) { - LOG_ERROR("del_store_service_data_ fail", KR(ret), KPC(log_entry_task)); - } else {} + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("del_store_service_data_ fail", KR(ret), KPC(log_entry_task)); + } + } } } @@ -330,9 +337,11 @@ int ObLogResourceCollector::del_store_service_data_(const uint64_t tenant_id, column_family_handle = tenant->get_cf(); } - if (OB_SUCC(ret)) { + if (OB_SUCC(ret) && ! RCThread::is_stoped()) { if (OB_FAIL(store_service_->del(column_family_handle, key))) { - LOG_ERROR("store_service_ del fail", KR(ret), K(tenant_id), K(key.c_str())); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("store_service_ del fail", KR(ret), K(tenant_id), K(key.c_str())); + } } else { LOG_DEBUG("store_service_ del succ", K(tenant_id), K(key.c_str())); } @@ -355,7 +364,7 @@ int ObLogResourceCollector::revert_participants_(const int64_t thread_index, } else { PartTransTask *task = participants; - while (OB_SUCCESS == ret && NULL != task) { + while (OB_SUCC(ret) && OB_NOT_NULL(task) && ! RCThread::is_stoped()) { PartTransTask *next = task->next_task(); task->set_next_task(NULL); @@ -368,6 +377,10 @@ int ObLogResourceCollector::revert_participants_(const int64_t thread_index, } } + if (RCThread::is_stoped()) { + ret = OB_IN_STOP_STATE; + } + task = NULL; } @@ -434,8 +447,10 @@ int ObLogResourceCollector::recycle_part_trans_task_(const int64_t thread_index, } else { if (task->is_ddl_trans()) { if (OB_FAIL(revert_dll_all_binlog_records_(task))) { - // Reclaim all Binlog Records within a DDL partitioned transaction - LOG_ERROR("revert_dll_all_binlog_records_ fail", KR(ret), K(*task)); + if (OB_IN_STOP_STATE != ret) { + // Reclaim all Binlog Records within a DDL partitioned transaction + LOG_ERROR("revert_dll_all_binlog_records_ fail", KR(ret), K(*task)); + } } } LOG_DEBUG("[ResourceCollector] recycle part trans task", K(thread_index), K(*task)); @@ -476,7 +491,9 @@ int ObLogResourceCollector::handle(void *data, if (! task->is_served()) { if (OB_FAIL(revert_unserved_part_trans_task_(thread_index, *task))) { - LOG_ERROR("revert_unserved_part_trans_task_ fail", KR(ret), K(thread_index), KPC(task)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("revert_unserved_part_trans_task_ fail", KR(ret), K(thread_index), KPC(task)); + } } // DML/DDL } else if (task->is_ddl_trans() || task->is_dml_trans()) { @@ -509,7 +526,7 @@ int ObLogResourceCollector::handle(void *data, LOG_ERROR("remove trans_ctx fail", KR(ret), K(tenant_id), K(trans_id), K(trans_ctx)); } // recycle all participants - else if (NULL != participants && OB_FAIL(revert_participants_(thread_index, participants))) { + else if (OB_NOT_NULL(participants) && OB_FAIL(revert_participants_(thread_index, participants))) { if (OB_IN_STOP_STATE != ret) { LOG_ERROR("revert_participants_ fail", KR(ret), K(thread_index), K(participants), K(trans_id)); } @@ -657,8 +674,10 @@ int ObLogResourceCollector::del_trans_(const uint64_t tenant_id, if (OB_SUCC(ret)) { if (OB_FAIL(store_service_->del_range(column_family_handle, begin_key, end_key))) { - LOG_ERROR("store_service_ del fail", KR(ret), "begin_key", begin_key.c_str(), - "end_key", end_key.c_str()); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("store_service_ del fail", KR(ret), "begin_key", begin_key.c_str(), + "end_key", end_key.c_str()); + } } else { LOG_INFO("store_service_ del succ", KR(ret), "begin_key", begin_key.c_str(), "end_key", end_key.c_str()); @@ -687,7 +706,9 @@ int ObLogResourceCollector::dec_ref_cnt_and_try_to_recycle_log_entry_task_(ObLog if (need_revert_log_entry_task) { if (OB_FAIL(revert_log_entry_task_(log_entry_task))) { - LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("revert_log_entry_task_ fail", KR(ret), KPC(log_entry_task)); + } } else { log_entry_task = NULL; } @@ -726,7 +747,7 @@ int ObLogResourceCollector::revert_dll_all_binlog_records_(PartTransTask *task) // FIXME: the Binlog Record contains references to memory allocated by the PartTransTask. // They should be actively freed here, but as PartTransTask will release the memory uniformly when it is reclaimed // memory in the Binlog Record is not actively freed here - while (OB_SUCC(ret) && NULL != stmt_task) { + while (OB_SUCC(ret) && OB_NOT_NULL(stmt_task) && ! RCThread::is_stoped()) { DdlStmtTask *next = static_cast(stmt_task->get_next()); ObLogBR *br = stmt_task->get_binlog_record(); stmt_task->set_binlog_record(NULL); @@ -737,6 +758,9 @@ int ObLogResourceCollector::revert_dll_all_binlog_records_(PartTransTask *task) stmt_task = next; } + if (RCThread::is_stoped()) { + ret = OB_IN_STOP_STATE; + } } return ret; @@ -798,7 +822,7 @@ int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t threa SortedRedoLogList &sorted_redo_list = task.get_sorted_redo_list(); DmlRedoLogNode *dml_redo_node = static_cast(sorted_redo_list.head_); - while (OB_SUCC(ret) && NULL != dml_redo_node) { + while (OB_SUCC(ret) && OB_NOT_NULL(dml_redo_node) && ! RCThread::is_stoped()) { if (dml_redo_node->is_stored()) { const palf::LSN &store_log_lsn = dml_redo_node->get_start_log_lsn(); ObLogStoreKey store_key; @@ -813,6 +837,10 @@ int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t threa } else {} } + if (RCThread::is_stoped()) { + ret = OB_IN_STOP_STATE; + } + if (OB_SUCC(ret)) { dml_redo_node = static_cast(dml_redo_node->get_next()); } diff --git a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp index 60a159b0f408d1afb68ccb6f62b8213e98f2ccf1..e5604df701a51cf726d125cf74d74b39e6f7e782 100644 --- a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp +++ b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp @@ -12,7 +12,7 @@ * OBCDC Storage based on rocksdb */ -#define USING_LOG_PREFIX OBLOG +#define USING_LOG_PREFIX OBLOG_STORAGER #include "ob_log_rocksdb_store_service.h" #include "ob_log_utils.h" @@ -30,6 +30,7 @@ namespace libobcdc RocksDbStoreService::RocksDbStoreService() { is_inited_ = false; + is_stopped_ = true; m_db_ = NULL; m_db_path_.clear(); } @@ -73,6 +74,7 @@ int RocksDbStoreService::init(const std::string &path) ret = OB_ERR_UNEXPECTED; } else { _LOG_INFO("RocksDbStoreService init success, path:%s, total_threads=%d", m_db_path_.c_str(), total_threads); + is_stopped_ = false; is_inited_ = true; } @@ -86,6 +88,10 @@ int RocksDbStoreService::close() int ret = OB_SUCCESS; if (NULL != m_db_) { + LOG_INFO("closing rocksdb ..."); + mark_stop_flag(); + usleep(100 * _MSEC_); + rocksdb::Status status = m_db_->Close(); if (! status.ok()) { @@ -127,6 +133,7 @@ int RocksDbStoreService::init_dir_(const char *dir_path) void RocksDbStoreService::destroy() { if (is_inited_) { + LOG_INFO("rocksdb service destroy begin"); close(); if (OB_NOT_NULL(m_db_)) { @@ -134,6 +141,7 @@ void RocksDbStoreService::destroy() m_db_ = NULL; } is_inited_ = false; + LOG_INFO("rocksdb service destroy end"); } } @@ -143,13 +151,19 @@ int RocksDbStoreService::put(const std::string &key, const ObSlice &value) rocksdb::WriteOptions writer_options; writer_options.disableWAL = true; - // find column family handle for cf - rocksdb::Status s = m_db_->Put(writer_options, rocksdb::Slice(key.c_str(), key.size()), - rocksdb::Slice(value.buf_, value.buf_len_)); + if (is_stopped()) { + ret = OB_IN_STOP_STATE; + } else { + // find column family handle for cf + rocksdb::Status s = m_db_->Put( + writer_options, + rocksdb::Slice(key.c_str(), key.size()), + rocksdb::Slice(value.buf_, value.buf_len_)); - if (!s.ok()) { - _LOG_ERROR("RocksDbStoreService put value into rocksdb failed, error %s", s.ToString().c_str()); - ret = OB_ERR_UNEXPECTED; + if (!s.ok()) { + _LOG_ERROR("RocksDbStoreService put value into rocksdb failed, error %s", s.ToString().c_str()); + ret = OB_ERR_UNEXPECTED; + } } return ret; @@ -165,6 +179,8 @@ int RocksDbStoreService::put(void *cf_handle, const std::string &key, const ObSl if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_ERR_UNEXPECTED; + } else if (is_stopped()) { + ret = OB_IN_STOP_STATE; } else { rocksdb::Status s = m_db_->Put(writer_options, column_family_handle, rocksdb::Slice(key), rocksdb::Slice(value.buf_, value.buf_len_)); @@ -193,15 +209,28 @@ int RocksDbStoreService::batch_write(void *cf_handle, } else { rocksdb::WriteBatch batch; - for (int64_t idx = 0; OB_SUCC(ret) && idx < keys.size(); ++idx) { - batch.Put(column_family_handle, rocksdb::Slice(keys[idx]), rocksdb::Slice(values[idx].buf_, values[idx].buf_len_)); + for (int64_t idx = 0; OB_SUCC(ret) && !is_stopped() && idx < keys.size(); ++idx) { + rocksdb::Status s = batch.Put( + column_family_handle, + rocksdb::Slice(keys[idx]), + rocksdb::Slice(values[idx].buf_, values[idx].buf_len_)); + if (!s.ok()) { + ret = OB_IO_ERROR; + _LOG_ERROR("RocksDbStoreService build batch failed, error %s", s.ToString().c_str()); + } } - rocksdb::Status s = m_db_->Write(writer_options, &batch); + if (OB_SUCC(ret) && !is_stopped()) { + rocksdb::Status s = m_db_->Write(writer_options, &batch); - if (!s.ok()) { - _LOG_ERROR("RocksDbStoreService WriteBatch put value into rocksdb failed, error %s", s.ToString().c_str()); - ret = OB_IO_ERROR; + if (!s.ok()) { + _LOG_ERROR("RocksDbStoreService WriteBatch put value into rocksdb failed, error %s", s.ToString().c_str()); + ret = OB_IO_ERROR; + } + } + + if (is_stopped()) { + ret = OB_IN_STOP_STATE; } } @@ -211,13 +240,18 @@ int RocksDbStoreService::batch_write(void *cf_handle, int RocksDbStoreService::get(const std::string &key, std::string &value) { int ret = OB_SUCCESS; - //rocksdb::PinnableSlice slice(&value); - rocksdb::Status s = m_db_->Get(rocksdb::ReadOptions(), key, &value); - if (!s.ok()) { - _LOG_ERROR("RocksDbStoreService get value from rocksdb failed, error %s, key:%s", - s.ToString().c_str(), key.c_str()); - ret = OB_ERR_UNEXPECTED; + if (is_stopped()) { + ret = OB_IN_STOP_STATE; + } else { + //rocksdb::PinnableSlice slice(&value); + rocksdb::Status s = m_db_->Get(rocksdb::ReadOptions(), key, &value); + + if (!s.ok()) { + _LOG_ERROR("RocksDbStoreService get value from rocksdb failed, error %s, key:%s", + s.ToString().c_str(), key.c_str()); + ret = OB_ERR_UNEXPECTED; + } } return ret; @@ -231,6 +265,8 @@ int RocksDbStoreService::get(void *cf_handle, const std::string &key, std::strin if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_ERR_UNEXPECTED; + } else if (is_stopped()) { + ret = OB_IN_STOP_STATE; } else { rocksdb::Status s = m_db_->Get(rocksdb::ReadOptions(), column_family_handle, key, &value); @@ -247,11 +283,16 @@ int RocksDbStoreService::get(void *cf_handle, const std::string &key, std::strin int RocksDbStoreService::del(const std::string &key) { int ret = OB_SUCCESS; - // find column family handle for cf - rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), key); - if (!s.ok()) { - LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str()); - ret = OB_ERR_UNEXPECTED; + + if (is_stopped()) { + ret = OB_IN_STOP_STATE; + } else { + // find column family handle for cf + rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), key); + if (!s.ok()) { + ret = OB_IO_ERROR; + _LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str()); + } } return ret; @@ -265,8 +306,10 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key) writer_options.disableWAL = true; if (OB_ISNULL(column_family_handle)) { - LOG_ERROR("column_family_handle is NULL"); ret = OB_ERR_UNEXPECTED; + LOG_ERROR("column_family_handle is NULL", KR(ret)); + } else if (is_stopped()) { + ret = OB_IN_STOP_STATE; } else { rocksdb::Status s = m_db_->Delete(writer_options, column_family_handle, key); @@ -287,6 +330,8 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_ERR_UNEXPECTED; + } else if (is_stopped()) { + ret = OB_IN_STOP_STATE; } else { rocksdb::Status s = m_db_->DeleteRange(rocksdb::WriteOptions(), column_family_handle, begin_key, end_key); @@ -323,16 +368,20 @@ int RocksDbStoreService::create_column_family(const std::string& column_family_n // Column Family's default memtable size is 64M, when the maximum limit is exceeded, memtable -> immutable memtable, increase write_buffer_size, can reduce write amplification cf_options.write_buffer_size = rocksdb_write_buffer_size << 20; - rocksdb::Status status = m_db_->CreateColumnFamily(cf_options, column_family_name, &column_family_handle); - - if (! status.ok()) { - _LOG_ERROR("rocksdb CreateColumnFamily [%s] failed, error %s", column_family_name.c_str(), status.ToString().c_str()); - ret = OB_ERR_UNEXPECTED; + if (is_stopped()) { + ret = OB_IN_STOP_STATE; } else { - cf_handle = reinterpret_cast(column_family_handle); + rocksdb::Status status = m_db_->CreateColumnFamily(cf_options, column_family_name, &column_family_handle); + + if (! status.ok()) { + _LOG_ERROR("rocksdb CreateColumnFamily [%s] failed, error %s", column_family_name.c_str(), status.ToString().c_str()); + ret = OB_ERR_UNEXPECTED; + } else { + cf_handle = reinterpret_cast(column_family_handle); - LOG_INFO("rocksdb CreateColumnFamily succ", "column_family_name", column_family_name.c_str(), - K(column_family_handle), K(cf_handle), K(rocksdb_write_buffer_size)); + LOG_INFO("rocksdb CreateColumnFamily succ", "column_family_name", column_family_name.c_str(), + K(column_family_handle), K(cf_handle), K(rocksdb_write_buffer_size)); + } } return ret; @@ -346,6 +395,8 @@ int RocksDbStoreService::drop_column_family(void *cf_handle) if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_INVALID_ARGUMENT; + } else if (is_stopped()) { + ret = OB_IN_STOP_STATE; } else { rocksdb::Status status = m_db_->DropColumnFamily(column_family_handle); @@ -368,6 +419,8 @@ int RocksDbStoreService::destory_column_family(void *cf_handle) if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_INVALID_ARGUMENT; + } else if (is_stopped()) { + ret = OB_IN_STOP_STATE; } else { rocksdb::Status status = m_db_->DestroyColumnFamilyHandle(column_family_handle); @@ -391,7 +444,7 @@ void RocksDbStoreService::get_mem_usage(const std::vector ids, int64_t total_table_readers_usage = 0; int64_t total_block_cache_pinned_usage = 0; - for (int64_t idx = 0; OB_SUCC(ret) && idx < cf_handles.size(); ++idx) { + for (int64_t idx = 0; OB_SUCC(ret) && !is_stopped() && idx < cf_handles.size(); ++idx) { rocksdb::ColumnFamilyHandle *column_family_handle = static_cast(cf_handles[idx]); if (OB_ISNULL(column_family_handle)) { diff --git a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h index c6623dabf42f3990f3109e5475a1e0d25d7f9f96..24cea25790fc2474cd6b8e48e18a47d056b4f847 100644 --- a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h +++ b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h @@ -16,6 +16,7 @@ #define OCEANBASE_LIBOBCDC_OB_LOG_ROCKSDB_IMPL_H_ #include "ob_log_store_service.h" +#include "lib/atomic/ob_atomic.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" @@ -54,15 +55,18 @@ public: virtual int drop_column_family(void *cf_handle); virtual int destory_column_family(void *cf_handle); - virtual int close(); + virtual void mark_stop_flag() override { ATOMIC_SET(&is_stopped_, true); } + virtual int close() override; virtual void get_mem_usage(const std::vector ids, const std::vector cf_handles); + OB_INLINE bool is_stopped() const { return ATOMIC_LOAD(&is_stopped_); } private: int init_dir_(const char *dir_path); private: bool is_inited_; + bool is_stopped_; rocksdb::DB *m_db_; rocksdb::Options m_options_; std::string m_db_path_; diff --git a/src/logservice/libobcdc/src/ob_log_storager.cpp b/src/logservice/libobcdc/src/ob_log_storager.cpp index 685eae4364977e4890ad16e0f714bc5a85a8fefb..b0d402aee7ef1816e04555db9018276324c2cfca 100644 --- a/src/logservice/libobcdc/src/ob_log_storager.cpp +++ b/src/logservice/libobcdc/src/ob_log_storager.cpp @@ -83,6 +83,7 @@ int ObLogStorager::init(const int64_t thread_num, void ObLogStorager::destroy() { if (inited_) { + LOG_INFO("store_service destroy begin"); StoragerThread::destroy(); inited_ = false; @@ -94,6 +95,7 @@ void ObLogStorager::destroy() store_service_stat_.reset(); store_service_ = NULL; err_handler_ = NULL; + LOG_INFO("store_service destroy end"); } } @@ -272,7 +274,9 @@ int ObLogStorager::handle_task_(IObLogBatchBufTask &batch_task, if (OB_FAIL(write_store_service_(key.c_str(), batch_buf + start_pos, data_len, column_family_handle, thread_index))) { - LOG_ERROR("write_store_service_ fail", KR(ret), K(store_task)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("write_store_service_ fail", KR(ret), K(store_task)); + } } else if (OB_FAIL(store_task->st_after_consume(OB_SUCCESS))) { LOG_ERROR("st_after_consume fail", KR(ret)); } else { @@ -335,7 +339,9 @@ int ObLogStorager::write_store_service_(const char *key, ret = OB_INVALID_ARGUMENT; } else { if (OB_FAIL(store_service_->put(column_family_handle, key, ObSlice(log_str, log_str_len)))) { - LOG_ERROR("store_service_ put fail", KR(ret), K(thread_index), K(key), K(log_str_len)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("store_service_ put fail", KR(ret), K(thread_index), K(key), K(log_str_len)); + } } else { // Statistics rps rps_stat_.do_rps_stat(1); @@ -393,7 +399,9 @@ int ObLogStorager::read_store_service_(const std::string &key) std::string br_string_res; if (OB_FAIL(store_service_->get(key, br_string_res))) { - LOG_ERROR("store_service_ get fail", KR(ret), K(key.c_str()), K(br_string_res.length())); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("store_service_ get fail", KR(ret), K(key.c_str()), K(br_string_res.length())); + } } else { LOG_DEBUG("store_service_ get succ", KR(ret), K(key.c_str()), K(br_string_res.length()), K(br_string_res.c_str())); } diff --git a/src/logservice/libobcdc/src/ob_log_store_service.h b/src/logservice/libobcdc/src/ob_log_store_service.h index 403521f03faaec529176ece5977a1e51928cac5e..6eb3ef040f689876153ceb7aef57448a078c14a1 100644 --- a/src/logservice/libobcdc/src/ob_log_store_service.h +++ b/src/logservice/libobcdc/src/ob_log_store_service.h @@ -47,7 +47,8 @@ class IObStoreService public: virtual ~IObStoreService() {} virtual int init(const std::string &path) = 0; - virtual int close() = 0; + virtual void mark_stop_flag() = 0; // stop store service: won't handle more store task. + virtual int close() = 0; // close store service: including actual storager public: virtual int put(const std::string &key, const ObSlice &value) = 0; diff --git a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp index a5743918c7c4ecbcaadba6fc5257fb29376ca387..e093a3d966f3f2a15ccfb01478b54f6e0c248556 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp @@ -330,10 +330,12 @@ int ObLogTenantMgr::do_add_tenant_(const uint64_t tenant_id, } else if (OB_ISNULL(tenant)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("tenant is NULL", KR(ret), K(tenant_id), K(tenant)); - } - else if (OB_FAIL(store_service->create_column_family(std::to_string(tenant_id) + ":" + std::string(tenant_name), - column_family_handle))) { - LOG_ERROR("create_column_family fail", KR(ret), K(tenant_id), K(tenant_name), K(column_family_handle)); + } else if (OB_FAIL(store_service->create_column_family( + std::to_string(tenant_id) + ":" + std::string(tenant_name), + column_family_handle))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("create_column_family fail", KR(ret), K(tenant_id), K(tenant_name), K(column_family_handle)); + } } // init tenant else if (OB_FAIL(tenant->init(tenant_id, tenant_name, tenant_start_serve_ts_ns, start_seq, @@ -992,7 +994,9 @@ int ObLogTenantMgr::remove_tenant_(const uint64_t tenant_id, ObLogTenant *tenant ret= OB_ERR_UNEXPECTED; LOG_ERROR("cf is NULL", KR(ret), K(tid), KPC(tenant)); } else if (OB_FAIL(store_service->drop_column_family(cf))) { - LOG_ERROR("store_service drop_column_family fail", KR(ret), K(tid), KPC(tenant)); + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("store_service drop_column_family fail", KR(ret), K(tid), KPC(tenant)); + } } else if (OB_FAIL(tenant_hash_map_.del(tid))) { LOG_ERROR("tenant_hash_map_ del failed", KR(ret), K(tenant_id)); } else if (OB_FAIL(tenant_server_provider->del_tenant(tenant_id))) { diff --git a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp index e02f0394f8660c3575813405ccf76f9ac4e67ba1..f2eaa4079e0d50bc09c043941511284f07280c38 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_msg_sorter.cpp @@ -125,7 +125,7 @@ int ObLogTransMsgSorter::start() LOG_INFO("begin start TransMsgSorter"); - if (OB_FAIL(TransMsgSorterThread::init(thread_num_, task_limit_, "oblog-br-sorter"))) { + if (OB_FAIL(TransMsgSorterThread::init(thread_num_, task_limit_, "obcdc-br-sorter"))) { LOG_ERROR("failed to init sorter thread pool", KR(ret), K_(thread_num), K_(task_limit)); } else { inited_ = true;