Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
428bd3b7
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
428bd3b7
编写于
2月 07, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-225] refactor
上级
602d2d1c
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
127 addition
and
128 deletion
+127
-128
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+33
-22
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+93
-105
未找到文件。
src/query/inc/qExecutor.h
浏览文件 @
428bd3b7
...
...
@@ -186,31 +186,45 @@ typedef struct SSDataBlock {
}
SSDataBlock
;
typedef
struct
SQuery
{
SLimitVal
limit
;
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// TODO used bitwise flag
bool
groupbyColumn
;
// denote if this is a groupby normal column query
bool
hasTagResults
;
// if there are tag values in final result or not
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
bool
queryWindowIdentical
;
// all query time windows are identical for all tables in one group
bool
queryBlockDist
;
// if query data block distribution
bool
stabledev
;
// super table stddev query
int32_t
interBufSize
;
// intermediate buffer sizse
SOrderVal
order
;
int16_t
numOfCols
;
int16_t
numOfTags
;
SOrderVal
order
;
STimeWindow
window
;
SInterval
interval
;
int16_t
precision
;
int16_t
numOfOutput
;
int16_t
fillType
;
int16_t
checkResultBuf
;
// check if the buffer is full during scan each block
SLimitVal
limit
;
int32_t
srcRowSize
;
// todo extract struct
int32_t
resultRowSize
;
int32_t
maxSrcColumnSize
;
int32_t
tagLen
;
// tag value length of current query
SSqlGroupbyExpr
*
pGroupbyExpr
;
SExprInfo
*
pExpr1
;
SExprInfo
*
pExpr2
;
int32_t
numOfExpr2
;
SColumnInfo
*
colList
;
SColumnInfo
*
tagColList
;
int32_t
numOfFilterCols
;
int64_t
*
fillVal
;
SOrderedPrjQueryInfo
prjInfo
;
// limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo
*
pFilterInfo
;
uint32_t
status
;
// query status
SResultRec
rec
;
int32_t
pos
;
...
...
@@ -218,33 +232,23 @@ typedef struct SQuery {
STableQueryInfo
*
current
;
int32_t
numOfCheckedBlocks
;
// number of check data blocks
SOrderedPrjQueryInfo
prjInfo
;
// limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo
*
pFilterInfo
;
SSDataBlock
*
ouptputBuf
;
void
*
tsdb
;
SMemRef
memRef
;
}
SQuery
;
typedef
struct
SQueryRuntimeEnv
{
jmp_buf
env
;
SQuery
*
pQuery
;
SQLFunctionCtx
*
pCtx
;
int32_t
numOfRowsPerPage
;
uint16_t
*
offset
;
uint16_t
scanFlag
;
// denotes reversed scan of data or not
SFillInfo
*
pFillInfo
;
SResultRowInfo
resultRowInfo
;
SQueryCostInfo
summary
;
void
*
pQueryHandle
;
void
*
pSecQueryHandle
;
// another thread for
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// TODO used bitwise flag
bool
groupbyColumn
;
// denote if this is a groupby normal column query
bool
hasTagResults
;
// if there are tag values in final result or not
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
bool
queryWindowIdentical
;
// all query time windows are identical for all tables in one group
bool
queryBlockDist
;
// if query data block distribution
bool
stabledev
;
// super table stddev query
int32_t
interBufSize
;
// intermediate buffer sizse
int32_t
prevGroupId
;
// previous executed group id
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
...
...
@@ -262,6 +266,11 @@ typedef struct SQueryRuntimeEnv {
SArithmeticSupport
*
sasArray
;
struct
STableScanInfo
*
pi
;
SSDataBlock
*
ouptputBuf
;
int32_t
groupIndex
;
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
}
SQueryRuntimeEnv
;
enum
{
...
...
@@ -273,14 +282,14 @@ typedef struct SQInfo {
void
*
signature
;
int32_t
code
;
// error code to returned to client
int64_t
owner
;
// if it is in execution
void
*
tsdb
;
SMemRef
memRef
;
int32_t
vgId
;
STableGroupInfo
tableGroupInfo
;
// table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv
runtimeEnv
;
SQuery
query
;
SHashObj
*
arrTableIdInfo
;
int32_t
groupIndex
;
/*
* the query is executed position on which meter of the whole list.
...
...
@@ -296,6 +305,8 @@ typedef struct SQInfo {
void
*
rspContext
;
// response context
int64_t
startExecTs
;
// start to exec timestamp
char
*
sql
;
// query sql string
SQueryCostInfo
summary
;
}
SQInfo
;
typedef
struct
SQueryParam
{
...
...
src/query/inc/qUtil.h
浏览文件 @
428bd3b7
...
...
@@ -57,7 +57,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
realRowId
=
(
int32_t
)(
pResult
->
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
p
RuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
));
int32_t
realRowId
=
(
int32_t
)(
pResult
->
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
p
Query
->
topBotQuery
,
pQuery
->
stableQuery
));
return
((
char
*
)
page
->
data
)
+
pRuntimeEnv
->
offset
[
columnIndex
]
*
pRuntimeEnv
->
numOfRowsPerPage
+
pQuery
->
pExpr1
[
columnIndex
].
bytes
*
realRowId
;
}
...
...
src/query/src/qExecutor.c
浏览文件 @
428bd3b7
...
...
@@ -190,6 +190,8 @@ static SSDataBlock* createOutputBuf(SQuery* pQuery) {
idata
.
pData
=
calloc
(
4096
,
idata
.
info
.
bytes
);
taosArrayPush
(
res
->
pDataBlock
,
&
idata
);
}
return
res
;
}
bool
doFilterData
(
SQuery
*
pQuery
,
int32_t
elemPos
)
{
...
...
@@ -263,9 +265,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
/*
* the value of number of result needs to be update due to offset value upated.
*/
void
updateNumOfResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
numOfRes
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
void
updateNumOfResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQuery
*
pQuery
,
int32_t
numOfRes
)
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
&
pRuntimeEnv
->
pCtx
[
j
]);
...
...
@@ -813,7 +813,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return
num
;
}
static
char
*
getDataBlock
(
SQuery
RuntimeEnv
*
pRuntimeEnv
,
SArithmeticSupport
*
sas
,
int32_t
col
,
int32_t
size
,
SArray
*
pDataBlock
);
static
char
*
getDataBlock
(
SQuery
*
pQuery
,
SArithmeticSupport
*
sas
,
int32_t
col
,
int32_t
size
,
SArray
*
pDataBlock
);
static
void
doBlockwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pWin
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
SArray
*
pDataBlock
)
{
...
...
@@ -826,7 +826,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
pCtx
[
k
].
size
=
forwardStep
;
pCtx
[
k
].
startTs
=
pWin
->
skey
;
char
*
dataBlock
=
getDataBlock
(
p
RuntimeEnv
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
numOfTotal
,
pDataBlock
);
char
*
dataBlock
=
getDataBlock
(
p
Query
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
numOfTotal
,
pDataBlock
);
int32_t
pos
=
(
QUERY_IS_ASC_QUERY
(
pQuery
))
?
offset
:
offset
-
(
forwardStep
-
1
);
if
(
dataBlock
!=
NULL
)
{
...
...
@@ -854,8 +854,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
}
}
static
void
doRowwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pWin
,
int32_t
offset
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
static
void
doRowwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQuery
*
pQuery
,
STimeWindow
*
pWin
,
int32_t
offset
)
{
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
...
...
@@ -868,10 +867,8 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *
}
}
static
int32_t
getNextQualifiedWindow
(
SQuery
RuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pNext
,
SDataBlockInfo
*
pDataBlockInfo
,
static
int32_t
getNextQualifiedWindow
(
SQuery
*
pQuery
,
STimeWindow
*
pNext
,
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
primaryKeys
,
__block_search_fn_t
searchFn
,
int32_t
prevPosition
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
getNextTimeWindow
(
pQuery
,
pNext
);
// next time window is not in current block
...
...
@@ -976,14 +973,12 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) {
}
// todo refactor
static
char
*
getDataBlock
(
SQuery
RuntimeEnv
*
pRuntimeEnv
,
SArithmeticSupport
*
sas
,
int32_t
col
,
int32_t
size
,
SArray
*
pDataBlock
)
{
static
char
*
getDataBlock
(
SQuery
*
pQuery
,
SArithmeticSupport
*
sas
,
int32_t
col
,
int32_t
size
,
SArray
*
pDataBlock
)
{
if
(
pDataBlock
==
NULL
)
{
return
NULL
;
}
char
*
dataBlock
=
NULL
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
functionId
=
pQuery
->
pExpr1
[
col
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_ARITHM
)
{
sas
->
pArithExpr
=
&
pQuery
->
pExpr1
[
col
];
...
...
@@ -1121,13 +1116,13 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY
static
void
doWindowBorderInterpolation
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pDataBlockInfo
,
SArray
*
pDataBlock
,
SResultRow
*
pResult
,
STimeWindow
*
win
,
int32_t
startPos
,
int32_t
forwardStep
)
{
if
(
!
pRuntimeEnv
->
timeWindowInterpo
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
!
pQuery
->
timeWindowInterpo
)
{
return
;
}
assert
(
pDataBlock
!=
NULL
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
fillType
=
pQuery
->
fillType
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
...
...
@@ -1166,7 +1161,7 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
}
}
static
void
aggApplyFunctions
_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
static
void
aggApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
SArray
*
pDataBlock
)
{
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -1178,15 +1173,10 @@ static void aggApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pSt
}
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
char
*
dataBlock
=
getDataBlock
(
p
RuntimeEnv
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
char
*
dataBlock
=
getDataBlock
(
p
Query
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
pQuery
->
pExpr1
[
k
]);
}
/*
* the sqlfunctionCtx parameters should be set done before all functions are invoked,
* since the selectivity + tag_prj query needs all parameters been set done.
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pExpr1
[
k
].
base
.
functionId
;
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
...
...
@@ -1221,7 +1211,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
char
*
dataBlock
=
getDataBlock
(
p
RuntimeEnv
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
char
*
dataBlock
=
getDataBlock
(
p
Query
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
pQuery
->
pExpr1
[
k
]);
}
...
...
@@ -1246,7 +1236,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// prev time window not interpolation yet.
int32_t
curIndex
=
curTimeWindowIndex
(
pWindowResInfo
);
if
(
prevIndex
!=
-
1
&&
prevIndex
<
curIndex
&&
p
RuntimeEnv
->
timeWindowInterpo
)
{
if
(
prevIndex
!=
-
1
&&
prevIndex
<
curIndex
&&
p
Query
->
timeWindowInterpo
)
{
for
(
int32_t
j
=
prevIndex
;
j
<
curIndex
;
++
j
)
{
// previous time window may be all closed already.
SResultRow
*
pRes
=
pWindowResInfo
->
pResult
[
j
];
if
(
pRes
->
closed
)
{
...
...
@@ -1278,7 +1268,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
STimeWindow
nextWin
=
win
;
while
(
1
)
{
int32_t
prevEndPos
=
(
forwardStep
-
1
)
*
step
+
startPos
;
startPos
=
getNextQualifiedWindow
(
p
RuntimeEnv
,
&
nextWin
,
pDataBlockInfo
,
tsCols
,
searchFn
,
prevEndPos
);
startPos
=
getNextQualifiedWindow
(
p
Query
,
&
nextWin
,
pDataBlockInfo
,
tsCols
,
searchFn
,
prevEndPos
);
if
(
startPos
<
0
)
{
break
;
}
...
...
@@ -1313,7 +1303,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
_end:
if
(
p
RuntimeEnv
->
timeWindowInterpo
)
{
if
(
p
Query
->
timeWindowInterpo
)
{
int32_t
rowIndex
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pDataBlockInfo
->
rows
-
1
:
0
;
saveDataBlockLastRow
(
pRuntimeEnv
,
pDataBlockInfo
,
pDataBlock
,
rowIndex
);
}
...
...
@@ -1572,7 +1562,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SColumnInfoData
*
pColumnInfoData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pDataBlock
,
0
);
TSKEY
*
tsCols
=
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
?
(
TSKEY
*
)
pColumnInfoData
->
pData
:
NULL
;
bool
groupbyColumnValue
=
p
RuntimeEnv
->
groupbyColumn
;
bool
groupbyColumnValue
=
p
Query
->
groupbyColumn
;
int16_t
type
=
0
;
int16_t
bytes
=
0
;
...
...
@@ -1584,7 +1574,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
char
*
dataBlock
=
getDataBlock
(
p
RuntimeEnv
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
char
*
dataBlock
=
getDataBlock
(
p
Query
,
&
pRuntimeEnv
->
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
pQuery
->
pExpr1
[
k
]);
pCtx
[
k
].
size
=
1
;
}
...
...
@@ -1641,7 +1631,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// window start key interpolation
if
(
p
RuntimeEnv
->
timeWindowInterpo
)
{
if
(
p
Query
->
timeWindowInterpo
)
{
// check for the time window end time interpolation
int32_t
curIndex
=
curTimeWindowIndex
(
pWindowResInfo
);
if
(
prevWindowIndex
!=
-
1
&&
prevWindowIndex
<
curIndex
)
{
...
...
@@ -1711,7 +1701,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
}
if
(
p
RuntimeEnv
->
stabledev
)
{
if
(
p
Query
->
stabledev
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pExpr1
[
k
].
base
.
functionId
;
if
(
functionId
!=
TSDB_FUNC_STDDEV_DST
)
{
...
...
@@ -1767,7 +1757,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// In case of all rows in current block are not qualified
if
(
p
RuntimeEnv
->
timeWindowInterpo
&&
prevRowIndex
!=
-
1
)
{
if
(
p
Query
->
timeWindowInterpo
&&
prevRowIndex
!=
-
1
)
{
saveDataBlockLastRow
(
pRuntimeEnv
,
pDataBlockInfo
,
pDataBlock
,
prevRowIndex
);
}
...
...
@@ -1783,7 +1773,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
SResultRowInfo
*
pResultRowInfo
=
&
pRuntimeEnv
->
resultRowInfo
;
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTsBuf
!=
NULL
||
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTsBuf
!=
NULL
||
p
Query
->
groupbyColumn
)
{
rowwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pResultRowInfo
,
pDataBlock
);
}
else
{
blockwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pResultRowInfo
,
searchFn
,
pDataBlock
);
...
...
@@ -1795,9 +1785,9 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied
int32_t
numOfRes
=
0
;
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
p
Query
->
groupbyColumn
)
{
numOfRes
=
pResultRowInfo
->
size
;
updateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
,
QUERY_IS_ASC_QUERY
(
pQuery
),
p
RuntimeEnv
->
timeWindowInterpo
);
updateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
,
QUERY_IS_ASC_QUERY
(
pQuery
),
p
Query
->
timeWindowInterpo
);
}
else
{
// projection query
numOfRes
=
(
int32_t
)
getNumOfResult
(
pRuntimeEnv
);
...
...
@@ -2054,7 +2044,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
// if it is group by normal column, do not set output buffer, the output buffer is pResult
// fixed output query/multi-output query for normal table
if
(
!
p
RuntimeEnv
->
groupbyColumn
&&
!
pRuntimeEnv
->
stableQuery
&&
!
QUERY_IS_INTERVAL_QUERY
(
pRuntimeEnv
->
pQuery
))
{
if
(
!
p
Query
->
groupbyColumn
&&
!
pRuntimeEnv
->
stableQuery
&&
!
QUERY_IS_INTERVAL_QUERY
(
pRuntimeEnv
->
pQuery
))
{
resetDefaultResInfoOutputBuf
(
pRuntimeEnv
);
}
...
...
@@ -2189,7 +2179,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
}
// Note:top/bottom query is fixed output query
if
(
pRuntimeEnv
->
topBotQuery
||
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
pRuntimeEnv
->
topBotQuery
||
p
Query
->
groupbyColumn
)
{
return
true
;
}
...
...
@@ -2496,7 +2486,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if
(
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
))
{
num
=
128
;
}
else
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// time window query, allocate one page for each table
size_t
s
=
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
;
size_t
s
=
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
;
num
=
(
int32_t
)(
MAX
(
s
,
INITIAL_RESULT_ROWS_VALUE
));
}
else
{
// for super table query, one page for each subset
num
=
1
;
// pQInfo->pSidSet->numOfSubSet;
...
...
@@ -2822,7 +2812,7 @@ static void expandBuffer(SQueryRuntimeEnv* pRuntimeEnv, int32_t newSize, void* q
static
void
ensureOutputBuffer
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
numOfRows
)
{
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
!
p
RuntimeEnv
->
groupbyColumn
&&
!
isFixedOutputQuery
(
pRuntimeEnv
)
&&
!
isTsCompQuery
(
pQuery
))
{
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
!
p
Query
->
groupbyColumn
&&
!
isFixedOutputQuery
(
pRuntimeEnv
)
&&
!
isTsCompQuery
(
pQuery
))
{
SResultRec
*
pRec
=
&
pQuery
->
rec
;
int32_t
remain
=
(
int32_t
)(
pRec
->
capacity
-
pRec
->
rows
);
...
...
@@ -3229,7 +3219,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
// group by normal columns and interval query on normal table
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
resultRowInfo
;
if
(
p
RuntimeEnv
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
if
(
p
Query
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
disableFuncInReverseScanImpl
(
pRuntimeEnv
,
pWindowResInfo
,
order
);
}
else
{
// for simple result of table query,
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
// todo refactor
...
...
@@ -3417,7 +3407,7 @@ bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
bool
toContinue
=
false
;
if
(
p
RuntimeEnv
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
if
(
p
Query
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// for each group result, call the finalize function for each column
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
resultRowInfo
;
...
...
@@ -3532,22 +3522,10 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) {
}
SET_REVERSE_SCAN_FLAG
(
pRuntimeEnv
);
STsdbQueryCond
cond
=
createTsdbQueryCond
(
pQuery
,
&
pQuery
->
window
);
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
switchCtxOrder
(
pRuntimeEnv
);
disableFuncInReverseScan
(
pQInfo
);
setupQueryRangeForReverseScan
(
pQInfo
);
// clean unused handle
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pSecQueryHandle
);
}
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
tableGroupInfo
,
pQInfo
,
&
pQInfo
->
memRef
);
if
(
pRuntimeEnv
->
pSecQueryHandle
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
}
static
void
clearEnvAfterReverseScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQueryStatusInfo
*
pStatus
)
{
...
...
@@ -3651,7 +3629,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQueryStatusInfo
qstatus
=
getQueryStatusInfo
(
pRuntimeEnv
,
start
);
SET_MASTER_SCAN_FLAG
(
pRuntimeEnv
);
if
(
!
p
RuntimeEnv
->
groupbyColumn
&&
pRuntimeEnv
->
hasTagResults
)
{
if
(
!
p
Query
->
groupbyColumn
&&
pRuntimeEnv
->
hasTagResults
)
{
setTagVal
(
pRuntimeEnv
,
pTableQueryInfo
->
pTable
);
}
...
...
@@ -3716,10 +3694,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
void
finalizeQueryResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
p
RuntimeEnv
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
if
(
p
Query
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// for each group result, call the finalize function for each column
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
resultRowInfo
;
if
(
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
p
Query
->
groupbyColumn
)
{
closeAllResultRows
(
pWindowResInfo
);
}
...
...
@@ -3928,7 +3906,7 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf
int32_t
setParamValue
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pRuntimeEnv
->
prevResult
==
NULL
||
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
pRuntimeEnv
->
prevResult
==
NULL
||
p
Query
->
groupbyColumn
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4146,14 +4124,14 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
SResultRowInfo
*
pResultRowInfo
=
&
pTableQueryInfo
->
resInfo
;
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
pDataBlockInfo
->
rows
-
1
;
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTsBuf
!=
NULL
||
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTsBuf
!=
NULL
||
p
Query
->
groupbyColumn
)
{
rowwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pResultRowInfo
,
pDataBlock
);
}
else
{
blockwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pResultRowInfo
,
searchFn
,
pDataBlock
);
}
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
updateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
,
QUERY_IS_ASC_QUERY
(
pQuery
),
p
RuntimeEnv
->
timeWindowInterpo
);
updateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
,
QUERY_IS_ASC_QUERY
(
pQuery
),
p
Query
->
timeWindowInterpo
);
}
}
...
...
@@ -4188,7 +4166,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR
return
numOfTotal
>
0
;
}
else
{
// there are results waiting for returned to client.
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
)
&&
hasRemainData
(
pGroupResInfo
)
&&
(
p
RuntimeEnv
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
)))
{
(
p
Query
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
)))
{
return
true
;
}
}
...
...
@@ -4304,7 +4282,7 @@ void queryCostStatis(SQInfo *pQInfo) {
SQueryCostInfo
*
pSummary
=
&
pRuntimeEnv
->
summary
;
uint64_t
hashSize
=
taosHashGetMemSize
(
pQInfo
->
runtimeEnv
.
pResultRowHashTable
);
hashSize
+=
taosHashGetMemSize
(
p
QInfo
->
tableqinfoGroupInfo
.
map
);
hashSize
+=
taosHashGetMemSize
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
map
);
pSummary
->
hashSize
=
hashSize
;
// add the merge time
...
...
@@ -4475,7 +4453,7 @@ static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* w
tw
=
*
win
;
int32_t
startPos
=
getNextQualifiedWindow
(
p
RuntimeEnv
,
&
tw
,
pBlockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
,
-
1
);
getNextQualifiedWindow
(
p
Query
,
&
tw
,
pBlockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
,
-
1
);
assert
(
startPos
>=
0
);
// set the abort info
...
...
@@ -4595,7 +4573,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
}
else
{
tw
=
win
;
int32_t
startPos
=
getNextQualifiedWindow
(
p
RuntimeEnv
,
&
tw
,
&
blockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
,
-
1
);
getNextQualifiedWindow
(
p
Query
,
&
tw
,
&
blockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
,
-
1
);
assert
(
startPos
>=
0
);
// set the abort info
...
...
@@ -4635,7 +4613,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
STsdbQueryCond
cond
=
createTsdbQueryCond
(
pQuery
,
&
pQuery
->
window
);
if
(
!
isSTableQuery
&&
(
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
)
&&
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
1
)
&&
(
cond
.
order
==
TSDB_ORDER_ASC
)
&&
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
!
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
))
...
...
@@ -4653,7 +4631,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
// update the query time window
pQuery
->
window
=
cond
.
twindow
;
if
(
pQInfo
->
tableGroupInfo
.
numOfTables
==
0
)
{
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
=
0
;
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
=
0
;
}
else
{
size_t
numOfGroups
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
...
...
@@ -4711,7 +4689,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv
->
topBotQuery
=
isTopBottomQuery
(
pQuery
);
pRuntimeEnv
->
hasTagResults
=
hasTagValOutput
(
pQuery
);
p
RuntimeEnv
->
timeWindowInterpo
=
timeWindowInterpoRequired
(
pQuery
);
p
Query
->
timeWindowInterpo
=
timeWindowInterpoRequired
(
pQuery
);
pRuntimeEnv
->
prevResult
=
prevResult
;
setScanLimitationByResultBuffer
(
pQuery
);
...
...
@@ -4730,7 +4708,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv
->
cur
.
vgroupIndex
=
-
1
;
pRuntimeEnv
->
stableQuery
=
isSTableQuery
;
pRuntimeEnv
->
prevGroupId
=
INT32_MIN
;
p
RuntimeEnv
->
groupbyColumn
=
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
);
p
Query
->
groupbyColumn
=
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
);
if
(
needReverseScan
(
pQuery
))
{
pRuntimeEnv
->
pi
=
createBiDirectionTableScanInfo
(
pRuntimeEnv
->
pQueryHandle
,
pQInfo
,
getNumOfScanTimes
(
pQuery
),
1
);
...
...
@@ -4756,7 +4734,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
int16_t
type
=
TSDB_DATA_TYPE_NULL
;
if
(
p
RuntimeEnv
->
groupbyColumn
)
{
// group by columns not tags;
if
(
p
Query
->
groupbyColumn
)
{
// group by columns not tags;
type
=
getGroupbyColumnType
(
pQuery
,
pQuery
->
pGroupbyExpr
);
}
else
{
type
=
TSDB_DATA_TYPE_INT
;
// group id
...
...
@@ -4767,7 +4745,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
return
code
;
}
}
}
else
if
(
p
RuntimeEnv
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
(
!
isSTableQuery
))
{
}
else
if
(
p
Query
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
(
!
isSTableQuery
))
{
int32_t
numOfResultRows
=
getInitialPageNum
(
pQInfo
);
getIntermediateBufInfo
(
pRuntimeEnv
,
&
ps
,
&
rowsize
);
code
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
rowsize
,
ps
,
TENMB
,
pQInfo
);
...
...
@@ -4776,7 +4754,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
}
int16_t
type
=
TSDB_DATA_TYPE_NULL
;
if
(
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
p
Query
->
groupbyColumn
)
{
type
=
getGroupbyColumnType
(
pQuery
,
pQuery
->
pGroupbyExpr
);
}
else
{
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
...
...
@@ -4877,7 +4855,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
}
tsdbRetrieveDataBlockInfo
(
pQueryHandle
,
&
blockInfo
);
STableQueryInfo
**
pTableQueryInfo
=
(
STableQueryInfo
**
)
taosHashGet
(
p
QInfo
->
tableqinfoGroupInfo
.
map
,
&
blockInfo
.
tid
,
sizeof
(
blockInfo
.
tid
));
STableQueryInfo
**
pTableQueryInfo
=
(
STableQueryInfo
**
)
taosHashGet
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
map
,
&
blockInfo
.
tid
,
sizeof
(
blockInfo
.
tid
));
if
(
pTableQueryInfo
==
NULL
)
{
break
;
}
...
...
@@ -4885,7 +4863,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
pQuery
->
current
=
*
pTableQueryInfo
;
doTableQueryInfoTimeWindowCheck
(
pQuery
,
*
pTableQueryInfo
);
if
(
!
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
!
p
Query
->
groupbyColumn
)
{
setEnvForEachBlock
(
pQInfo
,
*
pTableQueryInfo
,
&
blockInfo
);
}
...
...
@@ -5154,7 +5132,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
break
;
}
}
}
else
if
(
p
RuntimeEnv
->
groupbyColumn
)
{
// group-by on normal columns query
}
else
if
(
p
Query
->
groupbyColumn
)
{
// group-by on normal columns query
while
(
pQInfo
->
groupIndex
<
numOfGroups
)
{
SArray
*
group
=
taosArrayGetP
(
pQInfo
->
tableGroupInfo
.
pGroupList
,
pQInfo
->
groupIndex
);
...
...
@@ -5233,8 +5211,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
resetDefaultResInfoOutputBuf
(
pRuntimeEnv
);
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
0
);
assert
(
taosArrayGetSize
(
group
)
==
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
&&
1
==
taosArrayGetSize
(
p
QInfo
->
tableqinfoGroupInfo
.
pGroupList
));
assert
(
taosArrayGetSize
(
group
)
==
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
&&
1
==
taosArrayGetSize
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
pGroupList
));
void
*
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
if
(
pQueryHandle
==
NULL
)
{
...
...
@@ -5268,7 +5246,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
tsdbRetrieveDataBlockInfo
(
pQueryHandle
,
&
blockInfo
);
STableQueryInfo
**
pTableQueryInfo
=
(
STableQueryInfo
**
)
taosHashGet
(
p
QInfo
->
tableqinfoGroupInfo
.
map
,
&
blockInfo
.
tid
,
sizeof
(
blockInfo
.
tid
));
(
STableQueryInfo
**
)
taosHashGet
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
map
,
&
blockInfo
.
tid
,
sizeof
(
blockInfo
.
tid
));
if
(
pTableQueryInfo
==
NULL
)
{
break
;
}
...
...
@@ -5380,7 +5358,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
// all data have returned already
if
(
pQInfo
->
tableIndex
>=
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
)
{
if
(
pQInfo
->
tableIndex
>=
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
)
{
return
;
}
...
...
@@ -5388,10 +5366,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
resetResultRowInfo
(
pRuntimeEnv
,
&
pRuntimeEnv
->
resultRowInfo
);
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
0
);
assert
(
taosArrayGetSize
(
group
)
==
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
&&
1
==
taosArrayGetSize
(
p
QInfo
->
tableqinfoGroupInfo
.
pGroupList
));
assert
(
taosArrayGetSize
(
group
)
==
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
&&
1
==
taosArrayGetSize
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
pGroupList
));
while
(
pQInfo
->
tableIndex
<
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
)
{
while
(
pQInfo
->
tableIndex
<
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
)
{
if
(
isQueryKilled
(
pQInfo
))
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
...
...
@@ -5454,7 +5432,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
if
(
pQInfo
->
tableIndex
>=
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
)
{
if
(
pQInfo
->
tableIndex
>=
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
)
{
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
}
...
...
@@ -5479,7 +5457,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
qDebug
(
"QInfo %p numOfTables:%"
PRIu64
", index:%d, numOfGroups:%"
PRIzu
", %"
PRId64
" points returned, total:%"
PRId64
", offset:%"
PRId64
,
pQInfo
,
(
uint64_t
)
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
,
pQInfo
->
tableIndex
,
numOfGroups
,
pQuery
->
rec
.
rows
,
pQInfo
,
(
uint64_t
)
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
,
pQInfo
->
tableIndex
,
numOfGroups
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
,
pQuery
->
limit
.
offset
);
}
}
...
...
@@ -5680,7 +5658,7 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
tfree
(
arithSup
.
data
);
}
static
SSDataBlock
*
doScanImpl
(
STableScanInfo
*
pTableScanInfo
)
{
static
SSDataBlock
*
doScan
Table
Impl
(
STableScanInfo
*
pTableScanInfo
)
{
SSDataBlock
*
pBlock
=
&
pTableScanInfo
->
block
;
while
(
tsdbNextDataBlock
(
pTableScanInfo
->
pQueryHandle
))
{
...
...
@@ -5710,7 +5688,7 @@ static SSDataBlock* doTableScan(void* param) {
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pTableScanInfo
->
pQInfo
->
runtimeEnv
;
while
(
pTableScanInfo
->
current
<
pTableScanInfo
->
times
)
{
SSDataBlock
*
p
=
doScanImpl
(
pTableScanInfo
);
SSDataBlock
*
p
=
doScan
Table
Impl
(
pTableScanInfo
);
if
(
p
!=
NULL
)
{
return
p
;
}
...
...
@@ -5758,7 +5736,7 @@ static SSDataBlock* doTableScan(void* param) {
pTableScanInfo
->
times
=
1
;
pTableScanInfo
->
current
=
0
;
SSDataBlock
*
p
=
doScanImpl
(
pTableScanInfo
);
SSDataBlock
*
p
=
doScan
Table
Impl
(
pTableScanInfo
);
if
(
p
!=
NULL
)
{
return
p
;
}
...
...
@@ -5799,11 +5777,17 @@ static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) {
return
pTableScanInfo
->
current
;
}
static
UNUSED_FUNC
int32_t
getScanOrder
(
STableScanInfo
*
pTableScanInfo
)
{
return
pTableScanInfo
->
order
;
}
// this is a blocking operator
static
SSDataBlock
*
doAggOperator
(
void
*
param
)
{
SAggOperatorInfo
*
pInfo
=
(
SAggOperatorInfo
*
)
param
;
int32_t
countId
=
0
;
int32_t
order
=
getScanOrder
(
pInfo
->
pTableScanInfo
);
while
(
1
)
{
SSDataBlock
*
pBlock
=
pInfo
->
pTableScanInfo
->
apply
(
pInfo
->
pTableScanInfo
);
if
(
pBlock
==
NULL
)
{
...
...
@@ -5812,16 +5796,22 @@ static SSDataBlock* doAggOperator(void* param) {
if
(
countId
!=
getTableScanTime
(
pInfo
->
pTableScanInfo
))
{
needRepeatScan
(
pInfo
->
pRuntimeEnv
);
countId
=
getTableScanTime
(
pInfo
->
pTableScanInfo
);
}
if
(
order
!=
getScanOrder
(
pInfo
->
pTableScanInfo
))
{
setEnvBeforeReverseScan_rv
(
pInfo
->
pRuntimeEnv
);
order
=
getScanOrder
(
pInfo
->
pTableScanInfo
);
}
aggApplyFunctions
_rv
(
pInfo
->
pRuntimeEnv
,
pBlock
->
pBlockStatis
,
&
pBlock
->
info
,
pBlock
->
pDataBlock
);
aggApplyFunctions
(
pInfo
->
pRuntimeEnv
,
pBlock
->
pBlockStatis
,
&
pBlock
->
info
,
pBlock
->
pDataBlock
);
}
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
setQueryStatus
(
p
Info
->
pRuntimeEnv
->
p
Query
,
QUERY_COMPLETED
);
finalizeQueryResult
(
pInfo
->
pRuntimeEnv
);
res
->
info
.
rows
=
getNumOfResult
(
pInfo
->
pRuntimeEnv
);
return
res
;
pInfo
->
pRuntimeEnv
->
pQuery
->
ouptputBuf
->
info
.
rows
=
getNumOfResult
(
pInfo
->
pRuntimeEnv
);
return
pInfo
->
pRuntimeEnv
->
pQuery
->
ouptputBuf
;
}
// todo set the attribute of query scan count
...
...
@@ -5866,8 +5856,6 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
SAggOperatorInfo
*
pAggInfo
=
createAggOperatorInfo
(
&
pRuntimeEnv
->
resultRowInfo
,
pQuery
->
current
,
pRuntimeEnv
,
pRuntimeEnv
->
pi
);
SSDataBlock
*
pResBlock
=
pAggInfo
->
apply
(
pAggInfo
);
// scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
// since the numOfRows must be identical for all functions that are allowed to be executed simutaneously.
// pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
pQuery
->
rec
.
rows
=
pResBlock
->
info
.
rows
;
//getNumOfResult(pRuntimeEnv);
...
...
@@ -5973,7 +5961,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
TSKEY
newStartKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
INT64_MIN
:
INT64_MAX
;
// skip blocks without load the actual data block from file if no filter condition present
if
(
!
p
RuntimeEnv
->
groupbyColumn
)
{
if
(
!
p
Query
->
groupbyColumn
)
{
skipTimeInterval
(
pRuntimeEnv
,
&
newStartKey
);
if
(
pQuery
->
limit
.
offset
>
0
&&
pQuery
->
numOfFilterCols
==
0
&&
pRuntimeEnv
->
pFillInfo
==
NULL
)
{
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
...
...
@@ -6049,14 +6037,14 @@ void tableQueryImpl(SQInfo *pQInfo) {
pQuery
->
rec
.
rows
=
0
;
int64_t
st
=
taosGetTimestampUs
();
assert
(
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
assert
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
SArray
*
g
=
GET_TABLEGROUP
(
pQInfo
,
0
);
STableQueryInfo
*
item
=
taosArrayGetP
(
g
,
0
);
pQuery
->
current
=
item
;
// group by normal column, sliding window query, interval query are handled by interval query processor
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
p
RuntimeEnv
->
groupbyColumn
)
{
// interval (down sampling operation)
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
p
Query
->
groupbyColumn
)
{
// interval (down sampling operation)
tableIntervalProcess
(
pQInfo
,
item
);
}
else
if
(
isFixedOutputQuery
(
pRuntimeEnv
))
{
tableAggregationProcess
(
pQInfo
,
item
);
...
...
@@ -6067,7 +6055,7 @@ void tableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time
pRuntimeEnv
->
summary
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
assert
(
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
assert
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
}
void
buildTableBlockDistResult
(
SQInfo
*
pQInfo
)
{
...
...
@@ -6131,10 +6119,10 @@ void stableQueryImpl(SQInfo *pQInfo) {
int64_t
st
=
taosGetTimestampUs
();
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
(
isFixedOutputQuery
(
pRuntimeEnv
)
&&
(
!
isPointInterpoQuery
(
pQuery
))
&&
(
!
p
RuntimeEnv
->
groupbyColumn
)))
{
(
isFixedOutputQuery
(
pRuntimeEnv
)
&&
(
!
isPointInterpoQuery
(
pQuery
))
&&
(
!
p
Query
->
groupbyColumn
)))
{
multiTableQueryProcess
(
pQInfo
);
}
else
{
assert
(
pQuery
->
checkResultBuf
==
1
||
isPointInterpoQuery
(
pQuery
)
||
p
RuntimeEnv
->
groupbyColumn
);
assert
(
pQuery
->
checkResultBuf
==
1
||
isPointInterpoQuery
(
pQuery
)
||
p
Query
->
groupbyColumn
);
sequentialTableProcess
(
pQInfo
);
}
...
...
@@ -6938,7 +6926,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
size_t
numOfGroups
=
0
;
if
(
pTableGroupInfo
->
pGroupList
!=
NULL
)
{
numOfGroups
=
taosArrayGetSize
(
pTableGroupInfo
->
pGroupList
);
STableGroupInfo
*
pTableqinfo
=
&
p
QInfo
->
tableqinfoGroupInfo
;
STableGroupInfo
*
pTableqinfo
=
&
p
RuntimeEnv
->
tableqinfoGroupInfo
;
pTableqinfo
->
pGroupList
=
taosArrayInit
(
numOfGroups
,
POINTER_BYTES
);
pTableqinfo
->
numOfTables
=
pTableGroupInfo
->
numOfTables
;
...
...
@@ -6977,7 +6965,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
goto
_cleanup
;
}
taosArrayPush
(
p
QInfo
->
tableqinfoGroupInfo
.
pGroupList
,
&
p1
);
taosArrayPush
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
pGroupList
,
&
p1
);
for
(
int32_t
j
=
0
;
j
<
s
;
++
j
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pa
,
j
);
...
...
@@ -6997,7 +6985,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
taosArrayPush
(
p1
,
&
item
);
STableId
*
id
=
TSDB_TABLEID
(
info
->
pTable
);
taosHashPut
(
p
QInfo
->
tableqinfoGroupInfo
.
map
,
&
id
->
tid
,
sizeof
(
id
->
tid
),
&
item
,
POINTER_BYTES
);
taosHashPut
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
map
,
&
id
->
tid
,
sizeof
(
id
->
tid
),
&
item
,
POINTER_BYTES
);
index
+=
1
;
}
}
...
...
@@ -7076,12 +7064,12 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
qDebug
(
"QInfo:%p no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
=
0
;
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
=
0
;
// todo free memory
return
TSDB_CODE_SUCCESS
;
}
if
(
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
if
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%p no table qualified for tag filter, abort query"
,
pQInfo
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -7161,7 +7149,7 @@ void freeQInfo(SQInfo *pQInfo) {
qDebug
(
"QInfo:%p start to free QInfo"
,
pQInfo
);
releaseQueryBuf
(
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
);
releaseQueryBuf
(
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
);
teardownQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
);
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
...
...
@@ -7206,7 +7194,7 @@ void freeQInfo(SQInfo *pQInfo) {
tfree
(
pQuery
);
}
doDestroyTableQueryInfo
(
&
p
QInfo
->
tableqinfoGroupInfo
);
doDestroyTableQueryInfo
(
&
p
RuntimeEnv
->
tableqinfoGroupInfo
);
tfree
(
pQInfo
->
pBuf
);
tfree
(
pQInfo
->
sql
);
...
...
@@ -7348,7 +7336,7 @@ void buildTagQueryResult(SQInfo* pQInfo) {
SArray
*
pa
=
GET_TABLEGROUP
(
pQInfo
,
0
);
size_t
num
=
taosArrayGetSize
(
pa
);
assert
(
num
==
p
QInfo
->
tableqinfoGroupInfo
.
numOfTables
);
assert
(
num
==
p
RuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
);
int32_t
count
=
0
;
int32_t
functionId
=
pQuery
->
pExpr1
[
0
].
base
.
functionId
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录