From e44cddde95b13f52ff4a24763e2df74e54d77400 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 24 Jun 2022 10:32:22 +0800 Subject: [PATCH] [CP] fix hash join hang leads mem leak --- src/sql/engine/basic/ob_chunk_datum_store.cpp | 4 ++-- src/sql/engine/basic/ob_chunk_datum_store.h | 4 ++-- src/sql/engine/join/ob_hash_join.cpp | 3 +++ src/sql/engine/join/ob_hash_join_op.cpp | 4 +++- src/sql/engine/join/ob_hash_join_op.h | 3 ++- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/sql/engine/basic/ob_chunk_datum_store.cpp b/src/sql/engine/basic/ob_chunk_datum_store.cpp index 71d799479c..55fc66e591 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.cpp +++ b/src/sql/engine/basic/ob_chunk_datum_store.cpp @@ -831,7 +831,7 @@ int ObChunkDatumStore::add_row(const common::ObIArray& exprs, ObEvalCtx if (NULL == cur_blk_) { int64_t min_buf_size = 0; Block* new_blk = nullptr; - if (OB_FAIL(Block::min_buf_size(exprs, *ctx, min_buf_size))) { + if (OB_FAIL(Block::min_buf_size(exprs, row_extend_size_, *ctx, min_buf_size))) { } else if (OB_FAIL(alloc_block_buffer(new_blk, min_buf_size, false))) { LOG_WARN("alloc block failed", K(ret)); } else { @@ -844,7 +844,7 @@ int ObChunkDatumStore::add_row(const common::ObIArray& exprs, ObEvalCtx if (OB_FAIL(cur_blk_->append_row(exprs, ctx, cur_blk_buffer_, row_extend_size_, stored_row))) { if (OB_BUF_NOT_ENOUGH == ret) { int64_t min_buf_size = 0; - if (OB_FAIL(Block::min_buf_size(exprs, *ctx, min_buf_size))) { + if (OB_FAIL(Block::min_buf_size(exprs, row_extend_size_, *ctx, min_buf_size))) { } else if (OB_FAIL(switch_block(min_buf_size))) { if (OB_EXCEED_MEM_LIMIT != ret) { LOG_WARN("switch block failed", K(ret)); diff --git a/src/sql/engine/basic/ob_chunk_datum_store.h b/src/sql/engine/basic/ob_chunk_datum_store.h index 764b9c94e1..7e2e83b4be 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.h +++ b/src/sql/engine/basic/ob_chunk_datum_store.h @@ -307,14 +307,14 @@ public: Block() : magic_(0), blk_size_(0), rows_(0) {} - static int inline min_buf_size(const common::ObIArray& exprs, ObEvalCtx& ctx, int64_t& size) + static int inline min_buf_size(const common::ObIArray& exprs, int64_t row_extend_size, ObEvalCtx& ctx, int64_t& size) { int ret = OB_SUCCESS; size = 0; if (OB_FAIL(row_store_size(exprs, ctx, size))) { SQL_ENG_LOG(WARN, "failed to calc store row size", K(ret)); } else { - size += BlockBuffer::HEAD_SIZE + sizeof(BlockBuffer); + size += BlockBuffer::HEAD_SIZE + sizeof(BlockBuffer) + row_extend_size; } return ret; } diff --git a/src/sql/engine/join/ob_hash_join.cpp b/src/sql/engine/join/ob_hash_join.cpp index 55d66ee748..91f683a0db 100644 --- a/src/sql/engine/join/ob_hash_join.cpp +++ b/src/sql/engine/join/ob_hash_join.cpp @@ -861,6 +861,7 @@ int64_t ObHashJoin::auto_calc_partition_count(int64_t input_size, int64_t min_ne if (input_size > min_need_size) { // one pass int64_t need_part_cnt = min_need_size / ObChunkRowStore::BLOCK_SIZE; + need_part_cnt = max(0, need_part_cnt); while (partition_cnt < need_part_cnt) { partition_cnt <<= 1; } @@ -868,6 +869,7 @@ int64_t ObHashJoin::auto_calc_partition_count(int64_t input_size, int64_t min_ne // all in memory, use the least memory int64_t max_chunk_size = input_size / ObChunkRowStore::BLOCK_SIZE; int64_t square_partition_cnt = partition_cnt * partition_cnt; + max_chunk_size = max(0, max_chunk_size); while (square_partition_cnt < max_chunk_size) { partition_cnt <<= 1; square_partition_cnt = partition_cnt * partition_cnt; @@ -884,6 +886,7 @@ int64_t ObHashJoin::calc_partition_count(int64_t input_size, int64_t part_size, { int64_t estimate_part_count = input_size / part_size + 1; int64_t partition_cnt = 8; + estimate_part_count = max(0, estimate_part_count); while (partition_cnt < estimate_part_count) { partition_cnt <<= 1; } diff --git a/src/sql/engine/join/ob_hash_join_op.cpp b/src/sql/engine/join/ob_hash_join_op.cpp index 7f2e6386a3..7d498f0037 100644 --- a/src/sql/engine/join/ob_hash_join_op.cpp +++ b/src/sql/engine/join/ob_hash_join_op.cpp @@ -807,6 +807,7 @@ int64_t ObHashJoinOp::calc_partition_count(int64_t input_size, int64_t part_size { int64_t estimate_part_count = input_size / part_size + 1; int64_t partition_cnt = 8; + estimate_part_count = max(0, estimate_part_count); while (partition_cnt < estimate_part_count) { partition_cnt <<= 1; } @@ -825,6 +826,7 @@ int64_t ObHashJoinOp::calc_partition_count_by_cache_aware( if (max_part_count < partition_cnt) { partition_cnt = max_part_count; } + global_mem_bound_size = max(0, global_mem_bound_size); while (partition_cnt * PAGE_SIZE > global_mem_bound_size) { partition_cnt >>= 1; } @@ -854,7 +856,7 @@ int ObHashJoinOp::get_max_memory_size(int64_t input_size) const int64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id(); // default data memory size: 80% int64_t extra_memory_size = get_extra_memory_size(); - int64_t memory_size = extra_memory_size + input_size; + int64_t memory_size = (extra_memory_size + input_size) < 0 ? input_size : (extra_memory_size + input_size); if (OB_FAIL(ObSqlWorkareaUtil::get_workarea_size(ObSqlWorkAreaType::HASH_WORK_AREA, tenant_id, hash_area_size))) { LOG_WARN("failed to get workarea size", K(ret), K(tenant_id)); } else if (FALSE_IT(remain_data_memory_size_ = hash_area_size * 80 / 100)) { diff --git a/src/sql/engine/join/ob_hash_join_op.h b/src/sql/engine/join/ob_hash_join_op.h index 6161062fdb..a62fe83a29 100644 --- a/src/sql/engine/join/ob_hash_join_op.h +++ b/src/sql/engine/join/ob_hash_join_op.h @@ -554,9 +554,10 @@ private: { int64_t row_count = profile_.get_row_count(); int64_t bucket_cnt = profile_.get_bucket_size(); + const int64_t DEFAULT_EXTRA_SIZE = 2 * 1024 * 1024; int64_t extra_memory_size = bucket_cnt * (sizeof(HashTableCell*) + sizeof(uint8_t)); extra_memory_size += (row_count * sizeof(HashTableCell)); - return extra_memory_size; + return extra_memory_size < 0 ? DEFAULT_EXTRA_SIZE : extra_memory_size; } void clean_nest_loop_chunk() { -- GitLab