Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2a74aa31
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2a74aa31
编写于
4月 19, 2023
作者:
L
liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
inc sleep time
上级
5b7ccc42
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
34 addition
and
51 deletion
+34
-51
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+5
-31
tests/script/tsim/stream/distributeInterval0.sim
tests/script/tsim/stream/distributeInterval0.sim
+9
-9
tests/script/tsim/stream/fillHistoryBasic1.sim
tests/script/tsim/stream/fillHistoryBasic1.sim
+20
-11
未找到文件。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
2a74aa31
...
...
@@ -1455,18 +1455,6 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
return
w
;
}
static
void
closeChildIntervalWindow
(
SOperatorInfo
*
pOperator
,
SArray
*
pChildren
,
TSKEY
maxTs
)
{
int32_t
size
=
taosArrayGetSize
(
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pChildren
,
i
);
SStreamIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
ASSERTS
(
pChInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
,
"children trigger type should be at once"
);
pChInfo
->
twAggSup
.
maxTs
=
TMAX
(
pChInfo
->
twAggSup
.
maxTs
,
maxTs
);
closeStreamIntervalWindow
(
pChInfo
->
aggSup
.
pResultRowHashTable
,
&
pChInfo
->
twAggSup
,
&
pChInfo
->
interval
,
NULL
,
NULL
,
NULL
,
pOperator
);
}
}
static
void
doBuildDeleteResult
(
SStreamIntervalOperatorInfo
*
pInfo
,
SArray
*
pWins
,
int32_t
*
index
,
SSDataBlock
*
pBlock
)
{
blockDataCleanup
(
pBlock
);
...
...
@@ -1534,6 +1522,10 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
cleanupBasicInfo
(
&
pInfo
->
binfo
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
// it should be empty.
void
*
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pInfo
->
pPullDataMap
,
pIte
))
!=
NULL
)
{
taosArrayDestroy
(
*
(
void
**
)
pIte
);
}
taosHashCleanup
(
pInfo
->
pPullDataMap
);
taosArrayDestroy
(
pInfo
->
pPullWins
);
blockDataDestroy
(
pInfo
->
pPullDataRes
);
...
...
@@ -4426,23 +4418,6 @@ void destroyMergeIntervalOperatorInfo(void* param) {
taosMemoryFreeClear
(
param
);
}
static
int32_t
finalizeWindowResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
STimeWindow
*
win
,
SSDataBlock
*
pResultBlock
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
bool
ascScan
=
(
iaInfo
->
inputOrder
==
TSDB_ORDER_ASC
);
SExprSupp
*
pExprSup
=
&
pOperatorInfo
->
exprSupp
;
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
win
->
skey
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
tSimpleHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
tSimpleHashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
outputPrevIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
STimeWindow
*
newWin
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
...
...
@@ -4463,7 +4438,6 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow
*
prevWin
=
&
prevGrpWin
->
window
;
if
((
ascScan
&&
newWin
->
skey
>
prevWin
->
ekey
)
||
((
!
ascScan
)
&&
newWin
->
skey
<
prevWin
->
ekey
))
{
// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode
(
miaInfo
->
groupIntervals
,
listNode
);
}
}
...
...
@@ -4623,7 +4597,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if
(
listNode
!=
NULL
)
{
SGroupTimeWindow
*
grpWin
=
(
SGroupTimeWindow
*
)(
listNode
->
data
);
// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes
->
info
.
id
.
groupId
=
grpWin
->
groupId
;
}
}
...
...
@@ -4768,6 +4741,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doDeleteWindows
(
pOperator
,
&
pInfo
->
interval
,
pBlock
,
pInfo
->
pDelWins
,
pInfo
->
pUpdatedMap
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
qDebug
(
"===stream===single interval recv|block type STREAM_GET_ALL"
);
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
pUpdatedMap
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
...
...
tests/script/tsim/stream/distributeInterval0.sim
浏览文件 @
2a74aa31
...
...
@@ -62,7 +62,7 @@ $loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -93,7 +93,7 @@ $loop_count = 0
loop01:
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -132,7 +132,7 @@ $loop_count = 0
loop011:
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -171,7 +171,7 @@ $loop_count = 0
loop02:
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -211,7 +211,7 @@ $loop_count = 0
loop03:
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -258,7 +258,7 @@ $loop_count = 0
loop04:
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -300,7 +300,7 @@ sleep 2000
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -429,7 +429,7 @@ sql select * from streamtST1;
sleep 2000
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
@@ -468,7 +468,7 @@ sql select * from streamtST3;
sleep 2000
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
3
0 then
return -1
endi
...
...
tests/script/tsim/stream/fillHistoryBasic1.sim
浏览文件 @
2a74aa31
...
...
@@ -481,64 +481,73 @@ sql insert into t1 values(1648791213004,4,2,3,4.1);
sql create stream stream2 trigger at_once fill_history 1 IGNORE EXPIRED 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
$loop_count = 0
loop0:
sleep 5000
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
goto loop0
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
goto loop0
endi
if $data02 != 2 then
print ======$data02
return -1
goto loop0
endi
if $data03 != 5 then
print ======$data03
return -1
goto loop0
endi
if $data04 != 2 then
print ======$data04
return -1
goto loop0
endi
if $data05 != 3 then
print ======$data05
return -1
goto loop0
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
goto loop0
endi
if $data12 != 1 then
print ======$data12
return -1
goto loop0
endi
if $data13 != 2 then
print ======$data13
return -1
goto loop0
endi
if $data14 != 2 then
print ======$data14
return -1
goto loop0
endi
if $data15 != 3 then
print ======$data15
return -1
goto loop0
endi
# row 2
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录