diff --git a/src/sql/dtl/ob_dtl_local_channel.cpp b/src/sql/dtl/ob_dtl_local_channel.cpp index f75099d390144c015271361c1418e4fa5c290ed4..c2b80d9040f43b74c6eacefaa3ea5b575adbb2e6 100644 --- a/src/sql/dtl/ob_dtl_local_channel.cpp +++ b/src/sql/dtl/ob_dtl_local_channel.cpp @@ -96,7 +96,6 @@ int ObDtlLocalChannel::send_shared_message(ObDtlLinkedBuffer*& buf) KP(buf)); bool is_eof = buf->is_eof(); if (OB_FAIL(DTL.get_dfc_server().cache(/*buf->tenant_id(), */ peer_id_, buf, true))) { - ret = tmp_ret; LOG_WARN("get DTL channel fail", KP(peer_id_), "peer", get_peer(), K(ret), K(tmp_ret)); } else { // return block after cache first msg diff --git a/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp b/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp index e22b72520b1cc67f039170c0918e393cb05f20f8..b8699771a437b55bfb7c5ae014ac769b596ee907 100644 --- a/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp +++ b/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp @@ -38,7 +38,9 @@ int ObDtlLocalFirstBufferCache::ObDtlBufferClean::operator()(sql::dtl::ObDtlCach int ret = OB_SUCCESS; ObDtlLinkedBuffer* linked_buffer = buffer_info->buffer(); buffer_info->set_buffer(nullptr); - tenant_mem_mgr_->free(linked_buffer); + if (nullptr != linked_buffer) { + tenant_mem_mgr_->free(linked_buffer); + } buffer_info->reset(); buffer_info_mgr_->free_buffer_info(buffer_info); return ret; @@ -113,23 +115,27 @@ int ObDtlLocalFirstBufferCache::cache_buffer(ObDtlCacheBufferInfo*& buffer) int ret = OB_SUCCESS; if (OB_NOT_NULL(buffer)) { uint64_t chan_id = buffer->chid(); - if (OB_FAIL(buffer_map_.set_refactored(chan_id, buffer))) { +#ifdef ERRSIM + // -17 enable failed set refactored + ret = E(EventTable::EN_DTL_ONE_ROW_ONE_BUFFER) ret; +#endif + int64_t tmp_ret = ret; + if (TP_ENABLE_FAILED_SET_HT == ret) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to set refactor", K(ret), K(tmp_ret)); + } else if (OB_FAIL(buffer_map_.set_refactored(chan_id, buffer))) { LOG_WARN("failed to insert buffer map", K(ret)); - } else if (OB_FAIL(set_first_buffer(chan_id))) { - int tmp_ret = OB_SUCCESS; - ObDtlCacheBufferInfo* tmp_buffer = nullptr; - if (tmp_ret != buffer_map_.erase_refactored(chan_id, tmp_buffer)) { - LOG_WARN("failed to insert buffer map", K(ret), K(tmp_ret)); - } else if (nullptr == tmp_buffer) { + } else { + buffer = nullptr; + if (TP_ENABLE_FAILED_SET_FIRST_BUFFER == tmp_ret) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("buffer is null", K(ret), K(tmp_ret)); + LOG_WARN("failed to set first buffer", K(ret), K(tmp_ret)); + } else if (OB_FAIL(set_first_buffer(chan_id))) { + LOG_WARN("failed to set first buffer", K(ret), K(chan_id), KP(chan_id)); } else { - buffer = tmp_buffer; + inc_first_buffer_cnt(); + LOG_DEBUG("trace cache buffer", KP(chan_id), KP(this), K(dfo_key_)); } - LOG_WARN("failed to set first buffer", K(ret), K(chan_id), KP(chan_id)); - } else { - inc_first_buffer_cnt(); - LOG_DEBUG("trace cache buffer", KP(chan_id), KP(this), K(dfo_key_)); } } return ret; @@ -421,7 +427,18 @@ int ObDtlLocalFirstBufferCacheManager::cache_buffer(int64_t chid, ObDtlLinkedBuf buf_info->set_buffer(data_buffer); data_buffer = nullptr; } else { - ObDtlLinkedBuffer* buffer = tenant_mem_mgr_->alloc(chid, data_buffer->size()); + ObDtlLinkedBuffer *buffer = nullptr; +#ifdef ERRSIM + // -16 enable failed to alloc memory + ret = E(EventTable::EN_DTL_ONE_ROW_ONE_BUFFER) ret; +#endif + if (OB_SUCC(ret) || TP_ENABLE_FAILED_ALLOC_MEM != ret) { + ret = OB_SUCCESS; + buffer = tenant_mem_mgr_->alloc(chid, data_buffer->size()); + } else { + ret = OB_SUCCESS; + buffer = nullptr; + } if (nullptr != buffer) { ObDtlLinkedBuffer::assign(*data_buffer, buffer); buf_info->set_buffer(buffer); @@ -442,13 +459,10 @@ int ObDtlLocalFirstBufferCacheManager::cache_buffer(int64_t chid, ObDtlLinkedBuf if (OB_FAIL(ret)) { int tmp_ret = OB_SUCCESS; if (nullptr != buf_info) { - if (nullptr == data_buffer) { - // if cache buffer failed, should still send it. Can't just free it - data_buffer = buf_info->buffer(); - buf_info->set_buffer(nullptr); - } - if (!attach && OB_NOT_NULL(data_buffer)) { - tenant_mem_mgr_->free(data_buffer); + ObDtlLinkedBuffer *buffer = buf_info->buffer(); + buf_info->set_buffer(nullptr); + if (nullptr != buffer) { + tenant_mem_mgr_->free(buffer); } if (OB_SUCCESS != (tmp_ret = buffer_info_mgr_.free_buffer_info(buf_info))) { LOG_WARN("failed to free buffer info", K(ret), K(tmp_ret)); diff --git a/src/sql/dtl/ob_dtl_local_first_buffer_manager.h b/src/sql/dtl/ob_dtl_local_first_buffer_manager.h index 27b8505a3b8a6cb56536a604d8c90d999a9399f7..7472a1f9a6888c50680e4aabc1e92fe5f7956144 100644 --- a/src/sql/dtl/ob_dtl_local_first_buffer_manager.h +++ b/src/sql/dtl/ob_dtl_local_first_buffer_manager.h @@ -566,6 +566,8 @@ private: static const int64_t CONCURRENT_CNT = 2048; static const int64_t CHANNEL_HASH_BUCKET_NUM = 64 * 1024; static const int64_t MAX_BITSET_CNT = 1024 * 1024; // 1024 * 1024 + static const int64_t TP_ENABLE_FAILED_SET_HT = -17; + static const int64_t TP_ENABLE_FAILED_SET_FIRST_BUFFER = -18; int64_t pins_; ObDtlDfoKey dfo_key_; ObDtlFirstBufferHashTable buffer_map_; @@ -603,6 +605,7 @@ public: private: static const int64_t CONCURRENT_CNT = 1024; static const int64_t BUCKET_NUM = 64 * 1024; + static const int64_t TP_ENABLE_FAILED_ALLOC_MEM = -16; uint64_t tenant_id_; common::ObFIFOAllocator allocator_; ObDtlTenantMemManager* tenant_mem_mgr_; diff --git a/src/sql/dtl/ob_dtl_rpc_channel.cpp b/src/sql/dtl/ob_dtl_rpc_channel.cpp index 94d6025f9ee40893648290224f1f8b2f8523729a..9ef2d7feb61490908bb2c03ae138bd0cf791c7c6 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.cpp +++ b/src/sql/dtl/ob_dtl_rpc_channel.cpp @@ -188,7 +188,7 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer*& buffer) LOG_WARN("control channel can't drain msg", K(ret)); } } else if (OB_ISNULL(linked_buffer = alloc_buf(buffer->size()))) { - ret = OB_REACH_MEMORY_LIMIT; + ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate buffer", K(ret)); } else { LOG_TRACE("DTL feedup a new msg to msg loop", K(buffer->size()), KP(id_), K(peer_)); diff --git a/src/sql/dtl/ob_dtl_rpc_processor.cpp b/src/sql/dtl/ob_dtl_rpc_processor.cpp index cbda91c218a93f7c62dc1933d4745a77f2258a83..7feae8a5b14501f9ef7e2cb8fe9753077f10cb9a 100644 --- a/src/sql/dtl/ob_dtl_rpc_processor.cpp +++ b/src/sql/dtl/ob_dtl_rpc_processor.cpp @@ -63,7 +63,6 @@ int ObDtlSendMessageP::process_msg(ObDtlRpcDataResponse& response, ObDtlSendArgs } else if (arg.buffer_.is_data_msg() && 1 == arg.buffer_.seq_no()) { ObDtlLinkedBuffer* buf = &arg.buffer_; if (OB_FAIL(DTL.get_dfc_server().cache(arg.buffer_.tenant_id(), arg.chid_, buf))) { - ret = tmp_ret; LOG_WARN("get DTL channel fail", KP(arg.chid_), K(ret), diff --git a/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp b/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp index d52cc2e4b93f209bd9caa675af668422d6a5631a..44af4b04bf6983ee7784aca1f8279f7109d4fa50 100644 --- a/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp +++ b/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp @@ -136,7 +136,7 @@ ObDtlLinkedBuffer* ObDtlTenantMemManager::alloc(int64_t chid, int64_t size) ++n_times; buf = mem_mgr->alloc(chid, size); if (nullptr == buf) { - ret = OB_REACH_MEMORY_LIMIT; + ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate dtl buffer memory", K(ret)); } }