Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
02994550
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
02994550
编写于
7月 02, 2022
作者:
L
liuyao
提交者:
GitHub
7月 02, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14446 from taosdata/feature/TD-16874
ci(stream): add ignore expired data
上级
3a904846
c9f133a5
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
213 addition
and
21 deletion
+213
-21
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-4
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+4
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+19
-14
tests/script/tsim/stream/distributeInterval0.sim
tests/script/tsim/stream/distributeInterval0.sim
+1
-1
tests/script/tsim/stream/distributeSession0.sim
tests/script/tsim/stream/distributeSession0.sim
+24
-0
tests/script/tsim/stream/ignoreExpiredData.sim
tests/script/tsim/stream/ignoreExpiredData.sim
+161
-0
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
02994550
...
...
@@ -428,7 +428,7 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindowAggSupp
twAggSup
;
bool
invertible
;
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool
ignore
CloseWindow
;
bool
ignore
ExpiredData
;
}
SIntervalAggOperatorInfo
;
typedef
struct
SStreamFinalIntervalOperatorInfo
{
...
...
@@ -450,7 +450,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SArray
*
pPullWins
;
// SPullWindowInfo
int32_t
pullIndex
;
SSDataBlock
*
pPullDataRes
;
bool
ignore
CloseWindow
;
bool
ignore
ExpiredData
;
}
SStreamFinalIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
...
...
@@ -588,7 +588,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SArray
*
pChildren
;
// cache for children's result; final stream operator
SPhysiNode
*
pPhyNode
;
// create new child
bool
isFinal
;
bool
ignore
CloseWindow
;
bool
ignore
ExpiredData
;
}
SStreamSessionAggOperatorInfo
;
typedef
struct
STimeSliceOperatorInfo
{
...
...
@@ -632,7 +632,7 @@ typedef struct SStreamStateAggOperatorInfo {
void
*
pDelIterator
;
SArray
*
pScanWindow
;
SArray
*
pChildren
;
// cache for children's result;
bool
ignore
CloseWindow
;
bool
ignore
ExpiredData
;
}
SStreamStateAggOperatorInfo
;
typedef
struct
SSortedMergeOperatorInfo
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
02994550
...
...
@@ -1031,10 +1031,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
}
else
{
if
(
isStateWindow
(
pInfo
)
&&
taosArrayGetSize
(
pInfo
->
sessionSup
.
pStreamAggSup
->
pScanWindow
)
>
0
)
{
if
(
isStateWindow
(
pInfo
))
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
pInfo
->
updateResIndex
=
pInfo
->
pUpdateRes
->
info
.
rows
;
prepareDataScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
);
if
(
!
prepareDataScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
))
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
}
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER
)
{
SSDataBlock
*
pSDB
=
doDataScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
02994550
...
...
@@ -838,7 +838,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
&
pInfo
->
win
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
(
!
pInfo
->
ignore
CloseWindow
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
{
if
(
!
pInfo
->
ignore
ExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
{
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
...
@@ -871,7 +871,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
win
,
startPos
,
forwardRows
,
pSup
);
}
if
(
!
pInfo
->
ignore
CloseWindow
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
{
if
(
!
pInfo
->
ignore
ExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
{
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
order
);
...
...
@@ -886,7 +886,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
(
startPos
<
0
)
{
break
;
}
if
(
pInfo
->
ignore
CloseWindow
&&
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
))
{
if
(
pInfo
->
ignore
ExpiredData
&&
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
))
{
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
order
);
...
...
@@ -1535,7 +1535,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo
->
interval
=
*
pInterval
;
pInfo
->
execModel
=
pTaskInfo
->
execModel
;
pInfo
->
twAggSup
=
*
pTwAggSupp
;
pInfo
->
ignore
CloseWindow
=
false
;
pInfo
->
ignore
ExpiredData
=
pPhyNode
->
window
.
igExpired
;
if
(
pPhyNode
->
window
.
pExprs
!=
NULL
)
{
int32_t
numOfScalar
=
0
;
...
...
@@ -2292,7 +2292,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
pInfo
->
interval
.
precision
,
NULL
);
while
(
1
)
{
bool
isClosed
=
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
);
if
(
pInfo
->
ignore
CloseWindow
&&
isClosed
)
{
if
(
pInfo
->
ignore
ExpiredData
&&
isClosed
)
{
startPos
=
getNexWindowPos
(
&
pInfo
->
interval
,
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
&
nextWin
);
if
(
startPos
<
0
)
{
break
;
...
...
@@ -2710,7 +2710,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pInfo
->
pPullDataMap
=
taosHashInit
(
64
,
hashFn
,
false
,
HASH_NO_LOCK
);
pInfo
->
pPullDataRes
=
createPullDataBlock
();
pInfo
->
ignore
CloseWindow
=
false
;
pInfo
->
ignore
ExpiredData
=
pIntervalPhyNode
->
window
.
igExpired
;
pOperator
->
operatorType
=
pPhyNode
->
type
;
pOperator
->
blocking
=
true
;
...
...
@@ -2857,7 +2857,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo
->
pChildren
=
NULL
;
pInfo
->
isFinal
=
false
;
pInfo
->
pPhyNode
=
pPhyNode
;
pInfo
->
ignore
CloseWindow
=
false
;
pInfo
->
ignore
ExpiredData
=
pSessionNode
->
window
.
igExpired
;
pOperator
->
name
=
"StreamSessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
;
...
...
@@ -3133,7 +3133,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
for
(
int32_t
i
=
0
;
i
<
pSDataBlock
->
info
.
rows
;)
{
if
(
pInfo
->
ignore
CloseWindow
&&
isOverdue
(
endTsCols
[
i
],
&
pInfo
->
twAggSup
))
{
if
(
pInfo
->
ignore
ExpiredData
&&
isOverdue
(
endTsCols
[
i
],
&
pInfo
->
twAggSup
))
{
i
++
;
continue
;
}
...
...
@@ -3413,8 +3413,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForSession
,
pInfo
->
ignore
CloseWindow
);
closeChildSessionWindow
(
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
,
pInfo
->
ignore
CloseWindow
);
getResWinForSession
,
pInfo
->
ignore
ExpiredData
);
closeChildSessionWindow
(
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
,
pInfo
->
ignore
ExpiredData
);
copyUpdateResult
(
pStUpdated
,
pUpdated
);
taosHashCleanup
(
pStUpdated
);
...
...
@@ -3822,7 +3822,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
SColumnInfoData
*
pKeyColInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
stateCol
.
slotId
);
for
(
int32_t
i
=
0
;
i
<
pSDataBlock
->
info
.
rows
;
i
+=
winRows
)
{
if
(
pInfo
->
ignore
CloseWindow
&&
isOverdue
(
tsCols
[
i
],
&
pInfo
->
twAggSup
))
{
if
(
pInfo
->
ignore
ExpiredData
&&
isOverdue
(
tsCols
[
i
],
&
pInfo
->
twAggSup
))
{
i
++
;
continue
;
}
...
...
@@ -3866,12 +3866,14 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteDataBlock
(
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single state"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
printDataBlock
(
pBInfo
->
pRes
,
"single state"
);
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
...
...
@@ -3884,6 +3886,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if
(
pBlock
==
NULL
)
{
break
;
}
printDataBlock
(
pBlock
,
"single state recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
stateCol
,
pInfo
->
stateCol
.
slotId
,
...
...
@@ -3903,8 +3906,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForState
,
pInfo
->
ignore
CloseWindow
);
closeChildSessionWindow
(
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
,
pInfo
->
ignore
CloseWindow
);
getResWinForState
,
pInfo
->
ignore
ExpiredData
);
closeChildSessionWindow
(
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
,
pInfo
->
ignore
ExpiredData
);
copyUpdateResult
(
pSeUpdated
,
pUpdated
);
taosHashCleanup
(
pSeUpdated
);
...
...
@@ -3914,9 +3917,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildDeleteDataBlock
(
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single state"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
printDataBlock
(
pBInfo
->
pRes
,
"single state"
);
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
...
...
@@ -3978,7 +3983,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE
;
blockDataEnsureCapacity
(
pInfo
->
pDelRes
,
64
);
pInfo
->
pChildren
=
NULL
;
pInfo
->
ignore
CloseWindow
=
false
;
pInfo
->
ignore
ExpiredData
=
pStateNode
->
window
.
igExpired
;
pOperator
->
name
=
"StreamStateAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
;
...
...
tests/script/tsim/stream/distributeInterval0.sim
浏览文件 @
02994550
...
...
@@ -83,9 +83,9 @@ sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0)
$loop_count = 0
loop1:
sleep 300
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
...
...
tests/script/tsim/stream/distributeSession0.sim
浏览文件 @
02994550
...
...
@@ -10,6 +10,30 @@ sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
...
...
tests/script/tsim/stream/ignoreExpiredData.sim
0 → 100644
浏览文件 @
02994550
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED into streamt1 as select _wstartts, count(*) c1, sum(a) c3 from t1 interval(10s);
sql create stream streams2 trigger at_once IGNORE EXPIRED into streamt2 as select _wstartts, count(*) c1, sum(a) c3 from t1 session(ts,10s);
sql create stream streams3 trigger at_once IGNORE EXPIRED into streamt3 as select _wstartts, count(*) c1, sum(a) c3 from t1 state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.1);
sql insert into t1 values(1648791233002,2,2,3,2.1);
sql insert into t1 values(1648791243003,2,2,3,3.1);
sql insert into t1 values(1648791200000,4,2,3,4.1);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop1
endi
$loop_count = 0
loop2:
sleep 300
sql select * from streamt2;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop2
endi
$loop_count = 0
loop3:
sleep 300
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop3
endi
print =============== create database
sql create database test1 vgroups 4
sql show databases
print ======database=$rows
sql use test1
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql create stream stream_t2 trigger at_once IGNORE EXPIRED into streamtST2 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3);
$loop_count = 0
loop4:
sleep 300
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
if $data02 != 1 then
print =====data02=$data02
goto loop4
endi
$loop_count = 0
loop5:
sleep 300
sql select * from streamtST2;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop5
endi
if $data02 != 1 then
print =====data02=$data02
goto loop5
endi
system sh/stop_dnodes.sh
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录