提交 5bb53151 编写于 作者: Hanse1's avatar Hanse1 提交者: ob-robot

cancel co dag net when create_sstable failed

上级 fc818332
......@@ -14,6 +14,7 @@
#include "ob_index_block_builder.h"
#include "storage/blocksstable/ob_shared_macro_block_manager.h"
#include "observer/ob_server_event_history_table_operator.h"
#ifdef OB_BUILD_SHARED_STORAGE
#include "share/compaction/ob_shared_storage_compaction_util.h"
......@@ -24,6 +25,7 @@ namespace oceanbase
ERRSIM_POINT_DEF(EN_COMPACTION_DISABLE_SHARED_MACRO);
ERRSIM_POINT_DEF(EN_SSTABLE_SINGLE_ROOT_TREE);
ERRSIM_POINT_DEF(EN_SSTABLE_META_IN_TAIL);
ERRSIM_POINT_DEF(EN_REWRITE_SMALL_SSTABLE_FAILED);
using namespace common;
using namespace storage;
using namespace compaction;
......@@ -1844,6 +1846,9 @@ int ObSSTableIndexBuilder::rewrite_small_sstable(ObSSTableMergeRes &res)
} else if (FALSE_IT(read_info.macro_block_id_ = macro_meta.val_.macro_id_)) {
} else if (OB_FAIL(ObObjectManager::async_read_object(read_info, read_handle))) {
STORAGE_LOG(WARN, "fail to async read macro block", K(ret), K(read_info), K(macro_meta), K(roots_[0]->last_macro_size_));
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("merge_errsim", "async_read_macro_block_failed", "ret_code", ret);
#endif
} else if (OB_FAIL(read_handle.wait())) {
STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(read_info));
} else {
......@@ -1855,7 +1860,20 @@ int ObSSTableIndexBuilder::rewrite_small_sstable(ObSSTableMergeRes &res)
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "successfully rewrite small sstable, but block info is invalid", K(ret), K(block_info));
} else if (FALSE_IT(macro_meta.val_.macro_id_ = block_info.macro_id_)) {
} else if (OB_FAIL(change_single_macro_meta_for_small_sstable(macro_meta))){
#ifdef ERRSIM
// failed after dec old block ref cnt and before change macro meta
} else if (EN_REWRITE_SMALL_SSTABLE_FAILED) {
ret = EN_REWRITE_SMALL_SSTABLE_FAILED;
const int64_t cg_idx = data_store_desc_.get_desc().get_table_cg_idx();
if (cg_idx == -ret) {
LOG_INFO("ERRSIM EN_REWRITE_SMALL_SSTABLE_FAILED", K(ret), K(cg_idx));
SERVER_EVENT_SYNC_ADD("merge_errsim", "rewrite_small_sst_failed", "ret_code", ret);
} else {
ret = OB_SUCCESS;
}
#endif
}
if (FAILEDx(change_single_macro_meta_for_small_sstable(macro_meta))){
STORAGE_LOG(WARN, "fail to change index tree root ctx macro id for small sst", K(ret),
K(block_info.macro_id_), KPC(roots_[0]));
} else {
......
......@@ -30,6 +30,7 @@ using namespace compaction;
namespace blocksstable
{
ERRSIM_POINT_DEF(EN_NO_NEED_MERGE_MICRO_BLK);
ERRSIM_POINT_DEF(EN_NO_SHARED_MACRO);
class ObIntConstUniform
{
......@@ -2111,6 +2112,13 @@ bool ObMacroBlockWriter::check_can_flush_small_sstable(const bool is_flush_for_l
const int64_t macro_size = macro_block.get_data_size();
const int64_t align_macro_size = upper_align(macro_size, DIO_READ_ALIGN_SIZE);
ObSSTableIndexBuilder *sstable_index_builder = OB_ISNULL(data_store_desc_) ? nullptr : data_store_desc_->sstable_index_builder_;
#ifdef ERRSIM
// disable shared macro block in merge progress
if (EN_NO_SHARED_MACRO) {
FLOG_INFO("ERRSIM EN_NO_SHARED_MACRO");
return false;
}
#endif
return is_flush_for_last_block
&& OB_NOT_NULL(sstable_index_builder)
&& is_alloc_block_needed()
......
......@@ -672,6 +672,8 @@ int ObCOTabletMergeCtx::create_cg_sstable(const int64_t cg_idx)
LOG_WARN("merge info or table array is null", K(ret), K(cg_idx), K(merged_sstable_array_));
} else if (OB_FAIL(cg_merge_info_array_[cg_idx]->create_sstable(*this, table_handle, skip_to_create_empty_cg, cg_schema_ptr, cg_idx))) {
LOG_WARN("fail to create sstable", K(ret), K(cg_idx), KPC(this));
// index builder is non-reentrant, so cancel dag net to retry
(void) static_cast<ObCOMergeDagNet&>(dag_net_).cancel_dag_net(ret);
#ifdef ERRSIM
} else if (OB_FAIL(ret = OB_E(EventTable::EN_COMPACTION_CO_PUSH_TABLES_FAILED) OB_SUCCESS)) {
if (cg_idx == -ret) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册