Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4e5df8e3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4e5df8e3
编写于
3月 25, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
bugfix
上级
cc0dfeb1
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
196 addition
and
116 deletion
+196
-116
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-1
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+195
-115
未找到文件。
source/libs/executor/src/executorimpl.c
浏览文件 @
4e5df8e3
...
...
@@ -5569,7 +5569,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
blockDataCleanup
(
pInfo
->
pRes
);
int32_t
tableNameSlotId
=
1
;
int32_t
tableNameSlotId
=
0
;
SColumnInfoData
*
pTableNameCol
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
tableNameSlotId
);
char
*
name
=
NULL
;
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
4e5df8e3
...
...
@@ -41,61 +41,145 @@ static int32_t getSlotKey(SNode* pNode, char* pKey) {
return
sprintf
(
pKey
,
"%s"
,
((
SExprNode
*
)
pNode
)
->
aliasName
);
}
static
SNode
*
createSlotDesc
(
SPhysiPlanContext
*
pCxt
,
const
SNode
*
pNode
,
int16_t
slotId
)
{
static
SNode
*
createSlotDesc
(
SPhysiPlanContext
*
pCxt
,
const
SNode
*
pNode
,
int16_t
slotId
,
bool
output
)
{
SSlotDescNode
*
pSlot
=
(
SSlotDescNode
*
)
nodesMakeNode
(
QUERY_NODE_SLOT_DESC
);
CHECK_ALLOC
(
pSlot
,
NULL
);
if
(
NULL
==
pSlot
)
{
return
NULL
;
}
pSlot
->
slotId
=
slotId
;
pSlot
->
dataType
=
((
SExprNode
*
)
pNode
)
->
resType
;
pSlot
->
reserve
=
false
;
pSlot
->
output
=
true
;
pSlot
->
output
=
output
;
return
(
SNode
*
)
pSlot
;
}
static
SNode
*
createTarget
(
SNode
*
pNode
,
int16_t
dataBlockId
,
int16_t
slotId
)
{
static
int32_t
createTarget
(
SNode
*
pNode
,
int16_t
dataBlockId
,
int16_t
slotId
,
SNode
**
pOutput
)
{
STargetNode
*
pTarget
=
(
STargetNode
*
)
nodesMakeNode
(
QUERY_NODE_TARGET
);
if
(
NULL
==
pTarget
)
{
return
NULL
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pTarget
->
dataBlockId
=
dataBlockId
;
pTarget
->
slotId
=
slotId
;
pTarget
->
pExpr
=
pNode
;
return
(
SNode
*
)
pTarget
;
*
pOutput
=
(
SNode
*
)
pTarget
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
addDataBlockDesc
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
SHashObj
*
pHash
=
NULL
;
static
int32_t
putSlotToHashImpl
(
int16_t
dataBlockId
,
int16_t
slotId
,
const
char
*
pName
,
int32_t
len
,
SHashObj
*
pHash
)
{
SSlotIndex
index
=
{
.
dataBlockId
=
dataBlockId
,
.
slotId
=
slotId
};
return
taosHashPut
(
pHash
,
pName
,
len
,
&
index
,
sizeof
(
SSlotIndex
));
}
static
int32_t
putSlotToHash
(
int16_t
dataBlockId
,
int16_t
slotId
,
SNode
*
pNode
,
SHashObj
*
pHash
)
{
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
int32_t
len
=
getSlotKey
(
pNode
,
name
);
return
putSlotToHashImpl
(
dataBlockId
,
slotId
,
name
,
len
,
pHash
);
}
static
int32_t
createDataBlockDescHash
(
SPhysiPlanContext
*
pCxt
,
int32_t
capacity
,
int16_t
dataBlockId
,
SHashObj
**
pDescHash
)
{
SHashObj
*
pHash
=
taosHashInit
(
capacity
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
pHash
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
NULL
==
taosArrayInsert
(
pCxt
->
pLocationHelper
,
dataBlockId
,
&
pHash
))
{
taosHashCleanup
(
pHash
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pDescHash
=
pHash
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildDataBlockSlots
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
,
SHashObj
*
pHash
)
{
pDataBlockDesc
->
pSlots
=
nodesMakeList
();
if
(
NULL
==
pDataBlockDesc
->
pSlots
)
{
pDataBlockDesc
->
pSlots
=
nodesMakeList
()
;
CHECK_ALLOC
(
pDataBlockDesc
->
pSlots
,
TSDB_CODE_OUT_OF_MEMORY
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pHash
=
taosHashInit
(
LIST_LENGTH
(
pList
),
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
CHECK_ALLOC
(
pHash
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
taosArrayInsert
(
pCxt
->
pLocationHelper
,
pDataBlockDesc
->
dataBlockId
,
&
pHash
))
{
taosHashCleanup
(
pHash
);
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int16_t
slotId
=
0
;
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pList
)
{
code
=
nodesListStrictAppend
(
pDataBlockDesc
->
pSlots
,
createSlotDesc
(
pCxt
,
pNode
,
slotId
,
true
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
putSlotToHash
(
pDataBlockDesc
->
dataBlockId
,
slotId
,
pNode
,
pHash
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pDataBlockDesc
->
resultRowSize
+=
((
SExprNode
*
)
pNode
)
->
resType
.
bytes
;
++
slotId
;
}
else
{
break
;
}
}
return
code
;
}
static
int32_t
createDataBlockDesc
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
**
pDataBlockDesc
)
{
SDataBlockDescNode
*
pDesc
=
nodesMakeNode
(
QUERY_NODE_DATABLOCK_DESC
);
if
(
NULL
==
pDesc
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pDesc
->
dataBlockId
=
pCxt
->
nextDataBlockId
++
;
SHashObj
*
pHash
=
NULL
;
int32_t
code
=
createDataBlockDescHash
(
pCxt
,
LIST_LENGTH
(
pList
),
pDesc
->
dataBlockId
,
&
pHash
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildDataBlockSlots
(
pCxt
,
pList
,
pDesc
,
pHash
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pDataBlockDesc
=
pDesc
;
}
else
{
pHash
=
taosArrayGetP
(
pCxt
->
pLocationHelper
,
pDataBlockDesc
->
dataBlockId
);
nodesDestroyNode
(
pDesc
);
}
return
code
;
}
static
int32_t
addDataBlockSlotsImpl
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
,
bool
output
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SHashObj
*
pHash
=
taosArrayGetP
(
pCxt
->
pLocationHelper
,
pDataBlockDesc
->
dataBlockId
);
int16_t
nextSlotId
=
taosHashGetSize
(
pHash
),
slotId
=
0
;
SNode
*
pNode
=
NULL
;
int16_t
slotId
=
taosHashGetSize
(
pHash
);
FOREACH
(
pNode
,
pList
)
{
CHECK_CODE_EXT
(
nodesListStrictAppend
(
pDataBlockDesc
->
pSlots
,
createSlotDesc
(
pCxt
,
pNode
,
slotId
)));
SSlotIndex
index
=
{
.
dataBlockId
=
pDataBlockDesc
->
dataBlockId
,
.
slotId
=
slotId
};
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
]
=
{
0
};
int32_t
len
=
getSlotKey
(
pNode
,
name
);
CHECK_CODE
(
taosHashPut
(
pHash
,
name
,
len
,
&
index
,
sizeof
(
SSlotIndex
)),
TSDB_CODE_OUT_OF_MEMORY
);
SNode
*
pTarget
=
createTarget
(
pNode
,
pDataBlockDesc
->
dataBlockId
,
slotId
);
CHECK_ALLOC
(
pTarget
,
TSDB_CODE_OUT_OF_MEMORY
);
REPLACE_NODE
(
pTarget
);
SSlotIndex
*
pIndex
=
taosHashGet
(
pHash
,
name
,
len
);
if
(
NULL
==
pIndex
)
{
code
=
nodesListStrictAppend
(
pDataBlockDesc
->
pSlots
,
createSlotDesc
(
pCxt
,
pNode
,
nextSlotId
,
output
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
putSlotToHashImpl
(
pDataBlockDesc
->
dataBlockId
,
nextSlotId
,
name
,
len
,
pHash
);
}
pDataBlockDesc
->
resultRowSize
+=
((
SExprNode
*
)
pNode
)
->
resType
.
bytes
;
slotId
=
nextSlotId
;
++
nextSlotId
;
}
else
{
slotId
=
pIndex
->
slotId
;
}
pDataBlockDesc
->
resultRowSize
+=
((
SExprNode
*
)
pNode
)
->
resType
.
bytes
;
++
slotId
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SNode
*
pTarget
=
NULL
;
code
=
createTarget
(
pNode
,
pDataBlockDesc
->
dataBlockId
,
slotId
,
&
pTarget
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
pTarget
);
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
return
TSDB_CODE_SUCCESS
;
return
code
;
}
static
int32_t
addDataBlockSlots
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
return
addDataBlockSlotsImpl
(
pCxt
,
pList
,
pDataBlockDesc
,
false
);
}
static
int32_t
pushdownDataBlockSlots
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
return
addDataBlockSlotsImpl
(
pCxt
,
pList
,
pDataBlockDesc
,
false
);
}
typedef
struct
SSetSlotIdCxt
{
...
...
@@ -114,10 +198,11 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
pIndex
=
taosHashGet
(
pCxt
->
pRightHash
,
name
,
len
);
}
// pIndex is definitely not NULL, otherwise it is a bug
CHECK_ALLOC
(
pIndex
,
DEAL_RES_ERROR
);
if
(
NULL
==
pIndex
)
{
return
DEAL_RES_ERROR
;
}
((
SColumnNode
*
)
pNode
)
->
dataBlockId
=
pIndex
->
dataBlockId
;
((
SColumnNode
*
)
pNode
)
->
slotId
=
pIndex
->
slotId
;
CHECK_ALLOC
(
pNode
,
DEAL_RES_ERROR
);
return
DEAL_RES_IGNORE_CHILD
;
}
return
DEAL_RES_CONTINUE
;
...
...
@@ -144,7 +229,7 @@ static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
setListSlotId
(
SPhysiPlanContext
*
pCxt
,
int16_t
leftDataBlockId
,
int16_t
rightDataBlockId
,
SNodeList
*
pList
,
SNodeList
**
pOutput
)
{
static
int32_t
setListSlotId
(
SPhysiPlanContext
*
pCxt
,
int16_t
leftDataBlockId
,
int16_t
rightDataBlockId
,
const
SNodeList
*
pList
,
SNodeList
**
pOutput
)
{
SNodeList
*
pRes
=
nodesCloneList
(
pList
);
if
(
NULL
==
pRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -164,18 +249,17 @@ static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i
return
TSDB_CODE_SUCCESS
;
}
static
SPhysiNode
*
makePhysiNode
(
SPhysiPlanContext
*
pCxt
,
ENodeType
type
)
{
static
SPhysiNode
*
makePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
ENodeType
type
)
{
SPhysiNode
*
pPhysiNode
=
(
SPhysiNode
*
)
nodesMakeNode
(
type
);
if
(
NULL
==
pPhysiNode
)
{
return
NULL
;
}
pPhysiNode
->
pOutputDataBlockDesc
=
nodesMakeNode
(
QUERY_NODE_DATABLOCK_DESC
);
if
(
NULL
==
pPhysiNode
->
pOutputDataBlockDesc
)
{
int32_t
code
=
createDataBlockDesc
(
pCxt
,
pLogicNode
->
pTargets
,
&
pPhysiNode
->
pOutputDataBlockDesc
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
pPhysiNode
);
return
NULL
;
}
pPhysiNode
->
pOutputDataBlockDesc
->
dataBlockId
=
pCxt
->
nextDataBlockId
++
;
pPhysiNode
->
pOutputDataBlockDesc
->
type
=
QUERY_NODE_DATABLOCK_DESC
;
return
pPhysiNode
;
}
...
...
@@ -186,24 +270,11 @@ static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pL
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
setSlotOutput
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pTargets
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
SHashObj
*
pHash
=
taosArrayGetP
(
pCxt
->
pLocationHelper
,
pDataBlockDesc
->
dataBlockId
);
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
SNode
*
pNode
;
FOREACH
(
pNode
,
pTargets
)
{
int32_t
len
=
getSlotKey
(
pNode
,
name
);
SSlotIndex
*
pIndex
=
taosHashGet
(
pHash
,
name
,
len
);
// pIndex is definitely not NULL, otherwise it is a bug
CHECK_ALLOC
(
pIndex
,
TSDB_CODE_FAILED
);
((
SSlotDescNode
*
)
nodesListGetNode
(
pDataBlockDesc
->
pSlots
,
pIndex
->
slotId
))
->
output
=
true
;
}
return
TSDB_CODE_SUCCESS
;
}
static
SNodeptr
createPrimaryKeyCol
(
SPhysiPlanContext
*
pCxt
,
uint64_t
tableId
)
{
SColumnNode
*
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
CHECK_ALLOC
(
pCol
,
NULL
);
if
(
NULL
==
pCol
)
{
return
NULL
;
}
pCol
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
pCol
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
;
pCol
->
tableId
=
tableId
;
...
...
@@ -244,8 +315,12 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
nodeType
(
pScanPhysiNode
)
||
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
==
nodeType
(
pScanPhysiNode
))
{
pScanPhysiNode
->
pScanCols
=
nodesMakeList
();
CHECK_ALLOC
(
pScanPhysiNode
->
pScanCols
,
TSDB_CODE_OUT_OF_MEMORY
);
CHECK_CODE_EXT
(
nodesListStrictAppend
(
pScanPhysiNode
->
pScanCols
,
createPrimaryKeyCol
(
pCxt
,
pScanPhysiNode
->
uid
)));
if
(
NULL
==
pScanPhysiNode
->
pScanCols
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pScanPhysiNode
->
pScanCols
,
createPrimaryKeyCol
(
pCxt
,
pScanPhysiNode
->
uid
)))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SNode
*
pNode
;
FOREACH
(
pNode
,
pScanCols
)
{
...
...
@@ -255,29 +330,29 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys
strcpy
(
pCol
->
colName
,
((
SColumnNode
*
)
pNode
)
->
colName
);
continue
;
}
CHECK_CODE_EXT
(
nodesListStrictAppend
(
pScanPhysiNode
->
pScanCols
,
nodesCloneNode
(
pNode
)));
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pScanPhysiNode
->
pScanCols
,
nodesCloneNode
(
pNode
)))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
}
else
{
pScanPhysiNode
->
pScanCols
=
nodesCloneList
(
pScanCols
);
CHECK_ALLOC
(
pScanPhysiNode
->
pScanCols
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
pScanPhysiNode
->
pScanCols
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
// return sortScanCols(pScanPhysiNode->pScanCols);
return
TSDB_CODE_SUCCESS
;
return
sortScanCols
(
pScanPhysiNode
->
pScanCols
);
}
static
int32_t
createScanPhysiNodeFinalize
(
SPhysiPlanContext
*
pCxt
,
SScanLogicNode
*
pScanLogicNode
,
SScanPhysiNode
*
pScanPhysiNode
,
SPhysiNode
**
pPhyNode
)
{
int32_t
code
=
createScanCols
(
pCxt
,
pScanPhysiNode
,
pScanLogicNode
->
pScanCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
code
=
addDataBlock
Desc
(
pCxt
,
pScanPhysiNode
->
pScanCols
,
pScanPhysiNode
->
node
.
pOutputDataBlockDesc
);
code
=
addDataBlock
Slots
(
pCxt
,
pScanPhysiNode
->
pScanCols
,
pScanPhysiNode
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pScanLogicNode
,
(
SPhysiNode
*
)
pScanPhysiNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setSlotOutput
(
pCxt
,
pScanLogicNode
->
node
.
pTargets
,
pScanPhysiNode
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScanPhysiNode
->
uid
=
pScanLogicNode
->
pMeta
->
uid
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
pMeta
->
tableType
;
...
...
@@ -302,7 +377,7 @@ static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAdd
}
static
int32_t
createTagScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SScanLogicNode
*
pScanLogicNode
,
SPhysiNode
**
pPhyNode
)
{
STagScanPhysiNode
*
pTagScan
=
(
STagScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
);
STagScanPhysiNode
*
pTagScan
=
(
STagScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
);
if
(
NULL
==
pTagScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -310,7 +385,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* p
}
static
int32_t
createTableScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
SPhysiNode
**
pPhyNode
)
{
STableScanPhysiNode
*
pTableScan
=
(
STableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
STableScanPhysiNode
*
pTableScan
=
(
STableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
if
(
NULL
==
pTableScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -326,7 +401,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
}
static
int32_t
createSystemTableScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SSystemTableScanPhysiNode
*
pScan
=
(
SSystemTableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
);
SSystemTableScanPhysiNode
*
pScan
=
(
SSystemTableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
);
if
(
NULL
==
pScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -348,7 +423,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
}
static
int32_t
createStreamScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SStreamScanPhysiNode
*
pScan
=
(
SStreamScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
SStreamScanPhysiNode
*
pScan
=
(
SStreamScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pScanLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
if
(
NULL
==
pScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -412,7 +487,7 @@ static int32_t createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode*
}
static
int32_t
createJoinPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SJoinLogicNode
*
pJoinLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_JOIN
);
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pJoinLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_JOIN
);
if
(
NULL
==
pJoin
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -425,14 +500,11 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
code
=
createJoinOutputCols
(
pCxt
,
pLeftDesc
,
pRightDesc
,
&
pJoin
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlock
Desc
(
pCxt
,
pJoin
->
pTargets
,
pJoin
->
node
.
pOutputDataBlockDesc
);
code
=
addDataBlock
Slots
(
pCxt
,
pJoin
->
pTargets
,
pJoin
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pJoinLogicNode
,
(
SPhysiNode
*
)
pJoin
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setSlotOutput
(
pCxt
,
pJoinLogicNode
->
node
.
pTargets
,
pJoin
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pJoin
;
...
...
@@ -452,7 +524,9 @@ typedef struct SRewritePrecalcExprsCxt {
static
EDealRes
collectAndRewrite
(
SRewritePrecalcExprsCxt
*
pCxt
,
SNode
**
pNode
)
{
SNode
*
pExpr
=
nodesCloneNode
(
*
pNode
);
CHECK_ALLOC
(
pExpr
,
DEAL_RES_ERROR
);
if
(
NULL
==
pExpr
)
{
return
DEAL_RES_ERROR
;
}
if
(
nodesListAppend
(
pCxt
->
pPrecalcExprs
,
pExpr
))
{
nodesDestroyNode
(
pExpr
);
return
DEAL_RES_ERROR
;
...
...
@@ -500,11 +574,15 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN
if
(
NULL
==
*
pPrecalcExprs
)
{
*
pPrecalcExprs
=
nodesMakeList
();
CHECK_ALLOC
(
*
pPrecalcExprs
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
*
pPrecalcExprs
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
NULL
==
*
pRewrittenList
)
{
*
pRewrittenList
=
nodesMakeList
();
CHECK_ALLOC
(
*
pRewrittenList
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
*
pRewrittenList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pList
)
{
...
...
@@ -514,8 +592,12 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN
}
else
{
pNew
=
nodesCloneNode
(
pNode
);
}
CHECK_ALLOC
(
pNew
,
TSDB_CODE_OUT_OF_MEMORY
);
CHECK_CODE
(
nodesListAppend
(
*
pRewrittenList
,
pNew
),
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
pNew
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
*
pRewrittenList
,
pNew
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
SRewritePrecalcExprsCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pPrecalcExprs
=
*
pPrecalcExprs
};
nodesRewriteList
(
*
pRewrittenList
,
doRewritePrecalcExprs
,
&
cxt
);
...
...
@@ -527,7 +609,7 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN
}
static
int32_t
createAggPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SAggLogicNode
*
pAggLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SAggPhysiNode
*
pAgg
=
(
SAggPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_AGG
);
SAggPhysiNode
*
pAgg
=
(
SAggPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pAggLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_AGG
);
if
(
NULL
==
pAgg
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -545,30 +627,27 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pPrecalcExprs
)
{
code
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pPrecalcExprs
,
&
pAgg
->
pExprs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockDesc
(
pCxt
,
pAgg
->
pExprs
,
pChildTupe
);
code
=
pushdownDataBlockSlots
(
pCxt
,
pAgg
->
pExprs
,
pChildTupe
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pGroupKeys
)
{
code
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pGroupKeys
,
&
pAgg
->
pGroupKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlock
Desc
(
pCxt
,
pAgg
->
pGroupKeys
,
pAgg
->
node
.
pOutputDataBlockDesc
);
code
=
addDataBlock
Slots
(
pCxt
,
pAgg
->
pGroupKeys
,
pAgg
->
node
.
pOutputDataBlockDesc
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pAggFuncs
)
{
code
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pAggFuncs
,
&
pAgg
->
pAggFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlock
Desc
(
pCxt
,
pAgg
->
pAggFuncs
,
pAgg
->
node
.
pOutputDataBlockDesc
);
code
=
addDataBlock
Slots
(
pCxt
,
pAgg
->
pAggFuncs
,
pAgg
->
node
.
pOutputDataBlockDesc
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pAggLogicNode
,
(
SPhysiNode
*
)
pAgg
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setSlotOutput
(
pCxt
,
pAggLogicNode
->
node
.
pTargets
,
pAgg
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pAgg
;
...
...
@@ -576,18 +655,22 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
nodesDestroyNode
(
pAgg
);
}
nodesDestroyList
(
pPrecalcExprs
);
nodesDestroyList
(
pGroupKeys
);
nodesDestroyList
(
pAggFuncs
);
return
code
;
}
static
int32_t
createProjectPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SProjectLogicNode
*
pProjectLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SProjectPhysiNode
*
pProject
=
(
SProjectPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
);
SProjectPhysiNode
*
pProject
=
(
SProjectPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pProjectLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
);
if
(
NULL
==
pProject
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
setListSlotId
(
pCxt
,
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pProjectLogicNode
->
pProjections
,
&
pProject
->
pProjections
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlock
Desc
(
pCxt
,
pProject
->
pProjections
,
pProject
->
node
.
pOutputDataBlockDesc
);
code
=
addDataBlock
Slots
(
pCxt
,
pProject
->
pProjections
,
pProject
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pProjectLogicNode
,
(
SPhysiNode
*
)
pProject
);
...
...
@@ -603,34 +686,30 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
}
static
int32_t
doCreateExchangePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SExchangeLogicNode
*
pExchangeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
);
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pExchange
->
srcGroupId
=
pExchangeLogicNode
->
srcGroupId
;
int32_t
code
=
addDataBlockDesc
(
pCxt
,
pExchangeLogicNode
->
node
.
pTargets
,
pExchange
->
node
.
pOutputDataBlockDesc
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pExchange
;
}
else
{
nodesDestroyNode
(
pExchange
);
}
*
pPhyNode
=
(
SPhysiNode
*
)
pExchange
;
return
code
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createStreamScanPhysiNodeByExchange
(
SPhysiPlanContext
*
pCxt
,
SExchangeLogicNode
*
pExchangeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SStreamScanPhysiNode
*
pScan
=
(
SStreamScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
SStreamScanPhysiNode
*
pScan
=
(
SStreamScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
if
(
NULL
==
pScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
addDataBlockDesc
(
pCxt
,
pExchangeLogicNode
->
node
.
pTargets
,
pScan
->
node
.
pOutputDataBlockDesc
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
pScan
->
pScanCols
=
nodesCloneList
(
pExchangeLogicNode
->
node
.
pTargets
);
if
(
NULL
==
pScan
->
pScanCols
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScan
->
pScanCols
=
nodesCloneList
(
pExchangeLogicNode
->
node
.
pTargets
);
if
(
NULL
==
pScan
->
pScanCols
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
addDataBlockSlots
(
pCxt
,
pScan
->
pScanCols
,
pScan
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -660,21 +739,17 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pPrecalcExprs
)
{
code
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pPrecalcExprs
,
&
pWindow
->
pExprs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlock
Desc
(
pCxt
,
pWindow
->
pExprs
,
pChildTupe
);
code
=
addDataBlock
Slots
(
pCxt
,
pWindow
->
pExprs
,
pChildTupe
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pFuncs
)
{
code
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pFuncs
,
&
pWindow
->
pFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlock
Desc
(
pCxt
,
pWindow
->
pFuncs
,
pWindow
->
node
.
pOutputDataBlockDesc
);
code
=
addDataBlock
Slots
(
pCxt
,
pWindow
->
pFuncs
,
pWindow
->
node
.
pOutputDataBlockDesc
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setSlotOutput
(
pCxt
,
pWindowLogicNode
->
node
.
pTargets
,
pWindow
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pWindow
;
}
else
{
...
...
@@ -685,7 +760,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
}
static
int32_t
createIntervalPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SIntervalPhysiNode
*
pInterval
=
(
SIntervalPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
);
SIntervalPhysiNode
*
pInterval
=
(
SIntervalPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pWindowLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
);
if
(
NULL
==
pInterval
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -706,7 +781,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
}
static
int32_t
createSessionWindowPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SSessionWinodwPhysiNode
*
pSession
=
(
SSessionWinodwPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
);
SSessionWinodwPhysiNode
*
pSession
=
(
SSessionWinodwPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pWindowLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
);
if
(
NULL
==
pSession
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -875,17 +950,22 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
SNodeListNode
*
pGroup
;
if
(
level
>=
LIST_LENGTH
(
pSubplans
))
{
pGroup
=
nodesMakeNode
(
QUERY_NODE_NODE_LIST
);
CHECK_ALLOC
(
pGroup
,
TSDB_CODE_OUT_OF_MEMORY
);
CHECK_CODE
(
nodesListStrictAppend
(
pSubplans
,
pGroup
),
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
pGroup
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pSubplans
,
pGroup
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
else
{
pGroup
=
nodesListGetNode
(
pSubplans
,
level
);
}
if
(
NULL
==
pGroup
->
pNodeList
)
{
pGroup
->
pNodeList
=
nodesMakeList
();
CHECK_ALLOC
(
pGroup
->
pNodeList
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
pGroup
->
pNodeList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
CHECK_CODE
(
nodesListStrictAppend
(
pGroup
->
pNodeList
,
pSubplan
),
TSDB_CODE_OUT_OF_MEMORY
);
return
TSDB_CODE_SUCCESS
;
return
nodesListStrictAppend
(
pGroup
->
pNodeList
,
pSubplan
);
}
static
int32_t
buildPhysiPlan
(
SPhysiPlanContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SSubplan
*
pParent
,
SQueryPlan
*
pQueryPlan
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录