Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
11bac622
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
11bac622
编写于
7月 29, 2022
作者:
X
Xiaoyu Wang
提交者:
GitHub
7月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15523 from taosdata/feature/3.0_debug_wxy
enh: last function optimize
上级
7f5cb448
0550ab53
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
268 addition
and
120 deletion
+268
-120
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+6
-0
source/libs/command/inc/commandInt.h
source/libs/command/inc/commandInt.h
+5
-1
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+8
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-4
source/libs/executor/inc/tfill.h
source/libs/executor/inc/tfill.h
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-4
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+4
-4
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+20
-13
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+38
-40
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+2
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+21
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+2
-0
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+135
-50
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+3
-0
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+1
-1
source/libs/planner/test/planOptimizeTest.cpp
source/libs/planner/test/planOptimizeTest.cpp
+13
-0
未找到文件。
include/libs/nodes/plannodes.h
浏览文件 @
11bac622
...
...
@@ -96,6 +96,7 @@ typedef struct SScanLogicNode {
bool
groupSort
;
int8_t
cacheLastMode
;
bool
hasNormalCols
;
// neither tag column nor primary key tag column
bool
sortPrimaryKey
;
}
SScanLogicNode
;
typedef
struct
SJoinLogicNode
{
...
...
@@ -204,6 +205,7 @@ typedef struct SWindowLogicNode {
int8_t
igExpired
;
EWindowAlgorithm
windowAlgo
;
EOrder
inputTsOrder
;
EOrder
outputTsOrder
;
}
SWindowLogicNode
;
typedef
struct
SFillLogicNode
{
...
...
@@ -212,6 +214,7 @@ typedef struct SFillLogicNode {
SNode
*
pWStartTs
;
SNode
*
pValues
;
// SNodeListNode
STimeWindow
timeRange
;
EOrder
inputTsOrder
;
}
SFillLogicNode
;
typedef
struct
SSortLogicNode
{
...
...
@@ -410,6 +413,8 @@ typedef struct SWinodwPhysiNode {
int8_t
triggerType
;
int64_t
watermark
;
int8_t
igExpired
;
EOrder
inputTsOrder
;
EOrder
outputTsOrder
;
}
SWinodwPhysiNode
;
typedef
struct
SIntervalPhysiNode
{
...
...
@@ -434,6 +439,7 @@ typedef struct SFillPhysiNode {
SNode
*
pValues
;
// SNodeListNode
SNodeList
*
pTargets
;
STimeWindow
timeRange
;
EOrder
inputTsOrder
;
}
SFillPhysiNode
;
typedef
struct
SMultiTableIntervalPhysiNode
{
...
...
source/libs/command/inc/commandInt.h
浏览文件 @
11bac622
...
...
@@ -19,6 +19,8 @@
#ifdef __cplusplus
extern
"C"
{
#endif
// clang-format off
#include "nodes.h"
#include "plannodes.h"
#include "ttime.h"
...
...
@@ -77,6 +79,8 @@ extern "C" {
#define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64
#define EXPLAIN_MODE_FORMAT "mode=%s"
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
#define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s"
#define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s"
#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
...
...
@@ -122,7 +126,7 @@ typedef struct SExplainCtx {
SHashObj
*
groupHash
;
// Hash<SExplainGroup>
}
SExplainCtx
;
#define EXPLAIN_ORDER_STRING(_order) ((
TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending
")
#define EXPLAIN_ORDER_STRING(_order) ((
ORDER_ASC == _order) ? "asc" : "desc
")
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u)))
...
...
source/libs/command/src/explain.c
浏览文件 @
11bac622
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#include "commandInt.h"
#include "plannodes.h"
#include "query.h"
...
...
@@ -849,6 +850,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_FUNCTIONS_FORMAT
,
pIntNode
->
window
.
pFuncs
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pIntNode
->
window
.
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
outputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -1154,7 +1159,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_MERGE_KEYS_FORMAT
);
for
(
int32_t
i
=
0
;
i
<
LIST_LENGTH
(
pMergeNode
->
pMergeKeys
);
++
i
)
{
SOrderByExprNode
*
ptn
=
(
SOrderByExprNode
*
)
nodesListGetNode
(
pMergeNode
->
pMergeKeys
,
i
);
EXPLAIN_ROW_APPEND
(
"%s "
,
nodesGetNameFromColumnNode
(
ptn
->
pExpr
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_STRING_TYPE_FORMAT
,
nodesGetNameFromColumnNode
(
ptn
->
pExpr
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_STRING_TYPE_FORMAT
,
EXPLAIN_ORDER_STRING
(
ptn
->
order
));
}
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
11bac622
...
...
@@ -523,7 +523,8 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindow
win
;
// query time range
bool
timeWindowInterpo
;
// interpolation needed or not
SArray
*
pInterpCols
;
// interpolation columns
int32_t
order
;
// current SSDataBlock scan order
int32_t
resultTsOrder
;
// result timestamp order
int32_t
inputOrder
;
// input data ts order
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
STimeWindowAggSupp
twAggSup
;
bool
invertible
;
...
...
@@ -533,8 +534,7 @@ typedef struct SIntervalAggOperatorInfo {
SArray
*
pDelWins
;
// SWinRes
int32_t
delIndex
;
SSDataBlock
*
pDelRes
;
SNode
*
pCondition
;
SNode
*
pCondition
;
}
SIntervalAggOperatorInfo
;
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
...
...
@@ -804,7 +804,7 @@ typedef struct STagFilterOperatorInfo {
typedef
struct
SJoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
int32_t
input
Ts
Order
;
int32_t
inputOrder
;
SSDataBlock
*
pLeft
;
int32_t
leftPos
;
...
...
source/libs/executor/inc/tfill.h
浏览文件 @
11bac622
...
...
@@ -74,9 +74,9 @@ void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataB
struct
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
const
struct
SNodeListNode
*
val
);
bool
taosFillHasMoreResults
(
struct
SFillInfo
*
pFillInfo
);
SFillInfo
*
taosCreateFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
SFillInfo
*
taosCreateFillInfo
(
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
int32_t
fillType
,
struct
SFillColInfo
*
pCol
,
int32_t
slotId
,
const
char
*
id
);
int32_t
order
,
const
char
*
id
);
void
*
taosDestroyFillInfo
(
struct
SFillInfo
*
pFillInfo
);
int64_t
taosFillResultDataBlock
(
struct
SFillInfo
*
pFillInfo
,
SSDataBlock
*
p
,
int32_t
capacity
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
11bac622
...
...
@@ -3586,15 +3586,14 @@ void doDestroyExchangeOperatorInfo(void* param) {
}
static
int32_t
initFillInfo
(
SFillOperatorInfo
*
pInfo
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SNodeListNode
*
pValNode
,
STimeWindow
win
,
int32_t
capacity
,
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
)
{
STimeWindow
win
,
int32_t
capacity
,
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfCols
,
pValNode
);
STimeWindow
w
=
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
win
.
skey
);
w
=
getFirstQualifiedTimeWindow
(
win
.
skey
,
&
w
,
pInterval
,
TSDB_ORDER_ASC
);
int32_t
order
=
TSDB_ORDER_ASC
;
pInfo
->
pFillInfo
=
taosCreateFillInfo
(
order
,
w
.
skey
,
0
,
capacity
,
numOfCols
,
pInterval
,
fillType
,
pColInfo
,
pInfo
->
primaryTsCol
,
id
);
taosCreateFillInfo
(
w
.
skey
,
0
,
capacity
,
numOfCols
,
pInterval
,
fillType
,
pColInfo
,
pInfo
->
primaryTsCol
,
order
,
id
);
pInfo
->
win
=
win
;
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
...
...
@@ -3624,6 +3623,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
?
&
((
SMergeAlignedIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
intervalAggOperatorInfo
->
interval
:
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
order
=
(
pPhyFillNode
->
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
...
...
@@ -3635,7 +3635,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
int32_t
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
num
,
(
SNodeListNode
*
)
pPhyFillNode
->
pValues
,
pPhyFillNode
->
timeRange
,
pResultInfo
->
capacity
,
pTaskInfo
->
id
.
str
,
pInterval
,
type
);
pResultInfo
->
capacity
,
pTaskInfo
->
id
.
str
,
pInterval
,
type
,
order
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
11bac622
...
...
@@ -77,11 +77,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo
->
pCondAfterMerge
=
NULL
;
}
pInfo
->
input
Ts
Order
=
TSDB_ORDER_ASC
;
pInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
if
(
pJoinNode
->
inputTsOrder
==
ORDER_ASC
)
{
pInfo
->
input
Ts
Order
=
TSDB_ORDER_ASC
;
pInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
}
else
if
(
pJoinNode
->
inputTsOrder
==
ORDER_DESC
)
{
pInfo
->
input
Ts
Order
=
TSDB_ORDER_DESC
;
pInfo
->
inputOrder
=
TSDB_ORDER_DESC
;
}
pOperator
->
fpSet
=
...
...
@@ -312,7 +312,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
int32_t
nrows
=
pRes
->
info
.
rows
;
bool
asc
=
(
pJoinInfo
->
input
Ts
Order
==
TSDB_ORDER_ASC
)
?
true
:
false
;
bool
asc
=
(
pJoinInfo
->
inputOrder
==
TSDB_ORDER_ASC
)
?
true
:
false
;
while
(
1
)
{
int64_t
leftTs
=
0
;
...
...
source/libs/executor/src/tfill.c
浏览文件 @
11bac622
...
...
@@ -66,7 +66,7 @@ static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) {
static
void
doSetVal
(
SColumnInfoData
*
pDstColInfoData
,
int32_t
rowIndex
,
const
SGroupKeys
*
pKey
);
static
void
doSetUserSpecifiedValue
(
SColumnInfoData
*
pDst
,
SVariant
*
pVar
,
int32_t
rowIndex
,
int64_t
currentKey
)
{
static
void
doSetUserSpecifiedValue
(
SColumnInfoData
*
pDst
,
SVariant
*
pVar
,
int32_t
rowIndex
,
int64_t
currentKey
)
{
if
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
v
=
0
;
GET_TYPED_DATA
(
v
,
float
,
pVar
->
nType
,
&
pVar
->
i
);
...
...
@@ -184,7 +184,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
continue
;
}
SVariant
*
pVar
=
&
pFillInfo
->
pFillCol
[
i
].
fillVal
;
SVariant
*
pVar
=
&
pFillInfo
->
pFillCol
[
i
].
fillVal
;
SColumnInfoData
*
pDst
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
doSetUserSpecifiedValue
(
pDst
,
pVar
,
index
,
pFillInfo
->
currentKey
);
}
...
...
@@ -298,7 +298,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pFillInfo
->
pSrcBlock
->
pDataBlock
,
srcSlotId
);
char
*
src
=
colDataGetData
(
pSrc
,
pFillInfo
->
index
);
if
(
/*i == 0 || (*/
!
colDataIsNull_s
(
pSrc
,
pFillInfo
->
index
))
{
if
(
/*i == 0 || (*/
!
colDataIsNull_s
(
pSrc
,
pFillInfo
->
index
))
{
bool
isNull
=
colDataIsNull_s
(
pSrc
,
pFillInfo
->
index
);
colDataAppend
(
pDst
,
index
,
src
,
isNull
);
saveColData
(
pFillInfo
->
prev
,
i
,
src
,
isNull
);
...
...
@@ -313,7 +313,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
}
else
if
(
pFillInfo
->
type
==
TSDB_FILL_LINEAR
)
{
bool
isNull
=
colDataIsNull_s
(
pSrc
,
pFillInfo
->
index
);
colDataAppend
(
pDst
,
index
,
src
,
isNull
);
saveColData
(
pFillInfo
->
prev
,
i
,
src
,
isNull
);
// todo:
saveColData
(
pFillInfo
->
prev
,
i
,
src
,
isNull
);
// todo:
}
else
if
(
pFillInfo
->
type
==
TSDB_FILL_NULL
)
{
colDataAppendNULL
(
pDst
,
index
);
}
else
if
(
pFillInfo
->
type
==
TSDB_FILL_NEXT
)
{
...
...
@@ -433,9 +433,9 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
return
pFillInfo
->
numOfRows
-
pFillInfo
->
index
;
}
struct
SFillInfo
*
taosCreateFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
int32_t
fillType
,
struct
SFillColInfo
*
pCol
,
int32_t
primaryTsSlotId
,
const
char
*
id
)
{
struct
SFillInfo
*
taosCreateFillInfo
(
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
int32_t
fillType
,
struct
SFillColInfo
*
pCol
,
int32_t
primaryTsSlotId
,
int32_t
order
,
const
char
*
id
)
{
if
(
fillType
==
TSDB_FILL_NONE
)
{
return
NULL
;
}
...
...
@@ -446,10 +446,9 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
return
NULL
;
}
pFillInfo
->
order
=
order
;
pFillInfo
->
tsSlotId
=
primaryTsSlotId
;
taosResetFillInfo
(
pFillInfo
,
skey
);
pFillInfo
->
order
=
order
;
switch
(
fillType
)
{
case
FILL_MODE_NONE
:
...
...
@@ -535,6 +534,14 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
return
NULL
;
}
void
taosFillSetDataOrderInfo
(
SFillInfo
*
pFillInfo
,
int32_t
order
)
{
if
(
pFillInfo
==
NULL
||
(
order
!=
TSDB_ORDER_ASC
&&
order
!=
TSDB_ORDER_DESC
))
{
return
;
}
pFillInfo
->
order
=
order
;
}
void
taosFillSetStartInfo
(
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
)
{
if
(
pFillInfo
->
type
==
TSDB_FILL_NONE
)
{
return
;
...
...
@@ -581,7 +588,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
int64_t
numOfRes
=
-
1
;
if
(
numOfRows
>
0
)
{
// still fill gap within current data block, not generating data after the result set.
TSKEY
lastKey
=
tsList
[
pFillInfo
->
numOfRows
-
1
]
;
TSKEY
lastKey
=
(
TSDB_ORDER_ASC
==
pFillInfo
->
order
?
tsList
[
pFillInfo
->
numOfRows
-
1
]
:
tsList
[
0
])
;
numOfRes
=
taosTimeCountInterval
(
lastKey
,
pFillInfo
->
currentKey
,
pFillInfo
->
interval
.
sliding
,
pFillInfo
->
interval
.
slidingUnit
,
pFillInfo
->
interval
.
precision
);
numOfRes
+=
1
;
...
...
@@ -626,9 +633,9 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca
}
qDebug
(
"fill:%p, generated fill result, src block:%d, index:%d, brange:%"
PRId64
"-%"
PRId64
", currentKey:%"
PRId64
", current : % d, total : % d, %s"
,
pFillInfo
,
pFillInfo
->
numOfRows
,
pFillInfo
->
index
,
pFillInfo
->
start
,
pFillInfo
->
end
,
pFillInfo
->
currentKey
,
pFillInfo
->
numOfCurrent
,
pFillInfo
->
numOfTotal
,
pFillInfo
->
id
);
", current : % d, total : % d, %s"
,
pFillInfo
,
pFillInfo
->
numOfRows
,
pFillInfo
->
index
,
pFillInfo
->
start
,
pFillInfo
->
end
,
pFillInfo
->
currentKey
,
pFillInfo
->
numOfCurrent
,
pFillInfo
->
numOfTotal
,
pFillInfo
->
id
);
return
numOfRes
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
11bac622
...
...
@@ -362,7 +362,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in
static
bool
setTimeWindowInterpolationStartTs
(
SIntervalAggOperatorInfo
*
pInfo
,
int32_t
pos
,
SSDataBlock
*
pBlock
,
const
TSKEY
*
tsCols
,
STimeWindow
*
win
,
SExprSupp
*
pSup
)
{
bool
ascQuery
=
(
pInfo
->
o
rder
==
TSDB_ORDER_ASC
);
bool
ascQuery
=
(
pInfo
->
inputO
rder
==
TSDB_ORDER_ASC
);
TSKEY
curTs
=
tsCols
[
pos
];
...
...
@@ -392,7 +392,7 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
static
bool
setTimeWindowInterpolationEndTs
(
SIntervalAggOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
int32_t
endRowIndex
,
SArray
*
pDataBlock
,
const
TSKEY
*
tsCols
,
TSKEY
blockEkey
,
STimeWindow
*
win
)
{
int32_t
order
=
pInfo
->
o
rder
;
int32_t
order
=
pInfo
->
inputO
rder
;
TSKEY
actualEndKey
=
tsCols
[
endRowIndex
];
TSKEY
key
=
(
order
==
TSDB_ORDER_ASC
)
?
win
->
ekey
:
win
->
skey
;
...
...
@@ -550,7 +550,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
if
(
!
done
)
{
int32_t
endRowIndex
=
startPos
+
forwardRows
-
1
;
TSKEY
endKey
=
(
pInfo
->
o
rder
==
TSDB_ORDER_ASC
)
?
pBlock
->
info
.
window
.
ekey
:
pBlock
->
info
.
window
.
skey
;
TSKEY
endKey
=
(
pInfo
->
inputO
rder
==
TSDB_ORDER_ASC
)
?
pBlock
->
info
.
window
.
ekey
:
pBlock
->
info
.
window
.
skey
;
bool
interp
=
setTimeWindowInterpolationEndTs
(
pInfo
,
pSup
,
endRowIndex
,
pBlock
->
pDataBlock
,
tsCols
,
endKey
,
win
);
if
(
interp
)
{
setResultRowInterpo
(
pResult
,
RESULT_ROW_END_INTERP
);
...
...
@@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setNotInterpoWindowKey
(
pSup
->
pCtx
,
numOfExprs
,
RESULT_ROW_START_INTERP
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
w
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
0
,
tsCols
,
pBlock
->
info
.
rows
,
numOfExprs
,
pInfo
->
o
rder
);
numOfExprs
,
pInfo
->
inputO
rder
);
if
(
isResultRowInterpolated
(
pResult
,
RESULT_ROW_END_INTERP
))
{
closeResultRow
(
pr
);
...
...
@@ -924,11 +924,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int64_t
*
tsCols
=
extractTsCol
(
pBlock
,
pInfo
);
uint64_t
tableGroupId
=
pBlock
->
info
.
groupId
;
bool
ascScan
=
(
pInfo
->
o
rder
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
pInfo
->
inputO
rder
==
TSDB_ORDER_ASC
);
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
o
rder
);
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
inputO
rder
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
...
...
@@ -946,7 +946,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY
ekey
=
ascScan
?
win
.
ekey
:
win
.
skey
;
int32_t
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
o
rder
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
inputO
rder
);
ASSERT
(
forwardRows
>
0
);
// prev time window not interpolation yet.
...
...
@@ -969,7 +969,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
o
rder
);
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
inputO
rder
);
}
doCloseWindow
(
pResultRowInfo
,
pInfo
,
pResult
);
...
...
@@ -977,14 +977,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow
nextWin
=
win
;
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
pInfo
->
o
rder
);
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
pInfo
->
inputO
rder
);
if
(
startPos
<
0
)
{
break
;
}
if
(
pInfo
->
ignoreExpiredData
&&
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
))
{
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
o
rder
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
inputO
rder
);
continue
;
}
...
...
@@ -1002,14 +1002,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
o
rder
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
inputO
rder
);
// window start(end) key interpolation
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
nextWin
,
startPos
,
forwardRows
,
pSup
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
o
rder
);
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
inputO
rder
);
doCloseWindow
(
pResultRowInfo
,
pInfo
,
pResult
);
}
...
...
@@ -1082,7 +1082,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break
;
}
getTableScanInfo
(
pOperator
,
&
pInfo
->
o
rder
,
&
scanFlag
);
getTableScanInfo
(
pOperator
,
&
pInfo
->
inputO
rder
,
&
scanFlag
);
if
(
pInfo
->
scalarSupp
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=
&
pInfo
->
scalarSupp
;
...
...
@@ -1090,13 +1090,13 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
o
rder
,
scanFlag
,
true
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
inputO
rder
,
scanFlag
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
}
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
o
rder
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
resultTsO
rder
);
OPTR_SET_OPENED
(
pOperator
);
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
...
...
@@ -1249,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
}
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
...
...
@@ -1550,7 +1549,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
pInfo
->
o
rder
=
TSDB_ORDER_ASC
;
pInfo
->
inputO
rder
=
TSDB_ORDER_ASC
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
...
...
@@ -1610,7 +1609,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
o
rder
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
inputO
rder
,
MAIN_SCAN
,
true
);
if
(
pInfo
->
invertible
)
{
setInverFunction
(
pSup
->
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
->
info
.
type
);
}
...
...
@@ -1790,7 +1789,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
order
=
TSDB_ORDER_ASC
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
execModel
=
pTaskInfo
->
execModel
;
pInfo
->
twAggSup
=
*
pTwAggSupp
;
...
...
@@ -1807,7 +1807,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pInfo
->
primaryTsIndex
=
primaryTsSlotId
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
...
...
@@ -1879,7 +1878,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
goto
_error
;
}
pInfo
->
o
rder
=
TSDB_ORDER_ASC
;
pInfo
->
inputO
rder
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
execModel
=
OPTR_EXEC_MODEL_STREAM
;
pInfo
->
win
=
pTaskInfo
->
window
;
...
...
@@ -4593,7 +4592,7 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
bool
ascScan
=
(
iaInfo
->
o
rder
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
iaInfo
->
inputO
rder
==
TSDB_ORDER_ASC
);
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
wstartTs
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
...
...
@@ -4647,7 +4646,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
}
else
{
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
o
rder
);
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
inputO
rder
);
outputMergeAlignedIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
...
...
@@ -4666,7 +4665,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
}
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
o
rder
);
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
inputO
rder
);
outputMergeAlignedIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
}
...
...
@@ -4711,8 +4710,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
break
;
}
getTableScanInfo
(
pOperator
,
&
iaInfo
->
o
rder
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
iaInfo
->
o
rder
,
scanFlag
,
true
);
getTableScanInfo
(
pOperator
,
&
iaInfo
->
inputO
rder
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
iaInfo
->
inputO
rder
,
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
doFilter
(
miaInfo
->
pCondition
,
pRes
,
NULL
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
capacity
)
{
...
...
@@ -4753,7 +4752,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
miaInfo
->
pCondition
=
pCondition
;
iaInfo
->
win
=
pTaskInfo
->
window
;
iaInfo
->
o
rder
=
TSDB_ORDER_ASC
;
iaInfo
->
inputO
rder
=
TSDB_ORDER_ASC
;
iaInfo
->
interval
=
*
pInterval
;
iaInfo
->
execModel
=
pTaskInfo
->
execModel
;
iaInfo
->
primaryTsIndex
=
primaryTsSlotId
;
...
...
@@ -4835,7 +4834,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
bool
ascScan
=
(
iaInfo
->
o
rder
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
iaInfo
->
inputO
rder
==
TSDB_ORDER_ASC
);
SExprSupp
*
pExprSup
=
&
pOperatorInfo
->
exprSupp
;
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
win
->
skey
,
TSDB_KEYSIZE
,
tableGroupId
);
...
...
@@ -4853,7 +4852,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
bool
ascScan
=
(
iaInfo
->
o
rder
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
iaInfo
->
inputO
rder
==
TSDB_ORDER_ASC
);
SExprSupp
*
pExprSup
=
&
pOperatorInfo
->
exprSupp
;
SGroupTimeWindow
groupTimeWindow
=
{.
groupId
=
tableGroupId
,
.
window
=
*
newWin
};
...
...
@@ -4889,12 +4888,12 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
int32_t
numOfOutput
=
pExprSup
->
numOfExprs
;
int64_t
*
tsCols
=
extractTsCol
(
pBlock
,
iaInfo
);
uint64_t
tableGroupId
=
pBlock
->
info
.
groupId
;
bool
ascScan
=
(
iaInfo
->
o
rder
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
iaInfo
->
inputO
rder
==
TSDB_ORDER_ASC
);
TSKEY
blockStartTs
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
iaInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
blockStartTs
,
&
iaInfo
->
interval
,
iaInfo
->
o
rder
);
getActiveTimeWindow
(
iaInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
blockStartTs
,
&
iaInfo
->
interval
,
iaInfo
->
inputO
rder
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pExprSup
->
pCtx
,
...
...
@@ -4905,7 +4904,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
TSKEY
ekey
=
ascScan
?
win
.
ekey
:
win
.
skey
;
int32_t
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
o
rder
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
inputO
rder
);
ASSERT
(
forwardRows
>
0
);
// prev time window not interpolation yet.
...
...
@@ -4926,7 +4925,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pExprSup
->
pCtx
,
&
win
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
o
rder
);
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
inputO
rder
);
doCloseWindow
(
pResultRowInfo
,
iaInfo
,
pResult
);
// output previous interval results after this interval (&win) is closed
...
...
@@ -4935,7 +4934,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow
nextWin
=
win
;
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
iaInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
iaInfo
->
o
rder
);
startPos
=
getNextQualifiedWindow
(
&
iaInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
iaInfo
->
inputO
rder
);
if
(
startPos
<
0
)
{
break
;
}
...
...
@@ -4950,14 +4949,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
o
rder
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
inputO
rder
);
// window start(end) key interpolation
doWindowBorderInterpolation
(
iaInfo
,
pBlock
,
pResult
,
&
nextWin
,
startPos
,
forwardRows
,
pExprSup
);
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pExprSup
->
pCtx
,
&
nextWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
o
rder
);
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
inputO
rder
);
doCloseWindow
(
pResultRowInfo
,
iaInfo
,
pResult
);
// output previous interval results after this interval (&nextWin) is closed
...
...
@@ -5011,8 +5010,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
break
;
}
getTableScanInfo
(
pOperator
,
&
iaInfo
->
o
rder
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pExpSupp
->
pCtx
,
pBlock
,
iaInfo
->
o
rder
,
scanFlag
,
true
);
getTableScanInfo
(
pOperator
,
&
iaInfo
->
inputO
rder
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pExpSupp
->
pCtx
,
pBlock
,
iaInfo
->
inputO
rder
,
scanFlag
,
true
);
doMergeIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
...
...
@@ -5054,9 +5053,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
miaInfo
->
groupIntervals
=
tdListNew
(
sizeof
(
SGroupTimeWindow
));
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
iaInfo
->
win
=
pTaskInfo
->
window
;
iaInfo
->
o
rder
=
TSDB_ORDER_ASC
;
iaInfo
->
inputO
rder
=
TSDB_ORDER_ASC
;
iaInfo
->
interval
=
*
pInterval
;
iaInfo
->
execModel
=
pTaskInfo
->
execModel
;
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
11bac622
...
...
@@ -443,6 +443,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD
(
igExpired
);
COPY_SCALAR_FIELD
(
windowAlgo
);
COPY_SCALAR_FIELD
(
inputTsOrder
);
COPY_SCALAR_FIELD
(
outputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -452,6 +453,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
CLONE_NODE_FIELD
(
pWStartTs
);
CLONE_NODE_FIELD
(
pValues
);
COPY_OBJECT_FIELD
(
timeRange
,
sizeof
(
STimeWindow
));
COPY_SCALAR_FIELD
(
inputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
11bac622
...
...
@@ -1936,6 +1936,8 @@ static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
static
const
char
*
jkWindowPhysiPlanTriggerType
=
"TriggerType"
;
static
const
char
*
jkWindowPhysiPlanWatermark
=
"Watermark"
;
static
const
char
*
jkWindowPhysiPlanIgnoreExpired
=
"IgnoreExpired"
;
static
const
char
*
jkWindowPhysiPlanInputTsOrder
=
"inputTsOrder"
;
static
const
char
*
jkWindowPhysiPlanOutputTsOrder
=
"outputTsOrder"
;
static
int32_t
physiWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SWinodwPhysiNode
*
pNode
=
(
const
SWinodwPhysiNode
*
)
pObj
;
...
...
@@ -1962,6 +1964,12 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowPhysiPlanIgnoreExpired
,
pNode
->
igExpired
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowPhysiPlanOutputTsOrder
,
pNode
->
outputTsOrder
);
}
return
code
;
}
...
...
@@ -1991,6 +1999,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkWindowPhysiPlanIgnoreExpired
,
&
pNode
->
igExpired
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkWindowPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkWindowPhysiPlanOutputTsOrder
,
pNode
->
outputTsOrder
,
code
);
}
return
code
;
}
...
...
@@ -2053,6 +2067,7 @@ static const char* jkFillPhysiPlanValues = "Values";
static
const
char
*
jkFillPhysiPlanTargets
=
"Targets"
;
static
const
char
*
jkFillPhysiPlanStartTime
=
"StartTime"
;
static
const
char
*
jkFillPhysiPlanEndTime
=
"EndTime"
;
static
const
char
*
jkFillPhysiPlanInputTsOrder
=
"inputTsOrder"
;
static
int32_t
physiFillNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SFillPhysiNode
*
pNode
=
(
const
SFillPhysiNode
*
)
pObj
;
...
...
@@ -2076,6 +2091,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkFillPhysiPlanEndTime
,
pNode
->
timeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkFillPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
);
}
return
code
;
}
...
...
@@ -2103,6 +2121,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkFillPhysiPlanEndTime
,
&
pNode
->
timeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkFillPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
,
code
);
}
return
code
;
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
11bac622
...
...
@@ -632,6 +632,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
pWindow
->
igExpired
=
pCxt
->
pPlanCxt
->
igExpired
;
}
pWindow
->
inputTsOrder
=
ORDER_ASC
;
pWindow
->
outputTsOrder
=
ORDER_ASC
;
int32_t
code
=
nodesCollectFuncs
(
pSelect
,
SQL_CLAUSE_WINDOW
,
fmIsWindowClauseFunc
,
&
pWindow
->
pFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -764,6 +765,7 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pFill
->
node
.
groupAction
=
GROUP_ACTION_KEEP
;
pFill
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_IN_GROUP
;
pFill
->
node
.
resultDataOrder
=
DATA_ORDER_LEVEL_IN_GROUP
;
pFill
->
inputTsOrder
=
ORDER_ASC
;
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_WINDOW
,
NULL
,
COLLECT_COL_TYPE_ALL
,
&
pFill
->
node
.
pTargets
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
==
pFill
->
node
.
pTargets
)
{
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
11bac622
...
...
@@ -38,10 +38,13 @@ typedef struct SOptimizeRule {
FOptimize
optimizeFunc
;
}
SOptimizeRule
;
typedef
enum
EScanOrder
{
SCAN_ORDER_ASC
=
1
,
SCAN_ORDER_DESC
,
SCAN_ORDER_BOTH
}
EScanOrder
;
typedef
struct
SOsdInfo
{
SScanLogicNode
*
pScan
;
SNodeList
*
pSdrFuncs
;
SNodeList
*
pDsoFuncs
;
EScanOrder
scanOrder
;
}
SOsdInfo
;
typedef
struct
SCpdIsMultiTableCondCxt
{
...
...
@@ -97,6 +100,27 @@ static EDealRes optRebuildTbanme(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
static
void
optSetParentOrder
(
SLogicNode
*
pNode
,
EOrder
order
)
{
if
(
NULL
==
pNode
)
{
return
;
}
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
((
SWindowLogicNode
*
)
pNode
)
->
inputTsOrder
=
order
;
// window has a sorting function, and the operator behind it uses its output order
return
;
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
((
SJoinLogicNode
*
)
pNode
)
->
inputTsOrder
=
order
;
break
;
case
QUERY_NODE_LOGIC_PLAN_FILL
:
((
SFillLogicNode
*
)
pNode
)
->
inputTsOrder
=
order
;
break
;
default:
break
;
}
optSetParentOrder
(
pNode
->
pParent
,
order
);
}
EDealRes
scanPathOptHaveNormalColImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
// *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
...
...
@@ -179,16 +203,18 @@ static int32_t scanPathOptGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSd
SNodeList
*
pAllFuncs
=
scanPathOptGetAllFuncs
(
pScan
->
node
.
pParent
);
SNodeList
*
pTmpSdrFuncs
=
NULL
;
SNodeList
*
pTmpDsoFuncs
=
NULL
;
SNode
*
p
Func
=
NULL
;
SNode
*
p
Node
=
NULL
;
bool
otherFunc
=
false
;
FOREACH
(
pFunc
,
pAllFuncs
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
scanPathOptNeedOptimizeDataRequire
((
SFunctionNode
*
)
pFunc
))
{
code
=
nodesListMakeStrictAppend
(
&
pTmpSdrFuncs
,
nodesCloneNode
(
pFunc
));
}
else
if
(
scanPathOptNeedDynOptimize
((
SFunctionNode
*
)
pFunc
))
{
code
=
nodesListMakeStrictAppend
(
&
pTmpDsoFuncs
,
nodesCloneNode
(
pFunc
));
FOREACH
(
pNode
,
pAllFuncs
)
{
SFunctionNode
*
pFunc
=
(
SFunctionNode
*
)
pNode
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
scanPathOptNeedOptimizeDataRequire
(
pFunc
))
{
code
=
nodesListMakeStrictAppend
(
&
pTmpSdrFuncs
,
nodesCloneNode
(
pNode
));
}
else
if
(
scanPathOptNeedDynOptimize
(
pFunc
))
{
code
=
nodesListMakeStrictAppend
(
&
pTmpDsoFuncs
,
nodesCloneNode
(
pNode
));
}
else
{
otherFunc
=
true
;
break
;
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyList
(
pTmpSdrFuncs
);
...
...
@@ -206,12 +232,46 @@ static int32_t scanPathOptGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSd
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
scanPathOptGetScanOrder
(
SScanLogicNode
*
pScan
,
EScanOrder
*
pScanOrder
)
{
SNodeList
*
pAllFuncs
=
scanPathOptGetAllFuncs
(
pScan
->
node
.
pParent
);
SNode
*
pNode
=
NULL
;
bool
hasFirst
=
false
;
bool
hasLast
=
false
;
bool
otherFunc
=
false
;
FOREACH
(
pNode
,
pAllFuncs
)
{
SFunctionNode
*
pFunc
=
(
SFunctionNode
*
)
pNode
;
if
(
FUNCTION_TYPE_FIRST
==
pFunc
->
funcType
)
{
hasFirst
=
true
;
}
else
if
(
FUNCTION_TYPE_LAST
==
pFunc
->
funcType
)
{
hasLast
=
true
;
}
else
if
(
FUNCTION_TYPE_SELECT_VALUE
!=
pFunc
->
funcType
)
{
otherFunc
=
true
;
}
}
if
(
hasFirst
&&
hasLast
&&
!
otherFunc
)
{
*
pScanOrder
=
SCAN_ORDER_BOTH
;
}
else
if
(
hasLast
)
{
*
pScanOrder
=
SCAN_ORDER_DESC
;
}
else
{
*
pScanOrder
=
SCAN_ORDER_ASC
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
scanPathOptSetOsdInfo
(
SOsdInfo
*
pInfo
)
{
int32_t
code
=
scanPathOptGetRelatedFuncs
(
pInfo
->
pScan
,
&
pInfo
->
pSdrFuncs
,
&
pInfo
->
pDsoFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
scanPathOptGetScanOrder
(
pInfo
->
pScan
,
&
pInfo
->
scanOrder
);
}
return
code
;
}
static
int32_t
scanPathOptMatch
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SOsdInfo
*
pInfo
)
{
pInfo
->
pScan
=
(
SScanLogicNode
*
)
optFindPossibleNode
(
pLogicNode
,
scanPathOptMayBeOptimized
);
if
(
NULL
==
pInfo
->
pScan
)
{
return
TSDB_CODE_SUCCESS
;
}
return
scanPathOpt
GetRelatedFuncs
(
pInfo
->
pScan
,
&
pInfo
->
pSdrFuncs
,
&
pInfo
->
pDsoFuncs
);
return
scanPathOpt
SetOsdInfo
(
pInfo
);
}
static
EFuncDataRequired
scanPathOptPromoteDataRequired
(
EFuncDataRequired
l
,
EFuncDataRequired
r
)
{
...
...
@@ -258,15 +318,42 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
}
}
static
void
scanPathOptSetScanOrder
(
EScanOrder
scanOrder
,
SScanLogicNode
*
pScan
)
{
if
(
pScan
->
sortPrimaryKey
||
pScan
->
scanSeq
[
0
]
>
1
||
pScan
->
scanSeq
[
1
]
>
1
)
{
return
;
}
switch
(
scanOrder
)
{
case
SCAN_ORDER_ASC
:
pScan
->
scanSeq
[
0
]
=
1
;
pScan
->
scanSeq
[
1
]
=
0
;
optSetParentOrder
(
pScan
->
node
.
pParent
,
ORDER_ASC
);
break
;
case
SCAN_ORDER_DESC
:
pScan
->
scanSeq
[
0
]
=
0
;
pScan
->
scanSeq
[
1
]
=
1
;
optSetParentOrder
(
pScan
->
node
.
pParent
,
ORDER_DESC
);
break
;
case
SCAN_ORDER_BOTH
:
pScan
->
scanSeq
[
0
]
=
1
;
pScan
->
scanSeq
[
1
]
=
1
;
break
;
default:
break
;
}
}
static
int32_t
scanPathOptimize
(
SOptimizeContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
)
{
SOsdInfo
info
=
{
0
};
SOsdInfo
info
=
{
.
scanOrder
=
SCAN_ORDER_ASC
};
int32_t
code
=
scanPathOptMatch
(
pCxt
,
pLogicSubplan
->
pNode
,
&
info
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
info
.
pScan
)
{
scanPathOptSetScanWin
((
SScanLogicNode
*
)
info
.
pScan
);
scanPathOptSetScanWin
(
info
.
pScan
);
scanPathOptSetScanOrder
(
info
.
scanOrder
,
info
.
pScan
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
(
NULL
!=
info
.
pDsoFuncs
||
NULL
!=
info
.
pSdrFuncs
))
{
info
.
pScan
->
dataRequired
=
scanPathOptGetDataRequired
(
info
.
pSdrFuncs
);
info
.
pScan
->
pDynamicScanFuncs
=
info
.
pDsoFuncs
;
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
info
.
pScan
)
{
OPTIMIZE_FLAG_SET_MASK
(
info
.
pScan
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_SCAN_PATH
);
pCxt
->
optimized
=
true
;
}
...
...
@@ -987,12 +1074,13 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
}
SSortLogicNode
*
pSort
=
(
SSortLogicNode
*
)
pNode
;
if
(
pSort
->
groupSort
||
!
sortPriKeyOptIsPriKeyOrderBy
(
pSort
->
pSortKeys
)
||
1
!=
LIST_LENGTH
(
pSort
->
node
.
pChildren
))
{
return
TSDB_CODE_SUCCESS
;
return
false
;
}
return
true
;
}
static
int32_t
sortPriKeyOptGetScanNodesImpl
(
SLogicNode
*
pNode
,
bool
*
pNotOptimize
,
SNodeList
**
pScanNodes
)
{
static
int32_t
sortPriKeyOptGetSequencingNodesImpl
(
SLogicNode
*
pNode
,
bool
*
pNotOptimize
,
SNodeList
**
pSequencingNodes
)
{
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pNode
;
...
...
@@ -1000,17 +1088,19 @@ static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimi
*
pNotOptimize
=
true
;
return
TSDB_CODE_SUCCESS
;
}
return
nodesListMakeAppend
(
pS
can
Nodes
,
(
SNode
*
)
pNode
);
return
nodesListMakeAppend
(
pS
equencing
Nodes
,
(
SNode
*
)
pNode
);
}
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
{
int32_t
code
=
sortPriKeyOptGetScanNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pScan
Nodes
);
int32_t
code
=
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pSequencing
Nodes
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
sortPriKeyOptGetScanNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
),
pNotOptimize
,
pScan
Nodes
);
code
=
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
),
pNotOptimize
,
pSequencing
Nodes
);
}
return
code
;
}
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
nodesListMakeAppend
(
pSequencingNodes
,
(
SNode
*
)
pNode
);
case
QUERY_NODE_LOGIC_PLAN_AGG
:
case
QUERY_NODE_LOGIC_PLAN_PARTITION
:
*
pNotOptimize
=
true
;
...
...
@@ -1024,14 +1114,15 @@ static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimi
return
TSDB_CODE_SUCCESS
;
}
return
sortPriKeyOptGetScanNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pScanNodes
);
return
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pSequencingNodes
);
}
static
int32_t
sortPriKeyOptGetS
canNodes
(
SLogicNode
*
pNode
,
SNodeList
**
pScan
Nodes
)
{
static
int32_t
sortPriKeyOptGetS
equencingNodes
(
SLogicNode
*
pNode
,
SNodeList
**
pSequencing
Nodes
)
{
bool
notOptimize
=
false
;
int32_t
code
=
sortPriKeyOptGetS
canNodesImpl
(
pNode
,
&
notOptimize
,
pScan
Nodes
);
int32_t
code
=
sortPriKeyOptGetS
equencingNodesImpl
(
pNode
,
&
notOptimize
,
pSequencing
Nodes
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
notOptimize
)
{
nodesClearList
(
*
pS
can
Nodes
);
nodesClearList
(
*
pS
equencing
Nodes
);
}
return
code
;
}
...
...
@@ -1040,33 +1131,26 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) {
return
((
SOrderByExprNode
*
)
nodesListGetNode
(
pSort
->
pSortKeys
,
0
))
->
order
;
}
static
void
sortPriKeyOptSetParentOrder
(
SLogicNode
*
pNode
,
EOrder
order
)
{
if
(
NULL
==
pNode
)
{
return
;
}
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
))
{
((
SWindowLogicNode
*
)
pNode
)
->
inputTsOrder
=
order
;
}
else
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pNode
))
{
((
SJoinLogicNode
*
)
pNode
)
->
inputTsOrder
=
order
;
}
sortPriKeyOptSetParentOrder
(
pNode
->
pParent
,
order
);
}
static
int32_t
sortPriKeyOptApply
(
SOptimizeContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SSortLogicNode
*
pSort
,
SNodeList
*
pS
can
Nodes
)
{
SNodeList
*
pS
equencing
Nodes
)
{
EOrder
order
=
sortPriKeyOptGetPriKeyOrder
(
pSort
);
SNode
*
pScanNode
=
NULL
;
FOREACH
(
pScanNode
,
pScanNodes
)
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pScanNode
;
if
(
ORDER_DESC
==
order
&&
pScan
->
scanSeq
[
0
]
>
0
)
{
TSWAP
(
pScan
->
scanSeq
[
0
],
pScan
->
scanSeq
[
1
]);
}
if
(
TSDB_SUPER_TABLE
==
pScan
->
tableType
)
{
pScan
->
scanType
=
SCAN_TYPE_TABLE_MERGE
;
pScan
->
node
.
resultDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
pScan
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
SNode
*
pSequencingNode
=
NULL
;
FOREACH
(
pSequencingNode
,
pSequencingNodes
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pSequencingNode
))
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pSequencingNode
;
if
((
ORDER_DESC
==
order
&&
pScan
->
scanSeq
[
0
]
>
0
)
||
(
ORDER_ASC
==
order
&&
pScan
->
scanSeq
[
1
]
>
0
))
{
TSWAP
(
pScan
->
scanSeq
[
0
],
pScan
->
scanSeq
[
1
]);
}
if
(
TSDB_SUPER_TABLE
==
pScan
->
tableType
)
{
pScan
->
scanType
=
SCAN_TYPE_TABLE_MERGE
;
pScan
->
node
.
resultDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
pScan
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
}
pScan
->
sortPrimaryKey
=
true
;
}
else
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pSequencingNode
))
{
((
SWindowLogicNode
*
)
pSequencingNode
)
->
outputTsOrder
=
order
;
}
sortPriKeyOptSetParentOrder
(
pScan
->
node
.
pParent
,
order
);
optSetParentOrder
(((
SLogicNode
*
)
pSequencingNode
)
->
pParent
,
order
);
}
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
);
...
...
@@ -1083,12 +1167,13 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
}
static
int32_t
sortPrimaryKeyOptimizeImpl
(
SOptimizeContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SSortLogicNode
*
pSort
)
{
SNodeList
*
pScanNodes
=
NULL
;
int32_t
code
=
sortPriKeyOptGetScanNodes
((
SLogicNode
*
)
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
),
&
pScanNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pScanNodes
)
{
code
=
sortPriKeyOptApply
(
pCxt
,
pLogicSubplan
,
pSort
,
pScanNodes
);
SNodeList
*
pSequencingNodes
=
NULL
;
int32_t
code
=
sortPriKeyOptGetSequencingNodes
((
SLogicNode
*
)
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
),
&
pSequencingNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pSequencingNodes
)
{
code
=
sortPriKeyOptApply
(
pCxt
,
pLogicSubplan
,
pSort
,
pSequencingNodes
);
}
nodesClearList
(
pS
can
Nodes
);
nodesClearList
(
pS
equencing
Nodes
);
return
code
;
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
11bac622
...
...
@@ -1089,6 +1089,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow
->
triggerType
=
pWindowLogicNode
->
triggerType
;
pWindow
->
watermark
=
pWindowLogicNode
->
watermark
;
pWindow
->
igExpired
=
pWindowLogicNode
->
igExpired
;
pWindow
->
inputTsOrder
=
pWindowLogicNode
->
inputTsOrder
;
pWindow
->
outputTsOrder
=
pWindowLogicNode
->
outputTsOrder
;
SNodeList
*
pPrecalcExprs
=
NULL
;
SNodeList
*
pFuncs
=
NULL
;
...
...
@@ -1363,6 +1365,7 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
pFill
->
mode
=
pFillNode
->
mode
;
pFill
->
timeRange
=
pFillNode
->
timeRange
;
pFill
->
inputTsOrder
=
pFillNode
->
inputTsOrder
;
SDataBlockDescNode
*
pChildTupe
=
(((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
);
int32_t
code
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pFillNode
->
node
.
pTargets
,
&
pFill
->
pTargets
);
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
11bac622
...
...
@@ -492,7 +492,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
windowAlgo
=
INTERVAL_ALGO_MERGE
;
SNodeList
*
pMergeKeys
=
NULL
;
code
=
stbSplCreateMergeKeysByPrimaryKey
(((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
pTspk
,
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
in
putTsOrder
,
&
pMergeKeys
);
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
out
putTsOrder
,
&
pMergeKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
NULL
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pPartWindow
,
true
);
}
...
...
source/libs/planner/test/planOptimizeTest.cpp
浏览文件 @
11bac622
...
...
@@ -30,6 +30,11 @@ TEST_F(PlanOptimizeTest, scanPath) {
run
(
"SELECT COUNT(CAST(c1 AS BIGINT)) FROM t1"
);
run
(
"SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1"
);
run
(
"SELECT LAST(c1) FROM t1"
);
run
(
"SELECT LAST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) "
"FILL(LINEAR)"
);
}
TEST_F
(
PlanOptimizeTest
,
pushDownCondition
)
{
...
...
@@ -57,7 +62,15 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) {
run
(
"SELECT c1 FROM t1 ORDER BY ts DESC"
);
run
(
"SELECT c1 FROM st1 ORDER BY ts DESC"
);
run
(
"SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC"
);
run
(
"SELECT FIRST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) "
"FILL(LINEAR) ORDER BY _WSTART DESC"
);
run
(
"SELECT LAST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) "
"FILL(LINEAR) ORDER BY _WSTART"
);
}
TEST_F
(
PlanOptimizeTest
,
PartitionTags
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录