提交 0afbb844 编写于 作者: L ls0 提交者: LINGuanRen

patch bugfix to opensource

上级 bc3a22c7
......@@ -659,29 +659,45 @@ int ObStaticEngineCG::generate_spec(ObLogDistinct& op, ObMergeDistinctSpec& spec
return ret;
}
int ObStaticEngineCG::generate_spec(ObLogDistinct& op, ObHashDistinctSpec& spec, const bool in_root_job)
int ObStaticEngineCG::generate_spec(
ObLogDistinct &op, ObHashDistinctSpec &spec, const bool in_root_job)
{
int ret = OB_SUCCESS;
UNUSED(in_root_job);
spec.is_block_mode_ = op.get_block_mode();
if (OB_FAIL(spec.cmp_funcs_.init(op.get_distinct_exprs().count()))) {
int64_t init_count = op.get_distinct_exprs().count();
if (1 != op.get_num_of_child()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected child count of hash distinct", K(ret), K(op.get_num_of_child()));
} else if (OB_FAIL(spec.cmp_funcs_.init(init_count))) {
LOG_WARN("failed to init cmp functions", K(ret));
} else if (OB_FAIL(spec.hash_funcs_.init(init_count))) {
LOG_WARN("failed to init hash functions", K(ret));
} else if (OB_FAIL(spec.sort_collations_.init(init_count))) {
LOG_WARN("failed to init sort functions", K(ret));
} else if (OB_FAIL(spec.hash_funcs_.init(op.get_distinct_exprs().count()))) {
LOG_WARN("failed to init sort functions", K(ret));
} else if (OB_FAIL(spec.sort_collations_.init(op.get_distinct_exprs().count()))) {
LOG_WARN("failed to init sort functions", K(ret));
} else if (OB_FAIL(spec.distinct_exprs_.init(op.get_distinct_exprs().count()))) {
} else if (OB_FAIL(spec.distinct_exprs_.init(op.get_distinct_exprs().count()
+ op.get_child(0)->get_output_exprs().count()))) {
LOG_WARN("failed to init distinct exprs", K(ret));
} else {
ObExpr* expr = nullptr;
ObArray<ObRawExpr *> additional_exprs;
ObExpr *expr = nullptr;
ARRAY_FOREACH(op.get_child(0)->get_output_exprs(), i) {
ObRawExpr* raw_expr = op.get_child(0)->get_output_exprs().at(i);
bool is_distinct_expr = has_exist_in_array(op.get_distinct_exprs(), raw_expr);
if (!is_distinct_expr) {
OZ (additional_exprs.push_back(raw_expr));
}
}
if (OB_SUCC(ret)) {
int64_t dist_cnt = 0;
ARRAY_FOREACH(op.get_distinct_exprs(), i)
{
const ObRawExpr* raw_expr = op.get_distinct_exprs().at(i);
ARRAY_FOREACH(op.get_distinct_exprs(), i) {
ObRawExpr* raw_expr = op.get_distinct_exprs().at(i);
if (OB_ISNULL(raw_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null pointer", K(ret));
} else if (raw_expr->has_flag(IS_CONST) || raw_expr->has_flag(IS_CONST_EXPR)) {
// distinct const value, 这里需要注意:distinct 1被跳过了,
// 但ObMergeDistinct中,如果没有distinct列,则默认所有值都相等,这个语义正好是符合预期的。
continue;
} else if (OB_FAIL(generate_rt_expr(*raw_expr, expr))) {
LOG_WARN("failed to generate rt expr", K(ret));
......@@ -698,9 +714,10 @@ int ObStaticEngineCG::generate_spec(ObLogDistinct& op, ObHashDistinctSpec& spec,
is_ascending,
(is_null_first(order_direction) ^ is_ascending) ? NULL_LAST : NULL_FIRST);
ObCmpFunc cmp_func;
cmp_func.cmp_func_ = ObDatumFuncs::get_nullsafe_cmp_func(expr->datum_meta_.type_,
cmp_func.cmp_func_ = ObDatumFuncs::get_nullsafe_cmp_func(
expr->datum_meta_.type_,
NULL_LAST, // NULL_FIRST and NULL_LAST both OK.
expr->datum_meta_.type_,
NULL_LAST,//这里null last还是first无所谓
expr->datum_meta_.cs_type_,
lib::is_oracle_mode());
ObHashFunc hash_func;
......@@ -708,9 +725,7 @@ int ObStaticEngineCG::generate_spec(ObLogDistinct& op, ObHashDistinctSpec& spec,
if (OB_ISNULL(cmp_func.cmp_func_) || OB_ISNULL(hash_func.hash_func_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cmp_func or hash func is null, check datatype is valid",
K(cmp_func.cmp_func_),
K(hash_func.hash_func_),
K(ret));
K(cmp_func.cmp_func_), K(hash_func.hash_func_), K(ret));
} else if (OB_FAIL(spec.sort_collations_.push_back(field_collation))) {
LOG_WARN("failed to push back sort collation", K(ret));
} else if (OB_FAIL(spec.cmp_funcs_.push_back(cmp_func))) {
......@@ -723,6 +738,25 @@ int ObStaticEngineCG::generate_spec(ObLogDistinct& op, ObHashDistinctSpec& spec,
}
}
}
// complete distinct exprs
if (OB_SUCC(ret) && 0 != additional_exprs.count()) {
ARRAY_FOREACH(additional_exprs, i) {
const ObRawExpr* raw_expr = additional_exprs.at(i);
if (OB_ISNULL(raw_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null pointer", K(ret));
} else if (raw_expr->has_flag(IS_CONST) || raw_expr->has_flag(IS_CONST_EXPR)) {
// distinct const value, 这里需要注意:distinct 1被跳过了,
// 但ObMergeDistinct中,如果没有distinct列,则默认所有值都相等,这个语义正好是符合预期的。
continue;
} else if (OB_FAIL(generate_rt_expr(*raw_expr, expr))) {
LOG_WARN("failed to generate rt expr", K(ret));
} else if (OB_FAIL(spec.distinct_exprs_.push_back(expr))) {
LOG_WARN("failed to push back expr", K(ret));
}
}
}
}
return ret;
}
......
......@@ -1150,20 +1150,34 @@ int ObChunkDatumStore::load_next_chunk_blocks(ChunkIterator& it)
tmp_read_size = read_size;
chunk_size = read_size + read_off;
}
if (0 == read_size) {
// ret end
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS !=
(tmp_ret = read_file(it.chunk_mem_ + read_off, tmp_read_size, it.cur_iter_pos_, it.aio_read_handle_))) {
if (OB_SUCCESS != (tmp_ret = read_file(it.chunk_mem_ + read_off,
tmp_read_size,
it.cur_iter_pos_,
it.aio_read_handle_,
it.file_size_,
it.cur_iter_pos_))) {
LOG_WARN("read blk info from file failed", K(tmp_ret), K_(it.cur_iter_pos));
}
if (OB_ITER_END != tmp_ret) {
LOG_WARN("unexpected status", K(ret), K(tmp_ret));
if (OB_UNLIKELY(OB_SUCCESS == tmp_ret)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected read succ", K(read_size), K(tmp_read_size), K(it), K(read_off));
} else {
ret = tmp_ret;
LOG_WARN("unexpected status", K(ret));
}
} else {
ret = OB_ITER_END;
}
} else if (OB_FAIL(read_file(it.chunk_mem_ + read_off, read_size, it.cur_iter_pos_, it.aio_read_handle_))) {
} else if (OB_FAIL(read_file(it.chunk_mem_ + read_off,
read_size,
it.cur_iter_pos_,
it.aio_read_handle_,
it.file_size_,
it.cur_iter_pos_))) {
LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos));
} else {
int64_t cur_pos = 0;
......@@ -1377,8 +1391,12 @@ int ObChunkDatumStore::load_next_block(ChunkIterator& it)
if (OB_ITER_END != ret) {
LOG_WARN("failed to aio read", K(ret));
}
} else if (!enable_aio &&
OB_FAIL(read_file(it.cur_iter_blk_, read_size, it.cur_iter_pos_, it.aio_read_handle_))) {
} else if (!enable_aio && OB_FAIL(read_file(it.cur_iter_blk_,
read_size,
it.cur_iter_pos_,
it.aio_read_handle_,
it.file_size_,
it.cur_iter_pos_))) {
if (OB_ITER_END != ret) {
LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos));
}
......@@ -1420,7 +1438,9 @@ int ObChunkDatumStore::load_next_block(ChunkIterator& it)
} else if (OB_FAIL(read_file(static_cast<void*>(it.cur_iter_blk_->payload_ + read_size - sizeof(Block)),
ac_size - read_size,
it.cur_iter_pos_ + read_size,
it.aio_read_handle_))) {
it.aio_read_handle_,
it.file_size_,
it.cur_iter_pos_))) {
if (OB_ITER_END != ret) {
LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos));
}
......@@ -1900,8 +1920,8 @@ int ObChunkDatumStore::write_file(void* buf, int64_t size)
return ret;
}
int ObChunkDatumStore::read_file(
void* buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle& handle)
int ObChunkDatumStore::read_file(void *buf, const int64_t size, const int64_t offset,
blocksstable::ObTmpFileIOHandle &handle, const int64_t file_size, const int64_t cur_pos)
{
int ret = OB_SUCCESS;
int64_t timeout_ms = 0;
......@@ -1918,10 +1938,13 @@ int ObChunkDatumStore::read_file(
LOG_WARN("failed to wait write", K(ret));
}
}
if (OB_SUCC(ret) && size > 0) {
this->set_io(size, static_cast<char*>(buf));
io_.io_desc_.category_ = common::USER_IO;
if (OB_FAIL(ret)) {
} else if (0 >= size) {
CK(cur_pos >= file_size);
OX(ret = OB_ITER_END);
} else {
this->set_io(size, static_cast<char *>(buf));
io_.io_desc_.category_ = common::ObIOCategory::USER_IO;
io_.io_desc_.wait_event_no_ = ObWaitEventIds::ROW_STORE_DISK_READ;
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_, offset, timeout_ms, handle))) {
if (OB_ITER_END != ret) {
......
......@@ -944,7 +944,8 @@ private:
inline int dump_one_block(BlockBuffer* item);
int write_file(void* buf, int64_t size);
int read_file(void* buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle& handle);
int read_file(void *buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle &handle,
const int64_t file_size, const int64_t cur_pos);
int aio_read_file(void* buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle& handle);
int aio_read_file(ChunkIterator& it, int64_t read_size);
bool need_dump(int64_t extra_size);
......
......@@ -995,19 +995,27 @@ int ObChunkRowStore::load_next_chunk_blocks(ChunkIterator& it)
read_size = it.file_size_ - it.cur_iter_pos_;
chunk_size = read_size + read_off;
}
if (0 == read_size) {
// ret end
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = read_file(it.chunk_mem_ + read_off, tmp_read_size, it.cur_iter_pos_))) {
if (OB_SUCCESS !=
(tmp_ret = read_file(
it.chunk_mem_ + read_off, tmp_read_size, it.cur_iter_pos_, it.file_size_, it.cur_iter_pos_))) {
LOG_WARN("read blk info from file failed", K(tmp_ret), K_(it.cur_iter_pos));
}
if (OB_ITER_END != tmp_ret) {
if (OB_UNLIKELY(OB_SUCCESS == tmp_ret)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected read succ", K(read_size), K(tmp_read_size), K(it), K(read_off));
} else {
ret = tmp_ret;
LOG_WARN("unexpected status", K(ret), K(tmp_ret));
}
} else {
ret = OB_ITER_END;
}
} else if (OB_FAIL(read_file(it.chunk_mem_ + read_off, read_size, it.cur_iter_pos_))) {
} else if (OB_FAIL(
read_file(it.chunk_mem_ + read_off, read_size, it.cur_iter_pos_, it.file_size_, it.cur_iter_pos_))) {
LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos));
} else {
int64_t cur_pos = 0;
......@@ -1144,7 +1152,6 @@ int ObChunkRowStore::load_next_block(ChunkIterator& it)
if (OB_SUCC(ret)) {
int64_t blk_cap = it.cur_iter_blk_buf_->capacity();
LOG_DEBUG("RowStore need read file",
K(it.cur_iter_blk_buf_->capacity()),
K(it.cur_iter_blk_->blk_size_),
......@@ -1153,7 +1160,7 @@ int ObChunkRowStore::load_next_block(ChunkIterator& it)
K_(it.cur_iter_pos),
K_(it.cur_iter_blk));
// read a normal size of Block first
if (OB_FAIL(read_file(it.cur_iter_blk_, read_size, it.cur_iter_pos_))) {
if (OB_FAIL(read_file(it.cur_iter_blk_, read_size, it.cur_iter_pos_, it.file_size_, it.cur_iter_pos_))) {
if (OB_ITER_END != ret) {
LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos));
}
......@@ -1187,7 +1194,9 @@ int ObChunkRowStore::load_next_block(ChunkIterator& it)
if (OB_FAIL(ret)) {
} else if (OB_FAIL(read_file(static_cast<void*>(it.cur_iter_blk_->payload_ + read_size - sizeof(Block)),
ac_size - read_size,
it.cur_iter_pos_ + read_size))) {
it.cur_iter_pos_ + read_size,
it.file_size_,
it.cur_iter_pos_))) {
if (OB_ITER_END != ret) {
LOG_WARN("read blk info from file failed", K(ret), K_(it.cur_iter_pos));
}
......@@ -1636,7 +1645,8 @@ int ObChunkRowStore::write_file(void* buf, int64_t size)
return ret;
}
int ObChunkRowStore::read_file(void* buf, const int64_t size, const int64_t offset)
int ObChunkRowStore::read_file(
void *buf, const int64_t size, const int64_t offset, const int64_t file_size, const int64_t cur_pos)
{
int ret = OB_SUCCESS;
int64_t timeout_ms = 0;
......@@ -1649,9 +1659,12 @@ int ObChunkRowStore::read_file(void* buf, const int64_t size, const int64_t offs
} else if (OB_FAIL(get_timeout(timeout_ms))) {
LOG_WARN("get timeout failed", K(ret));
}
if (OB_SUCC(ret) && size > 0) {
this->set_io(size, static_cast<char*>(buf));
if (OB_FAIL(ret)) {
} else if (0 >= size) {
CK(cur_pos >= file_size);
OX(ret = OB_ITER_END);
} else {
this->set_io(size, static_cast<char *>(buf));
io_.io_desc_.category_ =
GCONF._large_query_io_percentage.get_value() > 0 ? common::LARGE_QUERY_IO : common::USER_IO;
io_.io_desc_.wait_event_no_ = ObWaitEventIds::ROW_STORE_DISK_READ;
......
......@@ -635,7 +635,7 @@ private:
inline int dump_one_block(BlockBuffer* item);
int write_file(void* buf, int64_t size);
int read_file(void* buf, const int64_t size, const int64_t offset);
int read_file(void *buf, const int64_t size, const int64_t offset, const int64_t file_size, const int64_t cur_pos);
bool need_dump(int64_t extra_size);
BlockBuffer* new_block();
......
......@@ -318,8 +318,10 @@ int ObExprInOrNotIn::ObExprInCtx::init_hashset_vecs(int64_t param_num, int64_t r
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
for (int64_t i = 0; i < (1 << row_dimension); ++i) {
new (&hashset_vecs_[i]) ObExprInHashMap<ObObj>();
}
for (int i = 0; OB_SUCC(ret) && i < (1 << row_dimension); ++i) {
ObExprInHashMap<ObObj>* hashset_ptr = new (&hashset_vecs_[i]) ObExprInHashMap<ObObj>();
hashset_vecs_[i].set_meta_idx(i);
hashset_vecs_[i].set_meta_dimension(row_dimension);
if (OB_FAIL(hashset_vecs_[i].create(param_num))) {
......@@ -350,8 +352,10 @@ int ObExprInOrNotIn::ObExprInCtx::init_static_engine_hashset_vecs(
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
for (int64_t i = 0; i < (1 << row_dimension); ++i) {
new (&static_engine_hashset_vecs_[i]) ObExprInHashMap<ObDatum>();
}
for (int i = 0; OB_SUCC(ret) && i < (1 << row_dimension); ++i) {
ObExprInHashMap<ObDatum>* hashset_ptr = new (&static_engine_hashset_vecs_[i]) ObExprInHashMap<ObDatum>();
static_engine_hashset_vecs_[i].set_meta_idx(i);
static_engine_hashset_vecs_[i].set_meta_dimension(row_dimension);
if (OB_FAIL(static_engine_hashset_vecs_[i].create(param_num))) {
......
......@@ -156,8 +156,14 @@ int ObExprNvl::calc_result2(ObObj& result, const ObObj& obj1, const ObObj& obj2,
LOG_WARN("invalid argument. allocator or session is NULL", K(ret), K(expr_ctx.calc_buf_), K(expr_ctx.my_session_));
} else {
p_res = obj1.is_null() ? &obj2 : &obj1;
//We cannot expect the framework to help us convert types
//to ensure that they are consistent with the deduced results in old engine
//convert collations to avoid unexpected compares like gbk = utf8
bool need_cast = !p_res->is_null()
&& (p_res->get_type() != get_result_type().get_type()
|| p_res->get_collation_type() != get_result_type().get_collation_type());
// no necessary to check p_res is NULL or not
if (!p_res->is_null() && p_res->get_type() != get_result_type().get_type()) {
if (need_cast) {
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
cast_ctx.dest_collation_ = get_result_type().get_collation_type();
if (OB_FAIL(ObObjCaster::to_type(get_result_type().get_type(), cast_ctx, *p_res, result))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册