Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
962fab4f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
962fab4f
编写于
12月 26, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix:remove assert
上级
233322db
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
7 addition
and
27 deletion
+7
-27
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+7
-27
未找到文件。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
962fab4f
...
...
@@ -907,7 +907,7 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
}
bool
isOverdue
(
TSKEY
ekey
,
STimeWindowAggSupp
*
pTwSup
)
{
ASSERT
(
pTwSup
->
maxTs
==
INT64_MIN
||
pTwSup
->
maxTs
>
0
);
ASSERT
S
(
pTwSup
->
maxTs
==
INT64_MIN
||
pTwSup
->
maxTs
>
0
,
"maxts should greater than 0"
);
return
pTwSup
->
maxTs
!=
INT64_MIN
&&
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
waterMark
;
}
...
...
@@ -1396,7 +1396,6 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
while
((
pIte
=
tSimpleHashIterate
(
pHashMap
,
pIte
,
&
iter
))
!=
NULL
)
{
void
*
key
=
tSimpleHashGetKey
(
pIte
,
&
keyLen
);
uint64_t
groupId
=
*
(
uint64_t
*
)
key
;
ASSERT
(
keyLen
==
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
)));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
SResultRowPosition
*
pPos
=
(
SResultRowPosition
*
)
pIte
;
int32_t
code
=
saveWinResult
(
ts
,
pPos
->
pageId
,
pPos
->
offset
,
groupId
,
resWins
);
...
...
@@ -1547,7 +1546,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pChildren
,
i
);
SStreamIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
ASSERT
(
pChInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
);
ASSERT
S
(
pChInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
,
"children trigger type should be at once"
);
pChInfo
->
twAggSup
.
maxTs
=
TMAX
(
pChInfo
->
twAggSup
.
maxTs
,
maxTs
);
closeStreamIntervalWindow
(
pChInfo
->
aggSup
.
pResultRowHashTable
,
&
pChInfo
->
twAggSup
,
&
pChInfo
->
interval
,
NULL
,
NULL
,
NULL
,
pOperator
);
...
...
@@ -1767,8 +1766,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
.
maxTs
=
INT64_MIN
,
};
ASSERT
(
as
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
...
...
@@ -2252,7 +2249,6 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
return
;
}
blockDataEnsureCapacity
(
pBlock
,
size
-
(
*
pIndex
));
ASSERT
(
3
<=
taosArrayGetSize
(
pBlock
->
pDataBlock
));
SColumnInfoData
*
pStartTs
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTs
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pGroupId
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
...
...
@@ -2359,7 +2355,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
SResultRow
*
pResult
=
NULL
;
int32_t
forwardRows
=
0
;
ASSERT
(
pSDataBlock
->
pDataBlock
!=
NULL
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
...
...
@@ -2482,7 +2477,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
ASSERT
(
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
return
pInfo
->
pPullDataRes
;
}
...
...
@@ -2543,7 +2537,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo
->
numOfDatapack
++
;
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval final recv"
:
"interval semi recv"
);
ASSERT
(
pBlock
->
info
.
type
!=
STREAM_INVERT
);
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
)
{
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
...
...
@@ -2633,7 +2626,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
ASSERT
(
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
return
pInfo
->
pPullDataRes
;
}
...
...
@@ -2688,7 +2680,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.
deleteMarkSaved
=
0
,
.
calTriggerSaved
=
0
,
};
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
ASSERT
S
(
pInfo
->
twAggSup
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
,
"trigger type should not be max delay"
);
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
...
...
@@ -2713,7 +2705,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initStreamFunciton
(
pOperator
->
exprSupp
.
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
);
ASSERT
(
numOfCols
>
0
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
pInfo
->
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
...
...
@@ -2724,6 +2715,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo
->
pChildren
=
NULL
;
if
(
numOfChild
>
0
)
{
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
void
*
));
if
(
!
pInfo
->
pChildren
)
{
goto
_error
;
}
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
createStreamFinalIntervalOperatorInfo
(
NULL
,
pPhyNode
,
pTaskInfo
,
0
);
if
(
pChildOp
)
{
...
...
@@ -2746,7 +2740,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
// semi interval operator does not catch result
pInfo
->
isFinal
=
false
;
pOperator
->
name
=
"StreamSemiIntervalOperator"
;
ASSERT
(
pInfo
->
aggSup
.
currentPageId
==
-
1
);
}
if
(
!
IS_FINAL_OP
(
pInfo
)
||
numOfChild
==
0
)
{
...
...
@@ -3162,15 +3155,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
void
deleteWindow
(
SArray
*
pWinInfos
,
int32_t
index
,
FDelete
fp
)
{
ASSERT
(
index
>=
0
&&
index
<
taosArrayGetSize
(
pWinInfos
));
if
(
fp
)
{
void
*
ptr
=
taosArrayGet
(
pWinInfos
,
index
);
fp
(
ptr
);
}
taosArrayRemove
(
pWinInfos
,
index
);
}
static
void
doDeleteTimeWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
SArray
*
result
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startDatas
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
...
...
@@ -3218,7 +3202,6 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
int32_t
iter
=
0
;
while
((
pIte
=
tSimpleHashIterate
(
pStUpdated
,
pIte
,
&
iter
))
!=
NULL
)
{
void
*
key
=
tSimpleHashGetKey
(
pIte
,
&
keyLen
);
ASSERT
(
keyLen
==
sizeof
(
SSessionKey
));
taosArrayPush
(
pUpdated
,
key
);
}
taosArraySort
(
pUpdated
,
sessionKeyCompareAsc
);
...
...
@@ -3279,7 +3262,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SSessionKey
*
pWinKey
=
taosArrayGet
(
pWinArray
,
i
);
...
...
@@ -3380,7 +3362,6 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
void
initGroupResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
)
{
pGroupResInfo
->
pRows
=
pArrayList
;
pGroupResInfo
->
index
=
0
;
ASSERT
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
}
void
doBuildSessionResult
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SGroupResInfo
*
pGroupResInfo
,
...
...
@@ -4811,7 +4792,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
ASSERT
(
numOfCols
>
0
);
SSDataBlock
*
pResBlock
=
createDataBlockFromDescNode
(
pPhyNode
->
pOutputDataBlockDesc
);
SInterval
interval
=
{
...
...
@@ -4831,7 +4811,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.
deleteMark
=
getDeleteMark
(
pIntervalPhyNode
),
};
ASSERT
(
twAggSupp
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
ASSERT
S
(
twAggSupp
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
,
"trigger type should not be max delay"
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
interval
=
interval
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录