Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
71d161ae
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
71d161ae
编写于
4月 11, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'enh/rocksRevert' of
https://github.com/taosdata/TDengine
into enh/rocksRevert
上级
911f08b3
4c2d0b5f
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
236 addition
and
42 deletion
+236
-42
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+22
-41
tests/script/tsim/stream/distributeInterval0.sim
tests/script/tsim/stream/distributeInterval0.sim
+212
-0
未找到文件。
source/libs/executor/src/groupoperator.c
浏览文件 @
71d161ae
...
...
@@ -1119,7 +1119,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
return
pInfo
->
pDelRes
;
}
break
;
default:
ASSERTS
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
,
"invalid SSDataBlock type"
);
ASSERTS
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
||
pBlock
->
info
.
type
==
STREAM_RETRIEVE
,
"invalid SSDataBlock type"
);
return
pBlock
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
71d161ae
...
...
@@ -1965,6 +1965,7 @@ FETCH_NEXT_BLOCK:
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RETRIEVE
;
copyDataBlock
(
pInfo
->
pUpdateRes
,
pBlock
);
pInfo
->
updateResIndex
=
0
;
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
updateInfoAddCloseWindowSBF
(
pInfo
->
pUpdateInfo
);
}
break
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
71d161ae
...
...
@@ -1366,8 +1366,13 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
continue
;
}
uint64_t
winGpId
=
pGpDatas
[
i
];
bool
res
=
doDeleteWindow
(
pOperator
,
win
.
skey
,
winGpId
);
SWinKey
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
&
winRes
,
sizeof
(
SWinKey
));
if
(
chIds
)
{
getNextTimeWindow
(
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
,
&
win
);
continue
;
}
bool
res
=
doDeleteWindow
(
pOperator
,
win
.
skey
,
winGpId
);
if
(
pUpWins
&&
res
)
{
taosArrayPush
(
pUpWins
,
&
winRes
);
}
...
...
@@ -2169,7 +2174,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
}
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SStreamState
*
pState
,
STimeWindowAggSupp
*
pTwSup
)
{
if
(
pWin
->
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
deleteMark
)
{
if
(
p
TwSup
->
maxTs
!=
INT64_MIN
&&
p
Win
->
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
deleteMark
)
{
SWinKey
key
=
{.
ts
=
pWin
->
skey
,
.
groupId
=
groupId
};
if
(
streamStateCheck
(
pState
,
&
key
))
{
return
true
;
...
...
@@ -2276,17 +2281,18 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SWinKey
*
winKey
=
taosArrayGet
(
wins
,
i
);
STimeWindow
nextWin
=
getFinalTimeWindow
(
winKey
->
ts
,
&
pInfo
->
interval
);
if
(
needDeleteWindowBuf
(
&
nextWin
,
&
pInfo
->
twAggSup
)
&&
!
pInfo
->
ignoreExpiredData
)
{
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
winKey
,
sizeof
(
SWinKey
));
if
(
!
chIds
)
{
SPullWindowInfo
pull
=
{
.
window
=
nextWin
,
.
groupId
=
winKey
->
groupId
,
.
calWin
.
skey
=
nextWin
.
skey
,
.
calWin
.
ekey
=
nextWin
.
skey
};
// add pull data request
if
(
savePullWindow
(
&
pull
,
pInfo
->
pPullWins
)
==
TSDB_CODE_SUCCESS
)
{
int32_t
size1
=
taosArrayGetSize
(
pInfo
->
pChildren
);
addPullWindow
(
pInfo
->
pPullDataMap
,
winKey
,
size1
);
qDebug
(
"===stream===prepare retrive for delete %"
PRId64
", size:%d"
,
winKey
->
ts
,
size1
);
}
if
(
isOverdue
(
nextWin
.
ekey
,
&
pInfo
->
twAggSup
)
&&
pInfo
->
ignoreExpiredData
)
{
continue
;
}
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
winKey
,
sizeof
(
SWinKey
));
if
(
!
chIds
)
{
SPullWindowInfo
pull
=
{
.
window
=
nextWin
,
.
groupId
=
winKey
->
groupId
,
.
calWin
.
skey
=
nextWin
.
skey
,
.
calWin
.
ekey
=
nextWin
.
skey
};
// add pull data request
if
(
savePullWindow
(
&
pull
,
pInfo
->
pPullWins
)
==
TSDB_CODE_SUCCESS
)
{
int32_t
size1
=
taosArrayGetSize
(
pInfo
->
pChildren
);
addPullWindow
(
pInfo
->
pPullDataMap
,
winKey
,
size1
);
qDebug
(
"===stream===prepare retrive for delete %"
PRId64
", size:%d"
,
winKey
->
ts
,
size1
);
}
}
}
...
...
@@ -2447,14 +2453,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
continue
;
}
if
(
IS_FINAL_OP
(
pInfo
)
&&
isClosed
&&
pInfo
->
pChildren
)
{
if
(
IS_FINAL_OP
(
pInfo
)
&&
pInfo
->
pChildren
)
{
bool
ignore
=
true
;
SWinKey
winRes
=
{
.
ts
=
nextWin
.
skey
,
.
groupId
=
groupId
,
};
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
&
winRes
,
sizeof
(
SWinKey
));
if
(
isDeletedStreamWindow
(
&
nextWin
,
groupId
,
pInfo
->
pState
,
&
pInfo
->
twAggSup
)
&&
!
chIds
)
{
if
(
isDeletedStreamWindow
(
&
nextWin
,
groupId
,
pInfo
->
pState
,
&
pInfo
->
twAggSup
)
&&
isClosed
&&
!
chIds
)
{
SPullWindowInfo
pull
=
{
.
window
=
nextWin
,
.
groupId
=
groupId
,
.
calWin
.
skey
=
nextWin
.
skey
,
.
calWin
.
ekey
=
nextWin
.
skey
};
// add pull data request
...
...
@@ -2611,6 +2617,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
streamStateCommit
(
pInfo
->
pState
);
pInfo
->
twAggSup
.
checkPointTs
=
pInfo
->
twAggSup
.
maxTs
;
}
qDebug
(
"===stream===interval final close"
);
}
return
NULL
;
}
else
{
...
...
@@ -2651,12 +2658,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SArray
*
delWins
=
taosArrayInit
(
8
,
sizeof
(
SWinKey
));
doDeleteWindows
(
pOperator
,
&
pInfo
->
interval
,
pBlock
,
delWins
,
pInfo
->
pUpdatedMap
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SStreamIntervalOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
doDeleteWindows
(
pChildOp
,
&
pChildInfo
->
interval
,
pBlock
,
NULL
,
NULL
);
rebuildIntervalWindow
(
pOperator
,
delWins
,
pInfo
->
pUpdatedMap
);
addRetriveWindow
(
delWins
,
pInfo
);
taosArrayAddAll
(
pInfo
->
pDelWins
,
delWins
);
taosArrayDestroy
(
delWins
);
...
...
@@ -2698,25 +2699,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
setInputDataBlock
(
pSup
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamIntervalAggImpl
(
pOperator
,
pBlock
,
pBlock
->
info
.
id
.
groupId
,
pInfo
->
pUpdatedMap
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
chIndex
=
getChildIndex
(
pBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
// if chIndex + 1 - size > 0, add new child
for
(
int32_t
i
=
0
;
i
<
chIndex
+
1
-
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
createStreamFinalIntervalOperatorInfo
(
NULL
,
pInfo
->
pPhyNode
,
pOperator
->
pTaskInfo
,
0
);
if
(
!
pChildOp
)
{
T_LONG_JMP
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_OUT_OF_MEMORY
);
}
SStreamIntervalOperatorInfo
*
pTmpInfo
=
pChildOp
->
info
;
pTmpInfo
->
twAggSup
.
calTrigger
=
STREAM_TRIGGER_AT_ONCE
;
taosArrayPush
(
pInfo
->
pChildren
,
&
pChildOp
);
qDebug
(
"===stream===add child, id:%d"
,
chIndex
);
}
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
SStreamIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
setInputDataBlock
(
&
pChildOp
->
exprSupp
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamIntervalAggImpl
(
pChildOp
,
pBlock
,
pBlock
->
info
.
id
.
groupId
,
NULL
);
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
watermark
);
pInfo
->
twAggSup
.
minTs
=
TMIN
(
pInfo
->
twAggSup
.
minTs
,
pBlock
->
info
.
window
.
skey
);
...
...
@@ -2726,7 +2708,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if
(
IS_FINAL_OP
(
pInfo
))
{
closeStreamIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pInfo
->
pPullDataMap
,
pInfo
->
pUpdatedMap
,
pInfo
->
pDelWins
,
pOperator
);
closeChildIntervalWindow
(
pOperator
,
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
);
}
pInfo
->
binfo
.
pRes
->
info
.
watermark
=
pInfo
->
twAggSup
.
maxTs
;
...
...
tests/script/tsim/stream/distributeInterval0.sim
浏览文件 @
71d161ae
...
...
@@ -58,6 +58,28 @@ sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
$loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 1 select * from streamtST1;
sql select * from streamtST1;
if $rows != 1 then
print =====rows=$rows
goto loop0
endi
if $data01 != 8 then
print =====data01=$data01
goto loop0
endi
sql insert into ts1 values(1648791223002,2,2,3,1.1);
sql insert into ts1 values(1648791233003,3,2,3,2.1);
sql insert into ts2 values(1648791243004,4,2,43,73.1);
...
...
@@ -66,10 +88,162 @@ sql insert into ts1 values(1648791243005,4,20,3,3.1);
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
$loop_count = 0
loop01:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 2 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop01
endi
if $data01 != 8 then
print =====data01=$data01
goto loop01
endi
if $data11 != 1 then
print =====data11=$data11
goto loop01
endi
if $data21 != 1 then
print =====data21=$data21
goto loop01
endi
if $data31 != 11 then
print =====data31=$data31
goto loop01
endi
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop011:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 3 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop011
endi
if $data01 != 8 then
print =====data01=$data01
goto loop011
endi
if $data11 != 2 then
print =====data11=$data11
goto loop011
endi
if $data21 != 1 then
print =====data21=$data21
goto loop011
endi
if $data31 != 13 then
print =====data31=$data31
goto loop011
endi
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
$loop_count = 0
loop02:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 4 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop02
endi
if $data01 != 8 then
print =====data01=$data01
goto loop02
endi
if $data11 != 3 then
print =====data11=$data11
goto loop02
endi
if $data21 != 2 then
print =====data21=$data21
goto loop02
endi
if $data31 != 15 then
print =====data31=$data31
goto loop02
endi
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop03:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 5 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop03
endi
if $data01 != 8 then
print =====data01=$data01
goto loop03
endi
if $data11 != 3 then
print =====data11=$data11
goto loop03
endi
if $data21 != 2 then
print =====data21=$data21
goto loop03
endi
if $data31 != 15 then
print =====data31=$data31
goto loop03
endi
sql insert into ts3 values(1648791223002,2,2,3,1.1);
sql insert into ts4 values(1648791233003,3,2,3,2.1);
sql insert into ts3 values(1648791243004,4,2,43,73.1);
...
...
@@ -79,6 +253,44 @@ sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop04:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 6 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop04
endi
if $data01 != 8 then
print =====data01=$data01
goto loop04
endi
if $data11 != 5 then
print =====data11=$data11
goto loop04
endi
if $data21 != 3 then
print =====data21=$data21
goto loop04
endi
if $data31 != 28 then
print =====data31=$data31
goto loop04
endi
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录