Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
201d0f2d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
201d0f2d
编写于
6月 06, 2023
作者:
L
liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix stream ci issue
上级
65d323d9
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
91 addition
and
35 deletion
+91
-35
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+1
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+42
-8
tests/script/tsim/stream/distributeInterval0.sim
tests/script/tsim/stream/distributeInterval0.sim
+47
-26
未找到文件。
source/libs/executor/inc/executorInt.h
浏览文件 @
201d0f2d
...
...
@@ -457,6 +457,7 @@ typedef struct SStreamIntervalOperatorInfo {
int64_t
dataVersion
;
SStateStore
statestore
;
bool
recvGetAll
;
SHashObj
*
pFinalPullDataMap
;
}
SStreamIntervalOperatorInfo
;
typedef
struct
SDataGroupInfo
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
201d0f2d
...
...
@@ -1865,7 +1865,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
TSKEY
maxTs
=
pAPI
->
stateStore
.
updateInfoFillBlockData
(
pInfo
->
pUpdateInfo
,
pInfo
->
pRecoverRes
,
pInfo
->
primaryTsIndex
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
}
else
{
pInfo
->
pUpdateInfo
->
maxDataVersion
=
pTaskInfo
->
streamInfo
.
fillHistoryVer2
;
pInfo
->
pUpdateInfo
->
maxDataVersion
=
TMAX
(
pInfo
->
pUpdateInfo
->
maxDataVersion
,
pTaskInfo
->
streamInfo
.
fillHistoryVer2
)
;
doCheckUpdate
(
pInfo
,
pInfo
->
pRecoverRes
->
info
.
window
.
ekey
,
pInfo
->
pRecoverRes
);
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
201d0f2d
...
...
@@ -1306,6 +1306,8 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
return
true
;
}
static
int32_t
getChildIndex
(
SSDataBlock
*
pBlock
)
{
return
pBlock
->
info
.
childId
;
}
static
void
doDeleteWindows
(
SOperatorInfo
*
pOperator
,
SInterval
*
pInterval
,
SSDataBlock
*
pBlock
,
SArray
*
pUpWins
,
SSHashObj
*
pUpdatedMap
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -1340,8 +1342,14 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
SWinKey
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
&
winRes
,
sizeof
(
SWinKey
));
if
(
chIds
)
{
getNextTimeWindow
(
pInterval
,
&
win
,
TSDB_ORDER_ASC
);
continue
;
int32_t
childId
=
getChildIndex
(
pBlock
);
SArray
*
chArray
=
*
(
void
**
)
chIds
;
int32_t
index
=
taosArraySearchIdx
(
chArray
,
&
childId
,
compareInt32Val
,
TD_EQ
);
if
(
index
!=
-
1
)
{
qDebug
(
"===stream===try push delete window%"
PRId64
"chId:%d ,continue"
,
win
.
skey
,
childId
);
getNextTimeWindow
(
pInterval
,
&
win
,
TSDB_ORDER_ASC
);
continue
;
}
}
bool
res
=
doDeleteWindow
(
pOperator
,
win
.
skey
,
winGpId
);
if
(
pUpWins
&&
res
)
{
...
...
@@ -1497,6 +1505,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
taosArrayDestroy
(
*
(
void
**
)
pIte
);
}
taosHashCleanup
(
pInfo
->
pPullDataMap
);
taosHashCleanup
(
pInfo
->
pFinalPullDataMap
);
taosArrayDestroy
(
pInfo
->
pPullWins
);
blockDataDestroy
(
pInfo
->
pPullDataRes
);
taosArrayDestroy
(
pInfo
->
pDelWins
);
...
...
@@ -2067,8 +2076,6 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
taosHashPut
(
pMap
,
pWinRes
,
sizeof
(
SWinKey
),
&
childIds
,
sizeof
(
void
*
));
}
static
int32_t
getChildIndex
(
SSDataBlock
*
pBlock
)
{
return
pBlock
->
info
.
childId
;
}
static
void
clearStreamIntervalOperator
(
SStreamIntervalOperatorInfo
*
pInfo
)
{
tSimpleHashClear
(
pInfo
->
aggSup
.
pResultRowHashTable
);
clearDiskbasedBuf
(
pInfo
->
aggSup
.
pResultBuf
);
...
...
@@ -2112,7 +2119,7 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow
(
pBlock
,
0
);
}
void
processPullOver
(
SSDataBlock
*
pBlock
,
SHashObj
*
pMap
,
S
Interval
*
pInterval
)
{
void
processPullOver
(
SSDataBlock
*
pBlock
,
SHashObj
*
pMap
,
S
HashObj
*
pFinalMap
,
SInterval
*
pInterval
,
SArray
*
pPullWins
,
int32_t
numOfCh
,
SOperatorInfo
*
pOperator
)
{
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
TSKEY
*
tsData
=
(
TSKEY
*
)
pStartCol
->
pData
;
SColumnInfoData
*
pEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
...
...
@@ -2136,6 +2143,22 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval)
taosArrayDestroy
(
chArray
);
taosHashRemove
(
pMap
,
&
winRes
,
sizeof
(
SWinKey
));
qDebug
(
"===stream===retrive pull data over.window %"
PRId64
,
winRes
.
ts
);
void
*
pFinalCh
=
taosHashGet
(
pFinalMap
,
&
winRes
,
sizeof
(
SWinKey
));
if
(
pFinalCh
)
{
taosHashRemove
(
pFinalMap
,
&
winRes
,
sizeof
(
SWinKey
));
doDeleteWindow
(
pOperator
,
winRes
.
ts
,
winRes
.
groupId
);
STimeWindow
nextWin
=
getFinalTimeWindow
(
winRes
.
ts
,
pInterval
);
SPullWindowInfo
pull
=
{.
window
=
nextWin
,
.
groupId
=
winRes
.
groupId
,
.
calWin
.
skey
=
nextWin
.
skey
,
.
calWin
.
ekey
=
nextWin
.
skey
};
// add pull data request
if
(
savePullWindow
(
&
pull
,
pPullWins
)
==
TSDB_CODE_SUCCESS
)
{
addPullWindow
(
pMap
,
&
winRes
,
numOfCh
);
qDebug
(
"===stream===prepare final retrive for delete %"
PRId64
", size:%d"
,
winRes
.
ts
,
numOfCh
);
}
}
}
}
}
...
...
@@ -2144,7 +2167,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval)
}
}
static
void
addRetriveWindow
(
SArray
*
wins
,
SStreamIntervalOperatorInfo
*
pInfo
)
{
static
void
addRetriveWindow
(
SArray
*
wins
,
SStreamIntervalOperatorInfo
*
pInfo
,
int32_t
childId
)
{
int32_t
size
=
taosArrayGetSize
(
wins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SWinKey
*
winKey
=
taosArrayGet
(
wins
,
i
);
...
...
@@ -2161,6 +2184,14 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
addPullWindow
(
pInfo
->
pPullDataMap
,
winKey
,
pInfo
->
numOfChild
);
qDebug
(
"===stream===prepare retrive for delete %"
PRId64
", size:%d"
,
winKey
->
ts
,
pInfo
->
numOfChild
);
}
}
else
{
SArray
*
chArray
=
*
(
void
**
)
chIds
;
int32_t
index
=
taosArraySearchIdx
(
chArray
,
&
childId
,
compareInt32Val
,
TD_EQ
);
qDebug
(
"===stream===check final retrive %"
PRId64
",chid:%d"
,
winKey
->
ts
,
index
);
if
(
index
==
-
1
)
{
qDebug
(
"===stream===add final retrive %"
PRId64
,
winKey
->
ts
);
taosHashPut
(
pInfo
->
pFinalPullDataMap
,
winKey
,
sizeof
(
SWinKey
),
NULL
,
0
);
}
}
}
}
...
...
@@ -2554,7 +2585,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SArray
*
delWins
=
taosArrayInit
(
8
,
sizeof
(
SWinKey
));
doDeleteWindows
(
pOperator
,
&
pInfo
->
interval
,
pBlock
,
delWins
,
pInfo
->
pUpdatedMap
);
if
(
IS_FINAL_OP
(
pInfo
))
{
addRetriveWindow
(
delWins
,
pInfo
);
int32_t
chId
=
getChildIndex
(
pBlock
);
addRetriveWindow
(
delWins
,
pInfo
,
chId
);
if
(
pBlock
->
info
.
type
!=
STREAM_CLEAR
)
{
taosArrayAddAll
(
pInfo
->
pDelWins
,
delWins
);
}
...
...
@@ -2589,7 +2621,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_PULL_OVER
&&
IS_FINAL_OP
(
pInfo
))
{
processPullOver
(
pBlock
,
pInfo
->
pPullDataMap
,
&
pInfo
->
interval
);
processPullOver
(
pBlock
,
pInfo
->
pPullDataMap
,
pInfo
->
pFinalPullDataMap
,
&
pInfo
->
interval
,
pInfo
->
pPullWins
,
pInfo
->
numOfChild
,
pOperator
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
return
pBlock
;
...
...
@@ -2772,6 +2804,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo
->
pullIndex
=
0
;
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pInfo
->
pPullDataMap
=
taosHashInit
(
64
,
hashFn
,
false
,
HASH_NO_LOCK
);
pInfo
->
pFinalPullDataMap
=
taosHashInit
(
64
,
hashFn
,
false
,
HASH_NO_LOCK
);
pInfo
->
pPullDataRes
=
createSpecialDataBlock
(
STREAM_RETRIEVE
);
pInfo
->
ignoreExpiredData
=
pIntervalPhyNode
->
window
.
igExpired
;
pInfo
->
ignoreExpiredDataSaved
=
false
;
...
...
@@ -4963,6 +4996,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo
->
pPhyNode
=
NULL
;
// create new child
pInfo
->
pPullDataMap
=
NULL
;
pInfo
->
pFinalPullDataMap
=
NULL
;
pInfo
->
pPullWins
=
NULL
;
// SPullWindowInfo
pInfo
->
pullIndex
=
0
;
pInfo
->
pPullDataRes
=
NULL
;
...
...
tests/script/tsim/stream/distributeInterval0.sim
浏览文件 @
201d0f2d
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
sql drop stream if exists stream_t1;
...
...
@@ -248,10 +223,56 @@ sql insert into ts3 values(1648791223002,2,2,3,1.1);
sql insert into ts4 values(1648791233003,3,2,3,2.1);
sql insert into ts3 values(1648791243004,4,2,43,73.1);
sql insert into ts4 values(1648791213002,24,22,23,4.1);
$loop_count = 0
loop032:
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sleep 1000
print 6-0 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop032
endi
if $data01 != 8 then
print =6====data01=$data01
goto loop032
endi
sql insert into ts3 values(1648791243005,4,20,3,3.1);
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
$loop_count = 0
loop033:
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sleep 1000
print 6-1 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop033
endi
if $data01 != 8 then
print =6====data01=$data01
goto loop033
endi
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录