Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
55d41c8f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
55d41c8f
编写于
6月 05, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: sql command 'delete from'
上级
a4166b73
变更
18
展开全部
隐藏空白更改
内联
并排
Showing
18 changed file
with
1085 addition
and
738 deletion
+1085
-738
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+4
-2
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+69
-49
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+12
-10
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+8
-8
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+454
-496
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+12
-5
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+186
-17
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+2
-2
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+11
-8
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+16
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+2
-1
source/libs/parser/test/parInitialDTest.cpp
source/libs/parser/test/parInitialDTest.cpp
+4
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+112
-19
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+3
-3
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+103
-61
source/libs/planner/src/planScaleOut.c
source/libs/planner/src/planScaleOut.c
+40
-31
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+35
-26
source/libs/planner/test/planOtherTest.cpp
source/libs/planner/test/planOtherTest.cpp
+12
-0
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
55d41c8f
...
...
@@ -188,7 +188,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_JOIN
,
QUERY_NODE_LOGIC_PLAN_AGG
,
QUERY_NODE_LOGIC_PLAN_PROJECT
,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
Y
,
QUERY_NODE_LOGIC_PLAN_EXCHANGE
,
QUERY_NODE_LOGIC_PLAN_MERGE
,
QUERY_NODE_LOGIC_PLAN_WINDOW
,
...
...
@@ -210,7 +210,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
,
...
...
@@ -223,6 +224,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_PARTITION
,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
,
QUERY_NODE_PHYSICAL_PLAN_INSERT
,
QUERY_NODE_PHYSICAL_PLAN_DELETE
,
QUERY_NODE_PHYSICAL_SUBPLAN
,
QUERY_NODE_PHYSICAL_PLAN
}
ENodeType
;
...
...
include/libs/nodes/plannodes.h
浏览文件 @
55d41c8f
...
...
@@ -37,29 +37,30 @@ typedef struct SLogicNode {
typedef
enum
EScanType
{
SCAN_TYPE_TAG
=
1
,
SCAN_TYPE_TABLE
,
SCAN_TYPE_SYSTEM_TABLE
,
SCAN_TYPE_STREAM
}
EScanType
;
typedef
struct
SScanLogicNode
{
SLogicNode
node
;
SNodeList
*
pScanCols
;
SNodeList
*
pScanPseudoCols
;
struct
STableMeta
*
pMeta
;
SVgroupsInfo
*
pVgroupList
;
EScanType
scanType
;
uint8_t
scanSeq
[
2
];
// first is scan count, and second is reverse scan count
STimeWindow
scanRange
;
SName
tableName
;
bool
showRewrite
;
double
ratio
;
SNodeList
*
pDynamicScanFuncs
;
int32_t
dataRequired
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SNode
*
pTagCond
;
int8_t
triggerType
;
int64_t
watermark
;
int16_t
tsColId
;
double
filesFactor
;
SLogicNode
node
;
SNodeList
*
pScanCols
;
SNodeList
*
pScanPseudoCols
;
int8_t
tableType
;
uint64_t
tableId
;
SVgroupsInfo
*
pVgroupList
;
EScanType
scanType
;
uint8_t
scanSeq
[
2
];
// first is scan count, and second is reverse scan count
STimeWindow
scanRange
;
SName
tableName
;
bool
showRewrite
;
double
ratio
;
SNodeList
*
pDynamicScanFuncs
;
int32_t
dataRequired
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SNode
*
pTagCond
;
int8_t
triggerType
;
int64_t
watermark
;
int16_t
tsColId
;
double
filesFactor
;
}
SScanLogicNode
;
typedef
struct
SJoinLogicNode
{
...
...
@@ -85,12 +86,20 @@ typedef struct SProjectLogicNode {
int64_t
soffset
;
}
SProjectLogicNode
;
typedef
struct
SVnodeModifLogicNode
{
SLogicNode
node
;
int32_t
msgType
;
SArray
*
pDataBlocks
;
SVgDataBlocks
*
pVgDataBlocks
;
}
SVnodeModifLogicNode
;
typedef
enum
EModifyTableType
{
MODIFY_TABLE_TYPE_INSERT
=
1
,
MODIFY_TABLE_TYPE_DELETE
}
EModifyTableType
;
typedef
struct
SVnodeModifyLogicNode
{
SLogicNode
node
;
EModifyTableType
modifyType
;
int32_t
msgType
;
SArray
*
pDataBlocks
;
SVgDataBlocks
*
pVgDataBlocks
;
SNode
*
pModifyRows
;
// SColumnNode
uint64_t
tableId
;
int8_t
tableType
;
// table type
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
STimeWindow
deleteTimeRange
;
}
SVnodeModifyLogicNode
;
typedef
struct
SExchangeLogicNode
{
SLogicNode
node
;
...
...
@@ -106,28 +115,30 @@ typedef struct SMergeLogicNode {
typedef
enum
EWindowType
{
WINDOW_TYPE_INTERVAL
=
1
,
WINDOW_TYPE_SESSION
,
WINDOW_TYPE_STATE
}
EWindowType
;
typedef
enum
EStreamIntervalAlgorithm
{
STREAM_INTERVAL_ALGO_FINAL
=
1
,
STREAM_INTERVAL_ALGO_SEMI
,
STREAM_INTERVAL_ALGO_SINGLE
}
EStreamIntervalAlgorithm
;
typedef
enum
EIntervalAlgorithm
{
INTERVAL_ALGO_HASH
=
1
,
INTERVAL_ALGO_SORT_MERGE
,
INTERVAL_ALGO_STREAM_FINAL
,
INTERVAL_ALGO_STREAM_SEMI
,
INTERVAL_ALGO_STREAM_SINGLE
,
}
EIntervalAlgorithm
;
typedef
struct
SWindowLogicNode
{
SLogicNode
node
;
EWindowType
winType
;
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int64_t
sessionGap
;
SNode
*
pTspk
;
SNode
*
pStateExpr
;
int8_t
triggerType
;
int64_t
watermark
;
double
filesFactor
;
E
StreamIntervalAlgorithm
stmInter
Algo
;
SLogicNode
node
;
EWindowType
winType
;
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int64_t
sessionGap
;
SNode
*
pTspk
;
SNode
*
pStateExpr
;
int8_t
triggerType
;
int64_t
watermark
;
double
filesFactor
;
E
IntervalAlgorithm
interval
Algo
;
}
SWindowLogicNode
;
typedef
struct
SFillLogicNode
{
...
...
@@ -308,6 +319,7 @@ typedef struct SIntervalPhysiNode {
int8_t
slidingUnit
;
}
SIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SSortMergeIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamFinalIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamSemiIntervalPhysiNode
;
...
...
@@ -370,6 +382,14 @@ typedef struct SDataInserterNode {
char
*
pData
;
}
SDataInserterNode
;
typedef
struct
SDataDeleterNode
{
SDataSinkNode
sink
;
uint64_t
tableId
;
int8_t
tableType
;
// table type
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
STimeWindow
deleteTimeRange
;
}
SDataDeleterNode
;
typedef
struct
SSubplan
{
ENodeType
type
;
SSubplanId
id
;
// unique id of the subplan
...
...
source/libs/command/src/explain.c
浏览文件 @
55d41c8f
...
...
@@ -153,7 +153,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren
=
pSortNode
->
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
{
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
{
SIntervalPhysiNode
*
pIntNode
=
(
SIntervalPhysiNode
*
)
pNode
;
pPhysiChildren
=
pIntNode
->
window
.
node
.
pChildren
;
break
;
...
...
@@ -164,12 +164,12 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
{
SStateWinodwPhysiNode
*
pStateNode
=
(
SStateWinodwPhysiNode
*
)
pNode
;
SStateWinodwPhysiNode
*
pStateNode
=
(
SStateWinodwPhysiNode
*
)
pNode
;
pPhysiChildren
=
pStateNode
->
window
.
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_PARTITION
:
{
SPartitionPhysiNode
*
partitionPhysiNode
=
(
SPartitionPhysiNode
*
)
pNode
;
SPartitionPhysiNode
*
partitionPhysiNode
=
(
SPartitionPhysiNode
*
)
pNode
;
pPhysiChildren
=
partitionPhysiNode
->
node
.
pChildren
;
break
;
}
...
...
@@ -397,7 +397,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
int32_t
nodeNum
=
taosArrayGetSize
(
pResNode
->
pExecInfo
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
;
++
i
)
{
SExplainExecInfo
*
execInfo
=
taosArrayGet
(
pResNode
->
pExecInfo
,
i
);
SExplainExecInfo
*
execInfo
=
taosArrayGet
(
pResNode
->
pExecInfo
,
i
);
STableScanAnalyzeInfo
*
pScanInfo
=
(
STableScanAnalyzeInfo
*
)
execInfo
->
verboseInfo
;
EXPLAIN_ROW_APPEND
(
"total_blocks=%d"
,
pScanInfo
->
totalBlocks
);
...
...
@@ -429,7 +429,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_TIMERANGE_FORMAT
,
pTblScanNode
->
scanRange
.
skey
,
pTblScanNode
->
scanRange
.
ekey
);
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_TIMERANGE_FORMAT
,
pTblScanNode
->
scanRange
.
skey
,
pTblScanNode
->
scanRange
.
ekey
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
...
...
@@ -641,7 +642,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
}
SDataBlockDescNode
*
pDescNode
=
pSortNode
->
node
.
pOutputDataBlockDesc
;
SDataBlockDescNode
*
pDescNode
=
pSortNode
->
node
.
pOutputDataBlockDesc
;
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
nodesGetOutputNumFromSlotList
(
pDescNode
->
pSlots
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pDescNode
->
totalRowSize
);
...
...
@@ -667,7 +668,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
int32_t
nodeNum
=
taosArrayGetSize
(
pResNode
->
pExecInfo
);
SExplainExecInfo
*
execInfo
=
taosArrayGet
(
pResNode
->
pExecInfo
,
0
);
SSortExecInfo
*
pExecInfo
=
(
SSortExecInfo
*
)
execInfo
->
verboseInfo
;
SSortExecInfo
*
pExecInfo
=
(
SSortExecInfo
*
)
execInfo
->
verboseInfo
;
EXPLAIN_ROW_APPEND
(
"%s"
,
pExecInfo
->
sortMethod
==
SORT_QSORT_T
?
"quicksort"
:
"merge sort"
);
if
(
pExecInfo
->
sortBuffer
>
1024
*
1024
)
{
EXPLAIN_ROW_APPEND
(
" Buffers:%.2f Mb"
,
pExecInfo
->
sortBuffer
/
(
1024
*
1024
.
0
));
...
...
@@ -701,7 +702,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
{
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
{
SIntervalPhysiNode
*
pIntNode
=
(
SIntervalPhysiNode
*
)
pNode
;
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_INTERVAL_FORMAT
,
nodesGetNameFromColumnNode
(
pIntNode
->
window
.
pTspk
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
...
...
@@ -784,7 +785,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
{
SStateWinodwPhysiNode
*
pStateNode
=
(
SStateWinodwPhysiNode
*
)
pNode
;
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_STATE_WINDOW_FORMAT
,
nodesGetNameFromColumnNode
(((
STargetNode
*
)
pStateNode
->
pStateKey
)
->
pExpr
));
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_STATE_WINDOW_FORMAT
,
nodesGetNameFromColumnNode
(((
STargetNode
*
)
pStateNode
->
pStateKey
)
->
pExpr
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
...
...
@@ -823,7 +825,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case
QUERY_NODE_PHYSICAL_PLAN_PARTITION
:
{
SPartitionPhysiNode
*
pPartNode
=
(
SPartitionPhysiNode
*
)
pNode
;
SNode
*
p
=
nodesListGetNode
(
pPartNode
->
pPartitionKeys
,
0
);
SNode
*
p
=
nodesListGetNode
(
pPartNode
->
pPartitionKeys
,
0
);
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_PARITION_FORMAT
,
nodesGetNameFromColumnNode
(
p
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
if
(
pResNode
->
pExecInfo
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
55d41c8f
...
...
@@ -4052,12 +4052,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
goto
_error
;
}
pInfo
->
limit
=
*
pLimit
;
pInfo
->
slimit
=
*
pSlimit
;
pInfo
->
curOffset
=
pLimit
->
offset
;
pInfo
->
limit
=
*
pLimit
;
pInfo
->
slimit
=
*
pSlimit
;
pInfo
->
curOffset
=
pLimit
->
offset
;
pInfo
->
curSOffset
=
pSlimit
->
offset
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFilterNode
=
pCondition
;
pInfo
->
pFilterNode
=
pCondition
;
int32_t
numOfCols
=
num
;
int32_t
numOfRows
=
4096
;
...
...
@@ -4487,7 +4487,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
SLimit
limit
=
{.
limit
=
pProjPhyNode
->
limit
,
.
offset
=
pProjPhyNode
->
offset
};
SLimit
slimit
=
{.
limit
=
pProjPhyNode
->
slimit
,
.
offset
=
pProjPhyNode
->
soffset
};
pOptr
=
createProjectOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
limit
,
&
slimit
,
pProjPhyNode
->
node
.
pConditions
,
pTaskInfo
);
pOptr
=
createProjectOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
limit
,
&
slimit
,
pProjPhyNode
->
node
.
pConditions
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_AGG
==
type
)
{
SAggPhysiNode
*
pAggNode
=
(
SAggPhysiNode
*
)
pPhyNode
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pAggNode
->
pAggFuncs
,
pAggNode
->
pGroupKeys
,
&
num
);
...
...
@@ -4507,7 +4508,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr
=
createAggregateOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pScalarExprInfo
,
numOfScalarExpr
,
pTaskInfo
);
}
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
==
type
||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
==
type
||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
num
);
...
...
@@ -5187,8 +5188,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return
TSDB_CODE_SUCCESS
;
}
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
size_t
size
)
{
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
size_t
size
)
{
pSup
->
keySize
=
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
);
pSup
->
pKeyBuf
=
taosMemoryCalloc
(
1
,
pSup
->
keySize
);
pSup
->
pResultRows
=
taosArrayInit
(
1024
,
size
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
55d41c8f
此差异已折叠。
点击以展开。
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
55d41c8f
...
...
@@ -316,7 +316,8 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
CLONE_NODE_LIST_FIELD
(
pScanCols
);
CLONE_NODE_LIST_FIELD
(
pScanPseudoCols
);
CLONE_OBJECT_FIELD
(
pMeta
,
tableMetaClone
);
COPY_SCALAR_FIELD
(
tableType
);
COPY_SCALAR_FIELD
(
tableId
);
CLONE_OBJECT_FIELD
(
pVgroupList
,
vgroupsInfoClone
);
COPY_SCALAR_FIELD
(
scanType
);
COPY_OBJECT_FIELD
(
scanSeq
[
0
],
sizeof
(
uint8_t
)
*
2
);
...
...
@@ -365,9 +366,15 @@ static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode*
return
(
SNode
*
)
pDst
;
}
static
SNode
*
logicVnodeModifCopy
(
const
SVnodeModif
LogicNode
*
pSrc
,
SVnodeModif
LogicNode
*
pDst
)
{
static
SNode
*
logicVnodeModifCopy
(
const
SVnodeModif
yLogicNode
*
pSrc
,
SVnodeModify
LogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
COPY_SCALAR_FIELD
(
modifyType
);
COPY_SCALAR_FIELD
(
msgType
);
CLONE_NODE_FIELD
(
pModifyRows
);
COPY_SCALAR_FIELD
(
tableId
);
COPY_SCALAR_FIELD
(
tableType
);
COPY_CHAR_ARRAY_FIELD
(
tableFName
);
COPY_OBJECT_FIELD
(
deleteTimeRange
,
sizeof
(
STimeWindow
));
return
(
SNode
*
)
pDst
;
}
...
...
@@ -400,7 +407,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD
(
triggerType
);
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
filesFactor
);
COPY_SCALAR_FIELD
(
stmInter
Algo
);
COPY_SCALAR_FIELD
(
interval
Algo
);
return
(
SNode
*
)
pDst
;
}
...
...
@@ -542,8 +549,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return
logicAggCopy
((
const
SAggLogicNode
*
)
pNode
,
(
SAggLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
logicProjectCopy
((
const
SProjectLogicNode
*
)
pNode
,
(
SProjectLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
return
logicVnodeModifCopy
((
const
SVnodeModif
LogicNode
*
)
pNode
,
(
SVnodeModif
LogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
Y
:
return
logicVnodeModifCopy
((
const
SVnodeModif
yLogicNode
*
)
pNode
,
(
SVnodeModify
LogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
logicExchangeCopy
((
const
SExchangeLogicNode
*
)
pNode
,
(
SExchangeLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
55d41c8f
...
...
@@ -188,8 +188,8 @@ const char* nodesNodeName(ENodeType type) {
return
"LogicAgg"
;
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
"LogicProject"
;
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
return
"LogicVnodeModif"
;
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
Y
:
return
"LogicVnodeModif
y
"
;
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
"LogicExchange"
;
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
...
...
@@ -228,8 +228,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiMerge"
;
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
"PhysiSort"
;
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
"PhysiInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
return
"Physi
Hash
Interval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
"PhysiStreamInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
...
...
@@ -252,6 +252,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiDispatch"
;
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
return
"PhysiInsert"
;
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
"PhysiDelete"
;
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
"PhysiSubplan"
;
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
@@ -504,8 +506,8 @@ static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) {
static
const
char
*
jkScanLogicPlanScanCols
=
"ScanCols"
;
static
const
char
*
jkScanLogicPlanScanPseudoCols
=
"ScanPseudoCols"
;
static
const
char
*
jkScanLogicPlanTable
MetaSize
=
"TableMetaSize
"
;
static
const
char
*
jkScanLogicPlanTable
Meta
=
"TableMeta
"
;
static
const
char
*
jkScanLogicPlanTable
Id
=
"TableId
"
;
static
const
char
*
jkScanLogicPlanTable
Type
=
"TableType
"
;
static
const
char
*
jkScanLogicPlanTagCond
=
"TagCond"
;
static
int32_t
logicScanNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
@@ -519,10 +521,10 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
code
=
nodeListToJson
(
pJson
,
jkScanLogicPlanScanPseudoCols
,
pNode
->
pScanPseudoCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanLogicPlanTable
MetaSize
,
TABLE_META_SIZE
(
pNode
->
pMeta
)
);
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanLogicPlanTable
Id
,
pNode
->
tableId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAdd
Object
(
pJson
,
jkScanLogicPlanTableMeta
,
tableMetaToJson
,
pNode
->
pMeta
);
code
=
tjsonAdd
IntegerToObject
(
pJson
,
jkScanLogicPlanTableType
,
pNode
->
tableType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkScanLogicPlanTagCond
,
nodeToJson
,
pNode
->
pTagCond
);
...
...
@@ -543,10 +545,10 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
code
=
jsonToNodeList
(
pJson
,
jkScanLogicPlanScanPseudoCols
,
&
pNode
->
pScanPseudoCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGet
IntValue
(
pJson
,
jkScanLogicPlanTableMetaSize
,
&
objSize
);
code
=
tjsonGet
UBigIntValue
(
pJson
,
jkScanLogicPlanTableId
,
&
pNode
->
tableId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjson
MakeObject
(
pJson
,
jkScanLogicPlanTableMeta
,
jsonToTableMeta
,
(
void
**
)
&
pNode
->
pMeta
,
objSiz
e
);
code
=
tjson
GetTinyIntValue
(
pJson
,
jkScanLogicPlanTableType
,
&
pNode
->
tableTyp
e
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkScanLogicPlanTagCond
,
&
pNode
->
pTagCond
);
...
...
@@ -578,7 +580,7 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
code
=
tjsonAddIntegerToObject
(
pJson
,
jkProjectLogicPlanSlimit
,
pNode
->
slimit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jk
ScanLogicPlanTableMetaSize
,
pNode
->
soffset
);
code
=
tjsonAddIntegerToObject
(
pJson
,
jk
ProjectLogicPlanSoffset
,
pNode
->
soffset
);
}
return
code
;
...
...
@@ -601,7 +603,45 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
code
=
tjsonGetBigIntValue
(
pJson
,
jkProjectLogicPlanSlimit
,
&
pNode
->
slimit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkScanLogicPlanTableMetaSize
,
&
pNode
->
soffset
);
code
=
tjsonGetBigIntValue
(
pJson
,
jkProjectLogicPlanSoffset
,
&
pNode
->
soffset
);
}
return
code
;
}
static
const
char
*
jkVnodeModifyLogicPlanModifyType
=
"ModifyType"
;
static
const
char
*
jkVnodeModifyLogicPlanMsgType
=
"MsgType"
;
static
const
char
*
jkVnodeModifyLogicPlanModifyRows
=
"ModifyRows"
;
static
int32_t
logicVnodeModifyNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SVnodeModifyLogicNode
*
pNode
=
(
const
SVnodeModifyLogicNode
*
)
pObj
;
int32_t
code
=
logicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkVnodeModifyLogicPlanModifyType
,
pNode
->
modifyType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkVnodeModifyLogicPlanMsgType
,
pNode
->
msgType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkVnodeModifyLogicPlanModifyRows
,
nodeToJson
,
pNode
->
pModifyRows
);
}
return
code
;
}
static
int32_t
jsonToLogicVnodeModifyNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SVnodeModifyLogicNode
*
pNode
=
(
SVnodeModifyLogicNode
*
)
pObj
;
int32_t
code
=
jsonToLogicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkVnodeModifyLogicPlanModifyType
,
pNode
->
modifyType
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkVnodeModifyLogicPlanMsgType
,
&
pNode
->
msgType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkVnodeModifyLogicPlanModifyRows
,
&
pNode
->
pModifyRows
);
}
return
code
;
...
...
@@ -1955,6 +1995,58 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return
static
int32_t
jsonToPhysiDispatchNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
return
jsonToPhysicDataSinkNode
(
pJson
,
pObj
);
}
static
const
char
*
jkDeletePhysiPlanTableId
=
"TableId"
;
static
const
char
*
jkDeletePhysiPlanTableType
=
"TableType"
;
static
const
char
*
jkDeletePhysiPlanTableFName
=
"TableFName"
;
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeStartKey
=
"DeleteTimeRangeStartKey"
;
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeEndKey
=
"DeleteTimeRangeEndKey"
;
static
int32_t
physiDeleteNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SDataDeleterNode
*
pNode
=
(
const
SDataDeleterNode
*
)
pObj
;
int32_t
code
=
physicDataSinkNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeletePhysiPlanTableId
,
pNode
->
tableId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeletePhysiPlanTableType
,
pNode
->
tableType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddStringToObject
(
pJson
,
jkDeletePhysiPlanTableFName
,
pNode
->
tableFName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeStartKey
,
pNode
->
deleteTimeRange
.
skey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeEndKey
,
pNode
->
deleteTimeRange
.
ekey
);
}
return
code
;
}
static
int32_t
jsonToPhysiDeleteNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SDataDeleterNode
*
pNode
=
(
SDataDeleterNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicDataSinkNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUBigIntValue
(
pJson
,
jkDeletePhysiPlanTableId
,
&
pNode
->
tableId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkDeletePhysiPlanTableType
,
&
pNode
->
tableType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetStringValue
(
pJson
,
jkDeletePhysiPlanTableFName
,
pNode
->
tableFName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeStartKey
,
&
pNode
->
deleteTimeRange
.
skey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeEndKey
,
&
pNode
->
deleteTimeRange
.
ekey
);
}
return
code
;
}
static
const
char
*
jkQueryNodeAddrId
=
"Id"
;
static
const
char
*
jkQueryNodeAddrInUse
=
"InUse"
;
static
const
char
*
jkQueryNodeAddrNumOfEps
=
"NumOfEps"
;
...
...
@@ -2751,7 +2843,7 @@ static const char* jkStateWindowExpr = "StateWindowExpr";
static
int32_t
stateWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SStateWindowNode
*
pNode
=
(
const
SStateWindowNode
*
)
pObj
;
int32_t
code
=
tjsonAddObject
(
pJson
,
jkStateWindowCol
,
nodeToJson
,
pNode
->
pCol
);
int32_t
code
=
tjsonAddObject
(
pJson
,
jkStateWindowCol
,
nodeToJson
,
pNode
->
pCol
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkStateWindowExpr
,
nodeToJson
,
pNode
->
pExpr
);
}
...
...
@@ -3522,6 +3614,73 @@ static int32_t jsonToCreateTopicStmt(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkDeleteStmtFromTable
=
"FromTable"
;
static
const
char
*
jkDeleteStmtWhere
=
"Where"
;
static
const
char
*
jkDeleteStmtCountFunc
=
"CountFunc"
;
static
const
char
*
jkDeleteStmtTagIndexCond
=
"TagIndexCond"
;
static
const
char
*
jkDeleteStmtTimeRangeStartKey
=
"TimeRangeStartKey"
;
static
const
char
*
jkDeleteStmtTimeRangeEndKey
=
"TimeRangeEndKey"
;
static
const
char
*
jkDeleteStmtPrecision
=
"Precision"
;
static
const
char
*
jkDeleteStmtDeleteZeroRows
=
"DeleteZeroRows"
;
static
int32_t
deleteStmtToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SDeleteStmt
*
pNode
=
(
const
SDeleteStmt
*
)
pObj
;
int32_t
code
=
tjsonAddObject
(
pJson
,
jkDeleteStmtFromTable
,
nodeToJson
,
pNode
->
pFromTable
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkDeleteStmtWhere
,
nodeToJson
,
pNode
->
pWhere
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkDeleteStmtCountFunc
,
nodeToJson
,
pNode
->
pCountFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkDeleteStmtTagIndexCond
,
nodeToJson
,
pNode
->
pTagIndexCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeleteStmtTimeRangeStartKey
,
pNode
->
timeRange
.
skey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeleteStmtTimeRangeEndKey
,
pNode
->
timeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeleteStmtPrecision
,
pNode
->
precision
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddBoolToObject
(
pJson
,
jkDeleteStmtDeleteZeroRows
,
pNode
->
deleteZeroRows
);
}
return
code
;
}
static
int32_t
jsonToDeleteStmt
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SDeleteStmt
*
pNode
=
(
SDeleteStmt
*
)
pObj
;
int32_t
code
=
jsonToNodeObject
(
pJson
,
jkDeleteStmtFromTable
,
&
pNode
->
pFromTable
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkDeleteStmtWhere
,
&
pNode
->
pWhere
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkDeleteStmtCountFunc
,
&
pNode
->
pCountFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkDeleteStmtTagIndexCond
,
&
pNode
->
pTagIndexCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkDeleteStmtTimeRangeStartKey
,
&
pNode
->
timeRange
.
skey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkDeleteStmtTimeRangeEndKey
,
&
pNode
->
timeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUTinyIntValue
(
pJson
,
jkDeleteStmtPrecision
,
&
pNode
->
precision
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBoolValue
(
pJson
,
jkDeleteStmtDeleteZeroRows
,
&
pNode
->
deleteZeroRows
);
}
return
code
;
}
static
int32_t
specificNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
switch
(
nodeType
(
pObj
))
{
case
QUERY_NODE_COLUMN
:
...
...
@@ -3594,6 +3753,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break
;
case
QUERY_NODE_CREATE_TOPIC_STMT
:
return
createTopicStmtToJson
(
pObj
,
pJson
);
case
QUERY_NODE_DELETE_STMT
:
return
deleteStmtToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
logicScanNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
...
...
@@ -3602,8 +3763,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
logicAggNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
logicProjectNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
break
;
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
Y
:
return
logicVnodeModifyNodeToJson
(
pObj
,
pJson
)
;
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
logicExchangeNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
...
...
@@ -3640,7 +3801,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiMergeNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
physiSortNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
...
@@ -3659,6 +3820,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiDispatchNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
break
;
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
physiDeleteNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
subplanToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
@@ -3722,10 +3885,14 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToAlterDnodeStmt
(
pJson
,
pObj
);
case
QUERY_NODE_CREATE_TOPIC_STMT
:
return
jsonToCreateTopicStmt
(
pJson
,
pObj
);
case
QUERY_NODE_DELETE_STMT
:
return
jsonToDeleteStmt
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
jsonToLogicScanNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
jsonToLogicProjectNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY
:
return
jsonToLogicVnodeModifyNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
jsonToLogicExchangeNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
...
...
@@ -3762,7 +3929,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiMergeNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
jsonToPhysiSortNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
...
@@ -3779,6 +3946,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiPartitionNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
jsonToPhysiDispatchNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
jsonToPhysiDeleteNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
jsonToSubplan
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
55d41c8f
...
...
@@ -512,7 +512,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
res
=
walkWindowPhysi
((
SWinodwPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
break
;
...
...
@@ -520,7 +520,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
:
res
=
walkWindowPhysi
((
SWinodwPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
{
SStateWinodwPhysiNode
*
pState
=
(
SStateWinodwPhysiNode
*
)
pNode
;
res
=
walkWindowPhysi
((
SWinodwPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
55d41c8f
...
...
@@ -218,8 +218,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SAggLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
makeNode
(
type
,
sizeof
(
SProjectLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
return
makeNode
(
type
,
sizeof
(
SVnodeModifLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
Y
:
return
makeNode
(
type
,
sizeof
(
SVnodeModif
y
LogicNode
));
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
makeNode
(
type
,
sizeof
(
SExchangeLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
...
...
@@ -258,8 +258,10 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SMergePhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
makeNode
(
type
,
sizeof
(
SSortPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SSortMergeIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SStreamIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
...
...
@@ -282,6 +284,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SDataDispatcherNode
));
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
return
makeNode
(
type
,
sizeof
(
SDataInserterNode
));
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
makeNode
(
type
,
sizeof
(
SDataDeleterNode
));
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
makeNode
(
type
,
sizeof
(
SSubplan
));
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
@@ -561,7 +565,6 @@ void nodesDestroyNode(SNodeptr pNode) {
SScanLogicNode
*
pLogicNode
=
(
SScanLogicNode
*
)
pNode
;
destroyLogicNode
((
SLogicNode
*
)
pLogicNode
);
nodesDestroyList
(
pLogicNode
->
pScanCols
);
taosMemoryFreeClear
(
pLogicNode
->
pMeta
);
taosMemoryFreeClear
(
pLogicNode
->
pVgroupList
);
break
;
}
...
...
@@ -584,8 +587,8 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyList
(
pLogicNode
->
pProjections
);
break
;
}
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
{
SVnodeModif
LogicNode
*
pLogicNode
=
(
SVnodeModif
LogicNode
*
)
pNode
;
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
Y
:
{
SVnodeModif
yLogicNode
*
pLogicNode
=
(
SVnodeModify
LogicNode
*
)
pNode
;
destroyLogicNode
((
SLogicNode
*
)
pLogicNode
);
destroyVgDataBlockArray
(
pLogicNode
->
pDataBlocks
);
// pVgDataBlocks is weak reference
...
...
@@ -673,7 +676,7 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyNode
(
pPhyNode
->
pSortKeys
);
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_
HASH_
INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
destroyWinodwPhysiNode
((
SWinodwPhysiNode
*
)
pNode
);
break
;
...
...
@@ -1066,7 +1069,7 @@ char* nodesGetStrValueFromNode(SValueNode* pNode) {
bool
nodesIsExprNode
(
const
SNode
*
pNode
)
{
ENodeType
type
=
nodeType
(
pNode
);
return
(
QUERY_NODE_COLUMN
==
type
||
QUERY_NODE_VALUE
==
type
||
QUERY_NODE_OPERATOR
==
type
||
QUERY_NODE_FUNCTION
==
type
);
QUERY_NODE_FUNCTION
==
type
||
QUERY_NODE_LOGIC_CONDITION
==
type
);
}
bool
nodesIsUnaryOp
(
const
SOperatorNode
*
pOp
)
{
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
55d41c8f
...
...
@@ -1489,11 +1489,27 @@ SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDb
return
(
SNode
*
)
pStmt
;
}
SNode
*
createCountFuncForDelete
(
SAstCreateContext
*
pCxt
)
{
SFunctionNode
*
pFunc
=
nodesMakeNode
(
QUERY_NODE_FUNCTION
);
CHECK_OUT_OF_MEM
(
pFunc
);
strcpy
(
pFunc
->
functionName
,
"count"
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListMakeStrictAppend
(
&
pFunc
->
pParameterList
,
createPrimaryKeyCol
(
pCxt
)))
{
nodesDestroyNode
(
pFunc
);
CHECK_OUT_OF_MEM
(
NULL
);
}
return
(
SNode
*
)
pFunc
;
}
SNode
*
createDeleteStmt
(
SAstCreateContext
*
pCxt
,
SNode
*
pTable
,
SNode
*
pWhere
)
{
CHECK_PARSER_STATUS
(
pCxt
);
SDeleteStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_DELETE_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
pStmt
->
pFromTable
=
pTable
;
pStmt
->
pWhere
=
pWhere
;
pStmt
->
pCountFunc
=
createCountFuncForDelete
(
pCxt
);
if
(
NULL
==
pStmt
->
pCountFunc
)
{
nodesDestroyNode
(
pStmt
);
CHECK_OUT_OF_MEM
(
NULL
);
}
return
(
SNode
*
)
pStmt
;
}
source/libs/parser/src/parTranslater.c
浏览文件 @
55d41c8f
...
...
@@ -938,7 +938,7 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if
(
hasInvalidFuncNesting
(
pFunc
->
pParameterList
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_AGG_FUNC_NESTING
);
}
if
(
pCxt
->
pCurrSelectStmt
->
hasIndefiniteRowsFunc
)
{
if
(
NULL
!=
pCxt
->
pCurrSelectStmt
&&
pCxt
->
pCurrSelectStmt
->
hasIndefiniteRowsFunc
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_NOT_ALLOWED_FUNC
);
}
...
...
@@ -2124,6 +2124,7 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
code
=
translateDeleteWhere
(
pCxt
,
pDelete
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pCxt
->
currClause
=
SQL_CLAUSE_SELECT
;
code
=
translateExpr
(
pCxt
,
&
pDelete
->
pCountFunc
);
}
return
code
;
...
...
source/libs/parser/test/parInitialDTest.cpp
浏览文件 @
55d41c8f
...
...
@@ -28,6 +28,10 @@ TEST_F(ParserInitialDTest, delete) {
run
(
"DELETE FROM t1"
);
run
(
"DELETE FROM t1 WHERE ts > now - 2d and ts < now - 1d"
);
run
(
"DELETE FROM st1"
);
run
(
"DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10"
);
}
TEST_F
(
ParserInitialDTest
,
deleteSemanticCheck
)
{
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
55d41c8f
...
...
@@ -98,7 +98,15 @@ static int32_t rewriteExprForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESq
return
cxt
.
errCode
;
}
static
int32_t
rewriteExpr
(
SNodeList
*
pExprs
,
SNode
**
pTarget
)
{
nodesWalkExprs
(
pExprs
,
doNameExpr
,
NULL
);
SRewriteExprCxt
cxt
=
{.
errCode
=
TSDB_CODE_SUCCESS
,
.
pExprs
=
pExprs
};
nodesRewriteExpr
(
pTarget
,
doRewriteExpr
,
&
cxt
);
return
cxt
.
errCode
;
}
static
int32_t
rewriteExprs
(
SNodeList
*
pExprs
,
SNodeList
*
pTarget
)
{
nodesWalkExprs
(
pExprs
,
doNameExpr
,
NULL
);
SRewriteExprCxt
cxt
=
{.
errCode
=
TSDB_CODE_SUCCESS
,
.
pExprs
=
pExprs
};
nodesRewriteExprs
(
pTarget
,
doRewriteExpr
,
&
cxt
);
return
cxt
.
errCode
;
...
...
@@ -141,7 +149,7 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
}
static
EScanType
getScanType
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pScanPseudoCols
,
SNodeList
*
pScanCols
,
STableMeta
*
pMeta
)
{
int8_t
tableType
)
{
if
(
pCxt
->
pPlanCxt
->
topicQuery
||
pCxt
->
pPlanCxt
->
streamQuery
)
{
return
SCAN_TYPE_STREAM
;
}
...
...
@@ -151,7 +159,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
return
NULL
==
pScanPseudoCols
?
SCAN_TYPE_TABLE
:
SCAN_TYPE_TAG
;
}
if
(
TSDB_SYSTEM_TABLE
==
pMeta
->
tableType
)
{
if
(
TSDB_SYSTEM_TABLE
==
tableType
)
{
return
SCAN_TYPE_SYSTEM_TABLE
;
}
...
...
@@ -204,16 +212,18 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, SNodeList** pCols) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createScanLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SRealTableNode
*
pRealTable
,
SLogicNode
**
pLogicNode
)
{
static
int32_t
makeScanLogicNode
(
SLogicPlanContext
*
pCxt
,
SRealTableNode
*
pRealTable
,
bool
hasRepeatScanFuncs
,
SLogicNode
**
pLogicNode
)
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_SCAN
);
if
(
NULL
==
pScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
TSWAP
(
pScan
->
pMeta
,
pRealTable
->
pMeta
);
//
TSWAP(pScan->pMeta, pRealTable->pMeta);
TSWAP
(
pScan
->
pVgroupList
,
pRealTable
->
pVgroupList
);
pScan
->
scanSeq
[
0
]
=
pSelect
->
hasRepeatScanFuncs
?
2
:
1
;
pScan
->
tableId
=
pRealTable
->
pMeta
->
uid
;
pScan
->
tableType
=
pRealTable
->
pMeta
->
tableType
;
pScan
->
scanSeq
[
0
]
=
hasRepeatScanFuncs
?
2
:
1
;
pScan
->
scanSeq
[
1
]
=
0
;
pScan
->
scanRange
=
TSWINDOW_INITIALIZER
;
pScan
->
tableName
.
type
=
TSDB_TABLE_NAME_T
;
...
...
@@ -224,9 +234,21 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan
->
ratio
=
pRealTable
->
ratio
;
pScan
->
dataRequired
=
FUNC_DATA_REQUIRED_DATA_LOAD
;
*
pLogicNode
=
(
SLogicNode
*
)
pScan
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createScanLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SRealTableNode
*
pRealTable
,
SLogicNode
**
pLogicNode
)
{
SScanLogicNode
*
pScan
=
NULL
;
int32_t
code
=
makeScanLogicNode
(
pCxt
,
pRealTable
,
pSelect
->
hasRepeatScanFuncs
,
(
SLogicNode
**
)
&
pScan
);
// set columns to scan
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_FROM
,
pRealTable
->
table
.
tableAlias
,
COLLECT_COL_TYPE_COL
,
&
pScan
->
pScanCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_FROM
,
pRealTable
->
table
.
tableAlias
,
COLLECT_COL_TYPE_COL
,
&
pScan
->
pScanCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_FROM
,
pRealTable
->
table
.
tableAlias
,
COLLECT_COL_TYPE_TAG
,
...
...
@@ -242,10 +264,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code
=
rewriteExprForSelect
(
pScan
->
pScanPseudoCols
,
pSelect
,
SQL_CLAUSE_FROM
);
}
pScan
->
scanType
=
getScanType
(
pCxt
,
pScan
->
pScanPseudoCols
,
pScan
->
pScanCols
,
pScan
->
pMeta
);
pScan
->
scanType
=
getScanType
(
pCxt
,
pScan
->
pScanPseudoCols
,
pScan
->
pScanCols
,
pScan
->
tableType
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addPrimaryKeyCol
(
pScan
->
pMeta
->
ui
d
,
&
pScan
->
pScanCols
);
code
=
addPrimaryKeyCol
(
pScan
->
tableI
d
,
&
pScan
->
pScanCols
);
}
// set output
...
...
@@ -518,7 +540,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow
->
sliding
=
(
NULL
!=
pInterval
->
pSliding
?
((
SValueNode
*
)
pInterval
->
pSliding
)
->
datum
.
i
:
pWindow
->
interval
);
pWindow
->
slidingUnit
=
(
NULL
!=
pInterval
->
pSliding
?
((
SValueNode
*
)
pInterval
->
pSliding
)
->
unit
:
pWindow
->
intervalUnit
);
pWindow
->
stmInterAlgo
=
STREAM_INTERVAL_ALGO_SINGLE
;
pWindow
->
intervalAlgo
=
pCxt
->
pPlanCxt
->
streamQuery
?
INTERVAL_ALGO_STREAM_SINGLE
:
INTERVAL_ALGO_HASH
;
pWindow
->
pTspk
=
nodesCloneNode
(
pInterval
->
pCol
);
if
(
NULL
==
pWindow
->
pTspk
)
{
...
...
@@ -946,10 +968,11 @@ static int32_t getMsgType(ENodeType sqlType) {
}
static
int32_t
createVnodeModifLogicNode
(
SLogicPlanContext
*
pCxt
,
SVnodeModifOpStmt
*
pStmt
,
SLogicNode
**
pLogicNode
)
{
SVnodeModif
LogicNode
*
pModif
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
);
SVnodeModif
yLogicNode
*
pModif
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY
);
if
(
NULL
==
pModif
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pModif
->
modifyType
=
MODIFY_TABLE_TYPE_INSERT
;
TSWAP
(
pModif
->
pDataBlocks
,
pStmt
->
pDataBlocks
);
pModif
->
msgType
=
getMsgType
(
pStmt
->
sqlNodeType
);
*
pLogicNode
=
(
SLogicNode
*
)
pModif
;
...
...
@@ -962,26 +985,96 @@ static int32_t createDeleteRootLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p
}
static
int32_t
createDeleteScanLogicNode
(
SLogicPlanContext
*
pCxt
,
SDeleteStmt
*
pDelete
,
SLogicNode
**
pLogicNode
)
{
return
TSDB_CODE_FAILED
;
SScanLogicNode
*
pScan
=
NULL
;
int32_t
code
=
makeScanLogicNode
(
pCxt
,
(
SRealTableNode
*
)
pDelete
->
pFromTable
,
false
,
(
SLogicNode
**
)
&
pScan
);
// set columns to scan
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScan
->
scanType
=
SCAN_TYPE_TABLE
;
pScan
->
pScanCols
=
nodesCloneList
(((
SFunctionNode
*
)
pDelete
->
pCountFunc
)
->
pParameterList
);
if
(
NULL
==
pScan
->
pScanCols
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pDelete
->
pTagIndexCond
)
{
pScan
->
pTagCond
=
nodesCloneNode
(
pDelete
->
pTagIndexCond
);
if
(
NULL
==
pScan
->
pTagCond
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
// set output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
pScan
->
pScanCols
,
&
pScan
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicNode
=
(
SLogicNode
*
)
pScan
;
}
else
{
nodesDestroyNode
(
pScan
);
}
return
code
;
}
static
int32_t
createDeleteAggLogicNode
(
SLogicPlanContext
*
pCxt
,
SDeleteStmt
*
pDelete
,
SLogicNode
**
pLogicNode
)
{
return
TSDB_CODE_FAILED
;
SAggLogicNode
*
pAgg
=
(
SAggLogicNode
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_AGG
);
if
(
NULL
==
pAgg
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
nodesListMakeStrictAppend
(
&
pAgg
->
pAggFuncs
,
nodesCloneNode
(
pDelete
->
pCountFunc
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExpr
(
pAgg
->
pAggFuncs
,
&
pDelete
->
pCountFunc
);
}
// set the output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
pAgg
->
pAggFuncs
,
&
pAgg
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicNode
=
(
SLogicNode
*
)
pAgg
;
}
else
{
nodesDestroyNode
(
pAgg
);
}
return
code
;
}
static
int32_t
createDeleteModifyTableLogicNode
(
SLogicPlanContext
*
pCxt
,
SDeleteStmt
*
pDelete
,
SLogicNode
**
pLogicNode
)
{
return
TSDB_CODE_FAILED
;
static
int32_t
createVnodeModifLogicNodeByDelete
(
SLogicPlanContext
*
pCxt
,
SDeleteStmt
*
pDelete
,
SLogicNode
**
pLogicNode
)
{
SVnodeModifyLogicNode
*
pModify
=
(
SVnodeModifyLogicNode
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY
);
if
(
NULL
==
pModify
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SRealTableNode
*
pRealTable
=
(
SRealTableNode
*
)
pDelete
->
pFromTable
;
pModify
->
modifyType
=
MODIFY_TABLE_TYPE_DELETE
;
pModify
->
tableId
=
pRealTable
->
pMeta
->
uid
;
pModify
->
tableType
=
pRealTable
->
pMeta
->
tableType
;
snprintf
(
pModify
->
tableFName
,
sizeof
(
pModify
->
tableFName
),
"%d.%s.%s"
,
pCxt
->
pPlanCxt
->
acctId
,
pRealTable
->
table
.
dbName
,
pRealTable
->
table
.
tableName
);
pModify
->
deleteTimeRange
=
pDelete
->
timeRange
;
pModify
->
pModifyRows
=
nodesCloneNode
(
pDelete
->
pCountFunc
);
if
(
NULL
==
pModify
->
pModifyRows
)
{
nodesDestroyNode
(
pModify
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pLogicNode
=
(
SLogicNode
*
)
pModify
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createDeleteLogicNode
(
SLogicPlanContext
*
pCxt
,
SDeleteStmt
*
pDelete
,
SLogicNode
**
pLogicNode
)
{
SLogicNode
*
pRoot
=
NULL
;
int32_t
code
=
createDelete
RootLogicNode
(
pCxt
,
pDelete
,
createDeleteScanLogicNod
e
,
&
pRoot
);
int32_t
code
=
createDelete
ScanLogicNode
(
pCxt
,
pDelet
e
,
&
pRoot
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createDeleteRootLogicNode
(
pCxt
,
pDelete
,
createDeleteAggLogicNode
,
&
pRoot
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createDeleteRootLogicNode
(
pCxt
,
pDelete
,
create
DeleteModifyTableLogicNod
e
,
&
pRoot
);
code
=
createDeleteRootLogicNode
(
pCxt
,
pDelete
,
create
VnodeModifLogicNodeByDelet
e
,
&
pRoot
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
55d41c8f
...
...
@@ -99,7 +99,7 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return
false
;
}
// todo: release after function splitting
if
(
TSDB_SUPER_TABLE
==
((
SScanLogicNode
*
)
pNode
)
->
pMeta
->
tableType
&&
if
(
TSDB_SUPER_TABLE
==
((
SScanLogicNode
*
)
pNode
)
->
tableType
&&
SCAN_TYPE_STREAM
!=
((
SScanLogicNode
*
)
pNode
)
->
scanType
)
{
return
false
;
}
...
...
@@ -328,7 +328,7 @@ static int32_t cpdApplyTagIndex(SScanLogicNode* pScan, SNode** pTagCond, SNode**
static
int32_t
cpdOptimizeScanCondition
(
SOptimizeContext
*
pCxt
,
SScanLogicNode
*
pScan
)
{
if
(
NULL
==
pScan
->
node
.
pConditions
||
OPTIMIZE_FLAG_TEST_MASK
(
pScan
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_CPD
)
||
TSDB_SYSTEM_TABLE
==
pScan
->
pMeta
->
tableType
)
{
TSDB_SYSTEM_TABLE
==
pScan
->
tableType
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -662,7 +662,7 @@ static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeL
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
if
(
TSDB_SUPER_TABLE
!=
((
SScanLogicNode
*
)
pNode
)
->
pMeta
->
tableType
)
{
if
(
TSDB_SUPER_TABLE
!=
((
SScanLogicNode
*
)
pNode
)
->
tableType
)
{
return
nodesListMakeAppend
(
pScanNodes
,
pNode
);
}
break
;
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
55d41c8f
...
...
@@ -342,18 +342,7 @@ static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i
return
TSDB_CODE_SUCCESS
;
}
static
uint8_t
getPrecision
(
SNodeList
*
pChildren
)
{
if
(
1
==
LIST_LENGTH
(
pChildren
))
{
return
(((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
)
->
precision
;
}
else
if
(
2
==
LIST_LENGTH
(
pChildren
))
{
uint8_t
lp
=
(((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
)
->
precision
;
uint8_t
rp
=
(((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
1
))
->
pOutputDataBlockDesc
)
->
precision
;
return
(
lp
>
rp
?
rp
:
lp
);
}
return
0
;
}
static
SPhysiNode
*
makePhysiNode
(
SPhysiPlanContext
*
pCxt
,
uint8_t
precision
,
SLogicNode
*
pLogicNode
,
ENodeType
type
)
{
static
SPhysiNode
*
makePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
ENodeType
type
)
{
SPhysiNode
*
pPhysiNode
=
(
SPhysiNode
*
)
nodesMakeNode
(
type
);
if
(
NULL
==
pPhysiNode
)
{
return
NULL
;
...
...
@@ -364,7 +353,7 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, uint8_t precision, SLo
nodesDestroyNode
(
pPhysiNode
);
return
NULL
;
}
pPhysiNode
->
pOutputDataBlockDesc
->
precision
=
precision
;
pPhysiNode
->
pOutputDataBlockDesc
->
precision
=
p
LogicNode
->
p
recision
;
return
pPhysiNode
;
}
...
...
@@ -435,8 +424,8 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScanPhysiNode
->
uid
=
pScanLogicNode
->
pMeta
->
ui
d
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
pMeta
->
tableType
;
pScanPhysiNode
->
uid
=
pScanLogicNode
->
tableI
d
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
tableType
;
memcpy
(
&
pScanPhysiNode
->
tableName
,
&
pScanLogicNode
->
tableName
,
sizeof
(
SName
));
if
(
NULL
!=
pScanLogicNode
->
pTagCond
)
{
pSubplan
->
pTagCond
=
nodesCloneNode
(
pScanLogicNode
->
pTagCond
);
...
...
@@ -462,8 +451,8 @@ static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAdd
static
int32_t
createTagScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
SPhysiNode
**
pPhyNode
)
{
STagScanPhysiNode
*
pTagScan
=
(
STagScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
pScanLogicNode
->
pMeta
->
tableInfo
.
precision
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
);
STagScanPhysiNode
*
pTagScan
=
(
STagScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
);
if
(
NULL
==
pTagScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -476,8 +465,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
static
int32_t
createTableScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
SPhysiNode
**
pPhyNode
)
{
STableScanPhysiNode
*
pTableScan
=
(
STableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
pScanLogicNode
->
pMeta
->
tableInfo
.
precision
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
(
STableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
if
(
NULL
==
pTableScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -515,9 +503,8 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
static
int32_t
createSystemTableScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SSystemTableScanPhysiNode
*
pScan
=
(
SSystemTableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
pScanLogicNode
->
pMeta
->
tableInfo
.
precision
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
);
SSystemTableScanPhysiNode
*
pScan
=
(
SSystemTableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
);
if
(
NULL
==
pScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -567,8 +554,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
static
int32_t
createJoinPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SJoinLogicNode
*
pJoinLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pJoinLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_JOIN
);
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pJoinLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_JOIN
);
if
(
NULL
==
pJoin
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -750,8 +737,7 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi
static
int32_t
createAggPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SAggLogicNode
*
pAggLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SAggPhysiNode
*
pAgg
=
(
SAggPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pAggLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_AGG
);
SAggPhysiNode
*
pAgg
=
(
SAggPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pAggLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_AGG
);
if
(
NULL
==
pAgg
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -806,8 +792,8 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
static
int32_t
createProjectPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SProjectLogicNode
*
pProjectLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SProjectPhysiNode
*
pProject
=
(
SProjectPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
)
,
(
SLogicNode
*
)
pProjectLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
);
SProjectPhysiNode
*
pProject
=
(
SProjectPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pProjectLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
);
if
(
NULL
==
pProject
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -838,8 +824,8 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
static
int32_t
doCreateExchangePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SExchangeLogicNode
*
pExchangeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
makePhysiNode
(
pCxt
,
pExchangeLogicNode
->
node
.
precision
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
);
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -852,8 +838,8 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
static
int32_t
createStreamScanPhysiNodeByExchange
(
SPhysiPlanContext
*
pCxt
,
SExchangeLogicNode
*
pExchangeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SScanPhysiNode
*
pScan
=
(
SScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
pExchangeLogicNode
->
node
.
precision
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
SScanPhysiNode
*
pScan
=
(
SScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
if
(
NULL
==
pScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -933,22 +919,28 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
return
code
;
}
static
ENodeType
getIntervalOperatorType
(
bool
streamQuery
,
EStreamIntervalAlgorithm
stmAlgo
)
{
if
(
streamQuery
)
{
return
STREAM_INTERVAL_ALGO_FINAL
==
stmAlgo
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
(
STREAM_INTERVAL_ALGO_SEMI
==
stmAlgo
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
);
}
else
{
return
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
;
static
ENodeType
getIntervalOperatorType
(
EIntervalAlgorithm
intervalAlgo
)
{
switch
(
intervalAlgo
)
{
case
INTERVAL_ALGO_HASH
:
return
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
;
case
INTERVAL_ALGO_SORT_MERGE
:
return
QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL
;
case
INTERVAL_ALGO_STREAM_FINAL
:
return
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
;
case
INTERVAL_ALGO_STREAM_SEMI
:
return
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
;
case
INTERVAL_ALGO_STREAM_SINGLE
:
return
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
;
default:
break
;
}
return
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
;
}
static
int32_t
createIntervalPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SIntervalPhysiNode
*
pInterval
=
(
SIntervalPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
getIntervalOperatorType
(
pCxt
->
pPlanCxt
->
streamQuery
,
pWindowLogicNode
->
stmInterAlgo
));
pCxt
,
(
SLogicNode
*
)
pWindowLogicNode
,
getIntervalOperatorType
(
pWindowLogicNode
->
intervalAlgo
));
if
(
NULL
==
pInterval
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -965,7 +957,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
static
int32_t
createSessionWindowPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SSessionWinodwPhysiNode
*
pSession
=
(
SSessionWinodwPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
pCxt
,
(
SLogicNode
*
)
pWindowLogicNode
,
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
:
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
));
if
(
NULL
==
pSession
)
{
...
...
@@ -979,10 +971,10 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
static
int32_t
createStateWindowPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SStateWinodwPhysiNode
*
pState
=
(
SStateWinodwPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
)
,
(
SLogicNode
*
)
pWindowLogicNode
,
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
));
SStateWinodwPhysiNode
*
pState
=
(
SStateWinodwPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pWindowLogicNode
,
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
));
if
(
NULL
==
pState
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -1032,8 +1024,8 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
static
int32_t
createSortPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SSortLogicNode
*
pSortLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SSortPhysiNode
*
pSort
=
(
SSortPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pSortLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_SORT
);
SSortPhysiNode
*
pSort
=
(
SSortPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pSortLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_SORT
);
if
(
NULL
==
pSort
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -1073,8 +1065,8 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
static
int32_t
createPartitionPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SPartitionLogicNode
*
pPartLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SPartitionPhysiNode
*
pPart
=
(
SPartitionPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
)
,
(
SLogicNode
*
)
pPartLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_PARTITION
);
SPartitionPhysiNode
*
pPart
=
(
SPartitionPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pPartLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_PARTITION
);
if
(
NULL
==
pPart
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -1114,8 +1106,7 @@ static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
static
int32_t
createFillPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SFillLogicNode
*
pFillNode
,
SPhysiNode
**
pPhyNode
)
{
SFillPhysiNode
*
pFill
=
(
SFillPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pFillNode
,
QUERY_NODE_PHYSICAL_PLAN_FILL
);
SFillPhysiNode
*
pFill
=
(
SFillPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pFillNode
,
QUERY_NODE_PHYSICAL_PLAN_FILL
);
if
(
NULL
==
pFill
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -1168,8 +1159,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
}
static
int32_t
createMergePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SMergeLogicNode
*
pMergeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SMergePhysiNode
*
pMerge
=
(
SMergePhysiNode
*
)
makePhysiNode
(
pCxt
,
pMergeLogicNode
->
node
.
precision
,
(
SLogicNode
*
)
pMergeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
);
SMergePhysiNode
*
pMerge
=
(
SMergePhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pMergeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
);
if
(
NULL
==
pMerge
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -1308,6 +1299,62 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
return
pSubplan
;
}
static
int32_t
buildInsertSubplan
(
SPhysiPlanContext
*
pCxt
,
SVnodeModifyLogicNode
*
pModify
,
SSubplan
*
pSubplan
)
{
pSubplan
->
msgType
=
pModify
->
msgType
;
pSubplan
->
execNode
.
epSet
=
pModify
->
pVgDataBlocks
->
vg
.
epSet
;
SQueryNodeLoad
node
=
{.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
node
);
return
createDataInserter
(
pCxt
,
pModify
->
pVgDataBlocks
,
&
pSubplan
->
pDataSink
);
}
static
int32_t
createDataDeleter
(
SPhysiPlanContext
*
pCxt
,
SVnodeModifyLogicNode
*
pModify
,
const
SPhysiNode
*
pRoot
,
SDataSinkNode
**
pSink
)
{
SDataDeleterNode
*
pDeleter
=
nodesMakeNode
(
QUERY_NODE_PHYSICAL_PLAN_DELETE
);
if
(
NULL
==
pDeleter
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pDeleter
->
tableId
=
pModify
->
tableId
;
pDeleter
->
tableType
=
pModify
->
tableType
;
strcpy
(
pDeleter
->
tableFName
,
pModify
->
tableFName
);
pDeleter
->
deleteTimeRange
=
pModify
->
deleteTimeRange
;
pDeleter
->
sink
.
pInputDataBlockDesc
=
nodesCloneNode
(
pRoot
->
pOutputDataBlockDesc
);
if
(
NULL
==
pDeleter
->
sink
.
pInputDataBlockDesc
)
{
nodesDestroyNode
(
pDeleter
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pSink
=
(
SDataSinkNode
*
)
pDeleter
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildDeleteSubplan
(
SPhysiPlanContext
*
pCxt
,
SVnodeModifyLogicNode
*
pModify
,
SSubplan
*
pSubplan
)
{
int32_t
code
=
createPhysiNode
(
pCxt
,
(
SLogicNode
*
)
nodesListGetNode
(
pModify
->
node
.
pChildren
,
0
),
pSubplan
,
&
pSubplan
->
pNode
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createDataDeleter
(
pCxt
,
pModify
,
pSubplan
->
pNode
,
&
pSubplan
->
pDataSink
);
}
return
code
;
}
static
int32_t
buildVnodeModifySubplan
(
SPhysiPlanContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SSubplan
*
pSubplan
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SVnodeModifyLogicNode
*
pModify
=
(
SVnodeModifyLogicNode
*
)
pLogicSubplan
->
pNode
;
switch
(
pModify
->
modifyType
)
{
case
MODIFY_TABLE_TYPE_INSERT
:
code
=
buildInsertSubplan
(
pCxt
,
pModify
,
pSubplan
);
break
;
case
MODIFY_TABLE_TYPE_DELETE
:
code
=
buildDeleteSubplan
(
pCxt
,
pModify
,
pSubplan
);
break
;
default:
code
=
TSDB_CODE_FAILED
;
break
;
}
return
code
;
}
static
int32_t
createPhysiSubplan
(
SPhysiPlanContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SSubplan
**
pPhysiSubplan
)
{
SSubplan
*
pSubplan
=
makeSubplan
(
pCxt
,
pLogicSubplan
);
if
(
NULL
==
pSubplan
)
{
...
...
@@ -1317,12 +1364,7 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
SUBPLAN_TYPE_MODIFY
==
pLogicSubplan
->
subplanType
)
{
SVnodeModifLogicNode
*
pModif
=
(
SVnodeModifLogicNode
*
)
pLogicSubplan
->
pNode
;
pSubplan
->
msgType
=
pModif
->
msgType
;
pSubplan
->
execNode
.
epSet
=
pModif
->
pVgDataBlocks
->
vg
.
epSet
;
SQueryNodeLoad
node
=
{.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
node
);
code
=
createDataInserter
(
pCxt
,
pModif
->
pVgDataBlocks
,
&
pSubplan
->
pDataSink
);
code
=
buildVnodeModifySubplan
(
pCxt
,
pLogicSubplan
,
pSubplan
);
}
else
{
pSubplan
->
msgType
=
TDMT_VND_QUERY
;
code
=
createPhysiNode
(
pCxt
,
pLogicSubplan
->
pNode
,
pSubplan
,
&
pSubplan
->
pNode
);
...
...
source/libs/planner/src/planScaleOut.c
浏览文件 @
55d41c8f
...
...
@@ -38,26 +38,6 @@ static SLogicSubplan* singleCloneSubLogicPlan(SScaleOutContext* pCxt, SLogicSubp
return
pDst
;
}
static
int32_t
scaleOutForModify
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
SVnodeModifLogicNode
*
pNode
=
(
SVnodeModifLogicNode
*
)
pSubplan
->
pNode
;
size_t
numOfVgroups
=
taosArrayGetSize
(
pNode
->
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
numOfVgroups
;
++
i
)
{
SLogicSubplan
*
pNewSubplan
=
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
);
if
(
NULL
==
pNewSubplan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
((
SVnodeModifLogicNode
*
)
pNewSubplan
->
pNode
)
->
pVgDataBlocks
=
(
SVgDataBlocks
*
)
taosArrayGetP
(
pNode
->
pDataBlocks
,
i
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pGroup
,
pNewSubplan
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
scaleOutForMerge
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
return
nodesListStrictAppend
(
pGroup
,
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
));
}
static
int32_t
doSetScanVgroup
(
SLogicNode
*
pNode
,
const
SVgroupInfo
*
pVgroup
,
bool
*
pFound
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pNode
;
...
...
@@ -84,23 +64,52 @@ static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
return
doSetScanVgroup
(
pNode
,
pVgroup
,
&
found
);
}
static
int32_t
scaleOutForScan
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
if
(
pSubplan
->
pVgroupList
&&
!
pCxt
->
pPlanCxt
->
streamQuery
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pSubplan
->
pVgroupList
->
numOfVgroups
;
++
i
)
{
static
int32_t
scaleOutByVgroups
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pSubplan
->
pVgroupList
->
numOfVgroups
;
++
i
)
{
SLogicSubplan
*
pNewSubplan
=
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
);
if
(
NULL
==
pNewSubplan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
setScanVgroup
(
pNewSubplan
->
pNode
,
pSubplan
->
pVgroupList
->
vgroups
+
i
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListStrictAppend
(
pGroup
,
pNewSubplan
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
return
code
;
}
static
int32_t
scaleOutForModify
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
SVnodeModifyLogicNode
*
pNode
=
(
SVnodeModifyLogicNode
*
)
pSubplan
->
pNode
;
if
(
MODIFY_TABLE_TYPE_DELETE
==
pNode
->
modifyType
)
{
return
scaleOutByVgroups
(
pCxt
,
pSubplan
,
level
,
pGroup
);
}
else
{
size_t
numOfVgroups
=
taosArrayGetSize
(
pNode
->
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
numOfVgroups
;
++
i
)
{
SLogicSubplan
*
pNewSubplan
=
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
);
if
(
NULL
==
pNewSubplan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
setScanVgroup
(
pNewSubplan
->
pNode
,
pSubplan
->
pVgroupList
->
vgroups
+
i
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListStrictAppend
(
pGroup
,
pNewSubplan
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
((
SVnodeModifyLogicNode
*
)
pNewSubplan
->
pNode
)
->
pVgDataBlocks
=
(
SVgDataBlocks
*
)
taosArrayGetP
(
pNode
->
pDataBlocks
,
i
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pGroup
,
pNewSubplan
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
code
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
scaleOutForMerge
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
return
nodesListStrictAppend
(
pGroup
,
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
));
}
static
int32_t
scaleOutForScan
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
if
(
pSubplan
->
pVgroupList
&&
!
pCxt
->
pPlanCxt
->
streamQuery
)
{
return
scaleOutByVgroups
(
pCxt
,
pSubplan
,
level
,
pGroup
);
}
else
{
return
scaleOutForMerge
(
pCxt
,
pSubplan
,
level
,
pGroup
);
}
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
55d41c8f
...
...
@@ -138,7 +138,7 @@ static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
static
bool
stbSplIsMultiTbScan
(
bool
streamQuery
,
SScanLogicNode
*
pScan
)
{
return
(
NULL
!=
pScan
->
pVgroupList
&&
pScan
->
pVgroupList
->
numOfVgroups
>
1
)
||
(
streamQuery
&&
TSDB_SUPER_TABLE
==
pScan
->
pMeta
->
tableType
);
(
streamQuery
&&
TSDB_SUPER_TABLE
==
pScan
->
tableType
);
}
static
bool
stbSplHasMultiTbScan
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
...
...
@@ -315,18 +315,22 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S
return
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pMerge
);
}
static
int32_t
stbSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicNode
*
pParent
,
SLogicNode
*
pPartChild
)
{
SExchangeLogicNode
*
pExchange
=
NULL
;
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
pPartChild
,
&
pExchange
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pExchange
);
}
return
code
;
}
static
int32_t
stbSplSplitWindowNodeForBatch
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pPartWindow
=
NULL
;
int32_t
code
=
stbSplCreatePartWindowNode
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartWindow
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SNodeList
*
pMergeKeys
=
NULL
;
code
=
nodesListMakeStrictAppend
(
&
pMergeKeys
,
nodesCloneNode
(((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
pTspk
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pPartWindow
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyList
(
pMergeKeys
);
}
((
SWindowLogicNode
*
)
pPartWindow
)
->
intervalAlgo
=
INTERVAL_ALGO_HASH
;
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
intervalAlgo
=
INTERVAL_ALGO_SORT_MERGE
;
code
=
stbSplCreateExchangeNode
(
pCxt
,
pInfo
->
pSplitNode
,
pPartWindow
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
...
...
@@ -336,21 +340,12 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn
return
code
;
}
static
int32_t
stbSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicNode
*
pParent
,
SLogicNode
*
pPartChild
)
{
SExchangeLogicNode
*
pExchange
=
NULL
;
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
pPartChild
,
&
pExchange
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pExchange
);
}
return
code
;
}
static
int32_t
stbSplSplitWindowNodeForStream
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pPartWindow
=
NULL
;
int32_t
code
=
stbSplCreatePartWindowNode
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartWindow
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
((
SWindowLogicNode
*
)
pPartWindow
)
->
stmInterAlgo
=
STREAM_INTERVAL_ALGO
_SEMI
;
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
stmInterAlgo
=
STREAM_INTERVAL_ALGO
_FINAL
;
((
SWindowLogicNode
*
)
pPartWindow
)
->
intervalAlgo
=
INTERVAL_ALGO_STREAM
_SEMI
;
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
intervalAlgo
=
INTERVAL_ALGO_STREAM
_FINAL
;
code
=
stbSplCreateExchangeNode
(
pCxt
,
pInfo
->
pSplitNode
,
pPartWindow
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -834,6 +829,16 @@ static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
static
void
setLogicNodeParent
(
SLogicNode
*
pNode
)
{
doSetLogicNodeParent
(
pNode
,
NULL
);
}
static
void
setVgroupsInfo
(
SLogicNode
*
pNode
,
SLogicSubplan
*
pSubplan
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
TSWAP
(((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
,
pSubplan
->
pVgroupList
);
return
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
setVgroupsInfo
((
SLogicNode
*
)
pChild
,
pSubplan
);
}
}
int32_t
splitLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SLogicSubplan
**
pLogicSubplan
)
{
SLogicSubplan
*
pSubplan
=
(
SLogicSubplan
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pSubplan
)
{
...
...
@@ -845,17 +850,21 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan
nodesDestroyNode
(
pSubplan
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
==
nodeType
(
pLogicNode
))
{
pSubplan
->
id
.
queryId
=
pCxt
->
queryId
;
pSubplan
->
id
.
groupId
=
1
;
setLogicNodeParent
(
pSubplan
->
pNode
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY
==
nodeType
(
pLogicNode
))
{
pSubplan
->
subplanType
=
SUBPLAN_TYPE_MODIFY
;
TSWAP
(((
SVnodeModifLogicNode
*
)
pLogicNode
)
->
pDataBlocks
,
((
SVnodeModifLogicNode
*
)
pSubplan
->
pNode
)
->
pDataBlocks
);
TSWAP
(((
SVnodeModifyLogicNode
*
)
pLogicNode
)
->
pDataBlocks
,
((
SVnodeModifyLogicNode
*
)
pSubplan
->
pNode
)
->
pDataBlocks
);
setVgroupsInfo
(
pSubplan
->
pNode
,
pSubplan
);
}
else
{
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
code
=
applySplitRule
(
pCxt
,
pSubplan
);
}
pSubplan
->
id
.
queryId
=
pCxt
->
queryId
;
pSubplan
->
id
.
groupId
=
1
;
setLogicNodeParent
(
pSubplan
->
pNode
);
int32_t
code
=
applySplitRule
(
pCxt
,
pSubplan
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicSubplan
=
pSubplan
;
}
else
{
...
...
source/libs/planner/test/planOtherTest.cpp
浏览文件 @
55d41c8f
...
...
@@ -60,3 +60,15 @@ TEST_F(PlanOtherTest, show) {
run
(
"SHOW DATABASES"
);
}
TEST_F
(
PlanOtherTest
,
delete
)
{
useDb
(
"root"
,
"test"
);
run
(
"DELETE FROM t1"
);
run
(
"DELETE FROM t1 WHERE ts > now - 2d and ts < now - 1d"
);
run
(
"DELETE FROM st1"
);
run
(
"DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10"
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录