Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
140e15bc
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
140e15bc
编写于
6月 13, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: merge interval operator optimization and explain analyze multimerge operator
上级
ed580475
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
42 addition
and
97 deletion
+42
-97
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+1
-0
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+40
-96
未找到文件。
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
140e15bc
...
...
@@ -3157,6 +3157,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
}
// handle data in cache situation
// bool tsdbNextDataBlock(tsdbReaderT pHandle, uint64_t uid)
bool
tsdbNextDataBlock
(
tsdbReaderT
pHandle
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
(
STsdbReadHandle
*
)
pHandle
;
...
...
source/libs/command/src/explain.c
浏览文件 @
140e15bc
...
...
@@ -1036,7 +1036,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if
(
EXPLAIN_MODE_ANALYZE
==
ctx
->
mode
)
{
// sort key
EXPLAIN_ROW_NEW
(
level
+
1
,
"
Sort
Key: "
);
EXPLAIN_ROW_NEW
(
level
+
1
,
"
Merge
Key: "
);
if
(
pResNode
->
pExecInfo
)
{
for
(
int32_t
i
=
0
;
i
<
LIST_LENGTH
(
pMergeNode
->
pMergeKeys
);
++
i
)
{
SOrderByExprNode
*
ptn
=
nodesListGetNode
(
pMergeNode
->
pMergeKeys
,
i
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
140e15bc
...
...
@@ -3195,7 +3195,6 @@ _error:
typedef
struct
SMergeIntervalAggOperatorInfo
{
SIntervalAggOperatorInfo
intervalAggOperatorInfo
;
SHashObj
*
groupIntervalHash
;
bool
hasGroupId
;
uint64_t
groupId
;
SSDataBlock
*
prefetchedBlock
;
...
...
@@ -3204,39 +3203,24 @@ typedef struct SMergeIntervalAggOperatorInfo {
void
destroyMergeIntervalOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
(
SMergeIntervalAggOperatorInfo
*
)
param
;
taosHashCleanup
(
miaInfo
->
groupIntervalHash
);
destroyIntervalOperatorInfo
(
&
miaInfo
->
intervalAggOperatorInfo
,
numOfOutput
);
}
static
int32_t
outputPrevIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
STimeWindow
*
newWin
)
{
static
int32_t
outputMergeIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
TSKEY
wstartTs
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
bool
ascScan
=
(
iaInfo
->
order
==
TSDB_ORDER_ASC
);
STimeWindow
*
prevWin
=
taosHashGet
(
miaInfo
->
groupIntervalHash
,
&
tableGroupId
,
sizeof
(
tableGroupId
));
if
(
prevWin
==
NULL
)
{
taosHashPut
(
miaInfo
->
groupIntervalHash
,
&
tableGroupId
,
sizeof
(
tableGroupId
),
newWin
,
sizeof
(
STimeWindow
));
return
0
;
}
if
(
newWin
==
NULL
||
(
ascScan
&&
newWin
->
skey
>
prevWin
->
ekey
||
(
!
ascScan
)
&&
newWin
->
skey
<
prevWin
->
ekey
)
)
{
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
prevWin
->
skey
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
wstartTs
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
iaInfo
->
binfo
.
pCtx
,
pOperatorInfo
->
pExpr
,
pOperatorInfo
->
numOfExprs
,
iaInfo
->
binfo
.
rowCellInfoOffset
,
pResultBlock
,
pTaskInfo
);
taosHashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
if
(
newWin
==
NULL
)
{
taosHashRemove
(
miaInfo
->
groupIntervalHash
,
&
tableGroupId
,
sizeof
(
tableGroupId
));
}
else
{
taosHashPut
(
miaInfo
->
groupIntervalHash
,
&
tableGroupId
,
sizeof
(
tableGroupId
),
newWin
,
sizeof
(
STimeWindow
));
}
}
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
iaInfo
->
binfo
.
pCtx
,
pOperatorInfo
->
pExpr
,
pOperatorInfo
->
numOfExprs
,
iaInfo
->
binfo
.
rowCellInfoOffset
,
pResultBlock
,
pTaskInfo
);
taosHashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
return
0
;
}
...
...
@@ -3252,13 +3236,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
int32_t
numOfOutput
=
pOperatorInfo
->
numOfExprs
;
int64_t
*
tsCols
=
extractTsCol
(
pBlock
,
iaInfo
);
uint64_t
tableGroupId
=
pBlock
->
info
.
groupId
;
bool
ascScan
=
(
iaInfo
->
order
==
TSDB_ORDER_ASC
);
TSKEY
blockStartTs
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
iaInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
blockStartTs
,
&
iaInfo
->
interval
,
iaInfo
->
interval
.
precision
,
&
iaInfo
->
win
);
STimeWindow
win
;
win
.
skey
=
blockStartTs
;
win
.
ekey
=
taosTimeAdd
(
win
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
//TODO: remove the hash table usage (groupid + winkey => result row position)
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
iaInfo
->
binfo
.
pCtx
,
numOfOutput
,
iaInfo
->
binfo
.
rowCellInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
...
...
@@ -3266,72 +3251,39 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
TSKEY
ekey
=
ascScan
?
win
.
ekey
:
win
.
skey
;
int32_t
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
order
);
ASSERT
(
forwardRows
>
0
);
// prev time window not interpolation yet.
if
(
iaInfo
->
timeWindowInterpo
)
{
SResultRowPosition
pos
=
addToOpenWindowList
(
pResultRowInfo
,
pResult
);
doInterpUnclosedTimeWindow
(
pOperatorInfo
,
numOfOutput
,
pResultRowInfo
,
pBlock
,
scanFlag
,
tsCols
,
&
pos
);
// restore current time window
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
iaInfo
->
binfo
.
pCtx
,
numOfOutput
,
iaInfo
->
binfo
.
rowCellInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
// window start key interpolation
doWindowBorderInterpolation
(
iaInfo
,
pBlock
,
numOfOutput
,
iaInfo
->
binfo
.
pCtx
,
pResult
,
&
win
,
startPos
,
forwardRows
);
}
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
iaInfo
->
binfo
.
pCtx
,
&
win
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
doCloseWindow
(
pResultRowInfo
,
iaInfo
,
pResult
);
// output previous interval results after this interval (&win) is closed
outputPrevIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
&
win
);
STimeWindow
nextWin
=
win
;
TSKEY
currTs
=
blockStartTs
;
TSKEY
currPos
=
startPos
;
STimeWindow
currWin
=
win
;
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
iaInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
iaInfo
->
order
);
if
(
startPos
<
0
)
{
++
currPos
;
if
(
currPos
>=
pBlock
->
info
.
rows
)
{
break
;
}
// null data, failed to allocate more memory buffer
int32_t
code
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
nextWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
iaInfo
->
binfo
.
pCtx
,
numOfOutput
,
iaInfo
->
binfo
.
rowCellInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
if
(
tsCols
[
currPos
]
==
currTs
)
{
continue
;
}
else
{
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
iaInfo
->
binfo
.
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
outputMergeIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
currTs
=
tsCols
[
currPos
];
currWin
.
skey
=
currTs
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
startPos
=
currPos
;
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
iaInfo
->
binfo
.
pCtx
,
numOfOutput
,
iaInfo
->
binfo
.
rowCellInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
order
);
// window start(end) key interpolation
doWindowBorderInterpolation
(
iaInfo
,
pBlock
,
numOfOutput
,
iaInfo
->
binfo
.
pCtx
,
pResult
,
&
nextWin
,
startPos
,
forwardRows
);
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
iaInfo
->
binfo
.
pCtx
,
&
nextWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
doCloseWindow
(
pResultRowInfo
,
iaInfo
,
pResult
);
// output previous interval results after this interval (&nextWin) is closed
outputPrevIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
&
nextWin
);
}
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
iaInfo
->
binfo
.
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
if
(
iaInfo
->
timeWindowInterpo
)
{
saveDataBlockLastRow
(
iaInfo
->
pPrevValues
,
pBlock
,
iaInfo
->
pInterpCols
);
}
outputMergeIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
}
static
SSDataBlock
*
doMergeIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -3385,13 +3337,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
}
pRes
->
info
.
groupId
=
miaInfo
->
groupId
;
}
else
{
void
*
p
=
taosHashIterate
(
miaInfo
->
groupIntervalHash
,
NULL
);
if
(
p
!=
NULL
)
{
size_t
len
=
0
;
uint64_t
*
pKey
=
taosHashGetKey
(
p
,
&
len
);
outputPrevIntervalResult
(
pOperator
,
*
pKey
,
pRes
,
NULL
);
}
}
if
(
pRes
->
info
.
rows
==
0
)
{
...
...
@@ -3421,7 +3366,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
iaInfo
->
execModel
=
pTaskInfo
->
execModel
;
iaInfo
->
primaryTsIndex
=
primaryTsSlotId
;
miaInfo
->
groupIntervalHash
=
taosHashInit
(
10
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_NO_LOCK
);
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
pOperator
,
4096
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录