Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
17603ba2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
17603ba2
编写于
6月 22, 2022
作者:
S
shenglian-zhou
提交者:
GitHub
6月 22, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14098 from taosdata/szhou/feature/merge-interval-2
fix: change merge interval to merge aligned interval
上级
7f8b2601
84ea04ea
变更
11
显示空白变更内容
内联
并排
Showing
11 changed file
with
46 addition
and
44 deletion
+46
-44
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+1
-1
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+4
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+3
-3
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+26
-24
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+1
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+4
-4
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+3
-3
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+1
-1
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
17603ba2
...
@@ -227,7 +227,7 @@ typedef enum ENodeType {
...
@@ -227,7 +227,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
17603ba2
...
@@ -377,7 +377,7 @@ typedef struct SIntervalPhysiNode {
...
@@ -377,7 +377,7 @@ typedef struct SIntervalPhysiNode {
int8_t
slidingUnit
;
int8_t
slidingUnit
;
}
SIntervalPhysiNode
;
}
SIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SMergeIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SMerge
Aligned
IntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamFinalIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamFinalIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamSemiIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamSemiIntervalPhysiNode
;
...
...
source/libs/command/src/explain.c
浏览文件 @
17603ba2
...
@@ -184,8 +184,8 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
...
@@ -184,8 +184,8 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren
=
indefPhysiNode
->
node
.
pChildren
;
pPhysiChildren
=
indefPhysiNode
->
node
.
pChildren
;
break
;
break
;
}
}
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
{
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
{
SMerge
IntervalPhysiNode
*
intPhysiNode
=
(
SMerge
IntervalPhysiNode
*
)
pNode
;
SMerge
AlignedIntervalPhysiNode
*
intPhysiNode
=
(
SMergeAligned
IntervalPhysiNode
*
)
pNode
;
pPhysiChildren
=
intPhysiNode
->
window
.
node
.
pChildren
;
pPhysiChildren
=
intPhysiNode
->
window
.
node
.
pChildren
;
break
;
break
;
}
}
...
@@ -841,8 +841,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
...
@@ -841,8 +841,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
}
break
;
break
;
}
}
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
{
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
{
SMerge
IntervalPhysiNode
*
pIntNode
=
(
SMerge
IntervalPhysiNode
*
)
pNode
;
SMerge
AlignedIntervalPhysiNode
*
pIntNode
=
(
SMergeAligned
IntervalPhysiNode
*
)
pNode
;
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_INTERVAL_FORMAT
,
nodesGetNameFromColumnNode
(
pIntNode
->
window
.
pTspk
));
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_INTERVAL_FORMAT
,
nodesGetNameFromColumnNode
(
pIntNode
->
window
.
pTspk
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
if
(
pResNode
->
pExecInfo
)
{
if
(
pResNode
->
pExecInfo
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
17603ba2
...
@@ -729,7 +729,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
...
@@ -729,7 +729,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
,
bool
isStream
);
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
,
bool
isStream
);
SOperatorInfo
*
createMergeIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SOperatorInfo
*
createMerge
Aligned
IntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SExecTaskInfo
*
pTaskInfo
);
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
17603ba2
...
@@ -4190,8 +4190,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
...
@@ -4190,8 +4190,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr
=
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pTaskInfo
,
isStream
);
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pTaskInfo
,
isStream
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
==
type
)
{
SMerge
IntervalPhysiNode
*
pIntervalPhyNode
=
(
SMerge
IntervalPhysiNode
*
)
pPhyNode
;
SMerge
AlignedIntervalPhysiNode
*
pIntervalPhyNode
=
(
SMergeAligned
IntervalPhysiNode
*
)
pPhyNode
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
num
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
num
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
...
@@ -4204,7 +4204,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
...
@@ -4204,7 +4204,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
};
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
};
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
pOptr
=
createMergeIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
pTaskInfo
);
pOptr
=
createMerge
Aligned
IntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
int32_t
children
=
0
;
int32_t
children
=
0
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
17603ba2
source/libs/executor/src/timewindowoperator.c
浏览文件 @
17603ba2
...
@@ -3667,23 +3667,23 @@ _error:
...
@@ -3667,23 +3667,23 @@ _error:
return
NULL
;
return
NULL
;
}
}
typedef
struct
SMergeIntervalAggOperatorInfo
{
typedef
struct
SMerge
Aligned
IntervalAggOperatorInfo
{
SIntervalAggOperatorInfo
intervalAggOperatorInfo
;
SIntervalAggOperatorInfo
intervalAggOperatorInfo
;
bool
hasGroupId
;
bool
hasGroupId
;
uint64_t
groupId
;
uint64_t
groupId
;
SSDataBlock
*
prefetchedBlock
;
SSDataBlock
*
prefetchedBlock
;
bool
inputBlocksFinished
;
bool
inputBlocksFinished
;
}
SMergeIntervalAggOperatorInfo
;
}
SMerge
Aligned
IntervalAggOperatorInfo
;
void
destroyMergeIntervalOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
void
destroyMerge
Aligned
IntervalOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SMerge
IntervalAggOperatorInfo
*
miaInfo
=
(
SMerge
IntervalAggOperatorInfo
*
)
param
;
SMerge
AlignedIntervalAggOperatorInfo
*
miaInfo
=
(
SMergeAligned
IntervalAggOperatorInfo
*
)
param
;
destroyIntervalOperatorInfo
(
&
miaInfo
->
intervalAggOperatorInfo
,
numOfOutput
);
destroyIntervalOperatorInfo
(
&
miaInfo
->
intervalAggOperatorInfo
,
numOfOutput
);
}
}
static
int32_t
outputMergeIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
static
int32_t
outputMerge
Aligned
IntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
TSKEY
wstartTs
)
{
TSKEY
wstartTs
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SMerge
Aligned
IntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
...
@@ -3702,9 +3702,9 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t
...
@@ -3702,9 +3702,9 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t
return
0
;
return
0
;
}
}
static
void
doMergeIntervalAggImpl
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
static
void
doMerge
Aligned
IntervalAggImpl
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
,
SSDataBlock
*
pResultBlock
)
{
int32_t
scanFlag
,
SSDataBlock
*
pResultBlock
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SMerge
Aligned
IntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
...
@@ -3722,7 +3722,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
...
@@ -3722,7 +3722,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
win
.
ekey
=
win
.
ekey
=
taosTimeAdd
(
win
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
taosTimeAdd
(
win
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
// TODO: remove the hash table
usage
(groupid + winkey => result row position)
// TODO: remove the hash table (groupid + winkey => result row position)
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
@@ -3744,13 +3744,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
...
@@ -3744,13 +3744,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
outputMergeIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
outputMerge
Aligned
IntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
currTs
=
tsCols
[
currPos
];
currTs
=
tsCols
[
currPos
];
currWin
.
skey
=
currTs
;
currWin
.
skey
=
currTs
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
precision
)
-
iaInfo
->
interval
.
interval
,
1
;
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
startPos
=
currPos
;
startPos
=
currPos
;
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
...
@@ -3763,13 +3764,13 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
...
@@ -3763,13 +3764,13 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
outputMergeIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
outputMerge
Aligned
IntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
}
}
static
SSDataBlock
*
doMergeIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
static
SSDataBlock
*
doMerge
Aligned
IntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperator
->
info
;
SMerge
Aligned
IntervalAggOperatorInfo
*
miaInfo
=
pOperator
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
return
NULL
;
...
@@ -3790,6 +3791,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3790,6 +3791,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
}
else
{
}
else
{
pBlock
=
miaInfo
->
prefetchedBlock
;
pBlock
=
miaInfo
->
prefetchedBlock
;
miaInfo
->
groupId
=
pBlock
->
info
.
groupId
;
miaInfo
->
groupId
=
pBlock
->
info
.
groupId
;
miaInfo
->
prefetchedBlock
=
NULL
;
}
}
if
(
pBlock
==
NULL
)
{
if
(
pBlock
==
NULL
)
{
...
@@ -3807,7 +3809,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3807,7 +3809,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo
(
pOperator
,
&
iaInfo
->
order
,
&
scanFlag
);
getTableScanInfo
(
pOperator
,
&
iaInfo
->
order
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
iaInfo
->
order
,
scanFlag
,
true
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
iaInfo
->
order
,
scanFlag
,
true
);
doMergeIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
doMerge
Aligned
IntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
break
;
...
@@ -3826,10 +3828,10 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3826,10 +3828,10 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pRes
;
return
(
rows
==
0
)
?
NULL
:
pRes
;
}
}
SOperatorInfo
*
createMergeIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SOperatorInfo
*
createMerge
Aligned
IntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SExecTaskInfo
*
pTaskInfo
)
{
SExecTaskInfo
*
pTaskInfo
)
{
SMerge
IntervalAggOperatorInfo
*
miaInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMerge
IntervalAggOperatorInfo
));
SMerge
AlignedIntervalAggOperatorInfo
*
miaInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMergeAligned
IntervalAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
miaInfo
==
NULL
||
pOperator
==
NULL
)
{
if
(
miaInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
goto
_error
;
...
@@ -3864,8 +3866,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
...
@@ -3864,8 +3866,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
initResultRowInfo
(
&
iaInfo
->
binfo
.
resultRowInfo
);
initResultRowInfo
(
&
iaInfo
->
binfo
.
resultRowInfo
);
pOperator
->
name
=
"TimeMergeIntervalAggOperator"
;
pOperator
->
name
=
"TimeMerge
Aligned
IntervalAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
;
pOperator
->
blocking
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
...
@@ -3873,8 +3875,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
...
@@ -3873,8 +3875,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
miaInfo
;
pOperator
->
info
=
miaInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doMergeIntervalAgg
,
NULL
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doMerge
Aligned
IntervalAgg
,
NULL
,
NULL
,
destroyMergeIntervalOperatorInfo
,
NULL
,
NULL
,
NULL
);
destroyMerge
Aligned
IntervalOperatorInfo
,
NULL
,
NULL
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
@@ -3884,7 +3886,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
...
@@ -3884,7 +3886,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
return
pOperator
;
return
pOperator
;
_error:
_error:
destroyMergeIntervalOperatorInfo
(
miaInfo
,
numOfCols
);
destroyMerge
Aligned
IntervalOperatorInfo
(
miaInfo
,
numOfCols
);
taosMemoryFreeClear
(
miaInfo
);
taosMemoryFreeClear
(
miaInfo
);
taosMemoryFreeClear
(
pOperator
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
pTaskInfo
->
code
=
code
;
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
17603ba2
...
@@ -702,7 +702,7 @@ SNode* nodesCloneNode(const SNode* pNode) {
...
@@ -702,7 +702,7 @@ SNode* nodesCloneNode(const SNode* pNode) {
case
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
:
return
physiSysTableScanCopy
((
const
SSystemTableScanPhysiNode
*
)
pNode
,
(
SSystemTableScanPhysiNode
*
)
pDst
);
return
physiSysTableScanCopy
((
const
SSystemTableScanPhysiNode
*
)
pNode
,
(
SSystemTableScanPhysiNode
*
)
pDst
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
17603ba2
...
@@ -236,8 +236,8 @@ const char* nodesNodeName(ENodeType type) {
...
@@ -236,8 +236,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiSort"
;
return
"PhysiSort"
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
return
"PhysiHashInterval"
;
return
"PhysiHashInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
return
"PhysiMergeInterval"
;
return
"PhysiMerge
Aligned
Interval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
"PhysiStreamInterval"
;
return
"PhysiStreamInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
...
@@ -4128,7 +4128,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
...
@@ -4128,7 +4128,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
physiSortNodeToJson
(
pObj
,
pJson
);
return
physiSortNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
@@ -4269,7 +4269,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
...
@@ -4269,7 +4269,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
jsonToPhysiSortNode
(
pJson
,
pObj
);
return
jsonToPhysiSortNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
17603ba2
...
@@ -289,8 +289,8 @@ SNode* nodesMakeNode(ENodeType type) {
...
@@ -289,8 +289,8 @@ SNode* nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SSortPhysiNode
));
return
makeNode
(
type
,
sizeof
(
SSortPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SIntervalPhysiNode
));
return
makeNode
(
type
,
sizeof
(
SIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SMergeIntervalPhysiNode
));
return
makeNode
(
type
,
sizeof
(
SMerge
Aligned
IntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SStreamIntervalPhysiNode
));
return
makeNode
(
type
,
sizeof
(
SStreamIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
...
@@ -828,7 +828,7 @@ void nodesDestroyNode(SNode* pNode) {
...
@@ -828,7 +828,7 @@ void nodesDestroyNode(SNode* pNode) {
break
;
break
;
}
}
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
17603ba2
...
@@ -1049,7 +1049,7 @@ static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
...
@@ -1049,7 +1049,7 @@ static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
case
INTERVAL_ALGO_HASH
:
case
INTERVAL_ALGO_HASH
:
return
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
;
return
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
;
case
INTERVAL_ALGO_MERGE
:
case
INTERVAL_ALGO_MERGE
:
return
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
;
return
QUERY_NODE_PHYSICAL_PLAN_MERGE_
ALIGNED_
INTERVAL
;
case
INTERVAL_ALGO_STREAM_FINAL
:
case
INTERVAL_ALGO_STREAM_FINAL
:
return
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
;
return
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
;
case
INTERVAL_ALGO_STREAM_SEMI
:
case
INTERVAL_ALGO_STREAM_SEMI
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录