Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
a8399213
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a8399213
编写于
7月 13, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-225]opt query perf.
上级
f2d5fe86
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
162 addition
and
207 deletion
+162
-207
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+84
-95
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+2
-2
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+1
-0
src/query/inc/tsqlfunction.h
src/query/inc/tsqlfunction.h
+5
-19
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+70
-91
未找到文件。
src/client/src/tscFunctionImpl.c
浏览文件 @
a8399213
...
...
@@ -330,10 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return
TSDB_CODE_SUCCESS
;
}
bool
stableQueryFunctChanged
(
int32_t
funcId
)
{
return
(
aAggs
[
funcId
].
stableFuncId
!=
funcId
);
}
/**
* the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized
...
...
@@ -361,7 +357,6 @@ static bool function_setup(SQLFunctionCtx *pCtx) {
}
memset
(
pCtx
->
aOutputBuf
,
0
,
(
size_t
)
pCtx
->
outputBytes
);
initResultInfo
(
pResInfo
);
return
true
;
}
...
...
@@ -675,16 +670,16 @@ static void sum_func_second_merge(SQLFunctionCtx *pCtx) {
}
}
static
int32_t
precal_req_load_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
static
int32_t
statisRequired
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
return
BLK_DATA_STATIS_NEEDED
;
}
static
int32_t
data
_req_load_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
static
int32_t
data
BlockRequired
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
return
BLK_DATA_ALL_NEEDED
;
}
// todo: if column in current data block are null, opt for this case
static
int32_t
first
_data_req_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
static
int32_t
first
FuncRequired
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -697,7 +692,7 @@ static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
}
}
static
int32_t
last
_data_req_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
static
int32_t
last
FuncRequired
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64Key
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -709,34 +704,30 @@ static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
}
}
static
int32_t
first
_dist_data_req_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
static
int32_t
first
DistFuncRequired
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
return
BLK_DATA_NO_NEEDED
;
}
// result buffer has not been set yet.
return
BLK_DATA_ALL_NEEDED
;
//todo optimize the filter info
// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
// if (pInfo->hasResult != DATA_SET_FLAG) {
// return BLK_DATA_ALL_NEEDED;
// } else { // data in current block is not earlier than current result
// return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
// }
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)
(
pCtx
->
aOutputBuf
+
pCtx
->
inputBytes
);
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
return
BLK_DATA_ALL_NEEDED
;
}
else
{
// data in current block is not earlier than current result
return
(
pInfo
->
ts
<=
start
)
?
BLK_DATA_NO_NEEDED
:
BLK_DATA_ALL_NEEDED
;
}
}
static
int32_t
last
_dist_data_req_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
static
int32_t
last
DistFuncRequired
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64Key
)
{
return
BLK_DATA_NO_NEEDED
;
}
return
BLK_DATA_ALL_NEEDED
;
// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
// if (pInfo->hasResult != DATA_SET_FLAG) {
// return BLK_DATA_ALL_NEEDED;
// } else {
// return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
// }
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)
(
pCtx
->
aOutputBuf
+
pCtx
->
inputBytes
);
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
return
BLK_DATA_ALL_NEEDED
;
}
else
{
return
(
pInfo
->
ts
>
end
)
?
BLK_DATA_NO_NEEDED
:
BLK_DATA_ALL_NEEDED
;
}
}
//////////////////////////////////////////////////////////////////////////////////////////////
...
...
@@ -2123,74 +2114,72 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
tfree
(
pData
);
}
bool
top_bot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
char
*
minval
,
char
*
maxval
)
{
STopBotInfo
*
pTopBotInfo
=
(
STopBotInfo
*
)
GET_RES_INFO
(
pCtx
)
->
interResultBuf
;
int32_t
numOfExistsRes
=
pTopBotInfo
->
num
;
/*
* Parameters values:
* 1. param[0]: maximum allowable results
* 2. param[1]: order by type (time or value)
* 3. param[2]: asc/desc order
*
* top/bottom use the intermediate result buffer to keep the intermediate result
*/
static
STopBotInfo
*
getTopBotOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// only the first_stage_merge is directly written data into final output buffer
if
(
pResInfo
->
superTableQ
&&
pCtx
->
currentStage
!=
SECONDARY_STAGE_MERGE
)
{
return
(
STopBotInfo
*
)
pCtx
->
aOutputBuf
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
pResInfo
->
interResultBuf
;
}
}
bool
topbot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
const
char
*
minval
,
const
char
*
maxval
)
{
STopBotInfo
*
pTopBotInfo
=
getTopBotOutputInfo
(
pCtx
);
// required number of results are not reached, continue load data block
if
(
numOfExistsRes
<
pCtx
->
param
[
0
].
i64Key
)
{
if
(
pTopBotInfo
->
num
<
pCtx
->
param
[
0
].
i64Key
)
{
return
true
;
}
tValuePair
*
pRes
=
(
tValuePair
*
)
pTopBotInfo
->
res
;
tValuePair
*
*
pRes
=
(
tValuePair
*
*
)
pTopBotInfo
->
res
;
if
(
functionId
==
TSDB_FUNC_TOP
)
{
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
return
GET_INT8_VAL
(
maxval
)
>
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT8_VAL
(
maxval
)
>
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_SMALLINT
:
return
GET_INT16_VAL
(
maxval
)
>
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT16_VAL
(
maxval
)
>
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_INT
:
return
GET_INT32_VAL
(
maxval
)
>
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT32_VAL
(
maxval
)
>
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_BIGINT
:
return
GET_INT64_VAL
(
maxval
)
>
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT64_VAL
(
maxval
)
>
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_FLOAT
:
return
GET_FLOAT_VAL
(
maxval
)
>
pRes
[
0
]
.
v
.
dKey
;
return
GET_FLOAT_VAL
(
maxval
)
>
pRes
[
0
]
->
v
.
dKey
;
case
TSDB_DATA_TYPE_DOUBLE
:
return
GET_DOUBLE_VAL
(
maxval
)
>
pRes
[
0
]
.
v
.
dKey
;
return
GET_DOUBLE_VAL
(
maxval
)
>
pRes
[
0
]
->
v
.
dKey
;
default:
return
true
;
}
}
else
{
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
return
GET_INT8_VAL
(
minval
)
<
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT8_VAL
(
minval
)
<
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_SMALLINT
:
return
GET_INT16_VAL
(
minval
)
<
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT16_VAL
(
minval
)
<
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_INT
:
return
GET_INT32_VAL
(
minval
)
<
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT32_VAL
(
minval
)
<
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_BIGINT
:
return
GET_INT64_VAL
(
minval
)
<
pRes
[
0
]
.
v
.
i64Key
;
return
GET_INT64_VAL
(
minval
)
<
pRes
[
0
]
->
v
.
i64Key
;
case
TSDB_DATA_TYPE_FLOAT
:
return
GET_FLOAT_VAL
(
minval
)
<
pRes
[
0
]
.
v
.
dKey
;
return
GET_FLOAT_VAL
(
minval
)
<
pRes
[
0
]
->
v
.
dKey
;
case
TSDB_DATA_TYPE_DOUBLE
:
return
GET_DOUBLE_VAL
(
minval
)
<
pRes
[
0
]
.
v
.
dKey
;
return
GET_DOUBLE_VAL
(
minval
)
<
pRes
[
0
]
->
v
.
dKey
;
default:
return
true
;
}
}
}
/*
* Parameters values:
* 1. param[0]: maximum allowable results
* 2. param[1]: order by type (time or value)
* 3. param[2]: asc/desc order
*
* top/bottom use the intermediate result buffer to keep the intermediate result
*/
static
STopBotInfo
*
getTopBotOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// only the first_stage_merge is directly written data into final output buffer
if
(
pResInfo
->
superTableQ
&&
pCtx
->
currentStage
!=
SECONDARY_STAGE_MERGE
)
{
return
(
STopBotInfo
*
)
pCtx
->
aOutputBuf
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
pResInfo
->
interResultBuf
;
}
}
/*
* keep the intermediate results during scan data blocks in the format of:
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
...
...
@@ -3376,7 +3365,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SSpreadInfo
*
pInfo
=
pResInfo
->
interResultBuf
;
int32_t
numOfElems
=
pCtx
->
size
;
int32_t
numOfElems
=
0
;
// todo : opt with pre-calculated result
// column missing cause the hasNull to be true
...
...
@@ -4412,7 +4401,7 @@ static void sumrate_finalizer(SQLFunctionCtx *pCtx) {
* e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last...
*
*/
int32_t
func
CompatDef
List
[]
=
{
int32_t
func
tionCompat
List
[]
=
{
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
// last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z
...
...
@@ -4451,7 +4440,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer
,
sum_func_merge
,
sum_func_second_merge
,
precal_req_load_info
,
statisRequired
,
},
{
// 2
...
...
@@ -4466,7 +4455,7 @@ SQLAggFuncElem aAggs[] = {{
avg_finalizer
,
avg_func_merge
,
avg_func_second_merge
,
precal_req_load_info
,
statisRequired
,
},
{
// 3
...
...
@@ -4481,7 +4470,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer
,
min_func_merge
,
min_func_second_merge
,
precal_req_load_info
,
statisRequired
,
},
{
// 4
...
...
@@ -4496,7 +4485,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer
,
max_func_merge
,
max_func_second_merge
,
precal_req_load_info
,
statisRequired
,
},
{
// 5
...
...
@@ -4511,7 +4500,7 @@ SQLAggFuncElem aAggs[] = {{
stddev_finalizer
,
noop1
,
noop1
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 6
...
...
@@ -4526,7 +4515,7 @@ SQLAggFuncElem aAggs[] = {{
percentile_finalizer
,
noop1
,
noop1
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 7
...
...
@@ -4541,7 +4530,7 @@ SQLAggFuncElem aAggs[] = {{
apercentile_finalizer
,
apercentile_func_merge
,
apercentile_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 8
...
...
@@ -4556,7 +4545,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer
,
noop1
,
noop1
,
first
_data_req_info
,
first
FuncRequired
,
},
{
// 9
...
...
@@ -4571,7 +4560,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer
,
noop1
,
noop1
,
last
_data_req_info
,
last
FuncRequired
,
},
{
// 10
...
...
@@ -4587,7 +4576,7 @@ SQLAggFuncElem aAggs[] = {{
last_row_finalizer
,
noop1
,
last_dist_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 11
...
...
@@ -4603,7 +4592,7 @@ SQLAggFuncElem aAggs[] = {{
top_bottom_func_finalizer
,
top_func_merge
,
top_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 12
...
...
@@ -4619,7 +4608,7 @@ SQLAggFuncElem aAggs[] = {{
top_bottom_func_finalizer
,
bottom_func_merge
,
bottom_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 13
...
...
@@ -4649,7 +4638,7 @@ SQLAggFuncElem aAggs[] = {{
twa_function_finalizer
,
twa_func_merge
,
twa_function_copy
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 15
...
...
@@ -4664,7 +4653,7 @@ SQLAggFuncElem aAggs[] = {{
leastsquares_finalizer
,
noop1
,
noop1
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 16
...
...
@@ -4694,7 +4683,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer
,
copy_function
,
copy_function
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 18
...
...
@@ -4724,7 +4713,7 @@ SQLAggFuncElem aAggs[] = {{
ts_comp_finalize
,
copy_function
,
copy_function
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 20
...
...
@@ -4754,7 +4743,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer
,
copy_function
,
copy_function
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 22, multi-output, tag function has only one result
...
...
@@ -4784,7 +4773,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer
,
copy_function
,
copy_function
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 24
...
...
@@ -4799,7 +4788,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer
,
noop1
,
noop1
,
data
_req_load_info
,
data
BlockRequired
,
},
// distributed version used in two-stage aggregation processes
{
...
...
@@ -4815,7 +4804,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer
,
first_dist_func_merge
,
first_dist_func_second_merge
,
first
_dist_data_req_info
,
first
DistFuncRequired
,
},
{
// 26
...
...
@@ -4830,7 +4819,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer
,
last_dist_func_merge
,
last_dist_func_second_merge
,
last
_dist_data_req_info
,
last
DistFuncRequired
,
},
{
// 27
...
...
@@ -4845,7 +4834,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer
,
noop1
,
copy_function
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 28
...
...
@@ -4860,7 +4849,7 @@ SQLAggFuncElem aAggs[] = {{
rate_finalizer
,
rate_func_merge
,
rate_func_copy
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 29
...
...
@@ -4875,7 +4864,7 @@ SQLAggFuncElem aAggs[] = {{
rate_finalizer
,
rate_func_merge
,
rate_func_copy
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 30
...
...
@@ -4890,7 +4879,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer
,
sumrate_func_merge
,
sumrate_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 31
...
...
@@ -4905,7 +4894,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer
,
sumrate_func_merge
,
sumrate_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 32
...
...
@@ -4920,7 +4909,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer
,
sumrate_func_merge
,
sumrate_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 33
...
...
@@ -4935,7 +4924,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer
,
sumrate_func_merge
,
sumrate_func_second_merge
,
data
_req_load_info
,
data
BlockRequired
,
},
{
// 34
...
...
@@ -4950,5 +4939,5 @@ SQLAggFuncElem aAggs[] = {{
noop1
,
noop1
,
noop1
,
data
_req_load_info
,
data
BlockRequired
,
}};
src/client/src/tscSQLParser.c
浏览文件 @
a8399213
...
...
@@ -2471,7 +2471,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
startIdx
++
;
}
int32_t
factor
=
func
CompatDef
List
[
tscSqlExprGet
(
pQueryInfo
,
startIdx
)
->
functionId
];
int32_t
factor
=
func
tionCompat
List
[
tscSqlExprGet
(
pQueryInfo
,
startIdx
)
->
functionId
];
// diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions
...
...
@@ -2489,7 +2489,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
continue
;
}
if
(
func
CompatDef
List
[
functionId
]
!=
factor
)
{
if
(
func
tionCompat
List
[
functionId
]
!=
factor
)
{
return
false
;
}
}
...
...
src/query/inc/qExecutor.h
浏览文件 @
a8399213
...
...
@@ -121,6 +121,7 @@ typedef struct SQueryCostInfo {
uint32_t
loadBlockStatis
;
uint32_t
discardBlocks
;
uint64_t
elapsedTime
;
uint64_t
ioTime
;
uint64_t
computTime
;
}
SQueryCostInfo
;
...
...
src/query/inc/tsqlfunction.h
浏览文件 @
a8399213
...
...
@@ -224,25 +224,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
#define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0)
/*
* the status of one block, used in metric query. all blocks are mixed together,
* we need the status to decide if one block is a first/end/inter block of one meter
*/
enum
{
BLK_FILE_BLOCK
=
0x1
,
BLK_BLOCK_LOADED
=
0x2
,
BLK_CACHE_BLOCK
=
0x4
,
// in case of cache block, block must be loaded
};
/* determine the real data need to calculated the result */
enum
{
BLK_DATA_NO_NEEDED
=
0x0
,
BLK_DATA_NO_NEEDED
=
0x0
,
BLK_DATA_STATIS_NEEDED
=
0x1
,
BLK_DATA_ALL_NEEDED
=
0x3
,
BLK_DATA_ALL_NEEDED
=
0x3
,
BLK_DATA_DISCARD
=
0x4
,
// discard current data block since it is not qualified for filter
};
#define SET_DATA_BLOCK_NOT_LOADED(x) ((x) &= (~BLK_BLOCK_LOADED));
typedef
struct
STwaInfo
{
TSKEY
lastKey
;
int8_t
hasResult
;
// flag to denote has value
...
...
@@ -264,12 +253,9 @@ typedef struct STwaInfo {
/* global sql function array */
extern
struct
SQLAggFuncElem
aAggs
[];
/* compatible check array list */
extern
int32_t
funcCompatDefList
[];
bool
top_bot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
char
*
minval
,
char
*
maxval
);
extern
int32_t
functionCompatList
[];
// compatible check array list
bool
stableQueryFunctChanged
(
int32_t
funcId
);
bool
topbot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
const
char
*
minval
,
const
char
*
maxval
);
void
resetResultInfo
(
SResultInfo
*
pResInfo
);
void
setResultInfoBuf
(
SResultInfo
*
pResInfo
,
int32_t
size
,
bool
superTable
,
char
*
buf
);
...
...
src/query/src/qExecutor.c
浏览文件 @
a8399213
...
...
@@ -1928,73 +1928,45 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
/**
* decrease the refcount for each table involved in this query
* @param pQInfo
*/
UNUSED_FUNC
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
!=
NULL
)
{
// assert(taosHashGetSize(pQInfo->tableqinfoGroupInfo) >= 1);
}
#if 0
if (pQInfo == NULL || pQInfo->tableqinfoGroupInfo.numOfTables == 1) {
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
qDebug("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
} else {
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->tableqinfoGroupInfo.numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->tableqinfoGroupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
qDebug("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
pMeter->meterId, pMeter->numOfQueries);
num++;
}
}
/*
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information
*/
num = pQInfo->tableqinfoGroupInfo.numOfTables - num;
qDebug("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo,
pQInfo->tableqinfoGroupInfo.numOfTables, num);
}
#endif
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_DOUBLE && (_t) != TSDB_DATA_TYPE_FLOAT)
static
bool
needToLoadDataBlock
(
SQuery
*
pQuery
,
SDataStatis
*
pDataStatis
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfTotalPoints
)
{
if
(
pDataStatis
==
NULL
)
{
static
bool
needToLoadDataBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pDataStatis
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfRows
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pDataStatis
==
NULL
||
(
pQuery
->
numOfFilterCols
==
0
&&
(
!
pRuntimeEnv
->
topBotQuery
)))
{
return
true
;
}
#if 0
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfFilterCols
;
++
k
)
{
SSingleColumnFilterInfo
*
pFilterInfo
=
&
pQuery
->
pFilterInfo
[
k
];
int32_t colIndex = pFilterInfo->info.colIndex;
// this column not valid in current data block
if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) {
int32_t
index
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
if
(
pDataStatis
[
i
].
colId
==
pFilterInfo
->
info
.
colId
)
{
index
=
i
;
break
;
}
}
if
(
index
==
-
1
)
{
continue
;
}
// not support pre-filter operation on binary/nchar data type
if (!
vnodeSupportPrefilter(pFilterInfo->info.data
.type)) {
if
(
!
IS_PREFILTER_TYPE
(
pFilterInfo
->
info
.
type
))
{
continue
;
}
// all points in current column are NULL, no need to check its boundary value
if (pDataStatis[
colIndex].numOfNull == numOfTotalPoint
s) {
if
(
pDataStatis
[
index
].
numOfNull
==
numOfRow
s
)
{
continue
;
}
if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) {
float minval = *(double *)(&pDataStatis[colIndex].min);
float maxval = *(double *)(&pDataStatis[colIndex].max);
SDataStatis
*
pDataBlockst
=
&
pDataStatis
[
index
];
if
(
pFilterInfo
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
minval
=
*
(
double
*
)(
&
pDataBlockst
->
min
);
float
maxval
=
*
(
double
*
)(
&
pDataBlockst
->
max
);
for
(
int32_t
i
=
0
;
i
<
pFilterInfo
->
numOfFilters
;
++
i
)
{
if
(
pFilterInfo
->
pFilters
[
i
].
fp
(
&
pFilterInfo
->
pFilters
[
i
],
(
char
*
)
&
minval
,
(
char
*
)
&
maxval
))
{
...
...
@@ -2003,53 +1975,50 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
}
}
else
{
for
(
int32_t
i
=
0
;
i
<
pFilterInfo
->
numOfFilters
;
++
i
)
{
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&pDataStatis[colIndex].min,
(char *)&pDataStatis[colIndex].max)) {
if
(
pFilterInfo
->
pFilters
[
i
].
fp
(
&
pFilterInfo
->
pFilters
[
i
],
(
char
*
)
&
pDataBlockst
->
min
,
(
char
*
)
&
pDataBlockst
->
max
))
{
return
true
;
}
}
}
}
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].base.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
// }
// }
if
(
pRuntimeEnv
->
topBotQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
)
{
return
topbot_datablock_filter
(
&
pCtx
[
i
],
functionId
,
(
char
*
)
&
pDataStatis
[
i
].
min
,
(
char
*
)
&
pDataStatis
[
i
].
max
);
}
}
}
#endif
return
true
;
return
false
;
}
SArray
*
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pQueryHandle
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
)
{
int32_t
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pQueryHandle
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
,
SArray
**
pDataBlock
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
uint32_t
r
=
0
;
SArray
*
pDataBlock
=
NULL
;
uint32_t
status
=
0
;
if
(
pQuery
->
numOfFilterCols
>
0
)
{
r
=
BLK_DATA_ALL_NEEDED
;
}
else
{
// check if this data block is required to load
status
=
BLK_DATA_ALL_NEEDED
;
}
else
{
// check if this data block is required to load
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pSqlFunc
=
&
pQuery
->
pSelectExpr
[
i
].
base
;
int32_t
functionId
=
pSqlFunc
->
functionId
;
int32_t
colId
=
pSqlFunc
->
colInfo
.
colId
;
r
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
colId
);
status
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
colId
);
}
if
(
pRuntimeEnv
->
pTSBuf
>
0
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
r
|=
BLK_DATA_ALL_NEEDED
;
status
|=
BLK_DATA_ALL_NEEDED
;
}
}
if
(
r
==
BLK_DATA_NO_NEEDED
)
{
qDebug
(
"QInfo:%p data block discard, rows:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pBlockInfo
->
rows
);
if
(
status
==
BLK_DATA_NO_NEEDED
)
{
qDebug
(
"QInfo:%p data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pRuntimeEnv
->
summary
.
discardBlocks
+=
1
;
}
else
if
(
r
==
BLK_DATA_STATIS_NEEDED
)
{
}
else
if
(
status
==
BLK_DATA_STATIS_NEEDED
)
{
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
// return DISK_DATA_LOAD_FAILED;
}
...
...
@@ -2057,32 +2026,34 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
if
(
*
pStatis
==
NULL
)
{
// data block statistics does not exist, load data block
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
}
}
else
{
assert
(
r
==
BLK_DATA_ALL_NEEDED
);
assert
(
status
==
BLK_DATA_ALL_NEEDED
);
// load the data block statistics to perform further filter
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
}
if
(
!
needToLoadDataBlock
(
p
Query
,
*
pStatis
,
pRuntimeEnv
->
pCtx
,
pBlockInfo
->
rows
))
{
if
(
!
needToLoadDataBlock
(
p
RuntimeEnv
,
*
pStatis
,
pRuntimeEnv
->
pCtx
,
pBlockInfo
->
rows
))
{
#if defined(_DEBUG_VIEW)
qDebug
(
"QInfo:%p block discarded by per-filter"
,
GET_QINFO_ADDR
(
pRuntimeEnv
));
#endif
// current block has been discard due to filter applied
pRuntimeEnv
->
summary
.
discardBlocks
+=
1
;
// return DISK_DATA_DISCARDED;
qDebug
(
"QInfo:%p data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
return
BLK_DATA_DISCARD
;
}
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
pRuntimeEnv
->
summary
.
loadBlocks
+=
1
;
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
}
return
pDataBlock
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
...
...
@@ -2225,6 +2196,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
order
.
order
);
TsdbQueryHandleT
pQueryHandle
=
IS_MASTER_SCAN
(
pRuntimeEnv
)
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
...
...
@@ -2259,7 +2231,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
ensureOutputBuffer
(
pRuntimeEnv
,
&
blockInfo
);
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
);
SArray
*
pDataBlock
=
NULL
;
if
(
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
,
&
pDataBlock
)
==
BLK_DATA_DISCARD
)
{
pQuery
->
current
->
lastKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
blockInfo
.
window
.
ekey
+
step
:
blockInfo
.
window
.
skey
+
step
;
continue
;
}
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
blockInfo
.
rows
-
1
;
...
...
@@ -2282,8 +2258,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
IS_MASTER_SCAN
(
pRuntimeEnv
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
// int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
closeAllTimeWindow
(
&
pRuntimeEnv
->
windowResInfo
);
// removeRedundantWindow(&pRuntimeEnv->windowResInfo, pTableQueryInfo->lastKey - step, step);
pRuntimeEnv
->
windowResInfo
.
curIndex
=
pRuntimeEnv
->
windowResInfo
.
size
-
1
;
// point to the last time window
...
...
@@ -3700,7 +3674,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void
stableApplyFunctionsOnBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
static
void
stableApplyFunctionsOnBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
SArray
*
pDataBlock
,
__block_search_fn_t
searchFn
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
...
...
@@ -3859,9 +3833,10 @@ static void queryCostStatis(SQInfo *pQInfo) {
// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
// pSummary->skippedFileBlocks, pSummary->totalGenData);
qDebug
(
"QInfo:%p :cost summary: elpased time:%"
PRId64
" us, total blocks:%d, use block statis:%d, use block data:%d, "
"total rows:%"
PRId64
", check rows:%"
PRId64
,
pQInfo
,
pSummary
->
elapsedTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
qDebug
(
"QInfo:%p :cost summary: elapsed time:%"
PRId64
" us, io time:%"
PRId64
" us, total blocks:%d, load block statis:%d,"
" load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
pQInfo
,
pSummary
->
elapsedTime
,
pSummary
->
ioTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
// qDebug("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
//
...
...
@@ -4247,10 +4222,11 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
TsdbQueryHandleT
pQueryHandle
=
IS_MASTER_SCAN
(
pRuntimeEnv
)
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
summary
->
totalBlocks
+=
1
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
finalizeQueryResult
(
pRuntimeEnv
);
// clean up allocated resource during query
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
...
...
@@ -4263,12 +4239,8 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
assert
(
*
pTableQueryInfo
!=
NULL
);
SET_CURRENT_QUERY_TABLE_INFO
(
pRuntimeEnv
,
*
pTableQueryInfo
);
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
);
if
(
!
pRuntimeEnv
->
groupbyNormalCol
)
{
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
int32_t
step
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
1
:-
1
;
setExecutionContext
(
pQInfo
,
(
*
pTableQueryInfo
)
->
groupIndex
,
blockInfo
.
window
.
ekey
+
step
);
}
else
{
// interval query
TSKEY
nextKey
=
blockInfo
.
window
.
skey
;
...
...
@@ -4280,6 +4252,13 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
}
}
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
NULL
;
if
(
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
,
&
pDataBlock
)
==
BLK_DATA_DISCARD
)
{
pQuery
->
current
->
lastKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
blockInfo
.
window
.
ekey
+
step
:
blockInfo
.
window
.
skey
+
step
;
continue
;
}
summary
->
totalRows
+=
blockInfo
.
rows
;
stableApplyFunctionsOnBlock
(
pRuntimeEnv
,
&
blockInfo
,
pStatis
,
pDataBlock
,
binarySearchForKey
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录