Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f95d75d1
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f95d75d1
编写于
7月 09, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-225] opt query perf
上级
fe4b5832
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
117 addition
and
94 deletion
+117
-94
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+10
-0
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+2
-3
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+2
-12
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+1
-0
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+3
-1
src/query/inc/tsqlfunction.h
src/query/inc/tsqlfunction.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+82
-64
src/query/src/qUtil.c
src/query/src/qUtil.c
+16
-13
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
f95d75d1
...
...
@@ -87,6 +87,16 @@ typedef struct SVgroupTableInfo {
SArray
*
itemList
;
//SArray<STableIdInfo>
}
SVgroupTableInfo
;
static
FORCE_INLINE
SQueryInfo
*
tscGetQueryInfoDetail
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
)
{
assert
(
pCmd
!=
NULL
&&
subClauseIndex
>=
0
&&
subClauseIndex
<
TSDB_MAX_UNION_CLAUSE
);
if
(
pCmd
->
pQueryInfo
==
NULL
||
subClauseIndex
>=
pCmd
->
numOfClause
)
{
return
NULL
;
}
return
pCmd
->
pQueryInfo
[
subClauseIndex
];
}
int32_t
tscCreateDataBlock
(
size_t
initialSize
,
int32_t
rowSize
,
int32_t
startOffset
,
const
char
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
);
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
);
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
f95d75d1
...
...
@@ -340,13 +340,12 @@ bool stableQueryFunctChanged(int32_t funcId) {
*/
void
resetResultInfo
(
SResultInfo
*
pResInfo
)
{
pResInfo
->
initialized
=
false
;
}
void
setResultInfoBuf
(
SResultInfo
*
pResInfo
,
int32_t
size
,
bool
superTable
)
{
void
setResultInfoBuf
(
SResultInfo
*
pResInfo
,
int32_t
size
,
bool
superTable
,
char
*
buf
)
{
assert
(
pResInfo
->
interResultBuf
==
NULL
);
pResInfo
->
bufLen
=
size
;
pResInfo
->
superTableQ
=
superTable
;
pResInfo
->
interResultBuf
=
calloc
(
1
,
(
size_t
)
size
);
pResInfo
->
interResultBuf
=
buf
;
}
// set the query flag to denote that query is completed
...
...
src/client/src/tscUtil.c
浏览文件 @
f95d75d1
...
...
@@ -1464,16 +1464,6 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return
pQueryInfo
->
pTableMetaInfo
[
tableIndex
];
}
SQueryInfo
*
tscGetQueryInfoDetail
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
)
{
assert
(
pCmd
!=
NULL
&&
subClauseIndex
>=
0
&&
subClauseIndex
<
TSDB_MAX_UNION_CLAUSE
);
if
(
pCmd
->
pQueryInfo
==
NULL
||
subClauseIndex
>=
pCmd
->
numOfClause
)
{
return
NULL
;
}
return
pCmd
->
pQueryInfo
[
subClauseIndex
];
}
int32_t
tscGetQueryInfoDetailSafely
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
SQueryInfo
**
pQueryInfo
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
...
...
@@ -2097,7 +2087,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
}
void
tscGetResultColumnChr
(
SSqlRes
*
pRes
,
SFieldInfo
*
pFieldInfo
,
int32_t
columnIndex
)
{
SFieldSupInfo
*
pInfo
=
taosArrayGet
(
pFieldInfo
->
pSupportInfo
,
columnIndex
);
//tscFieldInfoGetSupp(pFieldInfo, columnIndex);
SFieldSupInfo
*
pInfo
=
taosArrayGet
(
pFieldInfo
->
pSupportInfo
,
columnIndex
);
assert
(
pInfo
->
pSqlExpr
!=
NULL
);
int32_t
type
=
pInfo
->
pSqlExpr
->
resType
;
...
...
@@ -2112,7 +2102,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
if
(
isNull
(
pData
,
type
))
{
pRes
->
tsrow
[
columnIndex
]
=
NULL
;
}
else
{
pRes
->
tsrow
[
columnIndex
]
=
pData
+
VARSTR_HEADER_SIZE
;
pRes
->
tsrow
[
columnIndex
]
=
((
tstr
*
)
pData
)
->
data
;
}
if
(
realLen
<
pInfo
->
pSqlExpr
->
resBytes
-
VARSTR_HEADER_SIZE
)
{
// todo refactor
...
...
src/query/inc/qExecutor.h
浏览文件 @
f95d75d1
...
...
@@ -172,6 +172,7 @@ typedef struct SQueryRuntimeEnv {
bool
topBotQuery
;
// false
bool
groupbyNormalCol
;
// denote if this is a groupby normal column query
bool
hasTagResults
;
// if there are tag values in final result or not
int32_t
interBufSize
;
// intermediate buffer sizse
int32_t
prevGroupId
;
// previous executed group id
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
}
SQueryRuntimeEnv
;
...
...
src/query/inc/qUtil.h
浏览文件 @
f95d75d1
...
...
@@ -15,6 +15,8 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
int32_t
getOutputInterResultBufSize
(
SQuery
*
pQuery
);
void
clearTimeWindowResBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pOneOutputRes
);
void
copyTimeWindowResBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
dst
,
const
SWindowResult
*
src
);
...
...
@@ -35,7 +37,7 @@ SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
#define curTimeWindow(_winres) ((_winres)->curIndex)
bool
isWindowResClosed
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
);
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
SPosInfo
*
posInfo
);
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
SPosInfo
*
posInfo
,
size_t
interBufSize
);
char
*
getPosInResultPage
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
columnIndex
,
SWindowResult
*
pResult
);
...
...
src/query/inc/tsqlfunction.h
浏览文件 @
f95d75d1
...
...
@@ -272,7 +272,7 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
bool
stableQueryFunctChanged
(
int32_t
funcId
);
void
resetResultInfo
(
SResultInfo
*
pResInfo
);
void
setResultInfoBuf
(
SResultInfo
*
pResInfo
,
int32_t
size
,
bool
superTable
);
void
setResultInfoBuf
(
SResultInfo
*
pResInfo
,
int32_t
size
,
bool
superTable
,
char
*
buf
);
static
FORCE_INLINE
void
initResultInfo
(
SResultInfo
*
pResInfo
)
{
pResInfo
->
initialized
=
true
;
// the this struct has been initialized flag
...
...
src/query/src/qExecutor.c
浏览文件 @
f95d75d1
...
...
@@ -123,6 +123,14 @@ static void setQueryStatus(SQuery *pQuery, int8_t status);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
// previous time window may not be of the same size of pQuery->intervalTime
#define GET_NEXT_TIMEWINDOW(_q, tw) \
do { \
int32_t factor = GET_FORWARD_DIRECTION_FACTOR((_q)->order.order); \
(tw)->skey += ((_q)->slidingTime * factor); \
(tw)->ekey = (tw)->skey + ((_q)->intervalTime - 1); \
} while (0)
// todo move to utility
static
int32_t
mergeIntoGroupResultImpl
(
SQInfo
*
pQInfo
,
SArray
*
group
);
...
...
@@ -130,7 +138,6 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
static
void
setWindowResOutputBufInitCtx
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pResult
);
static
void
resetMergeResultBuf
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
SResultInfo
*
pResultInfo
);
static
bool
functionNeedToExecute
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
);
static
void
getNextTimeWindow
(
SQuery
*
pQuery
,
STimeWindow
*
pTimeWindow
);
static
void
setExecParams
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
void
*
inputData
,
TSKEY
*
tsCol
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
*
pStatis
,
void
*
param
,
int32_t
colIndex
);
...
...
@@ -419,7 +426,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
for
(
int32_t
i
=
pWindowResInfo
->
capacity
;
i
<
newCap
;
++
i
)
{
SPosInfo
pos
=
{
-
1
,
-
1
};
createQueryResultInfo
(
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
pos
);
createQueryResultInfo
(
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
pos
,
pRuntimeEnv
->
interBufSize
);
}
pWindowResInfo
->
capacity
=
newCap
;
}
...
...
@@ -551,19 +558,29 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int
static
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int16_t
pos
,
int16_t
order
,
int64_t
*
pData
)
{
int32_t
endPos
=
searchFn
((
char
*
)
pData
,
numOfRows
,
ekey
,
order
);
int32_t
forwardStep
=
0
;
if
(
endPos
>=
0
)
{
forwardStep
=
(
order
==
TSDB_ORDER_ASC
)
?
(
endPos
-
pos
)
:
(
pos
-
endPos
);
assert
(
forwardStep
>=
0
);
if
(
order
==
TSDB_ORDER_ASC
)
{
int32_t
end
=
searchFn
((
char
*
)
&
pData
[
pos
],
numOfRows
-
pos
,
ekey
,
order
);
if
(
end
>=
0
)
{
forwardStep
=
end
;
// endPos data is equalled to the key so, we do need to read the element in endPos
if
(
pData
[
endPos
]
==
ekey
)
{
forwardStep
+=
1
;
if
(
pData
[
end
+
pos
]
==
ekey
)
{
forwardStep
+=
1
;
}
}
}
else
{
int32_t
end
=
searchFn
((
char
*
)
pData
,
pos
+
1
,
ekey
,
order
);
if
(
end
>=
0
)
{
forwardStep
=
pos
-
end
;
if
(
pData
[
end
]
==
ekey
)
{
forwardStep
+=
1
;
}
}
}
assert
(
forwardStep
>
0
);
return
forwardStep
;
}
...
...
@@ -686,7 +703,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
}
}
assert
(
num
>
=
0
);
assert
(
num
>
0
);
return
num
;
}
...
...
@@ -736,59 +753,60 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
}
}
static
int32_t
getNextQualifiedWindow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pNextWin
,
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
primaryKeys
,
__block_search_fn_t
searchFn
)
{
static
int32_t
getNextQualifiedWindow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pNext
,
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
primaryKeys
,
__block_search_fn_t
searchFn
,
int32_t
prevPosition
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// tumbling time window query, a special case of sliding time window query
if
(
pQuery
->
slidingTime
==
pQuery
->
intervalTime
)
{
// todo opt
}
getNextTimeWindow
(
pQuery
,
pNextWin
);
GET_NEXT_TIMEWINDOW
(
pQuery
,
pNext
);
// next time window is not in current block
if
((
pNext
Win
->
skey
>
pDataBlockInfo
->
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
pNext
Win
->
ekey
<
pDataBlockInfo
->
window
.
skey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
if
((
pNext
->
skey
>
pDataBlockInfo
->
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
pNext
->
ekey
<
pDataBlockInfo
->
window
.
skey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
return
-
1
;
}
TSKEY
startKey
=
-
1
;
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
startKey
=
pNext
Win
->
skey
;
startKey
=
pNext
->
skey
;
if
(
startKey
<
pQuery
->
window
.
skey
)
{
startKey
=
pQuery
->
window
.
skey
;
}
}
else
{
startKey
=
pNext
Win
->
ekey
;
startKey
=
pNext
->
ekey
;
if
(
startKey
>
pQuery
->
window
.
skey
)
{
startKey
=
pQuery
->
window
.
skey
;
}
}
int32_t
startPos
=
searchFn
((
char
*
)
primaryKeys
,
pDataBlockInfo
->
rows
,
startKey
,
pQuery
->
order
.
order
);
int32_t
startPos
=
0
;
// tumbling time window query, a special case of sliding time window query
if
(
pQuery
->
slidingTime
==
pQuery
->
intervalTime
&&
prevPosition
!=
-
1
)
{
int32_t
factor
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
startPos
=
prevPosition
+
factor
;
}
else
{
startPos
=
searchFn
((
char
*
)
primaryKeys
,
pDataBlockInfo
->
rows
,
startKey
,
pQuery
->
order
.
order
);
}
/*
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
*/
if
(
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
primaryKeys
[
startPos
]
>
pNext
Win
->
ekey
)
{
if
(
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
primaryKeys
[
startPos
]
>
pNext
->
ekey
)
{
TSKEY
next
=
primaryKeys
[
startPos
];
pNext
Win
->
ekey
+=
((
next
-
pNextWin
->
ekey
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
Win
->
skey
=
pNextWin
->
ekey
-
pQuery
->
intervalTime
+
1
;
}
else
if
((
!
QUERY_IS_ASC_QUERY
(
pQuery
))
&&
primaryKeys
[
startPos
]
<
pNext
Win
->
skey
)
{
pNext
->
ekey
+=
((
next
-
pNext
->
ekey
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
->
skey
=
pNext
->
ekey
-
pQuery
->
intervalTime
+
1
;
}
else
if
((
!
QUERY_IS_ASC_QUERY
(
pQuery
))
&&
primaryKeys
[
startPos
]
<
pNext
->
skey
)
{
TSKEY
next
=
primaryKeys
[
startPos
];
pNext
Win
->
skey
-=
((
pNextWin
->
skey
-
next
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
Win
->
ekey
=
pNextWin
->
skey
+
pQuery
->
intervalTime
-
1
;
pNext
->
skey
-=
((
pNext
->
skey
-
next
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
->
ekey
=
pNext
->
skey
+
pQuery
->
intervalTime
-
1
;
}
return
startPos
;
}
static
TSKEY
reviseWindowEkey
(
SQuery
*
pQuery
,
STimeWindow
*
pWindow
)
{
static
FORCE_INLINE
TSKEY
reviseWindowEkey
(
SQuery
*
pQuery
,
STimeWindow
*
pWindow
)
{
TSKEY
ekey
=
-
1
;
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
ekey
=
pWindow
->
ekey
;
...
...
@@ -924,20 +942,23 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
return
;
}
int32_t
forwardStep
=
0
;
int32_t
startPos
=
pQuery
->
pos
;
if
(
hasTimeWindow
)
{
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
int32_t
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
pQuery
->
pos
,
ekey
,
searchFn
,
true
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
pQuery
->
pos
,
ekey
,
searchFn
,
true
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
pQuery
->
p
os
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
startP
os
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
}
int32_t
index
=
pWindowResInfo
->
curIndex
;
STimeWindow
nextWin
=
win
;
while
(
1
)
{
int32_t
startPos
=
getNextQualifiedWindow
(
pRuntimeEnv
,
&
nextWin
,
pDataBlockInfo
,
tsCols
,
searchFn
);
int32_t
prevEndPos
=
(
forwardStep
-
1
)
*
step
+
startPos
;
startPos
=
getNextQualifiedWindow
(
pRuntimeEnv
,
&
nextWin
,
pDataBlockInfo
,
tsCols
,
searchFn
,
prevEndPos
);
if
(
startPos
<
0
)
{
break
;
}
...
...
@@ -953,7 +974,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
nextWin
);
int32_t
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
startPos
,
ekey
,
searchFn
,
true
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
startPos
,
ekey
,
searchFn
,
true
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
nextWin
,
startPos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
...
...
@@ -1224,7 +1245,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
int32_t
index
=
pWindowResInfo
->
curIndex
;
while
(
1
)
{
getNextTimeWindow
(
pQuery
,
&
nextWin
);
GET_NEXT_TIMEWINDOW
(
pQuery
,
&
nextWin
);
if
(
/*pWindowResInfo->startTime > nextWin.skey ||*/
(
nextWin
.
skey
>
pQuery
->
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
nextWin
.
skey
<
pQuery
->
window
.
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
...
...
@@ -1236,7 +1257,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// null data, failed to allocate more memory buffer
bool
hasTimeWindow
=
false
;
hasTimeWindow
=
false
;
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
nextWin
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
break
;
}
...
...
@@ -1459,11 +1480,13 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
}
}
static
FORCE_INLINE
void
setWindowResultInfo
(
SResultInfo
*
pResultInfo
,
SQuery
*
pQuery
,
bool
isStableQuery
)
{
static
FORCE_INLINE
void
setWindowResultInfo
(
SResultInfo
*
pResultInfo
,
SQuery
*
pQuery
,
bool
isStableQuery
,
char
*
buf
)
{
char
*
p
=
buf
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
assert
(
pQuery
->
pSelectExpr
[
i
].
interBytes
<=
DEFAULT_INTERN_BUF_PAGE_SIZE
);
setResultInfoBuf
(
&
pResultInfo
[
i
],
pQuery
->
pSelectExpr
[
i
].
interBytes
,
isStableQuery
);
int32_t
size
=
pQuery
->
pSelectExpr
[
i
].
interBytes
;
setResultInfoBuf
(
&
pResultInfo
[
i
],
size
,
isStableQuery
,
p
);
p
+=
size
;
}
}
...
...
@@ -1542,8 +1565,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
}
}
char
*
buf
=
calloc
(
1
,
pRuntimeEnv
->
interBufSize
);
// set the intermediate result output buffer
setWindowResultInfo
(
pRuntimeEnv
->
resultInfo
,
pQuery
,
pRuntimeEnv
->
stableQuery
);
setWindowResultInfo
(
pRuntimeEnv
->
resultInfo
,
pQuery
,
pRuntimeEnv
->
stableQuery
,
buf
);
// if it is group by normal column, do not set output buffer, the output buffer is pResult
if
(
!
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
&&
!
pRuntimeEnv
->
stableQuery
)
{
...
...
@@ -1581,9 +1606,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tVariantDestroy
(
&
pCtx
->
tag
);
tfree
(
pCtx
->
tagInfo
.
pTagCtxList
);
tfree
(
pRuntimeEnv
->
resultInfo
[
i
].
interResultBuf
);
}
tfree
(
pRuntimeEnv
->
resultInfo
[
0
].
interResultBuf
);
tfree
(
pRuntimeEnv
->
resultInfo
);
tfree
(
pRuntimeEnv
->
pCtx
);
}
...
...
@@ -2017,14 +2042,6 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
return
true
;
}
// previous time window may not be of the same size of pQuery->intervalTime
static
void
getNextTimeWindow
(
SQuery
*
pQuery
,
STimeWindow
*
pTimeWindow
)
{
int32_t
factor
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
pTimeWindow
->
skey
+=
(
pQuery
->
slidingTime
*
factor
);
pTimeWindow
->
ekey
=
pTimeWindow
->
skey
+
(
pQuery
->
intervalTime
-
1
);
}
SArray
*
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pQueryHandle
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -2737,7 +2754,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
setWindowResultInfo
(
pResultInfo
,
pQuery
,
pRuntimeEnv
->
stableQuery
);
char
*
buf
=
calloc
(
1
,
pRuntimeEnv
->
interBufSize
);
setWindowResultInfo
(
pResultInfo
,
pQuery
,
pRuntimeEnv
->
stableQuery
,
buf
);
resetMergeResultBuf
(
pQuery
,
pRuntimeEnv
->
pCtx
,
pResultInfo
);
int64_t
lastTimestamp
=
-
1
;
...
...
@@ -2823,11 +2841,9 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tfree
(
pTree
);
pQInfo
->
offset
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
tfree
(
pResultInfo
[
i
].
interResultBuf
);
}
tfree
(
pResultInfo
);
tfree
(
buf
);
return
pQInfo
->
numOfGroupResultPages
;
}
...
...
@@ -2980,14 +2996,16 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
SPosInfo
*
posInfo
)
{
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
SPosInfo
*
posInfo
,
size_t
interBufSize
)
{
int32_t
numOfCols
=
pQuery
->
numOfOutput
;
pResultRow
->
resultInfo
=
calloc
((
size_t
)
numOfCols
,
sizeof
(
SResultInfo
));
pResultRow
->
pos
=
*
posInfo
;
char
*
buf
=
calloc
(
1
,
interBufSize
);
// set the intermediate result output buffer
setWindowResultInfo
(
pResultRow
->
resultInfo
,
pQuery
,
isSTableQuery
);
setWindowResultInfo
(
pResultRow
->
resultInfo
,
pQuery
,
isSTableQuery
,
buf
);
}
void
resetCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
...
...
@@ -3365,7 +3383,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
// set more initial size of interval/groupby query
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
)
{
int32_t
initialSize
=
20
;
int32_t
initialSize
=
16
;
int32_t
initialThreshold
=
100
;
initWindowResInfo
(
&
pTableQueryInfo
->
windowResInfo
,
pRuntimeEnv
,
initialSize
,
initialThreshold
,
TSDB_DATA_TYPE_INT
);
}
else
{
// in other aggregate query, do not initialize the windowResInfo
...
...
@@ -4013,7 +4031,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
}
STimeWindow
tw
=
win
;
getNextTimeWindow
(
pQuery
,
&
tw
);
GET_NEXT_TIMEWINDOW
(
pQuery
,
&
tw
);
if
(
pQuery
->
limit
.
offset
==
0
)
{
if
((
tw
.
skey
<=
blockInfo
.
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
...
...
@@ -4025,7 +4043,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
tw
=
win
;
int32_t
startPos
=
getNextQualifiedWindow
(
pRuntimeEnv
,
&
tw
,
&
blockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
);
getNextQualifiedWindow
(
pRuntimeEnv
,
&
tw
,
&
blockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
,
-
1
);
assert
(
startPos
>=
0
);
// set the abort info
...
...
@@ -4068,7 +4086,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
tw
=
win
;
int32_t
startPos
=
getNextQualifiedWindow
(
pRuntimeEnv
,
&
tw
,
&
blockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
);
getNextQualifiedWindow
(
pRuntimeEnv
,
&
tw
,
&
blockInfo
,
pColInfoData
->
pData
,
binarySearchForKey
,
-
1
);
assert
(
startPos
>=
0
);
// set the abort info
...
...
@@ -4197,7 +4215,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type
=
TSDB_DATA_TYPE_INT
;
// group id
}
initWindowResInfo
(
&
pRuntimeEnv
->
windowResInfo
,
pRuntimeEnv
,
51
2
,
4096
,
type
);
initWindowResInfo
(
&
pRuntimeEnv
->
windowResInfo
,
pRuntimeEnv
,
3
2
,
4096
,
type
);
}
}
else
if
(
pRuntimeEnv
->
groupbyNormalCol
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
...
...
@@ -5736,7 +5754,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STimeWindow
window
=
pQueryMsg
->
window
;
taosArraySort
(
pTableIdList
,
compareTableIdInfo
);
// TODO optimize the STableQueryInfo malloc strategy
pQInfo
->
runtimeEnv
.
interBufSize
=
getOutputInterResultBufSize
(
pQuery
);
pQInfo
->
pBuf
=
calloc
(
pTableGroupInfo
->
numOfTables
,
sizeof
(
STableQueryInfo
));
int32_t
index
=
0
;
...
...
src/query/src/qUtil.c
浏览文件 @
f95d75d1
...
...
@@ -17,15 +17,24 @@
#include "hash.h"
#include "taosmsg.h"
#include "qextbuffer.h"
#include "ttime.h"
#include "qfill.h"
#include "ttime.h"
#include "qExecutor.h"
#include "qUtil.h"
int32_t
getOutputInterResultBufSize
(
SQuery
*
pQuery
)
{
int32_t
size
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
assert
(
pQuery
->
pSelectExpr
[
i
].
interBytes
<=
DEFAULT_INTERN_BUF_PAGE_SIZE
);
size
+=
pQuery
->
pSelectExpr
[
i
].
interBytes
;
}
assert
(
size
>
0
);
return
size
;
}
int32_t
initWindowResInfo
(
SWindowResInfo
*
pWindowResInfo
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
size
,
int32_t
threshold
,
int16_t
type
)
{
pWindowResInfo
->
capacity
=
size
;
...
...
@@ -43,7 +52,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo
->
pResult
=
calloc
(
threshold
,
sizeof
(
SWindowResult
));
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
capacity
;
++
i
)
{
SPosInfo
posInfo
=
{
-
1
,
-
1
};
createQueryResultInfo
(
pRuntimeEnv
->
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
posInfo
);
createQueryResultInfo
(
pRuntimeEnv
->
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
posInfo
,
pRuntimeEnv
->
interBufSize
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -54,11 +63,7 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
return
;
}
// TODO opt malloc strategy
for
(
int32_t
i
=
0
;
i
<
nOutputCols
;
++
i
)
{
free
(
pWindowRes
->
resultInfo
[
i
].
interResultBuf
);
}
free
(
pWindowRes
->
resultInfo
[
0
].
interResultBuf
);
free
(
pWindowRes
->
resultInfo
);
}
...
...
@@ -241,10 +246,9 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
}
pWindowRes
->
numOfRows
=
0
;
// pWindowRes->nAlloc = 0;
pWindowRes
->
pos
=
(
SPosInfo
){
-
1
,
-
1
};
pWindowRes
->
status
.
closed
=
false
;
pWindowRes
->
window
=
(
STimeWindow
){
0
,
0
}
;
pWindowRes
->
window
=
TSWINDOW_INITIALIZER
;
}
/**
...
...
@@ -254,7 +258,6 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
*/
void
copyTimeWindowResBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
dst
,
const
SWindowResult
*
src
)
{
dst
->
numOfRows
=
src
->
numOfRows
;
// dst->nAlloc = src->nAlloc;
dst
->
window
=
src
->
window
;
dst
->
status
=
src
->
status
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录