diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.cpp b/src/sql/engine/aggregate/ob_aggregate_processor.cpp index c6f5bb6ab3a0627ed7b68497d820bc18fa12c2ba..0fb4804bcdd91b157ad6d55b2feb2b1db0c5de99 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.cpp +++ b/src/sql/engine/aggregate/ob_aggregate_processor.cpp @@ -2865,10 +2865,12 @@ int ObAggregateProcessor::check_rows_equal(const ObChunkDatumStore::LastStoredRo // When there is stored_row_ reserved_cells, use stored_row_'s reserved_cells_ for calc equal. // Other use row_. int ObGroupRowHashTable::init(ObIAllocator* allocator, lib::ObMemAttr& mem_attr, ObEvalCtx* eval_ctx, - const common::ObIArray* cmp_funcs, int64_t initial_size) + const common::ObIArray* cmp_funcs, ObSqlMemMgrProcessor *sql_mem_processor, + int64_t initial_size) { int ret = OB_SUCCESS; - if (OB_FAIL(ObExtendHashTable::init(allocator, mem_attr, initial_size))) { + if (OB_FAIL(ObExtendHashTable::init(allocator, mem_attr, + sql_mem_processor, initial_size))) { LOG_WARN("failed to init extended hash table", K(ret)); } else { eval_ctx_ = eval_ctx; diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.h b/src/sql/engine/aggregate/ob_aggregate_processor.h index 6a49e677291276166f41711d6b4dc2adabb177b0..9ae63a010c6a2ebc6f68ca50e433cc63532a3fa0 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.h +++ b/src/sql/engine/aggregate/ob_aggregate_processor.h @@ -543,7 +543,8 @@ public: const ObGroupRowItem* get(const ObGroupRowItem& item) const; int init(ObIAllocator* allocator, lib::ObMemAttr& mem_attr, ObEvalCtx* eval_ctx, - const common::ObIArray* cmp_funcs, int64_t initial_size = INITIAL_SIZE); + const common::ObIArray* cmp_funcs, ObSqlMemMgrProcessor *sql_mem_processor, + int64_t initial_size = INITIAL_SIZE); private: bool compare(const ObGroupRowItem& left, const ObGroupRowItem& right) const; diff --git a/src/sql/engine/aggregate/ob_exec_hash_struct.h b/src/sql/engine/aggregate/ob_exec_hash_struct.h index 6e3c19ff6bc525e5b5dbc862a1d7c667c9e22ece..30865abbea2b099921dc58952e403f95d95e1948 100644 --- a/src/sql/engine/aggregate/ob_exec_hash_struct.h +++ b/src/sql/engine/aggregate/ob_exec_hash_struct.h @@ -20,6 +20,7 @@ #include "sql/engine/basic/ob_chunk_row_store.h" #include "lib/container/ob_2d_array.h" #include "sql/engine/basic/ob_chunk_datum_store.h" +#include "sql/engine/ob_sql_mem_mgr_processor.h" namespace oceanbase { namespace common { @@ -35,14 +36,17 @@ class ObExtendHashTable { public: const static int64_t INITIAL_SIZE = 128; const static int64_t SIZE_BUCKET_SCALE = 4; - ObExtendHashTable() : initial_bucket_num_(0), size_(0), buckets_(NULL), allocator_(NULL) + const static int64_t MAX_MEM_PERCENT = 40; + ObExtendHashTable() : initial_bucket_num_(0), size_(0), buckets_(NULL), allocator_(NULL), + sql_mem_processor_(nullptr) {} ~ObExtendHashTable() { destroy(); } - int init(ObIAllocator* allocator, lib::ObMemAttr& mem_attr, int64_t initial_size = INITIAL_SIZE); + int init(ObIAllocator* allocator, lib::ObMemAttr& mem_attr, + ObSqlMemMgrProcessor *sql_mem_processor, int64_t initial_size = INITIAL_SIZE); bool is_inited() const { return NULL != buckets_; @@ -70,7 +74,7 @@ public: size_ = 0; } - int resize(ObIAllocator* allocator, int64_t bucket_num); + int resize(ObIAllocator *allocator, int64_t bucket_num, ObSqlMemMgrProcessor *sql_mem_processor); void destroy() { @@ -82,6 +86,7 @@ public: allocator_.set_allocator(nullptr); size_ = 0; initial_bucket_num_ = 0; + sql_mem_processor_ = nullptr; } int64_t mem_used() const { @@ -116,6 +121,9 @@ public: protected: DISALLOW_COPY_AND_ASSIGN(ObExtendHashTable); int extend(); + int64_t estimate_bucket_num( + const int64_t bucket_num, + const int64_t max_hash_mem); protected: lib::ObMemAttr mem_attr_; @@ -124,11 +132,34 @@ protected: using BucketArray = common::ObSegmentArray; BucketArray* buckets_; common::ModulePageAllocator allocator_; + ObSqlMemMgrProcessor *sql_mem_processor_; }; +template +int64_t ObExtendHashTable::estimate_bucket_num( + const int64_t bucket_num, + const int64_t max_hash_mem) +{ + int64_t max_bound_size = max_hash_mem * MAX_MEM_PERCENT / 100; + int64_t est_bucket_num = common::next_pow2(bucket_num); + int64_t est_size = est_bucket_num * sizeof(void*); + while (est_size > max_bound_size) { + est_bucket_num >>= 1; + est_size = est_bucket_num * sizeof(void*); + } + if (est_bucket_num < INITIAL_SIZE) { + est_bucket_num = INITIAL_SIZE; + } + return est_bucket_num; +} + + template int ObExtendHashTable::init( - ObIAllocator* allocator, lib::ObMemAttr& mem_attr, const int64_t initial_size /* INITIAL_SIZE */) + ObIAllocator *allocator, + lib::ObMemAttr &mem_attr, + ObSqlMemMgrProcessor *sql_mem_processor, + const int64_t initial_size /* INITIAL_SIZE */) { int ret = common::OB_SUCCESS; if (initial_size < 2) { @@ -136,6 +167,7 @@ int ObExtendHashTable::init( SQL_ENG_LOG(WARN, "invalid argument", K(ret)); } else { mem_attr_ = mem_attr; + sql_mem_processor_ = sql_mem_processor; allocator_.set_allocator(allocator); allocator_.set_label(mem_attr.label_); void* buckets_buf = NULL; @@ -157,12 +189,13 @@ int ObExtendHashTable::init( } template -int ObExtendHashTable::resize(ObIAllocator* allocator, int64_t bucket_num) +int ObExtendHashTable::resize(ObIAllocator* allocator, int64_t bucket_num, + ObSqlMemMgrProcessor *sql_mem_processor) { int ret = OB_SUCCESS; if (bucket_num < get_bucket_num() / 2) { destroy(); - if (OB_FAIL(init(allocator, mem_attr_, bucket_num))) { + if (OB_FAIL(init(allocator, mem_attr_, sql_mem_processor, bucket_num))) { SQL_ENG_LOG(WARN, "failed to reuse with bucket", K(bucket_num), K(ret)); } } else { @@ -222,46 +255,52 @@ int ObExtendHashTable::extend() { common::hash::hash_func hf; int ret = common::OB_SUCCESS; - const int64_t new_bucket_num = - 0 == get_bucket_num() ? (0 == initial_bucket_num_ ? INITIAL_SIZE : initial_bucket_num_) : get_bucket_num() * 2; - BucketArray* new_buckets = NULL; - void* buckets_buf = NULL; - if (OB_ISNULL(buckets_buf = allocator_.alloc(sizeof(BucketArray), mem_attr_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - SQL_ENG_LOG(WARN, "failed to allocate memory", K(ret)); - } else { - new_buckets = new (buckets_buf) BucketArray(allocator_); - } - if (OB_FAIL(ret)) { - // do nothing - } else if (OB_ISNULL(buckets_)) { - ret = OB_INVALID_ARGUMENT; - SQL_ENG_LOG(WARN, "invalid argument", K(ret), K(buckets_)); - } else if (OB_FAIL(new_buckets->init(new_bucket_num))) { - SQL_ENG_LOG(WARN, "resize bucket array failed", K(ret), K(new_bucket_num)); + int64_t pre_bucket_num = get_bucket_num(); + int64_t new_bucket_num = 0 == pre_bucket_num ? + (0 == initial_bucket_num_ ? INITIAL_SIZE : initial_bucket_num_) + : pre_bucket_num * 2; + new_bucket_num = estimate_bucket_num(new_bucket_num, sql_mem_processor_->get_mem_bound()); + if (new_bucket_num <= pre_bucket_num) { } else { - for (int64_t i = 0; i < get_bucket_num(); i++) { - Item* bucket = buckets_->at(i); - while (bucket != NULL) { - Item* item = bucket; - bucket = bucket->next(); - Item*& new_bucket = new_buckets->at(hf(*item) & (new_bucket_num - 1)); - item->next() = new_bucket; - new_bucket = item; - } + BucketArray* new_buckets = NULL; + void* buckets_buf = NULL; + if (OB_ISNULL(buckets_buf = allocator_.alloc(sizeof(BucketArray), mem_attr_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + SQL_ENG_LOG(WARN, "failed to allocate memory", K(ret)); + } else { + new_buckets = new (buckets_buf) BucketArray(allocator_); } - buckets_->destroy(); - allocator_.free(buckets_); + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_ISNULL(buckets_)) { + ret = OB_INVALID_ARGUMENT; + SQL_ENG_LOG(WARN, "invalid argument", K(ret), K(buckets_)); + } else if (OB_FAIL(new_buckets->init(new_bucket_num))) { + SQL_ENG_LOG(WARN, "resize bucket array failed", K(ret), K(new_bucket_num)); + } else { + for (int64_t i = 0; i < get_bucket_num(); i++) { + Item* bucket = buckets_->at(i); + while (bucket != NULL) { + Item* item = bucket; + bucket = bucket->next(); + Item*& new_bucket = new_buckets->at(hf(*item) & (new_bucket_num - 1)); + item->next() = new_bucket; + new_bucket = item; + } + } + buckets_->destroy(); + allocator_.free(buckets_); - buckets_ = new_buckets; - } - if (OB_FAIL(ret)) { - if (buckets_ == new_buckets) { - SQL_ENG_LOG(ERROR, "unexpected status: failed allocate new bucket", K(ret)); - } else if (nullptr != new_buckets) { - new_buckets->destroy(); - allocator_.free(new_buckets); - new_buckets = nullptr; + buckets_ = new_buckets; + } + if (OB_FAIL(ret)) { + if (buckets_ == new_buckets) { + SQL_ENG_LOG(ERROR, "unexpected status: failed allocate new bucket", K(ret)); + } else if (nullptr != new_buckets) { + new_buckets->destroy(); + allocator_.free(new_buckets); + new_buckets = nullptr; + } } } return ret; diff --git a/src/sql/engine/aggregate/ob_hash_groupby.cpp b/src/sql/engine/aggregate/ob_hash_groupby.cpp index 186e9506f6f20620ae7eb55f40c1987fedd3d299..c212367b27fe069e631a652e2c744cc23b196ee2 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby.cpp +++ b/src/sql/engine/aggregate/ob_hash_groupby.cpp @@ -392,7 +392,9 @@ int ObHashGroupBy::load_data(ObExecContext& ctx) const level = cur_part->level_; part_shift = part_shift + level * CHAR_BIT; input_size = cur_part->row_store_.get_file_size(); - if (OB_FAIL(gby_ctx->group_rows_.resize(&gby_ctx->mem_context_->get_malloc_allocator(), max(2, input_rows)))) { + if (OB_FAIL(gby_ctx->group_rows_.resize(&gby_ctx->mem_context_->get_malloc_allocator(), + max(2, input_rows), + &gby_ctx->sql_mem_processor_))) { LOG_WARN("failed to reuse extended hash table", K(ret)); } else if (OB_FAIL(init_sql_mem_mgr(gby_ctx, input_size))) { LOG_WARN("failed to init sql mem manager", K(ret)); @@ -784,7 +786,9 @@ int ObHashGroupBy::inner_get_next_row(ObExecContext& ctx, const ObNewRow*& row) ObMemAttr attr( ctx.get_my_session()->get_effective_tenant_id(), ObModIds::OB_HASH_NODE_GROUP_ROWS, ObCtxIds::WORK_AREA); if (OB_FAIL( - groupby_ctx->group_rows_.init(&groupby_ctx->mem_context_->get_malloc_allocator(), attr, init_size))) { + groupby_ctx->group_rows_.init(&groupby_ctx->mem_context_->get_malloc_allocator(), + attr, &groupby_ctx->sql_mem_processor_, + init_size))) { LOG_WARN("fail to init hash map", K(ret)); } else { groupby_ctx->op_monitor_info_.otherstat_1_value_ = init_size; diff --git a/src/sql/engine/aggregate/ob_hash_groupby_op.cpp b/src/sql/engine/aggregate/ob_hash_groupby_op.cpp index 341ef0b277bf5490ca588b589056296121a7eed9..f2483c3e5964364c44ecc7805fb642c801ad2f06 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby_op.cpp +++ b/src/sql/engine/aggregate/ob_hash_groupby_op.cpp @@ -80,7 +80,7 @@ int ObHashGroupByOp::inner_open() LOG_WARN("failed to get px size", K(ret)); } else if (FALSE_IT(est_hash_mem_size = estimate_hash_bucket_size(est_group_cnt))) { } else if (FALSE_IT(estimate_mem_size = est_hash_mem_size + MY_SPEC.width_ * est_group_cnt)) { - } else if (OB_FAIL(sql_mem_processor_.init(&ctx_.get_allocator(), + } else if (OB_FAIL(sql_mem_processor_.init(&mem_context_->get_malloc_allocator(), ctx_.get_my_session()->get_effective_tenant_id(), estimate_mem_size, MY_SPEC.type_, @@ -93,7 +93,8 @@ int ObHashGroupByOp::inner_open() } else if (FALSE_IT(init_size = std::max((int64_t)MIN_GROUP_HT_INIT_SIZE, init_size))) { } else if (FALSE_IT(init_size = std::min((int64_t)MAX_GROUP_HT_INIT_SIZE, init_size))) { } else if (OB_FAIL(local_group_rows_.init( - &mem_context_->get_malloc_allocator(), attr, &eval_ctx_, &MY_SPEC.cmp_funcs_, init_size))) { + &mem_context_->get_malloc_allocator(), attr, &eval_ctx_, + &MY_SPEC.cmp_funcs_, &sql_mem_processor_, init_size))) { LOG_WARN("fail to init hash map", K(ret)); } else if (OB_FAIL(sql_mem_processor_.update_used_mem_size(get_mem_used_size()))) { LOG_WARN("fail to update_used_mem_size", "size", get_mem_used_size(), K(ret)); @@ -124,6 +125,25 @@ int ObHashGroupByOp::inner_open() return ret; } +int ObHashGroupByOp::init_group_store() +{ + int ret = OB_SUCCESS; + group_store_.reset(); + if (OB_FAIL(group_store_.init(0, + ctx_.get_my_session()->get_effective_tenant_id(), + ObCtxIds::WORK_AREA, + ObModIds::OB_HASH_NODE_GROUP_ROWS, + false /* disable dump */, + 0))) { + LOG_WARN("failed to init group store", K(ret)); + } else { + group_store_.set_dir_id(sql_mem_processor_.get_dir_id()); + group_store_.set_callback(&sql_mem_processor_); + group_store_.set_allocator(mem_context_->get_malloc_allocator()); + } + return ret; +} + int ObHashGroupByOp::inner_close() { sql_mem_processor_.unregister_profile(); @@ -188,7 +208,7 @@ int ObHashGroupByOp::inner_get_next_row() LOG_DEBUG("before inner_get_next_row", K(get_aggr_used_size()), K(get_aggr_used_size()), - K(get_local_hash_used_size()), + K(get_hash_table_used_size()), K(get_dumped_part_used_size()), K(get_dump_part_hold_size()), K(get_mem_used_size()), @@ -243,7 +263,7 @@ int ObHashGroupByOp::inner_get_next_row() LOG_DEBUG("after inner_get_next_row", K(get_aggr_used_size()), K(get_aggr_used_size()), - K(get_local_hash_used_size()), + K(get_hash_table_used_size()), K(get_dumped_part_used_size()), K(get_dump_part_hold_size()), K(get_mem_used_size()), @@ -279,15 +299,18 @@ int ObHashGroupByOp::load_data() part_id = cur_part->part_id_; part_shift = part_shift + part_id * CHAR_BIT; input_size = cur_part->datum_store_.get_file_size(); - if (OB_FAIL(local_group_rows_.resize(&mem_context_->get_malloc_allocator(), max(2, input_rows)))) { + if (OB_FAIL(local_group_rows_.resize(&mem_context_->get_malloc_allocator(), + max(2, input_rows), &sql_mem_processor_))) { LOG_WARN("failed to reuse extended hash table", K(ret)); - } else if (OB_FAIL(sql_mem_processor_.init(&ctx_.get_allocator(), + } else if (OB_FAIL(sql_mem_processor_.init(&mem_context_->get_malloc_allocator(), ctx_.get_my_session()->get_effective_tenant_id(), input_size, MY_SPEC.type_, MY_SPEC.id_, &ctx_))) { LOG_WARN("failed to init sql mem processor", K(ret)); + } else if (OB_FAIL(init_group_store())) { + LOG_WARN("failed to init group store", K(ret)); } else { LOG_TRACE("scan new partition", K(part_id), @@ -465,7 +488,8 @@ int ObHashGroupByOp::update_mem_status_periodically( bool updated = false; need_dump = false; if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically( - &ctx_.get_allocator(), [&](int64_t cur_cnt) { return nth_cnt > cur_cnt; }, updated))) { + &mem_context_->get_malloc_allocator(), + [&](int64_t cur_cnt) { return nth_cnt > cur_cnt; }, updated))) { LOG_WARN("failed to update usable memory size periodically", K(ret)); } else if (updated) { if (OB_FAIL(sql_mem_processor_.update_used_mem_size(get_mem_used_size()))) { @@ -475,7 +499,7 @@ int ObHashGroupByOp::update_mem_status_periodically( ; est_part_cnt = detect_part_cnt(input_row); calc_data_mem_ratio(est_part_cnt, data_ratio); - need_dump = (get_aggr_used_size() > get_mem_bound_size() * data_ratio); + need_dump = is_need_dump(data_ratio); } } return ret; @@ -483,14 +507,12 @@ int ObHashGroupByOp::update_mem_status_periodically( int64_t ObHashGroupByOp::detect_part_cnt(const int64_t rows) const { - const double group_mem_avg = (double)get_aggr_used_size() / local_group_rows_.size(); + const double group_mem_avg = (double)get_data_size() / local_group_rows_.size(); int64_t data_size = rows * ((double)agged_group_cnt_ / agged_row_cnt_) * group_mem_avg; int64_t mem_bound = get_mem_bound_size(); - const double part_skew_factor = 1.2; - data_size = data_size * part_skew_factor; int64_t part_cnt = (data_size + mem_bound) / mem_bound; part_cnt = next_pow2(part_cnt); - int64_t availble_mem_size = min(mem_bound - get_aggr_hold_size(), mem_bound * MAX_PART_MEM_RATIO); + int64_t availble_mem_size = mem_bound - get_mem_used_size(); int64_t est_dump_size = part_cnt * ObChunkRowStore::BLOCK_SIZE; if (0 < availble_mem_size) { while (est_dump_size > availble_mem_size) { @@ -508,13 +530,12 @@ int64_t ObHashGroupByOp::detect_part_cnt(const int64_t rows) const K(group_mem_avg), K(get_mem_used_size()), K(get_mem_bound_size()), - K(part_skew_factor), K(agged_group_cnt_), K(agged_row_cnt_), K(local_group_rows_.size()), K(part_cnt), K(get_aggr_used_size()), - K(get_local_hash_used_size()), + K(get_hash_table_used_size()), K(get_dumped_part_used_size()), K(get_aggr_hold_size()), K(get_dump_part_hold_size()), @@ -526,11 +547,12 @@ int64_t ObHashGroupByOp::detect_part_cnt(const int64_t rows) const void ObHashGroupByOp::calc_data_mem_ratio(const int64_t part_cnt, double& data_ratio) { - int64_t extra_size = (get_local_hash_used_size() + part_cnt * FIX_SIZE_PER_PART) * (1 + EXTRA_MEM_RATIO); - int64_t data_size = max(get_aggr_used_size(), (get_mem_bound_size() - extra_size) * 0.8); - data_ratio = data_size * 1.0 / (extra_size + data_size); + int64_t est_extra_size = (get_mem_used_size() + part_cnt * FIX_SIZE_PER_PART); + int64_t data_size = get_mem_used_size(); + data_ratio = data_size * 1.0 / est_extra_size; sql_mem_processor_.set_data_ratio(data_ratio); - LOG_TRACE("trace calc data ratio", K(data_ratio), K(extra_size), K(part_cnt), K(data_size), K(get_aggr_used_size())); + LOG_TRACE("trace calc data ratio", K(data_ratio), K(est_extra_size), + K(part_cnt), K(data_size), K(get_aggr_used_size())); } void ObHashGroupByOp::adjust_part_cnt(int64_t& part_cnt) @@ -581,12 +603,18 @@ bool ObHashGroupByOp::need_start_dump(const int64_t input_rows, int64_t& est_par calc_data_mem_ratio(est_part_cnt, data_ratio); } // We continue do aggregation after we start dumping, reserve 1/8 memory for it. - if (get_aggr_used_size() > data_ratio * mem_bound || check_dump) { + if (is_need_dump(data_ratio) || check_dump) { int ret = OB_SUCCESS; need_dump = true; if (OB_FAIL(sql_mem_processor_.extend_max_memory_size( - &ctx_.get_allocator(), - [&](int64_t max_memory_size) { return get_aggr_used_size() > data_ratio * max_memory_size; }, + &mem_context_->get_malloc_allocator(), + [&](int64_t max_memory_size) { + UNUSED(max_memory_size); + data_ratio = sql_mem_processor_.get_data_ratio();; + est_part_cnt = detect_part_cnt(input_rows); + calc_data_mem_ratio(est_part_cnt, data_ratio); + return is_need_dump(data_ratio); + }, need_dump, mem_used))) { need_dump = true; @@ -674,7 +702,7 @@ int ObHashGroupByOp::setup_dump_env(const int64_t part_id, const int64_t input_r } } if (OB_FAIL(ret)) { - } else if (OB_FAIL(sql_mem_processor_.get_max_available_mem_size(&ctx_.get_allocator()))) { + } else if (OB_FAIL(sql_mem_processor_.get_max_available_mem_size(&mem_context_->get_malloc_allocator()))) { LOG_WARN("failed to get max available memory size", K(ret)); } else if (OB_FAIL(sql_mem_processor_.update_used_mem_size(get_mem_used_size()))) { LOG_WARN("failed to update mem size", K(ret)); diff --git a/src/sql/engine/aggregate/ob_hash_groupby_op.h b/src/sql/engine/aggregate/ob_hash_groupby_op.h index e513f0586bbfe1630ad52a2eb0100091d929e11b..52d4c13626d555ea750ec83040ac9dd48fdbf669 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby_op.h +++ b/src/sql/engine/aggregate/ob_hash_groupby_op.h @@ -112,7 +112,7 @@ public: { return aggr_processor_.get_aggr_hold_size(); } - OB_INLINE int64_t get_local_hash_used_size() const + OB_INLINE int64_t get_hash_table_used_size() const { return local_group_rows_.mem_used(); } @@ -126,7 +126,11 @@ public: } OB_INLINE int64_t get_extra_size() const { - return get_local_hash_used_size() + get_dumped_part_used_size(); + return get_dumped_part_used_size(); + } + OB_INLINE int64_t get_data_size() const + { + return get_aggr_used_size() + sql_mem_processor_.get_data_size(); } OB_INLINE int64_t get_mem_used_size() const { @@ -136,6 +140,10 @@ public: { return sql_mem_processor_.get_mem_bound(); } + OB_INLINE bool is_need_dump(double data_ratio) + { + return (get_mem_used_size() > get_mem_bound_size() * data_ratio); + } OB_INLINE int64_t estimate_hash_bucket_size(const int64_t bucket_cnt) const { return next_pow2(ObGroupRowHashTable::SIZE_BUCKET_SCALE * bucket_cnt) * sizeof(void*); @@ -152,6 +160,7 @@ public: } return (mem_size / sizeof(void*) / ObGroupRowHashTable::SIZE_BUCKET_SCALE); } + int init_group_store(); int update_mem_status_periodically( const int64_t nth_cnt, const int64_t input_row, int64_t& est_part_cnt, bool& need_dump); int64_t detect_part_cnt(const int64_t rows) const; diff --git a/src/sql/engine/ob_tenant_sql_memory_manager.cpp b/src/sql/engine/ob_tenant_sql_memory_manager.cpp index 024a0ce1762ad4234480bf4dd9426b7cc1d5725f..61796d7837a0543f3ba62f3f3f8df3d6279625dc 100644 --- a/src/sql/engine/ob_tenant_sql_memory_manager.cpp +++ b/src/sql/engine/ob_tenant_sql_memory_manager.cpp @@ -837,14 +837,10 @@ int ObTenantSqlMemoryManager::get_max_work_area_size(int64_t& max_wa_memory_size int64_t pre_mem_target = mem_target_; double hold_ratio = 1. * tenant_work_area_memory_hold / tenant_work_area_max_size; int64_t tmp_max_wa_memory_size = (remain_memory_size > 0) - ? (1 - hold_ratio * hold_ratio) * remain_memory_size + total_alloc_size + ? (1 - hold_ratio * hold_ratio * hold_ratio) * remain_memory_size + total_alloc_size : total_alloc_size; double alloc_ratio = total_alloc_size * 1.0 / tmp_max_wa_memory_size; - if (total_alloc_size >= tmp_max_wa_memory_size) { - max_wa_memory_size = (tmp_max_wa_memory_size >> 1); - } else { - max_wa_memory_size = tmp_max_wa_memory_size * (1 - alloc_ratio * alloc_ratio); - } + max_wa_memory_size = tmp_max_wa_memory_size * (1 - alloc_ratio * alloc_ratio); max_workarea_size_ = tenant_work_area_max_size; workarea_hold_size_ = tenant_work_area_memory_hold; max_auto_workarea_size_ = max_wa_memory_size;