From 0afbb84439867460d835b1a74eb37088b8b1fa5f Mon Sep 17 00:00:00 2001 From: ls0 Date: Mon, 27 Dec 2021 20:52:43 +0800 Subject: [PATCH] patch bugfix to opensource --- .../code_generator/ob_static_engine_cg.cpp | 134 +++++++++++------- src/sql/engine/basic/ob_chunk_datum_store.cpp | 51 +++++-- src/sql/engine/basic/ob_chunk_datum_store.h | 3 +- src/sql/engine/basic/ob_chunk_row_store.cpp | 35 +++-- src/sql/engine/basic/ob_chunk_row_store.h | 2 +- src/sql/engine/expr/ob_expr_in.cpp | 8 +- src/sql/engine/expr/ob_expr_nvl.cpp | 8 +- 7 files changed, 161 insertions(+), 80 deletions(-) diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index af0c2b9ff..94c4c141d 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -659,69 +659,103 @@ int ObStaticEngineCG::generate_spec(ObLogDistinct& op, ObMergeDistinctSpec& spec return ret; } -int ObStaticEngineCG::generate_spec(ObLogDistinct& op, ObHashDistinctSpec& spec, const bool in_root_job) +int ObStaticEngineCG::generate_spec( + ObLogDistinct &op, ObHashDistinctSpec &spec, const bool in_root_job) { int ret = OB_SUCCESS; UNUSED(in_root_job); spec.is_block_mode_ = op.get_block_mode(); - if (OB_FAIL(spec.cmp_funcs_.init(op.get_distinct_exprs().count()))) { - LOG_WARN("failed to init sort functions", K(ret)); - } else if (OB_FAIL(spec.hash_funcs_.init(op.get_distinct_exprs().count()))) { - LOG_WARN("failed to init sort functions", K(ret)); - } else if (OB_FAIL(spec.sort_collations_.init(op.get_distinct_exprs().count()))) { + int64_t init_count = op.get_distinct_exprs().count(); + if (1 != op.get_num_of_child()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected child count of hash distinct", K(ret), K(op.get_num_of_child())); + } else if (OB_FAIL(spec.cmp_funcs_.init(init_count))) { + LOG_WARN("failed to init cmp functions", K(ret)); + } else if (OB_FAIL(spec.hash_funcs_.init(init_count))) { + LOG_WARN("failed to init hash functions", K(ret)); + } else if (OB_FAIL(spec.sort_collations_.init(init_count))) { LOG_WARN("failed to init sort functions", K(ret)); - } else if (OB_FAIL(spec.distinct_exprs_.init(op.get_distinct_exprs().count()))) { + } else if (OB_FAIL(spec.distinct_exprs_.init(op.get_distinct_exprs().count() + + op.get_child(0)->get_output_exprs().count()))) { LOG_WARN("failed to init distinct exprs", K(ret)); } else { - ObExpr* expr = nullptr; - int64_t dist_cnt = 0; - ARRAY_FOREACH(op.get_distinct_exprs(), i) - { - const ObRawExpr* raw_expr = op.get_distinct_exprs().at(i); - if (OB_ISNULL(raw_expr)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("null pointer", K(ret)); - } else if (raw_expr->has_flag(IS_CONST) || raw_expr->has_flag(IS_CONST_EXPR)) { - continue; - } else if (OB_FAIL(generate_rt_expr(*raw_expr, expr))) { - LOG_WARN("failed to generate rt expr", K(ret)); - } else if (OB_FAIL(spec.distinct_exprs_.push_back(expr))) { - LOG_WARN("failed to push back expr", K(ret)); - } else if (OB_ISNULL(expr->basic_funcs_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected status: basic funcs is not init", K(ret)); - } else { - ObOrderDirection order_direction = default_asc_direction(); - bool is_ascending = is_ascending_direction(order_direction); - ObSortFieldCollation field_collation(dist_cnt, + ObArray additional_exprs; + ObExpr *expr = nullptr; + ARRAY_FOREACH(op.get_child(0)->get_output_exprs(), i) { + ObRawExpr* raw_expr = op.get_child(0)->get_output_exprs().at(i); + bool is_distinct_expr = has_exist_in_array(op.get_distinct_exprs(), raw_expr); + if (!is_distinct_expr) { + OZ (additional_exprs.push_back(raw_expr)); + } + } + if (OB_SUCC(ret)) { + int64_t dist_cnt = 0; + ARRAY_FOREACH(op.get_distinct_exprs(), i) { + ObRawExpr* raw_expr = op.get_distinct_exprs().at(i); + if (OB_ISNULL(raw_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("null pointer", K(ret)); + } else if (raw_expr->has_flag(IS_CONST) || raw_expr->has_flag(IS_CONST_EXPR)) { + // distinct const value, 这里需要注意:distinct 1被跳过了, + // 但ObMergeDistinct中,如果没有distinct列,则默认所有值都相等,这个语义正好是符合预期的。 + continue; + } else if (OB_FAIL(generate_rt_expr(*raw_expr, expr))) { + LOG_WARN("failed to generate rt expr", K(ret)); + } else if (OB_FAIL(spec.distinct_exprs_.push_back(expr))) { + LOG_WARN("failed to push back expr", K(ret)); + } else if (OB_ISNULL(expr->basic_funcs_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected status: basic funcs is not init", K(ret)); + } else { + ObOrderDirection order_direction = default_asc_direction(); + bool is_ascending = is_ascending_direction(order_direction); + ObSortFieldCollation field_collation(dist_cnt, expr->datum_meta_.cs_type_, is_ascending, (is_null_first(order_direction) ^ is_ascending) ? NULL_LAST : NULL_FIRST); - ObCmpFunc cmp_func; - cmp_func.cmp_func_ = ObDatumFuncs::get_nullsafe_cmp_func(expr->datum_meta_.type_, - expr->datum_meta_.type_, - NULL_LAST, // NULL_FIRST and NULL_LAST both OK. - expr->datum_meta_.cs_type_, - lib::is_oracle_mode()); - ObHashFunc hash_func; - hash_func.hash_func_ = expr->basic_funcs_->murmur_hash_; - if (OB_ISNULL(cmp_func.cmp_func_) || OB_ISNULL(hash_func.hash_func_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("cmp_func or hash func is null, check datatype is valid", - K(cmp_func.cmp_func_), - K(hash_func.hash_func_), - K(ret)); - } else if (OB_FAIL(spec.sort_collations_.push_back(field_collation))) { - LOG_WARN("failed to push back sort collation", K(ret)); - } else if (OB_FAIL(spec.cmp_funcs_.push_back(cmp_func))) { - LOG_WARN("failed to push back sort function", K(ret)); - } else if (OB_FAIL(spec.hash_funcs_.push_back(hash_func))) { - LOG_WARN("failed to push back hash funcs", K(ret)); - } else { - ++dist_cnt; + ObCmpFunc cmp_func; + cmp_func.cmp_func_ = ObDatumFuncs::get_nullsafe_cmp_func( + expr->datum_meta_.type_, + expr->datum_meta_.type_, + NULL_LAST,//这里null last还是first无所谓 + expr->datum_meta_.cs_type_, + lib::is_oracle_mode()); + ObHashFunc hash_func; + hash_func.hash_func_ = expr->basic_funcs_->murmur_hash_; + if (OB_ISNULL(cmp_func.cmp_func_) || OB_ISNULL(hash_func.hash_func_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cmp_func or hash func is null, check datatype is valid", + K(cmp_func.cmp_func_), K(hash_func.hash_func_), K(ret)); + } else if (OB_FAIL(spec.sort_collations_.push_back(field_collation))) { + LOG_WARN("failed to push back sort collation", K(ret)); + } else if (OB_FAIL(spec.cmp_funcs_.push_back(cmp_func))) { + LOG_WARN("failed to push back sort function", K(ret)); + } else if (OB_FAIL(spec.hash_funcs_.push_back(hash_func))) { + LOG_WARN("failed to push back hash funcs", K(ret)); + } else { + ++dist_cnt; + } } } } + // complete distinct exprs + if (OB_SUCC(ret) && 0 != additional_exprs.count()) { + ARRAY_FOREACH(additional_exprs, i) { + const ObRawExpr* raw_expr = additional_exprs.at(i); + if (OB_ISNULL(raw_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("null pointer", K(ret)); + } else if (raw_expr->has_flag(IS_CONST) || raw_expr->has_flag(IS_CONST_EXPR)) { + // distinct const value, 这里需要注意:distinct 1被跳过了, + // 但ObMergeDistinct中,如果没有distinct列,则默认所有值都相等,这个语义正好是符合预期的。 + continue; + } else if (OB_FAIL(generate_rt_expr(*raw_expr, expr))) { + LOG_WARN("failed to generate rt expr", K(ret)); + } else if (OB_FAIL(spec.distinct_exprs_.push_back(expr))) { + LOG_WARN("failed to push back expr", K(ret)); + } + } + } } return ret; } diff --git a/src/sql/engine/basic/ob_chunk_datum_store.cpp b/src/sql/engine/basic/ob_chunk_datum_store.cpp index a1dc37598..b5594d627 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.cpp +++ b/src/sql/engine/basic/ob_chunk_datum_store.cpp @@ -1150,20 +1150,34 @@ int ObChunkDatumStore::load_next_chunk_blocks(ChunkIterator& it) tmp_read_size = read_size; chunk_size = read_size + read_off; } - if (0 == read_size) { // ret end int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != - (tmp_ret = read_file(it.chunk_mem_ + read_off, tmp_read_size, it.cur_iter_pos_, it.aio_read_handle_))) { + if (OB_SUCCESS != (tmp_ret = read_file(it.chunk_mem_ + read_off, + tmp_read_size, + it.cur_iter_pos_, + it.aio_read_handle_, + it.file_size_, + it.cur_iter_pos_))) { LOG_WARN("read blk info from file failed", K(tmp_ret), K_(it.cur_iter_pos)); } if (OB_ITER_END != tmp_ret) { - LOG_WARN("unexpected status", K(ret), K(tmp_ret)); + if (OB_UNLIKELY(OB_SUCCESS == tmp_ret)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected read succ", K(read_size), K(tmp_read_size), K(it), K(read_off)); + } else { + ret = tmp_ret; + LOG_WARN("unexpected status", K(ret)); + } } else { ret = OB_ITER_END; } - } else if (OB_FAIL(read_file(it.chunk_mem_ + read_off, read_size, it.cur_iter_pos_, it.aio_read_handle_))) { + } else if (OB_FAIL(read_file(it.chunk_mem_ + read_off, + read_size, + it.cur_iter_pos_, + it.aio_read_handle_, + it.file_size_, + it.cur_iter_pos_))) { LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos)); } else { int64_t cur_pos = 0; @@ -1377,8 +1391,12 @@ int ObChunkDatumStore::load_next_block(ChunkIterator& it) if (OB_ITER_END != ret) { LOG_WARN("failed to aio read", K(ret)); } - } else if (!enable_aio && - OB_FAIL(read_file(it.cur_iter_blk_, read_size, it.cur_iter_pos_, it.aio_read_handle_))) { + } else if (!enable_aio && OB_FAIL(read_file(it.cur_iter_blk_, + read_size, + it.cur_iter_pos_, + it.aio_read_handle_, + it.file_size_, + it.cur_iter_pos_))) { if (OB_ITER_END != ret) { LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos)); } @@ -1420,7 +1438,9 @@ int ObChunkDatumStore::load_next_block(ChunkIterator& it) } else if (OB_FAIL(read_file(static_cast(it.cur_iter_blk_->payload_ + read_size - sizeof(Block)), ac_size - read_size, it.cur_iter_pos_ + read_size, - it.aio_read_handle_))) { + it.aio_read_handle_, + it.file_size_, + it.cur_iter_pos_))) { if (OB_ITER_END != ret) { LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos)); } @@ -1900,8 +1920,8 @@ int ObChunkDatumStore::write_file(void* buf, int64_t size) return ret; } -int ObChunkDatumStore::read_file( - void* buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle& handle) +int ObChunkDatumStore::read_file(void *buf, const int64_t size, const int64_t offset, + blocksstable::ObTmpFileIOHandle &handle, const int64_t file_size, const int64_t cur_pos) { int ret = OB_SUCCESS; int64_t timeout_ms = 0; @@ -1918,10 +1938,13 @@ int ObChunkDatumStore::read_file( LOG_WARN("failed to wait write", K(ret)); } } - - if (OB_SUCC(ret) && size > 0) { - this->set_io(size, static_cast(buf)); - io_.io_desc_.category_ = common::USER_IO; + if (OB_FAIL(ret)) { + } else if (0 >= size) { + CK(cur_pos >= file_size); + OX(ret = OB_ITER_END); + } else { + this->set_io(size, static_cast(buf)); + io_.io_desc_.category_ = common::ObIOCategory::USER_IO; io_.io_desc_.wait_event_no_ = ObWaitEventIds::ROW_STORE_DISK_READ; if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_, offset, timeout_ms, handle))) { if (OB_ITER_END != ret) { diff --git a/src/sql/engine/basic/ob_chunk_datum_store.h b/src/sql/engine/basic/ob_chunk_datum_store.h index 0224de3b8..d54a13562 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.h +++ b/src/sql/engine/basic/ob_chunk_datum_store.h @@ -944,7 +944,8 @@ private: inline int dump_one_block(BlockBuffer* item); int write_file(void* buf, int64_t size); - int read_file(void* buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle& handle); + int read_file(void *buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle &handle, + const int64_t file_size, const int64_t cur_pos); int aio_read_file(void* buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle& handle); int aio_read_file(ChunkIterator& it, int64_t read_size); bool need_dump(int64_t extra_size); diff --git a/src/sql/engine/basic/ob_chunk_row_store.cpp b/src/sql/engine/basic/ob_chunk_row_store.cpp index 5b284dd59..c6a740064 100644 --- a/src/sql/engine/basic/ob_chunk_row_store.cpp +++ b/src/sql/engine/basic/ob_chunk_row_store.cpp @@ -995,19 +995,27 @@ int ObChunkRowStore::load_next_chunk_blocks(ChunkIterator& it) read_size = it.file_size_ - it.cur_iter_pos_; chunk_size = read_size + read_off; } - if (0 == read_size) { // ret end int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = read_file(it.chunk_mem_ + read_off, tmp_read_size, it.cur_iter_pos_))) { + if (OB_SUCCESS != + (tmp_ret = read_file( + it.chunk_mem_ + read_off, tmp_read_size, it.cur_iter_pos_, it.file_size_, it.cur_iter_pos_))) { LOG_WARN("read blk info from file failed", K(tmp_ret), K_(it.cur_iter_pos)); } if (OB_ITER_END != tmp_ret) { - LOG_WARN("unexpected status", K(ret), K(tmp_ret)); + if (OB_UNLIKELY(OB_SUCCESS == tmp_ret)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected read succ", K(read_size), K(tmp_read_size), K(it), K(read_off)); + } else { + ret = tmp_ret; + LOG_WARN("unexpected status", K(ret), K(tmp_ret)); + } } else { ret = OB_ITER_END; } - } else if (OB_FAIL(read_file(it.chunk_mem_ + read_off, read_size, it.cur_iter_pos_))) { + } else if (OB_FAIL( + read_file(it.chunk_mem_ + read_off, read_size, it.cur_iter_pos_, it.file_size_, it.cur_iter_pos_))) { LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos)); } else { int64_t cur_pos = 0; @@ -1144,7 +1152,6 @@ int ObChunkRowStore::load_next_block(ChunkIterator& it) if (OB_SUCC(ret)) { int64_t blk_cap = it.cur_iter_blk_buf_->capacity(); - LOG_DEBUG("RowStore need read file", K(it.cur_iter_blk_buf_->capacity()), K(it.cur_iter_blk_->blk_size_), @@ -1153,7 +1160,7 @@ int ObChunkRowStore::load_next_block(ChunkIterator& it) K_(it.cur_iter_pos), K_(it.cur_iter_blk)); // read a normal size of Block first - if (OB_FAIL(read_file(it.cur_iter_blk_, read_size, it.cur_iter_pos_))) { + if (OB_FAIL(read_file(it.cur_iter_blk_, read_size, it.cur_iter_pos_, it.file_size_, it.cur_iter_pos_))) { if (OB_ITER_END != ret) { LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos)); } @@ -1187,7 +1194,9 @@ int ObChunkRowStore::load_next_block(ChunkIterator& it) if (OB_FAIL(ret)) { } else if (OB_FAIL(read_file(static_cast(it.cur_iter_blk_->payload_ + read_size - sizeof(Block)), ac_size - read_size, - it.cur_iter_pos_ + read_size))) { + it.cur_iter_pos_ + read_size, + it.file_size_, + it.cur_iter_pos_))) { if (OB_ITER_END != ret) { LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos)); } @@ -1636,7 +1645,8 @@ int ObChunkRowStore::write_file(void* buf, int64_t size) return ret; } -int ObChunkRowStore::read_file(void* buf, const int64_t size, const int64_t offset) +int ObChunkRowStore::read_file( + void *buf, const int64_t size, const int64_t offset, const int64_t file_size, const int64_t cur_pos) { int ret = OB_SUCCESS; int64_t timeout_ms = 0; @@ -1649,9 +1659,12 @@ int ObChunkRowStore::read_file(void* buf, const int64_t size, const int64_t offs } else if (OB_FAIL(get_timeout(timeout_ms))) { LOG_WARN("get timeout failed", K(ret)); } - - if (OB_SUCC(ret) && size > 0) { - this->set_io(size, static_cast(buf)); + if (OB_FAIL(ret)) { + } else if (0 >= size) { + CK(cur_pos >= file_size); + OX(ret = OB_ITER_END); + } else { + this->set_io(size, static_cast(buf)); io_.io_desc_.category_ = GCONF._large_query_io_percentage.get_value() > 0 ? common::LARGE_QUERY_IO : common::USER_IO; io_.io_desc_.wait_event_no_ = ObWaitEventIds::ROW_STORE_DISK_READ; diff --git a/src/sql/engine/basic/ob_chunk_row_store.h b/src/sql/engine/basic/ob_chunk_row_store.h index 6274d28ab..94c2c57eb 100644 --- a/src/sql/engine/basic/ob_chunk_row_store.h +++ b/src/sql/engine/basic/ob_chunk_row_store.h @@ -635,7 +635,7 @@ private: inline int dump_one_block(BlockBuffer* item); int write_file(void* buf, int64_t size); - int read_file(void* buf, const int64_t size, const int64_t offset); + int read_file(void *buf, const int64_t size, const int64_t offset, const int64_t file_size, const int64_t cur_pos); bool need_dump(int64_t extra_size); BlockBuffer* new_block(); diff --git a/src/sql/engine/expr/ob_expr_in.cpp b/src/sql/engine/expr/ob_expr_in.cpp index 4b1299a29..dd190439a 100644 --- a/src/sql/engine/expr/ob_expr_in.cpp +++ b/src/sql/engine/expr/ob_expr_in.cpp @@ -318,8 +318,10 @@ int ObExprInOrNotIn::ObExprInCtx::init_hashset_vecs(int64_t param_num, int64_t r ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memory", K(ret)); } else { + for (int64_t i = 0; i < (1 << row_dimension); ++i) { + new (&hashset_vecs_[i]) ObExprInHashMap(); + } for (int i = 0; OB_SUCC(ret) && i < (1 << row_dimension); ++i) { - ObExprInHashMap* hashset_ptr = new (&hashset_vecs_[i]) ObExprInHashMap(); hashset_vecs_[i].set_meta_idx(i); hashset_vecs_[i].set_meta_dimension(row_dimension); if (OB_FAIL(hashset_vecs_[i].create(param_num))) { @@ -350,8 +352,10 @@ int ObExprInOrNotIn::ObExprInCtx::init_static_engine_hashset_vecs( ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memory", K(ret)); } else { + for (int64_t i = 0; i < (1 << row_dimension); ++i) { + new (&static_engine_hashset_vecs_[i]) ObExprInHashMap(); + } for (int i = 0; OB_SUCC(ret) && i < (1 << row_dimension); ++i) { - ObExprInHashMap* hashset_ptr = new (&static_engine_hashset_vecs_[i]) ObExprInHashMap(); static_engine_hashset_vecs_[i].set_meta_idx(i); static_engine_hashset_vecs_[i].set_meta_dimension(row_dimension); if (OB_FAIL(static_engine_hashset_vecs_[i].create(param_num))) { diff --git a/src/sql/engine/expr/ob_expr_nvl.cpp b/src/sql/engine/expr/ob_expr_nvl.cpp index 185418106..fd78bfd3c 100644 --- a/src/sql/engine/expr/ob_expr_nvl.cpp +++ b/src/sql/engine/expr/ob_expr_nvl.cpp @@ -156,8 +156,14 @@ int ObExprNvl::calc_result2(ObObj& result, const ObObj& obj1, const ObObj& obj2, LOG_WARN("invalid argument. allocator or session is NULL", K(ret), K(expr_ctx.calc_buf_), K(expr_ctx.my_session_)); } else { p_res = obj1.is_null() ? &obj2 : &obj1; + //We cannot expect the framework to help us convert types + //to ensure that they are consistent with the deduced results in old engine + //convert collations to avoid unexpected compares like gbk = utf8 + bool need_cast = !p_res->is_null() + && (p_res->get_type() != get_result_type().get_type() + || p_res->get_collation_type() != get_result_type().get_collation_type()); // no necessary to check p_res is NULL or not - if (!p_res->is_null() && p_res->get_type() != get_result_type().get_type()) { + if (need_cast) { EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE); cast_ctx.dest_collation_ = get_result_type().get_collation_type(); if (OB_FAIL(ObObjCaster::to_type(get_result_type().get_type(), cast_ctx, *p_res, result))) { -- GitLab