Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Metz
oceanbase
提交
d1636b90
O
oceanbase
项目概览
Metz
/
oceanbase
与 Fork 源项目一致
Fork自
oceanbase / oceanbase
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oceanbase
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
d1636b90
编写于
6月 06, 2022
作者:
S
stdliu
提交者:
wangzelin.wzl
6月 06, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[CP] [CP] [CP] Fix rollup SIGSEGV when building stored row
上级
bc26b2f1
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
184 addition
and
324 deletion
+184
-324
src/sql/engine/aggregate/ob_aggregate_processor.cpp
src/sql/engine/aggregate/ob_aggregate_processor.cpp
+6
-6
src/sql/engine/aggregate/ob_aggregate_processor.h
src/sql/engine/aggregate/ob_aggregate_processor.h
+3
-3
src/sql/engine/aggregate/ob_merge_distinct_op.h
src/sql/engine/aggregate/ob_merge_distinct_op.h
+2
-2
src/sql/engine/aggregate/ob_merge_groupby_op.h
src/sql/engine/aggregate/ob_merge_groupby_op.h
+1
-1
src/sql/engine/basic/ob_chunk_datum_store.cpp
src/sql/engine/basic/ob_chunk_datum_store.cpp
+66
-136
src/sql/engine/basic/ob_chunk_datum_store.h
src/sql/engine/basic/ob_chunk_datum_store.h
+60
-77
src/sql/engine/basic/ob_limit_op.h
src/sql/engine/basic/ob_limit_op.h
+1
-1
src/sql/engine/connect_by/ob_cnnt_by_pump.cpp
src/sql/engine/connect_by/ob_cnnt_by_pump.cpp
+3
-26
src/sql/engine/dml/ob_multi_table_insert_up_op.cpp
src/sql/engine/dml/ob_multi_table_insert_up_op.cpp
+1
-25
src/sql/engine/join/ob_hash_join_op.h
src/sql/engine/join/ob_hash_join_op.h
+1
-1
src/sql/engine/join/ob_merge_join_op.h
src/sql/engine/join/ob_merge_join_op.h
+1
-1
src/sql/engine/join/ob_nested_loop_join_op.h
src/sql/engine/join/ob_nested_loop_join_op.h
+1
-1
src/sql/engine/pdml/static/ob_pdml_op_data_driver.h
src/sql/engine/pdml/static/ob_pdml_op_data_driver.h
+1
-1
src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp
src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp
+6
-6
src/sql/engine/px/exchange/ob_px_ms_coord_op.h
src/sql/engine/px/exchange/ob_px_ms_coord_op.h
+5
-5
src/sql/engine/px/exchange/ob_px_transmit_op.h
src/sql/engine/px/exchange/ob_px_transmit_op.h
+1
-1
src/sql/engine/px/exchange/ob_row_heap.cpp
src/sql/engine/px/exchange/ob_row_heap.cpp
+1
-1
src/sql/engine/px/exchange/ob_row_heap.h
src/sql/engine/px/exchange/ob_row_heap.h
+2
-2
src/sql/engine/recursive_cte/ob_search_method_op.cpp
src/sql/engine/recursive_cte/ob_search_method_op.cpp
+1
-1
src/sql/engine/set/ob_merge_set_op.h
src/sql/engine/set/ob_merge_set_op.h
+1
-1
src/sql/engine/sort/ob_sort_op_impl.cpp
src/sql/engine/sort/ob_sort_op_impl.cpp
+16
-22
src/sql/engine/sort/ob_sort_op_impl.h
src/sql/engine/sort/ob_sort_op_impl.h
+1
-1
src/sql/engine/subquery/ob_subplan_filter_op.cpp
src/sql/engine/subquery/ob_subplan_filter_op.cpp
+1
-1
src/sql/engine/window_function/ob_window_function_op.h
src/sql/engine/window_function/ob_window_function_op.h
+2
-2
未找到文件。
src/sql/engine/aggregate/ob_aggregate_processor.cpp
浏览文件 @
d1636b90
...
...
@@ -394,7 +394,7 @@ int64_t ObAggregateProcessor::ExtraResult::to_string(char* buf, const int64_t bu
return
pos
;
}
int
ObAggrInfo
::
eval_aggr
(
ObChunkDatumStore
::
ShadowStoredRow
<>
&
curr_row_results
,
ObEvalCtx
&
ctx
)
const
int
ObAggrInfo
::
eval_aggr
(
ObChunkDatumStore
::
ShadowStoredRow
&
curr_row_results
,
ObEvalCtx
&
ctx
)
const
{
int
ret
=
OB_SUCCESS
;
if
(
param_exprs_
.
empty
()
&&
T_FUN_COUNT
==
get_expr_type
())
{
...
...
@@ -1908,7 +1908,7 @@ int ObAggregateProcessor::collect_aggr_result(AggrCell& aggr_cell, const ObExpr*
const
ObChunkDatumStore
::
StoredRow
*
storted_row
=
NULL
;
int64_t
rank_num
=
0
;
bool
need_check_order_equal
=
T_FUN_GROUP_DENSE_RANK
==
aggr_fun
;
ObChunkDatumStore
::
LastStoredRow
<>
prev_row
(
aggr_alloc_
);
ObChunkDatumStore
::
LastStoredRow
prev_row
(
aggr_alloc_
);
int64_t
total_sort_row_cnt
=
extra
->
get_row_count
();
bool
is_first
=
true
;
while
(
OB_SUCC
(
ret
)
&&
OB_SUCC
(
extra
->
get_next_row
(
storted_row
)))
{
...
...
@@ -2044,7 +2044,7 @@ int ObAggregateProcessor::collect_aggr_result(AggrCell& aggr_cell, const ObExpr*
const
int64_t
total_row_count
=
extra
->
get_row_count
();
char
buf_alloc
[
number
::
ObNumber
::
MAX_CALC_BYTE_LEN
];
ObDataBuffer
allocator
(
buf_alloc
,
number
::
ObNumber
::
MAX_CALC_BYTE_LEN
);
ObChunkDatumStore
::
LastStoredRow
<>
prev_row
(
aggr_alloc_
);
ObChunkDatumStore
::
LastStoredRow
prev_row
(
aggr_alloc_
);
number
::
ObNumber
factor
;
bool
need_linear_inter
=
false
;
int64_t
not_null_start_loc
=
0
;
...
...
@@ -2163,7 +2163,7 @@ int ObAggregateProcessor::collect_aggr_result(AggrCell& aggr_cell, const ObExpr*
LOG_WARN
(
"finish_add_row failed"
,
KPC
(
extra
),
K
(
ret
));
}
else
{
const
ObChunkDatumStore
::
StoredRow
*
storted_row
=
NULL
;
ObChunkDatumStore
::
LastStoredRow
<>
first_row
(
aggr_alloc_
);
ObChunkDatumStore
::
LastStoredRow
first_row
(
aggr_alloc_
);
bool
is_first
=
true
;
while
(
OB_SUCC
(
ret
)
&&
OB_SUCC
(
extra
->
get_next_row
(
storted_row
)))
{
bool
is_equal
=
false
;
...
...
@@ -2925,7 +2925,7 @@ int ObAggregateProcessor::compare_calc(const ObDatum& left_value, const ObDatum&
return
ret
;
}
int
ObAggregateProcessor
::
check_rows_equal
(
const
ObChunkDatumStore
::
LastStoredRow
<>
&
prev_row
,
int
ObAggregateProcessor
::
check_rows_equal
(
const
ObChunkDatumStore
::
LastStoredRow
&
prev_row
,
const
ObChunkDatumStore
::
StoredRow
&
cur_row
,
const
ObAggrInfo
&
aggr_info
,
bool
&
is_equal
)
{
int
ret
=
OB_SUCCESS
;
...
...
@@ -3190,7 +3190,7 @@ int ObAggregateProcessor::get_wm_concat_result(
LOG_WARN
(
"finish_add_row failed"
,
KPC
(
extra
),
K
(
ret
));
}
else
{
ObString
sep_str
=
ObCharsetUtils
::
get_const_str
(
aggr_info
.
expr_
->
datum_meta_
.
cs_type_
,
','
);
ObChunkDatumStore
::
LastStoredRow
<>
first_row
(
aggr_alloc_
);
ObChunkDatumStore
::
LastStoredRow
first_row
(
aggr_alloc_
);
const
ObChunkDatumStore
::
StoredRow
*
storted_row
=
NULL
;
bool
is_first
=
true
;
bool
need_continue
=
true
;
...
...
src/sql/engine/aggregate/ob_aggregate_processor.h
浏览文件 @
d1636b90
...
...
@@ -62,7 +62,7 @@ public:
?
1
:
((
T_FUN_COUNT
==
get_expr_type
()
&&
param_exprs_
.
empty
())
?
0
:
param_exprs_
.
count
());
}
int
eval_aggr
(
ObChunkDatumStore
::
ShadowStoredRow
<>
&
curr_row_results
,
ObEvalCtx
&
ctx
)
const
;
int
eval_aggr
(
ObChunkDatumStore
::
ShadowStoredRow
&
curr_row_results
,
ObEvalCtx
&
ctx
)
const
;
inline
void
set_implicit_first_aggr
()
{
is_implicit_first_aggr_
=
true
;
...
...
@@ -326,7 +326,7 @@ public:
}
public:
ObChunkDatumStore
::
ShadowStoredRow
<>
curr_row_results_
;
ObChunkDatumStore
::
ShadowStoredRow
curr_row_results_
;
private:
// for avg/count
...
...
@@ -466,7 +466,7 @@ private:
int
rollup_distinct
(
AggrCell
&
aggr_cell
,
AggrCell
&
rollup_cell
);
int
compare_calc
(
const
ObDatum
&
left_value
,
const
ObDatum
&
right_value
,
const
ObAggrInfo
&
aggr_info
,
int64_t
index
,
int
&
compare_result
,
bool
&
is_asc
);
int
check_rows_equal
(
const
ObChunkDatumStore
::
LastStoredRow
<>
&
prev_row
,
const
ObChunkDatumStore
::
StoredRow
&
cur_row
,
int
check_rows_equal
(
const
ObChunkDatumStore
::
LastStoredRow
&
prev_row
,
const
ObChunkDatumStore
::
StoredRow
&
cur_row
,
const
ObAggrInfo
&
aggr_info
,
bool
&
is_equal
);
int
get_wm_concat_result
(
const
ObAggrInfo
&
aggr_info
,
GroupConcatExtraResult
*&
extra
,
bool
is_keep_group_concat
,
ObDatum
&
concat_result
);
...
...
src/sql/engine/aggregate/ob_merge_distinct_op.h
浏览文件 @
d1636b90
...
...
@@ -50,7 +50,7 @@ public:
};
private:
typedef
ObChunkDatumStore
::
LastStoredRow
<
ObChunkDatumStore
::
StoredRow
>
LastStoreRow
;
typedef
ObChunkDatumStore
::
LastStoredRow
LastStoreRow
;
bool
first_got_row_
;
common
::
ObArenaAllocator
alloc_
;
LastStoreRow
last_row_
;
...
...
@@ -60,4 +60,4 @@ private:
}
// end namespace sql
}
// end namespace oceanbase
#endif
/* OCEANBASE_SRC_SQL_ENGINE_AGGREGATE_OB_MERGE_DISTINCT_OP_H_ */
\ No newline at end of file
#endif
/* OCEANBASE_SRC_SQL_ENGINE_AGGREGATE_OB_MERGE_DISTINCT_OP_H_ */
src/sql/engine/aggregate/ob_merge_groupby_op.h
浏览文件 @
d1636b90
...
...
@@ -97,7 +97,7 @@ private:
// added to support groupby with rollup
int64_t
cur_output_group_id_
;
int64_t
first_output_group_id_
;
ObChunkDatumStore
::
LastStoredRow
<>
last_child_output_
;
ObChunkDatumStore
::
LastStoredRow
last_child_output_
;
DatumFixedArray
curr_groupby_datums_
;
int64_t
dir_id_
;
};
...
...
src/sql/engine/basic/ob_chunk_datum_store.cpp
浏览文件 @
d1636b90
...
...
@@ -57,113 +57,6 @@ void point2pointer(T*& dst_pointer, B* dst_base, T* src_pointer, const B* src_ba
dst_pointer
=
reinterpret_cast
<
T
*>
(
reinterpret_cast
<
char
*>
(
dst_base
)
+
reinterpret_cast
<
intptr_t
>
(
src_pointer
)
-
reinterpret_cast
<
const
char
*>
(
src_base
));
}
}
// namespace chunk_datum_store
int
ObChunkDatumStore
::
StoredRow
::
copy_datums
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
,
char
*
buf
,
const
int64_t
size
,
const
int64_t
row_size
,
const
uint32_t
row_extend_size
)
{
int
ret
=
OB_SUCCESS
;
if
(
payload_
!=
buf
||
size
<
0
)
{
ret
=
OB_INVALID_ARGUMENT
;
LOG_WARN
(
"invalid argument"
,
K
(
ret
),
KP
(
payload_
),
KP
(
buf
),
K
(
size
));
}
else
{
cnt_
=
static_cast
<
uint32_t
>
(
exprs
.
count
());
int64_t
pos
=
sizeof
(
ObDatum
)
*
cnt_
+
row_extend_size
;
row_size_
=
static_cast
<
int32_t
>
(
row_size
);
for
(
int64_t
i
=
0
;
OB_SUCC
(
ret
)
&&
i
<
cnt_
;
++
i
)
{
ObDatum
&
in_datum
=
static_cast
<
ObDatum
&>
(
exprs
.
at
(
i
)
->
locate_expr_datum
(
ctx
));
ObDatum
*
datum
=
new
(
&
cells
()[
i
])
ObDatum
();
if
(
OB_FAIL
(
datum
->
deep_copy
(
in_datum
,
buf
,
size
,
pos
)))
{
LOG_WARN
(
"failed to copy datum"
,
K
(
ret
),
K
(
i
),
K
(
pos
),
K
(
size
),
K
(
row_size
),
K
(
in_datum
),
K
(
*
datum
));
}
else
{
LOG_DEBUG
(
"succ to copy_datums"
,
K
(
cnt_
),
K
(
i
),
K
(
size
),
K
(
row_size
),
K
(
in_datum
),
K
(
*
datum
));
}
}
}
return
ret
;
}
int
ObChunkDatumStore
::
StoredRow
::
copy_datums
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
,
int64_t
&
row_size
,
char
*
buf
,
const
int64_t
max_buf_size
,
const
uint32_t
row_extend_size
)
{
int
ret
=
OB_SUCCESS
;
if
(
max_buf_size
<
0
)
{
ret
=
OB_INVALID_ARGUMENT
;
LOG_WARN
(
"invalid argument"
,
K
(
ret
),
KP
(
payload_
),
KP
(
buf
),
K
(
max_buf_size
));
}
else
{
cnt_
=
static_cast
<
uint32_t
>
(
exprs
.
count
());
int64_t
pos
=
sizeof
(
ObDatum
)
*
cnt_
+
row_extend_size
+
sizeof
(
StoredRow
);
ObDatum
*
datums
=
cells
();
if
(
pos
>
max_buf_size
)
{
ret
=
OB_BUF_NOT_ENOUGH
;
}
else
{
for
(
int64_t
i
=
0
;
OB_SUCC
(
ret
)
&&
i
<
cnt_
;
++
i
)
{
ObDatum
*
in_datum
=
nullptr
;
ObDatum
*
datum
=
(
ObDatum
*
)
datums
;
if
(
OB_FAIL
(
exprs
.
at
(
i
)
->
eval
(
ctx
,
in_datum
)))
{
LOG_WARN
(
"failed to eval datum"
,
K
(
ret
));
}
else
if
(
OB_FAIL
(
datum
->
deep_copy
(
*
in_datum
,
buf
,
max_buf_size
,
pos
)))
{
if
(
OB_BUF_NOT_ENOUGH
!=
ret
)
{
LOG_WARN
(
"failed to copy datum"
,
K
(
ret
),
K
(
i
),
K
(
pos
),
K
(
max_buf_size
),
KP
(
in_datum
),
K
(
*
datum
));
}
}
else
{
LOG_DEBUG
(
"succ to copy_datums"
,
K
(
cnt_
),
K
(
i
),
K
(
max_buf_size
),
KP
(
in_datum
),
K
(
*
datum
));
}
++
datums
;
}
row_size_
=
static_cast
<
int32_t
>
(
pos
);
row_size
=
row_size_
;
}
}
return
ret
;
}
int
ObChunkDatumStore
::
StoredRow
::
copy_datums
(
common
::
ObDatum
**
datums
,
const
int64_t
cnt
,
char
*
buf
,
const
int64_t
size
,
const
int64_t
row_size
,
const
uint32_t
row_extend_size
)
{
int
ret
=
OB_SUCCESS
;
if
(
payload_
!=
buf
||
size
<
0
||
nullptr
==
datums
)
{
ret
=
OB_INVALID_ARGUMENT
;
LOG_WARN
(
"invalid argument"
,
K
(
ret
),
KP
(
payload_
),
KP
(
buf
),
K
(
size
),
K
(
datums
));
}
else
{
cnt_
=
static_cast
<
uint32_t
>
(
cnt
);
int64_t
pos
=
sizeof
(
ObDatum
)
*
cnt_
+
row_extend_size
;
row_size_
=
static_cast
<
int32_t
>
(
row_size
);
for
(
int64_t
i
=
0
;
OB_SUCC
(
ret
)
&&
i
<
cnt_
;
++
i
)
{
if
(
nullptr
==
datums
[
i
])
{
ret
=
OB_INVALID_ARGUMENT
;
LOG_WARN
(
"invalid argument"
,
K
(
ret
),
KP
(
payload_
),
KP
(
buf
),
K
(
size
),
K
(
datums
));
}
else
{
ObDatum
*
datum
=
new
(
&
cells
()[
i
])
ObDatum
();
if
(
OB_FAIL
(
datum
->
deep_copy
(
*
datums
[
i
],
buf
,
size
,
pos
)))
{
LOG_WARN
(
"failed to copy datum"
,
K
(
ret
),
K
(
i
),
K
(
pos
),
K
(
size
),
K
(
row_size
),
K
(
*
datums
[
i
]),
K
(
datums
[
i
]
->
len_
));
}
}
}
}
return
ret
;
}
int
ObChunkDatumStore
::
StoredRow
::
copy_datums
(
common
::
ObDatum
*
datums
,
const
int64_t
cnt
,
char
*
buf
,
const
int64_t
size
,
const
int64_t
row_size
,
const
uint32_t
row_extend_size
)
{
int
ret
=
OB_SUCCESS
;
if
(
payload_
!=
buf
||
size
<
0
||
nullptr
==
datums
)
{
ret
=
OB_INVALID_ARGUMENT
;
LOG_WARN
(
"invalid argument"
,
K
(
ret
),
KP
(
payload_
),
KP
(
buf
),
K
(
size
),
K
(
datums
));
}
else
{
cnt_
=
static_cast
<
uint32_t
>
(
cnt
);
row_size_
=
static_cast
<
int32_t
>
(
row_size
);
MEMCPY
(
buf
,
static_cast
<
const
void
*>
(
datums
),
size
);
int64_t
pos
=
sizeof
(
ObDatum
)
*
cnt_
+
row_extend_size
;
for
(
int64_t
i
=
0
;
i
<
cnt_
;
++
i
)
{
cells
()[
i
].
ptr_
=
buf
+
pos
;
pos
+=
cells
()[
i
].
len_
;
}
}
return
ret
;
}
int
ObChunkDatumStore
::
StoredRow
::
to_expr
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
)
const
...
...
@@ -230,6 +123,54 @@ void ObChunkDatumStore::StoredRow::swizzling(char* base /*= NULL*/)
}
}
int
ObChunkDatumStore
::
StoredRow
::
build
(
StoredRow
*&
sr
,
const
ObExprPtrIArray
&
exprs
,
ObEvalCtx
&
ctx
,
char
*
buf
,
const
int64_t
buf_len
,
const
uint32_t
extra_size
/* = 0 */
)
{
int
ret
=
OB_SUCCESS
;
sr
=
reinterpret_cast
<
StoredRow
*>
(
buf
);
int64_t
pos
=
sizeof
(
*
sr
)
+
sizeof
(
ObDatum
)
*
exprs
.
count
()
+
extra_size
;
if
(
pos
>
buf_len
)
{
ret
=
OB_BUF_NOT_ENOUGH
;
}
else
{
sr
->
cnt_
=
exprs
.
count
();
ObDatum
*
datums
=
sr
->
cells
();
for
(
int64_t
i
=
0
;
i
<
exprs
.
count
()
&&
OB_SUCC
(
ret
);
i
++
)
{
ObExpr
*
expr
=
exprs
.
at
(
i
);
ObDatum
*
in_datum
=
NULL
;
if
(
OB_UNLIKELY
(
NULL
==
expr
))
{
// Set datum to NULL for NULL expr
datums
[
i
].
set_null
();
}
else
if
(
OB_FAIL
(
expr
->
eval
(
ctx
,
in_datum
)))
{
LOG_WARN
(
"expression evaluate failed"
,
K
(
ret
));
}
else
{
ret
=
datums
[
i
].
deep_copy
(
*
in_datum
,
buf
,
buf_len
,
pos
);
}
}
if
(
OB_SUCC
(
ret
))
{
sr
->
row_size_
=
static_cast
<
int32_t
>
(
pos
);
}
}
return
ret
;
}
int
ObChunkDatumStore
::
StoredRow
::
build
(
StoredRow
*&
sr
,
const
ObExprPtrIArray
&
exprs
,
ObEvalCtx
&
ctx
,
common
::
ObIAllocator
&
alloc
,
const
uint32_t
extra_size
/* = 0 */
)
{
int
ret
=
OB_SUCCESS
;
int64_t
size
=
0
;
char
*
buf
=
NULL
;
if
(
OB_FAIL
(
Block
::
row_store_size
(
exprs
,
ctx
,
size
,
extra_size
)))
{
LOG_WARN
(
"get row store size failed"
,
K
(
ret
));
}
else
if
(
NULL
==
(
buf
=
static_cast
<
char
*>
(
alloc
.
alloc
(
size
))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_WARN
(
"allocate memory failed"
,
K
(
ret
),
K
(
size
));
}
else
if
(
OB_FAIL
(
build
(
sr
,
exprs
,
ctx
,
buf
,
size
,
extra_size
)))
{
LOG_WARN
(
"build stored row failed"
,
K
(
ret
));
}
return
ret
;
}
int
ObChunkDatumStore
::
Block
::
add_row
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
,
const
int64_t
row_size
,
uint32_t
row_extend_size
,
StoredRow
**
stored_row
)
{
...
...
@@ -242,18 +183,15 @@ int ObChunkDatumStore::Block::add_row(const common::ObIArray<ObExpr*>& exprs, Ob
ret
=
OB_BUF_NOT_ENOUGH
;
LOG_WARN
(
"buffer not enough"
,
K
(
row_size
),
"remain"
,
buf
->
remain
());
}
else
{
StoredRow
*
sr
=
(
StoredRow
*
)
buf
->
head
();
if
(
OB_FAIL
(
sr
->
copy_datums
(
exprs
,
ctx
,
buf
->
head
()
+
ROW_HEAD_SIZE
,
row_size
-
ROW_HEAD_SIZE
,
row_size
,
row_extend_size
)))
{
LOG_WARN
(
"copy row failed"
,
K
(
ret
),
K
(
row_size
));
StoredRow
*
sr
=
NULL
;
if
(
OB_FAIL
(
StoredRow
::
build
(
sr
,
exprs
,
ctx
,
buf
->
head
(),
row_size
,
row_extend_size
)))
{
LOG_WARN
(
"build stored row failed"
,
K
(
ret
));
}
else
if
(
OB_FAIL
(
buf
->
advance
(
row_size
)))
{
LOG_WARN
(
"fill buffer head failed"
,
K
(
ret
),
K
(
buf
),
K
(
row_size
));
}
else
{
if
(
OB_FAIL
(
buf
->
advance
(
row_size
)))
{
LOG_WARN
(
"fill buffer head failed"
,
K
(
ret
),
K
(
buf
),
K
(
row_size
));
}
else
{
rows_
++
;
if
(
NULL
!=
stored_row
)
{
*
stored_row
=
sr
;
}
rows_
++
;
if
(
NULL
!=
stored_row
)
{
*
stored_row
=
sr
;
}
}
}
...
...
@@ -268,25 +206,17 @@ int ObChunkDatumStore::Block::append_row(const common::ObIArray<ObExpr*>& exprs,
ret
=
OB_INVALID_ARGUMENT
;
LOG_WARN
(
"invalid argument"
,
K
(
ret
),
K
(
buf
));
}
else
{
int64_t
max_size
=
buf
->
remain
();
if
(
ROW_HEAD_SIZE
>
max_size
)
{
ret
=
OB_BUF_NOT_ENOUGH
;
StoredRow
*
sr
=
NULL
;
if
(
OB_FAIL
(
StoredRow
::
build
(
sr
,
exprs
,
*
ctx
,
buf
->
head
(),
buf
->
remain
(),
row_extend_size
)))
{
if
(
OB_BUF_NOT_ENOUGH
!=
ret
)
{
LOG_WARN
(
"build stored row failed"
,
K
(
ret
));
}
}
else
if
(
OB_FAIL
(
buf
->
advance
(
sr
->
row_size_
)))
{
LOG_WARN
(
"buffer advance failed"
,
K
(
ret
));
}
else
{
StoredRow
*
sr
=
(
StoredRow
*
)
buf
->
head
();
int64_t
row_size
=
0
;
if
(
OB_FAIL
(
sr
->
copy_datums
(
exprs
,
*
ctx
,
row_size
,
buf
->
head
(),
max_size
,
row_extend_size
)))
{
if
(
OB_BUF_NOT_ENOUGH
!=
ret
)
{
LOG_WARN
(
"copy row failed"
,
K
(
ret
),
K
(
max_size
));
}
}
else
{
if
(
OB_FAIL
(
buf
->
advance
(
row_size
)))
{
LOG_WARN
(
"fill buffer head failed"
,
K
(
ret
),
K
(
buf
));
}
else
{
++
rows_
;
if
(
NULL
!=
stored_row
)
{
*
stored_row
=
sr
;
}
}
++
rows_
;
if
(
NULL
!=
stored_row
)
{
*
stored_row
=
sr
;
}
}
}
...
...
src/sql/engine/basic/ob_chunk_datum_store.h
浏览文件 @
d1636b90
...
...
@@ -34,36 +34,6 @@ class ObChunkDatumStore {
OB_UNIS_VERSION_V
(
1
);
public:
static
inline
int
row_copy_size
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
,
int64_t
&
size
)
{
int
ret
=
OB_SUCCESS
;
common
::
ObDatum
*
datum
=
nullptr
;
size
=
DATUM_SIZE
*
exprs
.
count
();
for
(
int64_t
i
=
0
;
i
<
exprs
.
count
()
&&
OB_SUCC
(
ret
);
++
i
)
{
if
(
OB_FAIL
(
exprs
.
at
(
i
)
->
eval
(
ctx
,
datum
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to eval expr datum"
,
KPC
(
exprs
.
at
(
i
)),
K
(
ret
));
}
else
{
size
+=
datum
->
len_
;
}
}
return
ret
;
}
static
inline
int64_t
row_copy_size
(
common
::
ObDatum
**
datums
,
const
int64_t
cnt
)
{
int64_t
size
=
DATUM_SIZE
*
cnt
;
for
(
int64_t
i
=
0
;
i
<
cnt
;
++
i
)
{
size
+=
datums
[
i
]
->
len_
;
}
return
size
;
}
static
inline
int64_t
row_copy_size
(
common
::
ObDatum
*
datums
,
const
int64_t
cnt
)
{
int64_t
size
=
DATUM_SIZE
*
cnt
;
for
(
int64_t
i
=
0
;
i
<
cnt
;
++
i
)
{
size
+=
datums
[
i
].
len_
;
}
return
size
;
}
/*
* StoredRow memory layout
* N Datum + extend_size(can be 0) + real data
...
...
@@ -75,16 +45,22 @@ public:
struct
StoredRow
{
StoredRow
()
:
cnt_
(
0
),
row_size_
(
0
)
{}
int
copy_datums
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
,
char
*
buf
,
const
int64_t
size
,
const
int64_t
row_size
,
const
uint32_t
row_extend_size
);
int
copy_datums
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
,
int64_t
&
row_size
,
char
*
buf
,
const
int64_t
max_buf_size
,
const
uint32_t
row_extend_size
);
int
copy_datums
(
common
::
ObDatum
**
datums
,
const
int64_t
cnt
,
char
*
buf
,
const
int64_t
size
,
const
int64_t
row_size
,
const
uint32_t
row_extend_size
);
int
copy_datums
(
common
::
ObDatum
*
datums
,
const
int64_t
cnt
,
char
*
buf
,
const
int64_t
size
,
const
int64_t
row_size
,
const
uint32_t
row_extend_size
);
int
to_expr
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
)
const
;
int
to_expr
(
const
common
::
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
ctx
,
int64_t
count
)
const
;
// Build a stored row by exprs.
// @param [out] sr, result stored row
// @param epxrs,
// @param ctx
// @param buf
// @param buf_len, use Block::row_store_size() to detect the needed buffer size.
// @param extra_size, extra store size
// @param unswizzling
// @return OB_SUCCESS or OB_BUF_NOT_ENOUGH if buf not enough
static
int
build
(
StoredRow
*&
sr
,
const
ObExprPtrIArray
&
exprs
,
ObEvalCtx
&
ctx
,
char
*
buf
,
const
int64_t
buf_len
,
const
uint32_t
extra_size
=
0
);
static
int
build
(
StoredRow
*&
sr
,
const
ObExprPtrIArray
&
exprs
,
ObEvalCtx
&
ctx
,
common
::
ObIAllocator
&
alloc
,
const
uint32_t
extra_size
=
0
);
inline
common
::
ObDatum
*
cells
()
{
...
...
@@ -117,7 +93,6 @@ public:
* 2) Provide reuse mode, memory can be reused
* 3) Provide conversion from StoredRow to ObIArray<ObExpr*>
*/
template
<
typename
T
=
ObChunkDatumStore
::
StoredRow
>
class
LastStoredRow
{
public:
LastStoredRow
(
ObIAllocator
&
alloc
)
...
...
@@ -138,13 +113,13 @@ public:
char
*
buf
=
NULL
;
int64_t
row_size
=
0
;
int64_t
buffer_len
=
0
;
T
*
new_row
=
NULL
;
StoredRow
*
new_row
=
NULL
;
if
(
0
==
exprs
.
count
())
{
// no column. scenario like distinct 1
}
else
if
(
OB_FAIL
(
ObChunkDatumStore
::
row_copy_size
(
exprs
,
ctx
,
row_size
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to calc copy size"
,
K
(
ret
));
}
else
{
int64_t
head_size
=
sizeof
(
T
);
int64_t
head_size
=
sizeof
(
StoredRow
);
reuse
=
OB_ISNULL
(
store_row_
)
?
false
:
reuse
&&
(
max_size_
>=
row_size
+
head_size
+
extra_size
);
if
(
reuse
&&
OB_NOT_NULL
(
store_row_
))
{
// switch buffer for write
...
...
@@ -163,24 +138,21 @@ public:
}
else
if
(
OB_ISNULL
(
buf2
=
reinterpret_cast
<
char
*>
(
alloc_
.
alloc
(
buffer_len
))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
SQL_ENG_LOG
(
ERROR
,
"alloc buf failed"
,
K
(
ret
));
}
else
if
(
OB_ISNULL
(
pre_alloc_row1_
=
new
(
buf1
)
T
()))
{
}
else
if
(
OB_ISNULL
(
pre_alloc_row1_
=
new
(
buf1
)
StoredRow
()))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
SQL_ENG_LOG
(
ERROR
,
"failed to new row"
,
K
(
ret
));
}
else
if
(
OB_ISNULL
(
pre_alloc_row2_
=
new
(
buf2
)
T
()))
{
}
else
if
(
OB_ISNULL
(
pre_alloc_row2_
=
new
(
buf2
)
StoredRow
()))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
SQL_ENG_LOG
(
ERROR
,
"failed to new row"
,
K
(
ret
));
}
else
{
buf
=
buf1
;
new_row
=
pre_alloc_row1_
;
}
}
if
(
OB_SUCC
(
ret
))
{
int64_t
pos
=
head_size
;
if
(
OB_FAIL
(
new_row
->
copy_datums
(
exprs
,
ctx
,
buf
+
pos
,
buffer_len
-
head_size
,
row_size
,
extra_size
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to deep copy row"
,
K
(
ret
),
K
(
buffer_len
),
K
(
row_size
));
if
(
OB_FAIL
(
StoredRow
::
build
(
store_row_
,
exprs
,
ctx
,
buf
,
buffer_len
,
extra_size
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to build stored row"
,
K
(
ret
),
K
(
buffer_len
),
K
(
row_size
));
}
else
{
max_size_
=
buffer_len
;
store_row_
=
new_row
;
}
}
}
...
...
@@ -192,9 +164,9 @@ public:
bool
reuse
=
reuse_
;
char
*
buf
=
NULL
;
int64_t
buffer_len
=
0
;
T
*
new_row
=
NULL
;
StoredRow
*
new_row
=
NULL
;
int64_t
row_size
=
row
.
row_size_
;
int64_t
head_size
=
sizeof
(
T
);
int64_t
head_size
=
sizeof
(
StoredRow
);
reuse
=
OB_ISNULL
(
store_row_
)
?
false
:
reuse
&&
(
max_size_
>=
row_size
+
head_size
+
extra_size
);
if
(
reuse
&&
OB_NOT_NULL
(
store_row_
))
{
buf
=
reinterpret_cast
<
char
*>
(
store_row_
);
...
...
@@ -205,20 +177,15 @@ public:
if
(
OB_ISNULL
(
buf
=
reinterpret_cast
<
char
*>
(
alloc_
.
alloc
(
buffer_len
))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
SQL_ENG_LOG
(
ERROR
,
"alloc buf failed"
,
K
(
ret
));
}
else
if
(
OB_ISNULL
(
new_row
=
new
(
buf
)
T
()))
{
}
else
if
(
OB_ISNULL
(
new_row
=
new
(
buf
)
StoredRow
()))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
SQL_ENG_LOG
(
ERROR
,
"failed to new row"
,
K
(
ret
));
}
}
if
(
OB_SUCC
(
ret
))
{
int64_t
pos
=
head_size
;
if
(
OB_FAIL
(
new_row
->
copy_datums
(
const_cast
<
common
::
ObDatum
*>
(
row
.
cells
()),
row
.
cnt_
,
buf
+
pos
,
buffer_len
-
head_size
,
row_size
,
extra_size
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to deep copy row"
,
K
(
ret
),
K
(
buffer_len
),
K
(
row_size
));
if
(
OB_FAIL
(
new_row
->
assign
(
&
row
)))
{
SQL_ENG_LOG
(
WARN
,
"stored row assign failed"
,
K
(
ret
));
}
else
{
max_size_
=
buffer_len
;
store_row_
=
new_row
;
...
...
@@ -226,7 +193,7 @@ public:
}
return
ret
;
}
void
set_store_row
(
T
*
in_store_row
)
void
set_store_row
(
StoredRow
*
in_store_row
)
{
store_row_
=
in_store_row
;
}
...
...
@@ -237,18 +204,17 @@ public:
}
TO_STRING_KV
(
K_
(
max_size
),
K_
(
reuse
),
KPC_
(
store_row
));
T
*
store_row_
;
StoredRow
*
store_row_
;
ObIAllocator
&
alloc_
;
int64_t
max_size_
;
bool
reuse_
;
private:
// To avoid writing memory overwrite, alloc 2 row for alternate writing
T
*
pre_alloc_row1_
;
T
*
pre_alloc_row2_
;
StoredRow
*
pre_alloc_row1_
;
StoredRow
*
pre_alloc_row2_
;
};
template
<
typename
T
=
ObChunkDatumStore
::
StoredRow
>
class
ShadowStoredRow
{
public:
ShadowStoredRow
()
:
alloc_
(
nullptr
),
store_row_
(
nullptr
),
saved_
(
false
)
...
...
@@ -261,7 +227,7 @@ public:
int
init
(
common
::
ObIAllocator
&
allocator
,
int64_t
datum_cnt
)
{
int
ret
=
OB_SUCCESS
;
int64_t
buffer_len
=
datum_cnt
*
sizeof
(
ObDatum
)
+
sizeof
(
T
);
int64_t
buffer_len
=
datum_cnt
*
sizeof
(
ObDatum
)
+
sizeof
(
StoredRow
);
char
*
buf
=
nullptr
;
if
(
NULL
!=
alloc_
)
{
ret
=
common
::
OB_INIT_TWICE
;
...
...
@@ -271,9 +237,9 @@ public:
SQL_ENG_LOG
(
ERROR
,
"alloc buf failed"
,
K
(
ret
));
}
else
{
alloc_
=
&
allocator
;
store_row_
=
new
(
buf
)
T
();
store_row_
=
new
(
buf
)
StoredRow
();
store_row_
->
cnt_
=
datum_cnt
;
store_row_
->
row_size_
=
datum_cnt
*
sizeof
(
ObDatum
);
store_row_
->
row_size_
=
datum_cnt
*
sizeof
(
ObDatum
)
+
sizeof
(
StoredRow
)
;
saved_
=
false
;
}
return
ret
;
...
...
@@ -322,7 +288,7 @@ public:
saved_
=
false
;
}
T
*
get_store_row
()
const
StoredRow
*
get_store_row
()
const
{
return
store_row_
;
}
...
...
@@ -330,7 +296,7 @@ public:
private:
common
::
ObIAllocator
*
alloc_
;
T
*
store_row_
;
StoredRow
*
store_row_
;
bool
saved_
;
};
...
...
@@ -370,18 +336,10 @@ public:
}
// following interface for ObDatum only,unused for now
static
int64_t
inline
min_buf_size
(
common
::
ObDatum
**
datums
,
const
int64_t
cnt
)
{
return
BlockBuffer
::
HEAD_SIZE
+
sizeof
(
BlockBuffer
)
+
row_store_size
(
datums
,
cnt
);
}
static
int64_t
inline
min_buf_size
(
common
::
ObDatum
*
datums
,
const
int64_t
cnt
)
{
return
BlockBuffer
::
HEAD_SIZE
+
sizeof
(
BlockBuffer
)
+
row_store_size
(
datums
,
cnt
);
}
static
int64_t
inline
row_store_size
(
common
::
ObDatum
**
datums
,
const
int64_t
cnt
,
uint32_t
row_extend_size
=
0
)
{
return
ROW_HEAD_SIZE
+
row_extend_size
+
ObChunkDatumStore
::
row_copy_size
(
datums
,
cnt
);
}
static
int64_t
inline
row_store_size
(
common
::
ObDatum
*
datums
,
const
int64_t
cnt
,
uint32_t
row_extend_size
=
0
)
{
return
ROW_HEAD_SIZE
+
row_extend_size
+
ObChunkDatumStore
::
row_copy_size
(
datums
,
cnt
);
...
...
@@ -970,6 +928,30 @@ private:
callback_
->
free
(
size
);
}
static
inline
int
row_copy_size
(
const
common
::
ObIArray
<
ObExpr
*>
&
exprs
,
ObEvalCtx
&
ctx
,
int64_t
&
size
)
{
int
ret
=
OB_SUCCESS
;
common
::
ObDatum
*
datum
=
nullptr
;
size
=
DATUM_SIZE
*
exprs
.
count
();
for
(
int64_t
i
=
0
;
i
<
exprs
.
count
()
&&
OB_SUCC
(
ret
);
++
i
)
{
if
(
OB_FAIL
(
exprs
.
at
(
i
)
->
eval
(
ctx
,
datum
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to eval expr datum"
,
KPC
(
exprs
.
at
(
i
)),
K
(
ret
));
}
else
{
size
+=
datum
->
len_
;
}
}
return
ret
;
}
static
inline
int64_t
row_copy_size
(
common
::
ObDatum
*
datums
,
const
int64_t
cnt
)
{
int64_t
size
=
DATUM_SIZE
*
cnt
;
for
(
int64_t
i
=
0
;
i
<
cnt
;
++
i
)
{
size
+=
datums
[
i
].
len_
;
}
return
size
;
}
private:
bool
inited_
;
uint64_t
tenant_id_
;
...
...
@@ -1011,6 +993,8 @@ private:
DISALLOW_COPY_AND_ASSIGN
(
ObChunkDatumStore
);
};
typedef
ObChunkDatumStore
::
StoredRow
ObStoredDatumRow
;
inline
int
ObChunkDatumStore
::
BlockBuffer
::
advance
(
int64_t
size
)
{
int
ret
=
common
::
OB_SUCCESS
;
...
...
@@ -1026,7 +1010,6 @@ inline int ObChunkDatumStore::BlockBuffer::advance(int64_t size)
}
return
ret
;
}
}
// end namespace sql
}
// end namespace oceanbase
...
...
src/sql/engine/basic/ob_limit_op.h
浏览文件 @
d1636b90
...
...
@@ -68,7 +68,7 @@ private:
int64_t
total_cnt_
;
bool
is_percent_first_
;
ObChunkDatumStore
::
LastStoredRow
<>
pre_sort_columns_
;
ObChunkDatumStore
::
LastStoredRow
pre_sort_columns_
;
};
}
// end namespace sql
...
...
src/sql/engine/connect_by/ob_cnnt_by_pump.cpp
浏览文件 @
d1636b90
...
...
@@ -21,32 +21,9 @@ using namespace oceanbase::common;
int
ObConnectByOpPumpBase
::
deep_copy_row
(
const
ObIArray
<
ObExpr
*>&
exprs
,
const
ObChunkDatumStore
::
StoredRow
*&
dst_row
)
{
int
ret
=
OB_SUCCESS
;
char
*
buf
=
NULL
;
int64_t
row_size
=
0
;
int64_t
buffer_len
=
0
;
int64_t
extra_size
=
0
;
int64_t
head_size
=
sizeof
(
ObChunkDatumStore
::
StoredRow
);
int64_t
pos
=
head_size
;
ObChunkDatumStore
::
StoredRow
*
new_row
=
nullptr
;
if
(
OB_FAIL
(
ObChunkDatumStore
::
row_copy_size
(
exprs
,
*
eval_ctx_
,
row_size
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to calc copy size"
,
K
(
ret
));
}
else
{
row_size
+=
head_size
;
buffer_len
=
row_size
+
extra_size
;
if
(
OB_ISNULL
(
buf
=
reinterpret_cast
<
char
*>
(
allocator_
.
alloc
(
buffer_len
))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
SQL_ENG_LOG
(
ERROR
,
"alloc buf failed"
,
K
(
ret
));
}
else
if
(
OB_ISNULL
(
new_row
=
new
(
buf
)
ObChunkDatumStore
::
StoredRow
()))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
SQL_ENG_LOG
(
ERROR
,
"failed to new row"
,
K
(
ret
));
}
else
if
(
OB_FAIL
(
new_row
->
copy_datums
(
exprs
,
*
eval_ctx_
,
buf
+
pos
,
buffer_len
-
head_size
,
row_size
,
extra_size
)))
{
SQL_ENG_LOG
(
WARN
,
"failed to deep copy row"
,
K
(
ret
),
K
(
buffer_len
),
K
(
row_size
));
}
else
{
dst_row
=
new_row
;
}
}
ObChunkDatumStore
::
StoredRow
*
sr
=
NULL
;
int
ret
=
ObChunkDatumStore
::
StoredRow
::
build
(
sr
,
exprs
,
*
eval_ctx_
,
allocator_
);
dst_row
=
sr
;
return
ret
;
}
...
...
src/sql/engine/dml/ob_multi_table_insert_up_op.cpp
浏览文件 @
d1636b90
...
...
@@ -326,31 +326,7 @@ int ObMultiTableInsertUpOp::shuffle_update_row(const ObChunkDatumStore::StoredRo
int
ObMultiTableInsertUpOp
::
convert_exprs_to_stored_row
(
ObIAllocator
&
allocator
,
ObEvalCtx
&
eval_ctx
,
const
ObExprPtrIArray
&
exprs
,
ObChunkDatumStore
::
StoredRow
*&
new_row
)
{
int
ret
=
OB_SUCCESS
;
new_row
=
NULL
;
char
*
buf
=
NULL
;
int64_t
row_size
=
0
;
if
(
OB_FAIL
(
ObChunkDatumStore
::
row_copy_size
(
exprs
,
eval_ctx
,
row_size
)))
{
LOG_WARN
(
"failed to calc copy size"
,
K
(
ret
));
}
else
{
const
int64_t
STORE_ROW_HEADER_SIZE
=
sizeof
(
ObChunkDatumStore
::
StoredRow
);
int64_t
buffer_len
=
STORE_ROW_HEADER_SIZE
+
row_size
;
if
(
OB_ISNULL
(
buf
=
reinterpret_cast
<
char
*>
(
allocator
.
alloc
(
buffer_len
))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"alloc buf failed"
,
K
(
ret
));
}
else
if
(
OB_ISNULL
(
new_row
=
new
(
buf
)
ObChunkDatumStore
::
StoredRow
()))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"failed to new row"
,
K
(
ret
));
}
else
{
int64_t
pos
=
STORE_ROW_HEADER_SIZE
;
if
(
OB_FAIL
(
new_row
->
copy_datums
(
exprs
,
eval_ctx
,
buf
+
pos
,
buffer_len
-
STORE_ROW_HEADER_SIZE
,
row_size
,
0
/*extra_size*/
)))
{
LOG_WARN
(
"failed to deep copy row"
,
K
(
ret
),
K
(
buffer_len
));
}
}
}
return
ret
;
return
ObChunkDatumStore
::
StoredRow
::
build
(
new_row
,
exprs
,
eval_ctx
,
allocator
);
}
}
// namespace sql
...
...
src/sql/engine/join/ob_hash_join_op.h
浏览文件 @
d1636b90
...
...
@@ -689,7 +689,7 @@ private:
bool
postprocessed_left_
;
bool
has_fill_right_row_
;
bool
has_fill_left_row_
;
ObChunkDatumStore
::
ShadowStoredRow
<
ObHashJoinStoredJoinRow
>
right_last_row_
;
ObChunkDatumStore
::
ShadowStoredRow
right_last_row_
;
bool
need_return_
;
bool
iter_end_
;
bool
opt_cache_aware_
;
...
...
src/sql/engine/join/ob_merge_join_op.h
浏览文件 @
d1636b90
...
...
@@ -192,7 +192,7 @@ private:
bool
has_backup_row_
;
bool
reach_end_
;
// child iterator end
bool
*
left_row_joined_
;
ObChunkDatumStore
::
ShadowStoredRow
<>
store_row_
;
ObChunkDatumStore
::
ShadowStoredRow
store_row_
;
ObOperator
*
child_
;
};
...
...
src/sql/engine/join/ob_nested_loop_join_op.h
浏览文件 @
d1636b90
...
...
@@ -102,7 +102,7 @@ public:
ObChunkDatumStore
left_store_
;
ObChunkDatumStore
::
Iterator
left_store_iter_
;
bool
is_left_end_
;
ObChunkDatumStore
::
ShadowStoredRow
<>
last_store_row_
;
ObChunkDatumStore
::
ShadowStoredRow
last_store_row_
;
bool
save_last_row_
;
private:
...
...
src/sql/engine/pdml/static/ob_pdml_op_data_driver.h
浏览文件 @
d1636b90
...
...
@@ -132,7 +132,7 @@ private:
DriverState
state_
;
// Driver current state
ObEvalCtx
*
eval_ctx_
;
ObChunkDatumStore
::
LastStoredRow
<>
last_row_
;
ObChunkDatumStore
::
LastStoredRow
last_row_
;
int64_t
last_row_part_id_
;
const
ObExprPtrIArray
*
last_row_expr_
;
int64_t
op_id_
;
...
...
src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp
浏览文件 @
d1636b90
...
...
@@ -129,12 +129,12 @@ int ObPxMSCoordOp::init_store_rows(int64_t n_ways)
LOG_WARN
(
"unexpected status: store rows is not empty"
,
K
(
ret
),
K
(
n_ways
));
}
for
(
int64_t
i
=
0
;
i
<
n_ways
&&
OB_SUCC
(
ret
);
++
i
)
{
void
*
buf
=
alloc_
.
alloc
(
sizeof
(
ObChunkDatumStore
::
LastStoredRow
<>
));
void
*
buf
=
alloc_
.
alloc
(
sizeof
(
ObChunkDatumStore
::
LastStoredRow
));
if
(
OB_ISNULL
(
buf
))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_WARN
(
"failed to alloca memory"
,
K
(
ret
));
}
else
{
ObChunkDatumStore
::
LastStoredRow
<>*
store_row
=
new
(
buf
)
ObChunkDatumStore
::
LastStoredRow
<>
(
alloc_
);
ObChunkDatumStore
::
LastStoredRow
*
store_row
=
new
(
buf
)
ObChunkDatumStore
::
LastStoredRow
(
alloc_
);
if
(
OB_FAIL
(
store_rows_
.
push_back
(
store_row
)))
{
LOG_WARN
(
"failed to push back"
,
K
(
ret
));
}
...
...
@@ -160,7 +160,7 @@ int ObPxMSCoordOp::free_allocator()
last_pop_row_
=
nullptr
;
// the heap shoud be empty before store_rows_.reset();
while
(
OB_SUCC
(
ret
)
&&
row_heap_
.
count
()
>
0
)
{
const
ObChunkDatumStore
::
LastStoredRow
<>
*
pop_row
=
nullptr
;
const
ObChunkDatumStore
::
LastStoredRow
*
pop_row
=
nullptr
;
if
(
OB_SUCC
(
row_heap_
.
raw_pop
(
pop_row
)))
{
row_heap_
.
shrink
();
}
else
{
...
...
@@ -324,7 +324,7 @@ int ObPxMSCoordOp::next_row(bool& wait_next_msg)
LOG_WARN
(
"fail push row to heap"
,
K
(
ret
));
}
}
else
{
ObChunkDatumStore
::
LastStoredRow
<>
*
cur_row
=
nullptr
;
ObChunkDatumStore
::
LastStoredRow
*
cur_row
=
nullptr
;
if
(
OB_FAIL
(
store_rows_
.
at
(
row_heap_
.
writable_channel_idx
(),
cur_row
)))
{
LOG_WARN
(
"failed to get store row"
,
K
(
ret
),
K
(
row_heap_
.
writable_channel_idx
()));
}
else
if
(
OB_FAIL
(
cur_row
->
save_store_row
(
MY_SPEC
.
all_exprs_
,
eval_ctx_
,
0
)))
{
...
...
@@ -346,14 +346,14 @@ int ObPxMSCoordOp::next_row(bool& wait_next_msg)
all_rows_finish_
=
true
;
metric_
.
mark_last_out
();
}
else
if
(
row_heap_
.
capacity
()
==
row_heap_
.
count
())
{
const
ObChunkDatumStore
::
LastStoredRow
<>
*
pop_row
=
nullptr
;
const
ObChunkDatumStore
::
LastStoredRow
*
pop_row
=
nullptr
;
if
(
OB_FAIL
(
row_heap_
.
pop
(
pop_row
)))
{
LOG_WARN
(
"failed to pop row"
,
K
(
ret
));
}
else
if
(
OB_FAIL
(
pop_row
->
store_row_
->
to_expr
(
MY_SPEC
.
all_exprs_
,
eval_ctx_
)))
{
LOG_WARN
(
"failed to to exprs"
,
K
(
ret
));
}
else
{
wait_next_msg
=
false
;
last_pop_row_
=
const_cast
<
ObChunkDatumStore
::
LastStoredRow
<>
*>
(
pop_row
);
last_pop_row_
=
const_cast
<
ObChunkDatumStore
::
LastStoredRow
*>
(
pop_row
);
}
metric_
.
count
();
metric_
.
mark_first_out
();
...
...
src/sql/engine/px/exchange/ob_px_ms_coord_op.h
浏览文件 @
d1636b90
...
...
@@ -72,7 +72,7 @@ public:
};
class
ObMsgReceiveFilter
:
public
dtl
::
ObIDltChannelLoopPred
{
public:
ObMsgReceiveFilter
(
ObRowHeap
<
ObMaxDatumRowCompare
,
ObChunkDatumStore
::
LastStoredRow
<>
>&
heap
)
ObMsgReceiveFilter
(
ObRowHeap
<
ObMaxDatumRowCompare
,
ObChunkDatumStore
::
LastStoredRow
>&
heap
)
:
data_ch_idx_start_
(
-
1
),
data_ch_idx_end_
(
-
1
),
heap_
(
heap
)
{}
~
ObMsgReceiveFilter
()
=
default
;
...
...
@@ -96,7 +96,7 @@ public:
private:
int64_t
data_ch_idx_start_
;
int64_t
data_ch_idx_end_
;
ObRowHeap
<
ObMaxDatumRowCompare
,
ObChunkDatumStore
::
LastStoredRow
<>
>&
heap_
;
ObRowHeap
<
ObMaxDatumRowCompare
,
ObChunkDatumStore
::
LastStoredRow
>&
heap_
;
};
public:
...
...
@@ -127,9 +127,9 @@ private:
ObBarrierPieceMsgP
barrier_piece_msg_proc_
;
ObWinbufPieceMsgP
winbuf_piece_msg_proc_
;
ObPxQcInterruptedP
interrupt_proc_
;
ObArray
<
ObChunkDatumStore
::
LastStoredRow
<>
*>
store_rows_
;
ObChunkDatumStore
::
LastStoredRow
<>
*
last_pop_row_
;
ObRowHeap
<
ObMaxDatumRowCompare
,
ObChunkDatumStore
::
LastStoredRow
<>
>
row_heap_
;
ObArray
<
ObChunkDatumStore
::
LastStoredRow
*>
store_rows_
;
ObChunkDatumStore
::
LastStoredRow
*
last_pop_row_
;
ObRowHeap
<
ObMaxDatumRowCompare
,
ObChunkDatumStore
::
LastStoredRow
>
row_heap_
;
ObMsgReceiveFilter
receive_order_
;
common
::
ObArenaAllocator
alloc_
;
};
...
...
src/sql/engine/px/exchange/ob_px_transmit_op.h
浏览文件 @
d1636b90
...
...
@@ -135,7 +135,7 @@ protected:
common
::
ObArenaAllocator
px_row_allocator_
;
ObPxTaskChSet
task_ch_set_
;
bool
transmited_
;
// const ObChunkDatum::LastStoredRow
<>
first_row_;
// const ObChunkDatum::LastStoredRow first_row_;
bool
iter_end_
;
bool
consume_first_row_
;
dtl
::
ObDtlUnblockingMsgP
dfc_unblock_msg_proc_
;
...
...
src/sql/engine/px/exchange/ob_row_heap.cpp
浏览文件 @
d1636b90
...
...
@@ -129,7 +129,7 @@ ObMaxDatumRowCompare::ObMaxDatumRowCompare()
int
ObMaxDatumRowCompare
::
init
(
const
ObIArray
<
ObSortFieldCollation
>*
sort_collations
,
const
ObIArray
<
ObSortCmpFunc
>*
sort_cmp_funs
,
const
common
::
ObIArray
<
const
ObChunkDatumStore
::
LastStoredRow
<>
*>&
rows
)
const
common
::
ObIArray
<
const
ObChunkDatumStore
::
LastStoredRow
*>&
rows
)
{
int
ret
=
OB_SUCCESS
;
bool
is_static_cmp
=
false
;
...
...
src/sql/engine/px/exchange/ob_row_heap.h
浏览文件 @
d1636b90
...
...
@@ -84,7 +84,7 @@ class ObMaxDatumRowCompare {
public:
ObMaxDatumRowCompare
();
int
init
(
const
ObIArray
<
ObSortFieldCollation
>*
sort_collations
,
const
ObIArray
<
ObSortCmpFunc
>*
sort_cmp_funs
,
const
common
::
ObIArray
<
const
ObChunkDatumStore
::
LastStoredRow
<>
*>&
rows
);
const
common
::
ObIArray
<
const
ObChunkDatumStore
::
LastStoredRow
*>&
rows
);
// compare function for quick sort.
bool
operator
()(
int64_t
row_idx1
,
int64_t
row_idx2
);
...
...
@@ -113,7 +113,7 @@ public:
int
ret_
;
const
ObIArray
<
ObSortFieldCollation
>*
sort_collations_
;
const
ObIArray
<
ObSortCmpFunc
>*
sort_cmp_funs_
;
const
common
::
ObIArray
<
const
ObChunkDatumStore
::
LastStoredRow
<>
*>*
rows_
;
const
common
::
ObIArray
<
const
ObChunkDatumStore
::
LastStoredRow
*>*
rows_
;
};
/*
...
...
src/sql/engine/recursive_cte/ob_search_method_op.cpp
浏览文件 @
d1636b90
...
...
@@ -70,7 +70,7 @@ bool ObSearchMethodOp::ObCycleHash::operator==(const ObCycleHash& other) const
int
ObSearchMethodOp
::
add_row
(
const
ObIArray
<
ObExpr
*>&
exprs
,
ObEvalCtx
&
eval_ctx
)
{
int
ret
=
OB_SUCCESS
;
ObChunkDatumStore
::
LastStoredRow
<>
last_row
(
allocator_
);
ObChunkDatumStore
::
LastStoredRow
last_row
(
allocator_
);
if
(
input_rows_
.
empty
()
&&
0
==
input_rows_
.
get_capacity
()
&&
OB_FAIL
(
input_rows_
.
reserve
(
INIT_ROW_COUNT
)))
{
LOG_WARN
(
"Failed to pre allocate array"
,
K
(
ret
));
}
else
if
(
OB_UNLIKELY
(
exprs
.
empty
()))
{
...
...
src/sql/engine/set/ob_merge_set_op.h
浏览文件 @
d1636b90
...
...
@@ -70,7 +70,7 @@ protected:
protected:
common
::
ObArenaAllocator
alloc_
;
ObChunkDatumStore
::
LastStoredRow
<>
last_row_
;
ObChunkDatumStore
::
LastStoredRow
last_row_
;
Compare
cmp_
;
bool
need_skip_init_row_
;
bool
iter_end_
;
...
...
src/sql/engine/sort/ob_sort_op_impl.cpp
浏览文件 @
d1636b90
...
...
@@ -1425,21 +1425,19 @@ int ObInMemoryTopnSortImpl::add_row(const common::ObIArray<ObExpr*>& exprs, bool
// optimize for hit-rate: enlarge first Limit-Count row's space,
// so following rows are more likely to fit in.
int64_t
row_size
=
0
;
if
(
OB_FAIL
(
ObChunkDatumStore
::
row_copy_size
(
exprs
,
*
eval_ctx_
,
row_size
)))
{
if
(
OB_FAIL
(
ObChunkDatumStore
::
Block
::
row_store_size
(
exprs
,
*
eval_ctx_
,
row_size
,
STORE_ROW_EXTRA_SIZE
)))
{
LOG_WARN
(
"failed to calc copy size"
,
K
(
ret
));
}
else
{
int64_t
buffer_len
=
STORE_ROW_HEADER_SIZE
+
2
*
row_size
+
STORE_ROW_EXTRA_SIZE
;
if
(
OB_ISNULL
(
buf
=
reinterpret_cast
<
char
*>
(
cur_alloc_
.
alloc
(
buffer_len
))))
{
int64_t
buffer_len
=
2
*
row_size
;
if
(
OB_ISNULL
(
buf
=
reinterpret_cast
<
char
*>
(
cur_alloc_
.
alloc
(
buffer_len
))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"alloc buf failed"
,
K
(
ret
));
}
else
if
(
OB_ISNULL
(
new_row
=
new
(
buf
)
SortStoredRow
()))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"failed to new row"
,
K
(
ret
));
}
else
{
int64_t
pos
=
STORE_ROW_HEADER_SIZE
;
if
(
OB_FAIL
(
new_row
->
copy_datums
(
exprs
,
*
eval_ctx_
,
buf
+
pos
,
buffer_len
-
STORE_ROW_HEADER_SIZE
,
row_size
,
STORE_ROW_EXTRA_SIZE
)))
{
LOG_WARN
(
"failed to deep copy row"
,
K
(
ret
),
K
(
buffer_len
));
ObChunkDatumStore
::
StoredRow
*
sr
=
NULL
;
if
(
OB_FAIL
(
ObChunkDatumStore
::
StoredRow
::
build
(
sr
,
exprs
,
*
eval_ctx_
,
buf
,
buffer_len
,
STORE_ROW_EXTRA_SIZE
)))
{
LOG_WARN
(
"build stored row failed"
,
K
(
ret
));
}
else
if
(
FALSE_IT
(
new_row
=
static_cast
<
SortStoredRow
*>
(
sr
)))
{
}
else
if
(
OB_FAIL
(
heap_
.
push
(
new_row
)))
{
LOG_WARN
(
"failed to push back row"
,
K
(
ret
),
K
(
buffer_len
));
}
else
{
...
...
@@ -1469,37 +1467,33 @@ int ObInMemoryTopnSortImpl::adjust_topn_heap(const common::ObIArray<ObExpr*>& ex
char
*
buf
=
NULL
;
int64_t
row_size
=
0
;
int64_t
buffer_len
=
0
;
if
(
OB_FAIL
(
ObChunkDatumStore
::
row_copy_size
(
exprs
,
*
eval_ctx_
,
row_size
)))
{
if
(
OB_FAIL
(
ObChunkDatumStore
::
Block
::
row_store_size
(
exprs
,
*
eval_ctx_
,
row_size
,
STORE_ROW_EXTRA_SIZE
)))
{
LOG_WARN
(
"failed to calc copy size"
,
K
(
ret
));
}
else
{
// check to see whether this old row's space is adequate for new one
if
(
dt_row
->
get_max_size
()
>=
row_size
+
STORE_ROW_HEADER_SIZE
+
STORE_ROW_EXTRA_SIZE
)
{
if
(
dt_row
->
get_max_size
()
>=
row_size
)
{
buf
=
reinterpret_cast
<
char
*>
(
dt_row
);
new_row
=
dt_row
;
buffer_len
=
dt_row
->
get_max_size
();
}
else
{
buffer_len
=
row_size
*
2
+
STORE_ROW_HEADER_SIZE
+
STORE_ROW_EXTRA_SIZE
;
buffer_len
=
row_size
*
2
;
if
(
OB_ISNULL
(
buf
=
reinterpret_cast
<
char
*>
(
cur_alloc_
.
alloc
(
buffer_len
))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"alloc buf failed"
,
K
(
ret
));
}
else
if
(
OB_ISNULL
(
new_row
=
new
(
buf
)
SortStoredRow
()))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"failed to new row"
,
K
(
ret
));
}
}
}
if
(
OB_SUCC
(
ret
))
{
int64_t
pos
=
STORE_ROW_HEADER_SIZE
;
if
(
OB_FAIL
(
new_row
->
copy_datums
(
exprs
,
*
eval_ctx_
,
buf
+
pos
,
buffer_len
-
STORE_ROW_HEADER_SIZE
,
row_size
,
STORE_ROW_EXTRA_SIZE
)))
{
LOG_WARN
(
"failed to deep copy row"
,
K
(
ret
),
K
(
buffer_len
),
K
(
row_size
));
ObChunkDatumStore
::
StoredRow
*
sr
=
NULL
;
if
(
OB_FAIL
(
ObChunkDatumStore
::
StoredRow
::
build
(
sr
,
exprs
,
*
eval_ctx_
,
buf
,
buffer_len
,
STORE_ROW_EXTRA_SIZE
)))
{
LOG_WARN
(
"build stored row failed"
,
K
(
ret
));
}
else
if
(
FALSE_IT
(
new_row
=
static_cast
<
SortStoredRow
*>
(
sr
)))
{
}
else
if
(
OB_FAIL
(
heap_
.
replace_top
(
new_row
)))
{
LOG_WARN
(
"failed to replace top"
,
K
(
ret
));
}
else
{
new_row
->
set_max_size
(
buffer_len
);
last_row_
=
new_row
;
// LOG_TRACE("in memory topn sort check replace row", KPC(new_row),
// K(buffer_len), K(row_size), K(new_row->get_max_size()));
}
}
}
else
{
...
...
src/sql/engine/sort/ob_sort_op_impl.h
浏览文件 @
d1636b90
...
...
@@ -340,7 +340,7 @@ private:
const
ObChunkDatumStore
::
StoredRow
*
prev_row_
;
// when got new prefix, save the row to to %next_prefix_row_
ObChunkDatumStore
::
ShadowStoredRow
<>
next_prefix_row_store_
;
ObChunkDatumStore
::
ShadowStoredRow
next_prefix_row_store_
;
ObChunkDatumStore
::
StoredRow
*
next_prefix_row_
;
ObOperator
*
child_
;
...
...
src/sql/engine/subquery/ob_subplan_filter_op.cpp
浏览文件 @
d1636b90
...
...
@@ -470,7 +470,7 @@ int ObSubPlanFilterOp::handle_update_set()
ret
=
OB_ERR_UNEXPECTED
;
LOG_WARN
(
"too many subplan unexpected"
,
K
(
ret
),
K
(
subplan_iters_
.
count
()));
}
else
{
ObChunkDatumStore
::
LastStoredRow
<>
row_val
(
update_set_mem_
->
get_arena_allocator
());
ObChunkDatumStore
::
LastStoredRow
row_val
(
update_set_mem_
->
get_arena_allocator
());
Iterator
*
iter
=
subplan_iters_
.
at
(
0
);
if
(
OB_ISNULL
(
iter
))
{
ret
=
OB_ERR_UNEXPECTED
;
...
...
src/sql/engine/window_function/ob_window_function_op.h
浏览文件 @
d1636b90
...
...
@@ -272,7 +272,7 @@ public:
int64_t
wf_idx_
;
int64_t
part_first_row_idx_
;
ObChunkDatumStore
::
LastStoredRow
<>
part_values_
;
ObChunkDatumStore
::
LastStoredRow
part_values_
;
RowsStore
part_rows_store_
;
Frame
last_valid_frame_
;
...
...
@@ -494,7 +494,7 @@ private:
RowsStore
rows_store_
;
WinFuncCellList
wf_list_
;
// shadow copy the next and restore it before get next row from child.
ObChunkDatumStore
::
ShadowStoredRow
<>
next_row_
;
ObChunkDatumStore
::
ShadowStoredRow
next_row_
;
bool
next_row_valid_
;
// TODO
DatumFixedArray
curr_row_collect_values_
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录