Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b542b8f4
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b542b8f4
编写于
7月 10, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: some problem of parser and planner
上级
bd210bf3
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
100 addition
and
100 deletion
+100
-100
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+11
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+5
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+83
-94
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+1
-4
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
b542b8f4
...
...
@@ -473,6 +473,17 @@ typedef struct SIntervalAggOperatorInfo {
SNode
*
pCondition
;
}
SIntervalAggOperatorInfo
;
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
SIntervalAggOperatorInfo
*
intervalAggOperatorInfo
;
bool
hasGroupId
;
uint64_t
groupId
;
SSDataBlock
*
prefetchedBlock
;
bool
inputBlocksFinished
;
SNode
*
pCondition
;
}
SMergeAlignedIntervalAggOperatorInfo
;
typedef
struct
SStreamFinalIntervalOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
// basic info
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
b542b8f4
...
...
@@ -3994,8 +3994,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
int32_t
num
=
0
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyFillNode
->
node
.
pOutputDataBlockDesc
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pTargets
,
NULL
,
&
num
);
SInterval
*
pInterval
=
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SInterval
*
pInterval
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
downstream
->
operatorType
?
&
((
SMergeAlignedIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
intervalAggOperatorInfo
->
interval
:
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
initResultSizeInfo
(
pOperator
,
4096
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
b542b8f4
...
...
@@ -80,11 +80,11 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
}
static
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
)
{
int32_t
factor
=
(
order
==
TSDB_ORDER_ASC
)
?
-
1
:
1
;
int32_t
factor
=
(
order
==
TSDB_ORDER_ASC
)
?
-
1
:
1
;
STimeWindow
win
=
*
pWindow
;
STimeWindow
save
=
win
;
while
(
win
.
skey
<=
ts
&&
win
.
ekey
>=
ts
)
{
while
(
win
.
skey
<=
ts
&&
win
.
ekey
>=
ts
)
{
save
=
win
;
win
.
skey
=
taosTimeAdd
(
win
.
skey
,
factor
*
pInterval
->
sliding
,
pInterval
->
slidingUnit
,
pInterval
->
precision
);
win
.
ekey
=
taosTimeAdd
(
win
.
ekey
,
factor
*
pInterval
->
sliding
,
pInterval
->
slidingUnit
,
pInterval
->
precision
);
...
...
@@ -133,7 +133,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
if
(
order
==
TSDB_ORDER_ASC
)
{
if
(
w
.
skey
<=
ts
&&
w
.
ekey
>=
ts
)
{
// ts is resident in current time window, but we need to find the first
//qualified time window that cover this timestamp
//
qualified time window that cover this timestamp
w
=
getFirstQualifiedTimeWindow
(
ts
,
&
w
,
pInterval
,
order
);
}
else
{
// todo refactor:
...
...
@@ -889,7 +889,7 @@ static void removeResults(SArray* pWins, SArray* pUpdated) {
}
int64_t
getWinReskey
(
void
*
data
,
int32_t
index
)
{
SArray
*
res
=
(
SArray
*
)
data
;
SArray
*
res
=
(
SArray
*
)
data
;
SWinRes
*
pos
=
taosArrayGet
(
res
,
index
);
return
pos
->
ts
;
}
...
...
@@ -899,15 +899,14 @@ static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) {
int32_t
delSize
=
taosArrayGetSize
(
pDelWins
);
for
(
int32_t
i
=
0
;
i
<
upSize
;
i
++
)
{
SResKeyPos
*
pResKey
=
taosArrayGetP
(
pUpdated
,
i
);
int64_t
key
=
*
(
int64_t
*
)
pResKey
->
key
;
int32_t
index
=
binarySearch
(
pDelWins
,
delSize
,
key
,
TSDB_ORDER_DESC
,
getWinReskey
);
int64_t
key
=
*
(
int64_t
*
)
pResKey
->
key
;
int32_t
index
=
binarySearch
(
pDelWins
,
delSize
,
key
,
TSDB_ORDER_DESC
,
getWinReskey
);
if
(
index
>=
0
&&
key
==
getWinReskey
(
pDelWins
,
index
))
{
taosArrayRemove
(
pDelWins
,
index
);
}
}
}
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
)
{
ASSERT
(
pSup
->
maxTs
==
INT64_MIN
||
pSup
->
maxTs
>
0
);
return
pSup
->
maxTs
!=
INT64_MIN
&&
ts
<
pSup
->
maxTs
-
pSup
->
waterMark
;
...
...
@@ -1206,7 +1205,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
while
(
1
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
...
...
@@ -1221,7 +1220,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
int32_t
order
=
TSDB_ORDER_ASC
;
...
...
@@ -1247,7 +1246,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
...
...
@@ -1262,7 +1261,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
static
SSDataBlock
*
doBuildIntervalResult
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -1384,13 +1383,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
void
doDeleteSpecifyIntervalWindow
(
SAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
SArray
*
pUpWins
,
SInterval
*
pInterval
)
{
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
tsStarts
=
(
TSKEY
*
)
pStartCol
->
pData
;
TSKEY
*
tsStarts
=
(
TSKEY
*
)
pStartCol
->
pData
;
SColumnInfoData
*
pGroupCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
groupIds
=
(
uint64_t
*
)
pGroupCol
->
pData
;
uint64_t
*
groupIds
=
(
uint64_t
*
)
pGroupCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
SResultRowInfo
dumyInfo
;
dumyInfo
.
cur
.
pageId
=
-
1
;
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsStarts
[
i
],
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
);
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsStarts
[
i
],
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
);
doDeleteIntervalWindow
(
pAggSup
,
win
.
skey
,
groupIds
[
i
]);
if
(
pUpWins
)
{
SWinRes
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
groupIds
[
i
]};
...
...
@@ -1399,8 +1399,8 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
}
}
static
void
doClearWindows
(
SAggSupporter
*
pAggSup
,
SExprSupp
*
pSup1
,
SInterval
*
pInterval
,
int32_t
numOfOutput
,
SSDataBlock
*
pBlock
,
SArray
*
pUpWins
)
{
static
void
doClearWindows
(
SAggSupporter
*
pAggSup
,
SExprSupp
*
pSup1
,
SInterval
*
pInterval
,
int32_t
numOfOutput
,
SSDataBlock
*
pBlock
,
SArray
*
pUpWins
)
{
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pTsCol
->
pData
;
uint64_t
*
pGpDatas
=
NULL
;
...
...
@@ -1415,7 +1415,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsCols
[
i
],
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
);
step
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
i
,
win
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
uint64_t
winGpId
=
pGpDatas
?
pGpDatas
[
i
]
:
pBlock
->
info
.
groupId
;
bool
res
=
doClearWindow
(
pAggSup
,
pSup1
,
(
char
*
)
&
win
.
skey
,
sizeof
(
TKEY
),
winGpId
,
numOfOutput
);
bool
res
=
doClearWindow
(
pAggSup
,
pSup1
,
(
char
*
)
&
win
.
skey
,
sizeof
(
TKEY
),
winGpId
,
numOfOutput
);
if
(
pUpWins
&&
res
)
{
SWinRes
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
taosArrayPush
(
pUpWins
,
&
winRes
);
...
...
@@ -1440,9 +1440,9 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
closeIntervalWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pSup
,
SInterval
*
pInterval
,
SHashObj
*
pPullDataMap
,
SArray
*
closeWin
s
,
SArray
*
pRecyPages
,
SDiskbasedBuf
*
pDiscBuf
)
{
static
int32_t
closeIntervalWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pSup
,
SInterval
*
pInterval
,
SHashObj
*
pPullDataMap
,
SArray
*
closeWins
,
SArray
*
pRecyPage
s
,
SDiskbasedBuf
*
pDiscBuf
)
{
void
*
pIte
=
NULL
;
size_t
keyLen
=
0
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
...
...
@@ -1497,8 +1497,8 @@ static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
SStreamFinalIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
ASSERT
(
pChInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
);
pChInfo
->
twAggSup
.
maxTs
=
TMAX
(
pChInfo
->
twAggSup
.
maxTs
,
maxTs
);
closeIntervalWindow
(
pChInfo
->
aggSup
.
pResultRowHashTable
,
&
pChInfo
->
twAggSup
,
&
pChInfo
->
interval
,
NULL
,
NULL
,
NULL
,
pChInfo
->
aggSup
.
pResultBuf
);
closeIntervalWindow
(
pChInfo
->
aggSup
.
pResultRowHashTable
,
&
pChInfo
->
twAggSup
,
&
pChInfo
->
interval
,
NULL
,
NULL
,
NULL
,
pChInfo
->
aggSup
.
pResultBuf
);
}
}
...
...
@@ -1544,7 +1544,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
return
pInfo
->
pDelRes
;
}
...
...
@@ -1559,7 +1559,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
// SResKeyPos
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
// SResKeyPos
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
...
...
@@ -1568,11 +1568,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock
(
pBlock
,
"single interval recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearWindows
(
&
pInfo
->
aggSup
,
&
pOperator
->
exprSupp
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
NULL
);
doClearWindows
(
&
pInfo
->
aggSup
,
&
pOperator
->
exprSupp
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
}
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
doDeleteSpecifyIntervalWindow
(
&
pInfo
->
aggSup
,
pBlock
,
pInfo
->
pDelWins
,
&
pInfo
->
interval
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
...
...
@@ -1597,8 +1598,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
MAIN_SCAN
,
pUpdated
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdated
,
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdated
,
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
finalizeUpdatedResult
(
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pUpdated
,
pSup
->
rowEntryInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
...
...
@@ -1617,7 +1618,7 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo
*
pInfo
=
(
SStateWindowOperatorInfo
*
)
param
;
cleanupBasicInfo
(
&
pInfo
->
binfo
);
taosMemoryFreeClear
(
pInfo
->
stateKey
.
pData
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -1626,7 +1627,7 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo
(
&
pInfo
->
binfo
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
taosArrayDestroy
(
pInfo
->
pRecycledPages
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -1650,7 +1651,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
}
}
nodesDestroyNode
((
SNode
*
)
pInfo
->
pPhyNode
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -1793,8 +1794,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo
->
pDelWins
=
taosArrayInit
(
4
,
sizeof
(
SWinRes
));
pInfo
->
delIndex
=
0
;
// pInfo->pDelRes = createPullDataBlock(); todo(liuyao) for delete
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
...
...
@@ -1962,7 +1963,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
while
(
1
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
...
...
@@ -1977,7 +1978,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
int64_t
st
=
taosGetTimestampUs
();
...
...
@@ -2006,7 +2007,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
...
...
@@ -2021,7 +2022,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
static
void
doKeepPrevRows
(
STimeSliceOperatorInfo
*
pSliceInfo
,
const
SSDataBlock
*
pBlock
,
int32_t
rowIndex
)
{
...
...
@@ -2385,13 +2386,14 @@ _error:
void
destroySWindowOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SSessionAggOperatorInfo
*
pInfo
=
(
SSessionAggOperatorInfo
*
)
param
;
cleanupBasicInfo
(
&
pInfo
->
binfo
);
taosMemoryFreeClear
(
param
);
}
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
)
{
STimeWindowAggSupp
*
pTwAggSupp
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
)
{
SSessionAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSessionAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -2470,20 +2472,20 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) {
return
p1
!=
NULL
;
}
static
void
rebuildIntervalWindow
(
SStreamFinalIntervalOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
,
SArray
*
pUpdated
)
{
static
void
rebuildIntervalWindow
(
SStreamFinalIntervalOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
,
SArray
*
pUpdated
)
{
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
if
(
!
pInfo
->
pChildren
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SWinRes
*
pWinRes
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
STimeWindow
ParentWin
=
{.
skey
=
pWinRes
->
ts
,
.
ekey
=
pWinRes
->
ts
+
1
};
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
ParentWin
,
true
,
&
pCurResult
,
pWinRes
->
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
SWinRes
*
pWinRes
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
STimeWindow
ParentWin
=
{.
skey
=
pWinRes
->
ts
,
.
ekey
=
pWinRes
->
ts
+
1
};
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
ParentWin
,
true
,
&
pCurResult
,
pWinRes
->
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
bool
find
=
true
;
bool
find
=
true
;
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SIntervalAggOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
...
...
@@ -2493,8 +2495,9 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
}
find
=
true
;
SResultRow
*
pChResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pChInfo
->
binfo
.
resultRowInfo
,
&
ParentWin
,
true
,
&
pChResult
,
pWinRes
->
groupId
,
pChildSup
->
pCtx
,
pChildSup
->
numOfExprs
,
pChildSup
->
rowEntryInfoOffset
,
&
pChInfo
->
aggSup
,
pTaskInfo
);
setTimeWindowOutputBuf
(
&
pChInfo
->
binfo
.
resultRowInfo
,
&
ParentWin
,
true
,
&
pChResult
,
pWinRes
->
groupId
,
pChildSup
->
pCtx
,
pChildSup
->
numOfExprs
,
pChildSup
->
rowEntryInfoOffset
,
&
pChInfo
->
aggSup
,
pTaskInfo
);
compactFunctions
(
pSup
->
pCtx
,
pChildSup
->
pCtx
,
numOfOutput
,
pTaskInfo
);
}
if
(
find
&&
pUpdated
)
{
...
...
@@ -2783,18 +2786,16 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
SWinRes
));
doClearWindows
(
&
pInfo
->
aggSup
,
pSup
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
pUpWins
);
doClearWindows
(
&
pInfo
->
aggSup
,
pSup
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
pUpWins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SStreamFinalIntervalOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
doClearWindows
(
&
pChildInfo
->
aggSup
,
pChildSup
,
&
pChildInfo
->
interval
,
pChildSup
->
numOfExprs
,
pBlock
,
NULL
);
rebuildIntervalWindow
(
pInfo
,
pSup
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
->
pTaskInfo
,
NULL
);
doClearWindows
(
&
pChildInfo
->
aggSup
,
pChildSup
,
&
pChildInfo
->
interval
,
pChildSup
->
numOfExprs
,
pBlock
,
NULL
);
rebuildIntervalWindow
(
pInfo
,
pSup
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
->
pTaskInfo
,
NULL
);
taosArrayDestroy
(
pUpWins
);
continue
;
}
...
...
@@ -2812,7 +2813,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
doDeleteSpecifyIntervalWindow
(
&
pChildInfo
->
aggSup
,
pBlock
,
NULL
,
&
pChildInfo
->
interval
);
rebuildIntervalWindow
(
pInfo
,
pSup
,
pInfo
->
pDelWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
->
pTaskInfo
,
pUpdated
);
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
->
pTaskInfo
,
pUpdated
);
continue
;
}
removeResults
(
pInfo
->
pDelWins
,
pUpdated
);
...
...
@@ -2862,8 +2863,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
if
(
IS_FINAL_OP
(
pInfo
))
{
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pInfo
->
pPullDataMap
,
pUpdated
,
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pInfo
->
pPullDataMap
,
pUpdated
,
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
closeChildIntervalWindow
(
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
);
}
...
...
@@ -3005,8 +3006,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo
->
pPullDataRes
=
createPullDataBlock
();
pInfo
->
ignoreExpiredData
=
pIntervalPhyNode
->
window
.
igExpired
;
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
pInfo
->
delIndex
=
0
;
pInfo
->
pDelWins
=
taosArrayInit
(
4
,
sizeof
(
SWinRes
));
...
...
@@ -3063,7 +3064,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear
(
pChInfo
);
}
}
taosMemoryFreeClear
(
param
);
}
...
...
@@ -3152,8 +3153,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo
->
pStDeleted
=
taosHashInit
(
64
,
hashFn
,
true
,
HASH_NO_LOCK
);
pInfo
->
pDelIterator
=
NULL
;
// pInfo->pDelRes = createPullDataBlock();
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
pInfo
->
pChildren
=
NULL
;
pInfo
->
isFinal
=
false
;
pInfo
->
pPhyNode
=
pPhyNode
;
...
...
@@ -3206,9 +3207,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
return
false
;
}
bool
isInWindow
(
SResultWindowInfo
*
pWinInfo
,
TSKEY
ts
,
int64_t
gap
)
{
return
isInTimeWindow
(
&
pWinInfo
->
win
,
ts
,
gap
);
}
bool
isInWindow
(
SResultWindowInfo
*
pWinInfo
,
TSKEY
ts
,
int64_t
gap
)
{
return
isInTimeWindow
(
&
pWinInfo
->
win
,
ts
,
gap
);
}
static
SResultWindowInfo
*
insertNewSessionWindow
(
SArray
*
pWinInfos
,
TSKEY
ts
,
int32_t
index
)
{
SResultWindowInfo
win
=
{.
pos
.
offset
=
-
1
,
.
pos
.
pageId
=
-
1
,
.
win
.
skey
=
ts
,
.
win
.
ekey
=
ts
,
.
isOutput
=
false
};
...
...
@@ -3234,7 +3233,7 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
// don't add new window
SResultWindowInfo
*
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
)
{
int64_t
gap
,
int32_t
*
pIndex
)
{
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
...
...
@@ -3243,7 +3242,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
return
NULL
;
}
// find the first position which is smaller than the key
int32_t
index
=
binarySearch
(
pWinInfos
,
size
,
startTs
,
TSDB_ORDER_DESC
,
getSessionWindowEndkey
);
int32_t
index
=
binarySearch
(
pWinInfos
,
size
,
startTs
,
TSDB_ORDER_DESC
,
getSessionWindowEndkey
);
SResultWindowInfo
*
pWin
=
NULL
;
if
(
index
>=
0
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
);
...
...
@@ -3514,16 +3513,15 @@ void deleteWindow(SArray* pWinInfos, int32_t index) {
static
void
doDeleteTimeWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int64_t
gap
,
SArray
*
result
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startDatas
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
TSKEY
*
startDatas
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
TSKEY
*
endDatas
=
(
TSKEY
*
)
pEndTsCol
->
pData
;
TSKEY
*
endDatas
=
(
TSKEY
*
)
pEndTsCol
->
pData
;
SColumnInfoData
*
pGroupCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
gpDatas
=
(
uint64_t
*
)
pGroupCol
->
pData
;
uint64_t
*
gpDatas
=
(
uint64_t
*
)
pGroupCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
int32_t
winIndex
=
0
;
while
(
1
)
{
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
startDatas
[
i
],
endDatas
[
i
],
gpDatas
[
i
],
gap
,
&
winIndex
);
while
(
1
)
{
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
startDatas
[
i
],
endDatas
[
i
],
gpDatas
[
i
],
gap
,
&
winIndex
);
if
(
!
pCurWin
)
{
break
;
}
...
...
@@ -3755,8 +3753,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
0
,
pWins
);
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
0
,
pWins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
...
...
@@ -3910,7 +3908,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
break
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
// gap must be 0
doDeleteTimeWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
);
doDeleteTimeWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
);
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
break
;
...
...
@@ -4398,8 +4396,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo
->
pSeDeleted
=
taosHashInit
(
64
,
hashFn
,
true
,
HASH_NO_LOCK
);
pInfo
->
pDelIterator
=
NULL
;
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
pInfo
->
pDelRes
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
// todo(liuyao) for delete
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
// todo(liuyao) for delete
pInfo
->
pChildren
=
NULL
;
pInfo
->
ignoreExpiredData
=
pStateNode
->
window
.
igExpired
;
...
...
@@ -4428,17 +4426,6 @@ _error:
return
NULL
;
}
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
SIntervalAggOperatorInfo
*
intervalAggOperatorInfo
;
bool
hasGroupId
;
uint64_t
groupId
;
SSDataBlock
*
prefetchedBlock
;
bool
inputBlocksFinished
;
SNode
*
pCondition
;
}
SMergeAlignedIntervalAggOperatorInfo
;
void
destroyMergeAlignedIntervalOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SMergeAlignedIntervalAggOperatorInfo
*
miaInfo
=
(
SMergeAlignedIntervalAggOperatorInfo
*
)
param
;
destroyIntervalOperatorInfo
(
miaInfo
->
intervalAggOperatorInfo
,
numOfOutput
);
...
...
@@ -4514,7 +4501,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currTs
=
tsCols
[
currPos
];
currWin
.
skey
=
currTs
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
iaInfo
->
interval
.
precision
)
-
1
;
startPos
=
currPos
;
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
...
...
@@ -4593,7 +4581,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
createMergeAlignedIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
primaryTsSlotId
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
)
{
SMergeAlignedIntervalAggOperatorInfo
*
miaInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMergeAlignedIntervalAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
miaInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -4683,7 +4672,7 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo
*
miaInfo
=
(
SMergeIntervalAggOperatorInfo
*
)
param
;
tdListFree
(
miaInfo
->
groupIntervals
);
destroyIntervalOperatorInfo
(
&
miaInfo
->
intervalAggOperatorInfo
,
numOfOutput
);
taosMemoryFreeClear
(
param
);
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
b542b8f4
...
...
@@ -1317,10 +1317,7 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pFill
->
pWStartTs
=
nodesCloneNode
(
pFillNode
->
pWStartTs
);
if
(
NULL
==
pFill
->
pWStartTs
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
setNodeSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pFillNode
->
pWStartTs
,
&
pFill
->
pWStartTs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pFillNode
->
pValues
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录