Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3cbd3db8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3cbd3db8
编写于
9月 08, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(stream): stream delete for session&state
上级
78abfdd2
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
781 addition
and
43 deletion
+781
-43
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+44
-43
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+6
-0
tests/script/tsim/stream/deleteSession.sim
tests/script/tsim/stream/deleteSession.sim
+532
-0
tests/script/tsim/stream/deleteState.sim
tests/script/tsim/stream/deleteState.sim
+198
-0
tests/script/tsim/stream/partitionbyColumnInterval.sim
tests/script/tsim/stream/partitionbyColumnInterval.sim
+0
-0
tests/script/tsim/stream/partitionbyColumnSession.sim
tests/script/tsim/stream/partitionbyColumnSession.sim
+0
-0
tests/script/tsim/stream/partitionbyColumnState.sim
tests/script/tsim/stream/partitionbyColumnState.sim
+0
-0
未找到文件。
source/libs/executor/src/groupoperator.c
浏览文件 @
3cbd3db8
...
...
@@ -932,6 +932,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case
STREAM_DELETE_DATA
:
{
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
return
pInfo
->
pDelRes
;
}
break
;
default:
return
pBlock
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
3cbd3db8
...
...
@@ -1424,7 +1424,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
taosHashRemove
(
pUpdatedMap
,
&
winRes
,
sizeof
(
SWinKey
));
}
getNextTimeWindow
(
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
,
&
win
);
}
while
(
win
.
skey
<
tsEnds
[
i
]);
}
while
(
win
.
skey
<
=
tsEnds
[
i
]);
}
}
...
...
@@ -3595,7 +3595,8 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
// don't add new window
SResultWindowInfo
*
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
)
{
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
STimeWindow
searchWin
=
{.
skey
=
startTs
,
.
ekey
=
endTs
};
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
...
...
@@ -3607,7 +3608,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
SResultWindowInfo
*
pWin
=
NULL
;
if
(
index
>=
0
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
);
if
(
isInWindow
(
pWin
,
startTs
,
gap
))
{
if
(
isInWindow
(
pWin
,
startTs
,
gap
)
||
isInTimeWindow
(
&
searchWin
,
pWin
->
win
.
skey
,
gap
)
)
{
*
pIndex
=
index
;
return
pWin
;
}
...
...
@@ -3615,7 +3616,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
if
(
index
+
1
<
size
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
+
1
);
if
(
isInWindow
(
pWin
,
startTs
,
gap
))
{
if
(
isInWindow
(
pWin
,
startTs
,
gap
)
||
isInTimeWindow
(
&
searchWin
,
pWin
->
win
.
skey
,
gap
)
)
{
*
pIndex
=
index
+
1
;
return
pWin
;
}
else
if
(
endTs
!=
INT64_MIN
&&
isInWindow
(
pWin
,
endTs
,
gap
))
{
...
...
@@ -3793,7 +3794,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pCurWin
->
win
,
true
);
compactFunctions
(
pSup
->
pCtx
,
pInfo
->
pDummyCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
taosHashRemove
(
pStUpdated
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
));
if
(
pWinInfo
->
isOutput
)
{
if
(
pWinInfo
->
isOutput
&&
pStDeleted
)
{
SWinKey
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStDeleted
,
&
res
,
sizeof
(
SWinKey
),
&
res
,
sizeof
(
SWinKey
));
pWinInfo
->
isOutput
=
false
;
...
...
@@ -3886,19 +3887,24 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
SColumnInfoData
*
pGroupCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
gpDatas
=
(
uint64_t
*
)
pGroupCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
int32_t
winIndex
=
0
;
while
(
1
)
{
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
startDatas
[
i
],
endDatas
[
i
],
gpDatas
[
i
],
gap
,
&
winIndex
);
if
(
!
pCurWin
)
{
break
;
}
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
startDatas
[
i
],
endDatas
[
i
],
gpDatas
[
i
],
gap
,
&
winIndex
);
if
(
!
pCurWin
)
{
continue
;
}
do
{
SResultWindowInfo
delWin
=
*
pCurWin
;
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
fp
);
if
(
result
)
{
delWin
.
groupId
=
gpDatas
[
i
];
taosArrayPush
(
result
,
&
delWin
);
}
}
if
(
winIndex
>=
taosArrayGetSize
(
pAggSup
->
pCurWins
))
{
break
;
}
pCurWin
=
taosArrayGet
(
pAggSup
->
pCurWins
,
winIndex
);
}
while
(
pCurWin
->
win
.
skey
<=
endDatas
[
i
]);
}
}
...
...
@@ -3979,26 +3985,16 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
}
static
void
rebuildTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
numOfOutput
,
SOperatorInfo
*
pOperator
,
SHashObj
*
pStUpdated
,
bool
needCreate
)
{
SOperatorInfo
*
pOperator
,
SHashObj
*
pStUpdated
)
{
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SResultWindowInfo
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
uint64_t
groupId
=
pParentWin
->
groupId
;
int32_t
winIndex
=
0
;
if
(
needCreate
)
{
pParentWin
=
getSessionTimeWindow
(
&
pInfo
->
streamAggSup
,
pParentWin
->
win
.
skey
,
pParentWin
->
win
.
ekey
,
groupId
,
0
,
&
winIndex
);
}
setWindowOutputBuf
(
pParentWin
,
&
pCurResult
,
pSup
->
pCtx
,
groupId
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
int32_t
num
=
0
;
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
...
...
@@ -4011,31 +4007,36 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
for
(
int32_t
k
=
index
;
k
<
chWinSize
;
k
++
)
{
SResultWindowInfo
*
pChWin
=
taosArrayGet
(
pChWins
,
k
);
if
(
pParentWin
->
win
.
skey
<=
pChWin
->
win
.
skey
&&
pChWin
->
win
.
ekey
<=
pParentWin
->
win
.
ekey
)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pNewParWin
=
getSessionTimeWindow
(
&
pInfo
->
streamAggSup
,
pChWin
->
win
.
skey
,
pChWin
->
win
.
ekey
,
groupId
,
0
,
&
winIndex
);
SResultRow
*
pPareResult
=
NULL
;
setWindowOutputBuf
(
pNewParWin
,
&
pPareResult
,
pSup
->
pCtx
,
groupId
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
SResultRow
*
pChResult
=
NULL
;
setWindowOutputBuf
(
pChWin
,
&
pChResult
,
pChild
->
exprSupp
.
pCtx
,
groupId
,
numOfOutput
,
pChild
->
exprSupp
.
rowEntryInfoOffset
,
&
pChInfo
->
streamAggSup
,
pTaskInfo
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
p
Ch
Win
->
win
,
true
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
p
NewPar
Win
->
win
,
true
);
compactFunctions
(
pSup
->
pCtx
,
pChild
->
exprSupp
.
pCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
int32_t
winNum
=
getNumCompactWindow
(
pInfo
->
streamAggSup
.
pCurWins
,
winIndex
,
pInfo
->
gap
);
if
(
winNum
>
0
)
{
compactTimeWindow
(
pInfo
,
winIndex
,
winNum
,
groupId
,
numOfOutput
,
pStUpdated
,
NULL
,
pOperator
);
}
SFilePage
*
bufPage
=
getBufPage
(
pChInfo
->
streamAggSup
.
pResultBuf
,
pChWin
->
pos
.
pageId
);
releaseBufPage
(
pChInfo
->
streamAggSup
.
pResultBuf
,
bufPage
);
num
++
;
continue
;
bufPage
=
getBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
pNewParWin
->
pos
.
pageId
);
setBufPageDirty
(
bufPage
,
true
);
releaseBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
bufPage
);
SWinKey
value
=
{.
ts
=
pNewParWin
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStUpdated
,
&
pNewParWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinKey
));
}
else
if
(
!
pChWin
->
isClosed
)
{
break
;
}
}
}
if
(
num
==
0
&&
needCreate
)
{
deleteWindow
(
pInfo
->
streamAggSup
.
pCurWins
,
winIndex
,
NULL
);
}
if
(
pStUpdated
&&
num
>
0
)
{
SWinKey
value
=
{.
ts
=
pParentWin
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStUpdated
,
&
pParentWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinKey
));
}
SFilePage
*
bufPage
=
getBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
pParentWin
->
pos
.
pageId
);
ASSERT
(
size
>
0
);
setBufPageDirty
(
bufPage
,
true
);
releaseBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
bufPage
);
}
}
...
...
@@ -4196,7 +4197,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pChildOp
->
exprSupp
.
numOfExprs
,
0
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
NULL
,
false
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
pStUpdated
);
}
taosArrayDestroy
(
pWins
);
continue
;
...
...
@@ -4210,7 +4211,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
// gap must be 0
doDeleteTimeWindows
(
&
pChildInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
pStUpdated
,
true
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
pStUpdated
);
}
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pStDeleted
);
removeSessionResults
(
pStUpdated
,
pWins
);
...
...
@@ -4747,7 +4748,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doDeleteTimeWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
pWins
,
destroyStateWinInfo
);
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pSeDeleted
);
...
...
@@ -5674,7 +5675,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
doDeleteSpecifyIntervalWindow
(
&
pInfo
->
aggSup
,
pBlock
,
pInfo
->
pDelWins
,
&
pInfo
->
interval
,
pUpdatedMap
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
...
...
tests/script/jenkins/basic.txt
浏览文件 @
3cbd3db8
...
...
@@ -248,6 +248,12 @@
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/sliding.sim
#./test.sh -f tsim/stream/partitionbyColumnInterval.sim
#./test.sh -f tsim/stream/partitionbyColumnSession.sim
#./test.sh -f tsim/stream/partitionbyColumnState.sim
#./test.sh -f tsim/stream/deleteInterval.sim
#./test.sh -f tsim/stream/deleteSession.sim
#./test.sh -f tsim/stream/deleteState.sim
# ---- transaction ----
./test.sh -f tsim/trans/lossdata1.sim
...
...
tests/script/tsim/stream/deleteSession.sim
0 → 100644
浏览文件 @
3cbd3db8
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 200
sql connect
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from t1 session(ts, 5s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop0:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop0
endi
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop1:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != NULL then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213001,2,2,2,2.0);
sql insert into t1 values(1648791213002,3,3,3,3.0);
sql insert into t1 values(1648791213003,4,4,4,4.0);
sleep 200
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
$loop_count = 0
loop3:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data02 != 4 then
print =====data02=$data02
goto loop3
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop4
endi
sleep 200
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop5:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
if $data02 != 4 then
print =====data02=$data02
goto loop5
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213005,2,2,2,2.0);
sql insert into t1 values(1648791213006,3,3,3,3.0);
sql insert into t1 values(1648791213007,4,4,4,4.0);
sql insert into t1 values(1648791223000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,2.0);
sql insert into t1 values(1648791223002,3,3,3,3.0);
sql insert into t1 values(1648791223003,4,4,4,4.0);
sql insert into t1 values(1648791233000,1,1,1,1.0);
sql insert into t1 values(1648791233001,2,2,2,2.0);
sql insert into t1 values(1648791233008,3,3,3,3.0);
sql insert into t1 values(1648791233009,4,4,4,4.0);
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop6:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 2 then
print =====data11=$data11
goto loop6
endi
if $data12 != 4 then
print =====data12=$data12
goto loop6
endi
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop7:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop7
endi
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop8:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != NULL then
print =====data02=$data02
goto loop8
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t2 values(1648791223000,1,2,3,1.0);
sql insert into t2 values(1648791223001,1,2,3,1.0);
sql insert into t2 values(1648791223002,3,2,3,1.0);
sql insert into t2 values(1648791223003,3,2,3,1.0);
sleep 200
sql delete from t2 where ts >= 1648791223000 and ts <= 1648791223001;
$loop_count = 0
loop11:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop11
endi
if $data02 != NULL then
print =====data02=$data02
goto loop11
endi
if $data11 != 6 then
print =====data11=$data11
goto loop11
endi
if $data12 != 3 then
print =====data12=$data12
goto loop11
endi
sleep 200
sql delete from st where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop12:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop12
endi
if $data01 != 1 then
print =====data01=$data01
goto loop12
endi
if $data02 != NULL then
print =====data02=$data02
goto loop12
endi
sql insert into t1 values(1648791213004,3,2,3,1.0);
sql insert into t1 values(1648791213005,3,2,3,1.0);
sql insert into t1 values(1648791213006,3,2,3,1.0);
sql insert into t1 values(1648791223004,1,2,3,1.0);
sql insert into t2 values(1648791213004,3,2,3,1.0);
sql insert into t2 values(1648791213005,3,2,3,1.0);
sql insert into t2 values(1648791213006,3,2,3,1.0);
sql insert into t2 values(1648791223004,1,2,3,1.0);
sleep 200
sql delete from t2 where ts >= 1648791213004 and ts <= 1648791213006;
$loop_count = 0
loop13:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop13
endi
if $data01 != 4 then
print =====data01=$data01
goto loop13
endi
if $data02 != 3 then
print =====data02=$data02
goto loop13
endi
if $data11 != 2 then
print =====data11=$data11
goto loop13
endi
if $data12 != 1 then
print =====data12=$data12
goto loop13
endi
sql insert into t1 values(1648791223005,1,2,3,1.0);
sql insert into t1 values(1648791223006,1,2,3,1.0);
sql insert into t2 values(1648791223005,1,2,3,1.0);
sql insert into t2 values(1648791223006,1,2,3,1.0);
sql insert into t1 values(1648791233005,4,2,3,1.0);
sql insert into t1 values(1648791233006,2,2,3,1.0);
sql insert into t2 values(1648791233005,5,2,3,1.0);
sql insert into t2 values(1648791233006,3,2,3,1.0);
sleep 200
sql delete from st where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop14:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop14
endi
if $data01 != 1 then
print =====data01=$data01
goto loop14
endi
if $data02 != NULL then
print =====data02=$data02
goto loop14
endi
if $data11 != 2 then
print =====data11=$data11
goto loop14
endi
if $data12 != 3 then
print =====data12=$data12
goto loop14
endi
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop database if exists test3;
sql drop database if exists test;
sql create database test3 vgroups 4;
sql create database test vgroups 1;
sql use test3;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams3 trigger at_once into test.streamt3 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s);
sql insert into t1 values(1648791210000,1,1,1,NULL);
sql insert into t1 values(1648791210001,2,2,2,NULL);
sql insert into t2 values(1648791213001,3,3,3,NULL);
sql insert into t2 values(1648791213003,4,4,4,NULL);
sql insert into t1 values(1648791216000,5,5,5,NULL);
sql insert into t1 values(1648791216002,6,6,6,NULL);
sql insert into t1 values(1648791216004,7,7,7,NULL);
sql insert into t2 values(1648791218001,8,8,8,NULL);
sql insert into t2 values(1648791218003,9,9,9,NULL);
sql insert into t1 values(1648791222000,10,10,10,NULL);
sql insert into t1 values(1648791222003,11,11,11,NULL);
sql insert into t1 values(1648791222005,12,12,12,NULL);
sql insert into t1 values(1648791232005,13,13,13,NULL);
sql insert into t2 values(1648791242005,14,14,14,NULL);
$loop_count = 0
loop19:
sleep 200
sql select * from test.streamt3 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print =====rows=$rows
goto loop19
endi
sql delete from t2 where ts >= 1648791213001 and ts <= 1648791218003;
$loop_count = 0
loop20:
sleep 200
sql select * from test.streamt3 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 5 then
print =====rows=$rows
goto loop20
endi
if $data01 != 2 then
print =====data01=$data01
goto loop20
endi
if $data02 != 2 then
print =====data02=$data02
goto loop20
endi
if $data11 != 3 then
print =====data11=$data11
goto loop20
endi
if $data12 != 7 then
print =====data12=$data12
goto loop20
endi
if $data21 != 3 then
print =====data21=$data21
goto loop20
endi
if $data22 != 12 then
print =====data22=$data22
goto loop20
endi
if $data31 != 1 then
print =====data31=$data31
goto loop20
endi
if $data32 != 13 then
print =====data32=$data32
goto loop20
endi
if $data41 != 1 then
print =====data41=$data41
goto loop20
endi
if $data42 != 14 then
print =====data42=$data42
goto loop20
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
system sh/stop_dnodes.sh
#goto looptest
\ No newline at end of file
tests/script/tsim/stream/deleteState.sim
0 → 100644
浏览文件 @
3cbd3db8
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 200
sql connect
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(a);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop0:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop0
endi
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop1:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != NULL then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213001,1,2,2,2.0);
sql insert into t1 values(1648791213002,1,3,3,3.0);
sql insert into t1 values(1648791213003,1,4,4,4.0);
sleep 200
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
$loop_count = 0
loop3:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data02 != 4 then
print =====data02=$data02
goto loop3
endi
sql insert into t1 values(1648791223000,2,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.0);
sql insert into t1 values(1648791223002,2,2,3,1.0);
sql insert into t1 values(1648791223003,2,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop4
endi
sleep 200
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop5:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
if $data02 != 4 then
print =====data02=$data02
goto loop5
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213005,1,2,2,2.0);
sql insert into t1 values(1648791213006,1,3,3,3.0);
sql insert into t1 values(1648791213007,1,4,4,4.0);
sql insert into t1 values(1648791223000,2,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,2.0);
sql insert into t1 values(1648791223002,2,3,3,3.0);
sql insert into t1 values(1648791223003,2,4,4,4.0);
sql insert into t1 values(1648791233000,3,1,1,1.0);
sql insert into t1 values(1648791233001,3,2,2,2.0);
sql insert into t1 values(1648791233008,3,3,3,3.0);
sql insert into t1 values(1648791233009,3,4,4,4.0);
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop6:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop6
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 2 then
print =====data11=$data11
goto loop6
endi
if $data12 != 4 then
print =====data12=$data12
goto loop6
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
system sh/stop_dnodes.sh
#goto looptest
\ No newline at end of file
tests/script/tsim/stream/partitionbyColumn
0
.sim
→
tests/script/tsim/stream/partitionbyColumn
Interval
.sim
浏览文件 @
3cbd3db8
文件已移动
tests/script/tsim/stream/partitionbyColumn
1
.sim
→
tests/script/tsim/stream/partitionbyColumn
Session
.sim
浏览文件 @
3cbd3db8
文件已移动
tests/script/tsim/stream/partitionbyColumn
2
.sim
→
tests/script/tsim/stream/partitionbyColumn
State
.sim
浏览文件 @
3cbd3db8
文件已移动
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录