Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7eb75fea
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7eb75fea
编写于
9月 20, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(steam):optimize window close
上级
bc9c0dfd
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
21 addition
and
17 deletion
+21
-17
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-7
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+19
-10
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
7eb75fea
...
@@ -1064,6 +1064,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
...
@@ -1064,6 +1064,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
);
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
);
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
);
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
);
bool
isDeletedWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SAggSupporter
*
pSup
);
bool
isDeletedWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SAggSupporter
*
pSup
);
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SOperatorInfo
*
pOperator
,
STimeWindowAggSupp
*
pTwSup
);
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
,
uint64_t
*
pGp
);
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
,
uint64_t
*
pGp
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
7eb75fea
...
@@ -1331,7 +1331,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
...
@@ -1331,7 +1331,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
// must check update info first.
// must check update info first.
bool
update
=
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
pBlock
->
info
.
uid
,
tsCol
[
rowId
]);
bool
update
=
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
pBlock
->
info
.
uid
,
tsCol
[
rowId
]);
bool
closedWin
=
isClosed
&&
isSignleIntervalWindow
(
pInfo
)
&&
bool
closedWin
=
isClosed
&&
isSignleIntervalWindow
(
pInfo
)
&&
isDeleted
Window
(
&
win
,
pBlock
->
info
.
groupId
,
pInfo
->
windowSup
.
pInterval
AggSup
);
isDeleted
StreamWindow
(
&
win
,
pBlock
->
info
.
groupId
,
pInfo
->
pTableScanOp
,
&
pInfo
->
tw
AggSup
);
if
((
update
||
closedWin
)
&&
out
)
{
if
((
update
||
closedWin
)
&&
out
)
{
qDebug
(
"stream update check not pass, update %d, closedWin %d"
,
update
,
closedWin
);
qDebug
(
"stream update check not pass, update %d, closedWin %d"
,
update
,
closedWin
);
uint64_t
gpId
=
closedWin
&&
pInfo
->
partitionSup
.
needCalc
uint64_t
gpId
=
closedWin
&&
pInfo
->
partitionSup
.
needCalc
...
@@ -1931,11 +1931,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
...
@@ -1931,11 +1931,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
pTagCond
=
pTagCond
;
pInfo
->
pTagCond
=
pTagCond
;
pInfo
->
pGroupTags
=
pTableScanNode
->
pGroupTags
;
pInfo
->
pGroupTags
=
pTableScanNode
->
pGroupTags
;
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
){
.
waterMark
=
pTableScanNode
->
watermark
,
.
calTrigger
=
pTableScanNode
->
triggerType
,
.
maxTs
=
INT64_MIN
,
};
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
...
@@ -1985,7 +1980,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
...
@@ -1985,7 +1980,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
pUpdateInfo
=
NULL
;
pInfo
->
pUpdateInfo
=
NULL
;
pInfo
->
pTableScanOp
=
pTableScanOp
;
pInfo
->
pTableScanOp
=
pTableScanOp
;
pInfo
->
interval
=
pTSInfo
->
pdInfo
.
interval
;
pInfo
->
readHandle
=
*
pHandle
;
pInfo
->
readHandle
=
*
pHandle
;
pInfo
->
tableUid
=
pScanPhyNode
->
uid
;
pInfo
->
tableUid
=
pScanPhyNode
->
uid
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
7eb75fea
...
@@ -1753,16 +1753,17 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
...
@@ -1753,16 +1753,17 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
}
}
void
initIntervalDownStream
(
SOperatorInfo
*
downstream
,
uint16_t
type
,
SAggSupporter
*
pSup
,
SInterval
*
pInterval
,
void
initIntervalDownStream
(
SOperatorInfo
*
downstream
,
uint16_t
type
,
SAggSupporter
*
pSup
,
SInterval
*
pInterval
,
int64_t
waterMark
)
{
STimeWindowAggSupp
*
pTwSup
)
{
if
(
downstream
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
downstream
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
initIntervalDownStream
(
downstream
->
pDownstream
[
0
],
type
,
pSup
,
pInterval
,
waterMark
);
initIntervalDownStream
(
downstream
->
pDownstream
[
0
],
type
,
pSup
,
pInterval
,
pTwSup
);
return
;
return
;
}
}
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
windowSup
.
parentType
=
type
;
pScanInfo
->
windowSup
.
parentType
=
type
;
pScanInfo
->
windowSup
.
pIntervalAggSup
=
pSup
;
pScanInfo
->
windowSup
.
pIntervalAggSup
=
pSup
;
pScanInfo
->
pUpdateInfo
=
updateInfoInitP
(
pInterval
,
waterMark
);
pScanInfo
->
pUpdateInfo
=
updateInfoInitP
(
pInterval
,
pTwSup
->
waterMark
);
pScanInfo
->
interval
=
*
pInterval
;
pScanInfo
->
interval
=
*
pInterval
;
pScanInfo
->
twAggSup
=
*
pTwSup
;
}
}
void
initStreamFunciton
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
)
{
void
initStreamFunciton
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
)
{
...
@@ -1847,11 +1848,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
...
@@ -1847,11 +1848,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
NULL
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
NULL
,
NULL
,
destroyIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
destroyIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
if
(
nodeType
(
pPhyNode
)
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
)
{
initIntervalDownStream
(
downstream
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
}
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
...
@@ -2868,6 +2864,19 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
...
@@ -2868,6 +2864,19 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return
p1
==
NULL
;
return
p1
==
NULL
;
}
}
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SOperatorInfo
*
pOperator
,
STimeWindowAggSupp
*
pTwSup
)
{
if
(
pWin
->
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
deleteMark
)
{
SWinKey
key
=
{.
ts
=
pWin
->
skey
,
.
groupId
=
groupId
};
void
*
pVal
=
NULL
;
int32_t
size
=
0
;
if
(
streamStateGet
(
pOperator
->
pTaskInfo
->
streamInfo
.
pState
,
&
key
,
&
pVal
,
&
size
)
<
0
)
{
return
false
;
}
streamStateReleaseBuf
(
pOperator
->
pTaskInfo
->
streamInfo
.
pState
,
&
key
,
pVal
);
}
return
false
;
}
int32_t
getNexWindowPos
(
SInterval
*
pInterval
,
SDataBlockInfo
*
pBlockInfo
,
TSKEY
*
tsCols
,
int32_t
startPos
,
TSKEY
eKey
,
int32_t
getNexWindowPos
(
SInterval
*
pInterval
,
SDataBlockInfo
*
pBlockInfo
,
TSKEY
*
tsCols
,
int32_t
startPos
,
TSKEY
eKey
,
STimeWindow
*
pNextWin
)
{
STimeWindow
*
pNextWin
)
{
int32_t
forwardRows
=
int32_t
forwardRows
=
...
@@ -3425,7 +3434,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -3425,7 +3434,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
if
(
pPhyNode
->
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
)
{
if
(
pPhyNode
->
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
)
{
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
}
}
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
@@ -5944,7 +5953,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
...
@@ -5944,7 +5953,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
NULL
,
destroyStreamIntervalOperatorInfo
,
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
NULL
,
destroyStreamIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录