Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b8a1a7f1
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b8a1a7f1
编写于
8月 23, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(stream): state window update
上级
6dd250c2
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
35 addition
and
30 deletion
+35
-30
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+11
-4
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+23
-25
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
b8a1a7f1
...
...
@@ -1016,7 +1016,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
);
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
);
bool
isDeletedWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SAggSupporter
*
pSup
);
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
);
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
int32_t
uidCol
,
uint64_t
*
pID
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
b8a1a7f1
...
...
@@ -1086,7 +1086,10 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData
*
pDestStartCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestEndCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestUidCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int32_t
dummy
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
rows
;
i
++
)
{
uint64_t
groupId
=
getGroupId
(
pInfo
->
pTableScanOp
,
uidCol
[
i
]);
...
...
@@ -1100,9 +1103,13 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SResultWindowInfo
*
pEndWin
=
getCurSessionWindow
(
pInfo
->
sessionSup
.
pStreamAggSup
,
endData
[
i
],
endData
[
i
],
groupId
,
0
,
&
dummy
);
ASSERT
(
pEndWin
);
TSKEY
ts
=
INT64_MIN
;
colDataAppend
(
pDestStartCol
,
i
,
(
const
char
*
)
&
pStartWin
->
win
.
skey
,
false
);
colDataAppend
(
pDestEndCol
,
i
,
(
const
char
*
)
&
pEndWin
->
win
.
ekey
,
false
);
colDataAppendNULL
(
pDestUidCol
,
i
);
colDataAppend
(
pDestGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
colDataAppendNULL
(
pDestCalStartTsCol
,
i
);
colDataAppendNULL
(
pDestCalEndTsCol
,
i
);
pDestBlock
->
info
.
rows
++
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1157,13 +1164,13 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return
code
;
}
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
)
{
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
int32_t
uidCol
,
uint64_t
*
pID
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
uidCol
);
colDataAppend
(
pStartTsCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
pStartTs
,
false
);
colDataAppend
(
pEndTsCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
pEndTs
,
false
);
colDataAppend
(
pUidCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
p
Uid
,
false
);
colDataAppend
(
pUidCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
p
ID
,
false
);
pBlock
->
info
.
rows
++
;
}
...
...
@@ -1190,7 +1197,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool
closedWin
=
isClosed
&&
isSignleIntervalWindow
(
pInfo
)
&&
isDeletedWindow
(
&
win
,
pBlock
->
info
.
groupId
,
pInfo
->
sessionSup
.
pIntervalAggSup
);
if
((
update
||
closedWin
)
&&
out
)
{
appendOneRow
(
pInfo
->
pUpdateDataRes
,
tsCol
+
rowId
,
tsCol
+
rowId
,
&
pBlock
->
info
.
uid
);
appendOneRow
(
pInfo
->
pUpdateDataRes
,
tsCol
+
rowId
,
tsCol
+
rowId
,
UID_COLUMN_INDEX
,
&
pBlock
->
info
.
uid
);
}
}
if
(
out
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
b8a1a7f1
...
...
@@ -3951,11 +3951,13 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
int32_t
numOfOutput
,
int64_t
gap
,
SArray
*
result
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
SColumnInfoData
*
pGpDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
gpCols
=
(
uint64_t
*
)
pGpDataInfo
->
pData
;
int32_t
step
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
pBlock
->
info
.
groupId
,
gap
,
&
winIndex
);
getCurSessionWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
gpCols
[
i
]
,
gap
,
&
winIndex
);
if
(
!
pCurWin
||
pCurWin
->
pos
.
pageId
==
-
1
)
{
// window has been closed.
step
=
1
;
...
...
@@ -4168,13 +4170,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
0
,
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pOperator
->
exprSupp
.
numOfExprs
,
0
,
pWins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
0
,
pChildOp
->
exprSupp
.
numOfExprs
,
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pChildOp
->
exprSupp
.
numOfExprs
,
0
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pBlock
->
info
.
groupId
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
);
}
...
...
@@ -4285,21 +4287,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"sem
s
session"
);
printDataBlock
(
pBInfo
->
pRes
,
"sem
i
session"
);
return
pBInfo
->
pRes
;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
&&
!
pInfo
->
returnDelete
)
{
pInfo
->
returnDelete
=
true
;
printDataBlock
(
pInfo
->
pDelRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pDelRes
,
"sem
i
session"
);
return
pInfo
->
pDelRes
;
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
i
session"
);
return
pInfo
->
pUpdateRes
;
}
// semi interval operator clear disk buffer
...
...
@@ -4318,13 +4320,14 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock
(
pInfo
->
pUpdateRes
);
break
;
}
printDataBlock
(
pBlock
,
"semi session recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
pSup
,
pBlock
,
0
,
pSup
->
numOfExprs
,
0
,
pWins
);
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
pSup
,
pBlock
,
START_TS_COLUMN_INDEX
,
pSup
->
numOfExprs
,
0
,
pWins
);
removeSessionResults
(
pStUpdated
,
pWins
);
taosArrayDestroy
(
pWins
);
copy
UpdateDataBlock
(
pInfo
->
pUpdateRes
,
pBlock
,
pInfo
->
primaryTsIndex
);
copy
DataBlock
(
pInfo
->
pUpdateRes
,
pBlock
);
break
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
// gap must be 0
...
...
@@ -4364,21 +4367,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"sem
s
session"
);
printDataBlock
(
pBInfo
->
pRes
,
"sem
i
session"
);
return
pBInfo
->
pRes
;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
&&
!
pInfo
->
returnDelete
)
{
pInfo
->
returnDelete
=
true
;
printDataBlock
(
pInfo
->
pDelRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pDelRes
,
"sem
i
session"
);
return
pInfo
->
pDelRes
;
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
i
session"
);
return
pInfo
->
pUpdateRes
;
}
...
...
@@ -4400,8 +4403,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator
->
name
=
"StreamSessionFinalAggOperator"
;
}
else
{
pInfo
->
isFinal
=
false
;
pInfo
->
pUpdateRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
pUpdateRes
->
info
.
type
=
STREAM_CLEAR
;
pInfo
->
pUpdateRes
=
createSpecialDataBlock
(
STREAM_CLEAR
);
blockDataEnsureCapacity
(
pInfo
->
pUpdateRes
,
128
);
pOperator
->
name
=
"StreamSessionSemiAggOperator"
;
pOperator
->
fpSet
=
...
...
@@ -4616,23 +4618,20 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
}
static
void
doClearStateWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
SColumn
*
pCol
,
int32_t
keyIndex
,
SHashObj
*
pSeUpdated
,
SHashObj
*
pSeDeleted
)
{
SColumnInfoData
*
pTsColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
SColumnInfoData
*
p
KeyColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
keyIndex
);
SHashObj
*
pSeUpdated
,
SHashObj
*
pSeDeleted
)
{
SColumnInfoData
*
pTsColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
p
GroupColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
TSKEY
*
tsCol
=
(
TSKEY
*
)
pTsColInfo
->
pData
;
bool
allEqual
=
false
;
int32_t
step
=
1
;
uint64_t
groupId
=
pBlock
->
info
.
groupId
;
uint64_t
*
gpCol
=
(
uint64_t
*
)
pGroupColInfo
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
,
tsCol
[
i
],
g
roupId
,
&
winIndex
);
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
,
tsCol
[
i
],
g
pCol
[
i
]
,
&
winIndex
);
if
(
!
pCurWin
)
{
continue
;
}
step
=
updateStateWindowInfo
(
pAggSup
->
pCurWins
,
winIndex
,
tsCol
,
groupId
,
pKeyColInfo
,
pBlock
->
info
.
rows
,
i
,
&
allEqual
,
pSeDeleted
);
ASSERT
(
isTsInWindow
(
pCurWin
,
tsCol
[
i
])
||
isEqualStateKey
(
pCurWin
,
pKeyData
));
updateSessionWindowInfo
(
&
pCurWin
->
winInfo
,
tsCol
,
NULL
,
0
,
pBlock
->
info
.
rows
,
i
,
0
,
NULL
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
destroyStateWinInfo
);
}
...
...
@@ -4675,7 +4674,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pStDeleted
);
if
(
!
allEqual
)
{
appendOneRow
(
pAggSup
->
pScanBlock
,
&
pCurWin
->
winInfo
.
win
.
skey
,
&
pCurWin
->
winInfo
.
win
.
ekey
,
&
groupId
);
GROUPID_COLUMN_INDEX
,
&
groupId
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
destroyStateWinInfo
);
continue
;
...
...
@@ -4730,8 +4729,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
printDataBlock
(
pBlock
,
"single state recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
stateCol
,
pInfo
->
stateCol
.
slotId
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录