Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ddd8ae91
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ddd8ae91
编写于
3月 02, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-13747 planner integrate
上级
34a8e6cb
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
82 addition
and
482 deletion
+82
-482
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+66
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+4
-0
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+4
-481
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+8
-0
未找到文件。
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
ddd8ae91
...
...
@@ -624,6 +624,38 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkPlanQueryId
=
"QueryId"
;
static
const
char
*
jkPlanNumOfSubplans
=
"NumOfSubplans"
;
static
const
char
*
jkPlanSubplans
=
"Subplans"
;
static
int32_t
planToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SQueryPlan
*
pNode
=
(
const
SQueryPlan
*
)
pObj
;
int32_t
code
=
tjsonAddIntegerToObject
(
pJson
,
jkPlanQueryId
,
pNode
->
queryId
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkPlanNumOfSubplans
,
pNode
->
numOfSubplans
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkPlanSubplans
,
pNode
->
pSubplans
);
}
return
code
;
}
static
int32_t
jsonToPlan
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SQueryPlan
*
pNode
=
(
SQueryPlan
*
)
pObj
;
int32_t
code
=
tjsonGetUBigIntValue
(
pJson
,
jkPlanQueryId
,
&
pNode
->
queryId
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkPlanNumOfSubplans
,
&
pNode
->
numOfSubplans
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkPlanSubplans
,
&
pNode
->
pSubplans
);
}
return
code
;
}
static
const
char
*
jkAggLogicPlanGroupKeys
=
"GroupKeys"
;
static
const
char
*
jkAggLogicPlanAggFuncs
=
"AggFuncs"
;
...
...
@@ -1023,6 +1055,31 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
return
code
;
}
static
const
char
*
jkNodeListDataType
=
"DataType"
;
static
const
char
*
jkNodeListNodeList
=
"NodeList"
;
static
int32_t
nodeListNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SNodeListNode
*
pNode
=
(
const
SNodeListNode
*
)
pObj
;
int32_t
code
=
tjsonAddObject
(
pJson
,
jkNodeListDataType
,
dataTypeToJson
,
&
pNode
->
dataType
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkNodeListNodeList
,
pNode
->
pNodeList
);
}
return
code
;
}
static
int32_t
jsonToNodeListNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SNodeListNode
*
pNode
=
(
SNodeListNode
*
)
pObj
;
int32_t
code
=
tjsonToObject
(
pJson
,
jkNodeListDataType
,
jsonToDataType
,
&
pNode
->
dataType
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkNodeListNodeList
,
&
pNode
->
pNodeList
);
}
return
code
;
}
static
const
char
*
jkTargetDataBlockId
=
"DataBlockId"
;
static
const
char
*
jkTargetSlotId
=
"SlotId"
;
static
const
char
*
jkTargetExpr
=
"Expr"
;
...
...
@@ -1193,7 +1250,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_SESSION_WINDOW
:
case
QUERY_NODE_INTERVAL_WINDOW
:
case
QUERY_NODE_NODE_LIST
:
return
nodeListNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_FILL
:
break
;
case
QUERY_NODE_TARGET
:
return
targetNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_RAW_EXPR
:
...
...
@@ -1228,9 +1287,12 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiAggNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
subplanToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN
:
return
planToJson
(
pObj
,
pJson
);
default:
break
;
}
printf
(
"================================ specificNodeToJson unknown node = %s
\n
"
,
nodeName
(
nodeType
(
pObj
)));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1257,7 +1319,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
// case QUERY_NODE_STATE_WINDOW:
// case QUERY_NODE_SESSION_WINDOW:
// case QUERY_NODE_INTERVAL_WINDOW:
// case QUERY_NODE_NODE_LIST:
case
QUERY_NODE_NODE_LIST
:
return
jsonToNodeListNode
(
pJson
,
pObj
);
// case QUERY_NODE_FILL:
case
QUERY_NODE_TARGET
:
return
jsonToTargetNode
(
pJson
,
pObj
);
...
...
@@ -1293,6 +1356,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiAggNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
jsonToSubplan
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN
:
return
jsonToPlan
(
pJson
,
pObj
);
default:
break
;
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
ddd8ae91
...
...
@@ -79,6 +79,10 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SAggLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
makeNode
(
type
,
sizeof
(
SProjectLogicNode
));
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
makeNode
(
type
,
sizeof
(
SSubLogicPlan
));
case
QUERY_NODE_LOGIC_PLAN
:
return
makeNode
(
type
,
sizeof
(
SQueryLogicPlan
));
case
QUERY_NODE_TARGET
:
return
makeNode
(
type
,
sizeof
(
STargetNode
));
case
QUERY_NODE_DATABLOCK_DESC
:
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
ddd8ae91
...
...
@@ -13,486 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "plannerInt.h"
#include "texception.h"
#include "parser.h"
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _
typedef struct SPlanContext {
struct SCatalog* pCatalog;
struct SQueryDag* pDag;
SSubplan* pCurrentSubplan;
SSubplanId nextId;
} SPlanContext;
static const char* gOpName[] = {
"Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
static void* validPointer(void* p) {
if (NULL == p) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
return p;
}
const char* opTypeToOpName(int32_t type) {
return gOpName[type];
}
int32_t opNameToOpType(const char* name) {
for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
if (0 == strcmp(name, gOpName[i])) {
return i;
}
}
return OP_Unknown;
}
const char* dsinkTypeToDsinkName(int32_t type) {
switch (type) {
case DSINK_Dispatch:
return "Dispatch";
case DSINK_Insert:
return "Insert";
default:
break;
}
return "Unknown";
}
int32_t dsinkNameToDsinkType(const char* name) {
if (0 == strcmp(name, "Dispatch")) {
return DSINK_Dispatch;
} else if (0 == strcmp(name, "Insert")) {
return DSINK_Insert;
}
return DSINK_Unknown;
}
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
if (NULL == dst->pSchema) {
return false;
}
memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
dst->numOfCols = src->numOfCols;
dst->resultRowSize = src->resultRowSize;
dst->precision = src->precision;
return true;
}
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
if (NULL == dataBlockSchema->pSchema) {
return false;
}
dataBlockSchema->resultRowSize = 0;
for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));
dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
}
return true;
}
static bool cloneExprArray(SArray** dst, SArray* src) {
if (NULL == src) {
return true;
}
size_t size = taosArrayGetSize(src);
if (0 == size) {
return true;
}
*dst = taosArrayInit(size, POINTER_BYTES);
if (NULL == *dst) {
return false;
}
return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
}
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
sink->info.type = type;
sink->info.name = dsinkTypeToDsinkName(type);
if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
tfree(sink);
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
return sink;
}
static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
inserter->numOfTables = pBlocks->numOfTables;
inserter->size = pBlocks->size;
TSWAP(inserter->pData, pBlocks->pData, char*);
return (SDataSink*)inserter;
}
static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
return (SDataSink*)dispatcher;
}
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
node->info.type = type;
node->info.name = opTypeToOpName(type);
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
free(node);
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
return node;
}
static void cleanupPhyNode(SPhyNode* pPhyNode) {
if (pPhyNode == NULL) {
return;
}
dropOneLevelExprInfo(pPhyNode->pTargets);
tfree(pPhyNode->targetSchema.pSchema);
tfree(pPhyNode);
}
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);
STableMeta *pTableMeta = pTable->pMeta->pTableMeta;
node->uid = pTableMeta->uid;
node->count = 1;
node->order = TSDB_ORDER_ASC;
node->tableType = pTableMeta->tableType;
return (SPhyNode*)node;
}
static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}
static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
// todo
return MAIN_SCAN;
}
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pQueryTableInfo, int32_t op) {
STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pQueryTableInfo, op, sizeof(STableScanPhyNode));
node->scanFlag = getScanFlag(pPlanNode, pQueryTableInfo);
node->window = pQueryTableInfo->window;
// todo tag cond
return (SPhyNode*)node;
}
static bool isSystemTable(SQueryTableInfo* pTable) {
// todo
return false;
}
static bool needSeqScan(SQueryPlanNode* pPlanNode) {
// todo
return false;
}
static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
if (isSystemTable(pTable)) {
return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
} else if (needSeqScan(pPlanNode)) {
return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
}
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTable, type);
}
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId);
subplan->type = type;
subplan->level = 0;
if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildren) {
pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
}
taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
}
SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(pCxt->pDag->pSubplans, ¤tLevel);
} else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
}
taosArrayPush(currentLevel, &subplan);
pCxt->pCurrentSubplan = subplan;
++(pCxt->pDag->numOfSubplans);
return subplan;
}
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
pNodeAddr->nodeId = vg->vgId;
pNodeAddr->epset = vg->epset;
}
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
SVgroupsInfo* pVgroupList = pTableInfo->pMeta->vgroupList;
for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
subplan->msgType = TDMT_VND_QUERY;
vgroupInfoToNodeAddr(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo);
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
return pCxt->nextId.templateId++;
}
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
node->srcTemplateId = srcTemplateId;
node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
return (SPhyNode*)node;
}
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
// todo system table, for instance, user_tables
return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
}
// TODO: the SVgroupInfo index
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTableInfo, type);
}
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
if (needMultiNodeScan(pTable)) {
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
}
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
}
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
node->aggAlgo = AGG_ALGO_PLAIN;
node->aggSplit = AGG_SPLIT_FINAL;
if (NULL != pGroupBy) {
node->aggAlgo = AGG_ALGO_HASHED;
node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
}
return (SPhyNode*)node;
}
static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
// if (needMultiNodeAgg(pPlanNode)) {
// }
return createSingleTableAgg(pCxt, pPlanNode);
}
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SPhyNode* node = NULL;
switch (pPlanNode->info.type) {
case QNODE_TAGSCAN:
node = createTagScanNode(pPlanNode);
break;
case QNODE_STREAMSCAN:
case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode);
break;
case QNODE_AGGREGATE:
case QNODE_GROUPBY:
node = createAggNode(pCxt, pPlanNode);
break;
case QNODE_MODIFY:
// Insert is not an operator in a physical plan.
break;
default:
assert(false);
}
if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
size_t size = taosArrayGetSize(pPlanNode->pChildren);
for(int32_t i = 0; i < size; ++i) {
SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
child->pParent = node;
taosArrayPush(node->pChildren, &child);
}
}
return node;
}
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;
size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
for (int32_t i = 0; i < numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
subplan->execNode.epset = blocks->vg.epset;
subplan->pDataSink = createDataInserter(pCxt, blocks, NULL);
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
subplan->id.queryId = pCxt->pDag->queryId;
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
}
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_MODIFY == pRoot->info.type) {
splitModificationOpSubPlan(pCxt, pRoot);
} else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
++(pCxt->nextId.templateId);
subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
}
// todo deal subquery
}
static void postCreateDag(SQueryPlanNode* pQueryNode, SQueryDag* pDag, SArray* pNodeList) {
// The exchange operator is not necessary, in case of the stream scan.
// Here we need to remove it from the DAG.
if (pQueryNode->info.type == QNODE_STREAMSCAN) {
SArray* pRootLevel = taosArrayGetP(pDag->pSubplans, 0);
SSubplan *pSubplan = taosArrayGetP(pRootLevel, 0);
if (pSubplan->pNode->info.type == OP_Exchange) {
ASSERT(taosArrayGetSize(pRootLevel) == 1);
taosArrayRemove(pDag->pSubplans, 0);
// And then update the number of the subplans.
pDag->numOfSubplans -= 1;
}
} else {
// Traverse the dag again to acquire the execution node.
if (pNodeList != NULL) {
SArray** pSubLevel = taosArrayGetLast(pDag->pSubplans);
size_t num = taosArrayGetSize(*pSubLevel);
for (int32_t j = 0; j < num; ++j) {
SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
taosArrayPush(pNodeList, &pPlan->execNode);
}
}
}
}
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = {
.pCatalog = pCatalog,
.pDag = validPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL,
//The unsigned Id starting from 1 would be better
.nextId = {.queryId = requestId, .subplanId = 1, .templateId = 1},
};
*pDag = context.pDag;
context.pDag->queryId = requestId;
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode);
} CATCH(code) {
CLEANUP_EXECUTE();
terrno = code;
return TSDB_CODE_FAILED;
} END_TRY
postCreateDag(pQueryNode, *pDag, pNodeList);
return TSDB_CODE_SUCCESS;
}
void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
if (NULL == pNode) {
return;
}
if (OP_Exchange == pNode->info.type) {
SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
if (templateId == pExchange->srcTemplateId) {
taosArrayPush(pExchange->pSrcEndPoints, pSource);
}
}
if (pNode->pChildren != NULL) {
size_t size = taosArrayGetSize(pNode->pChildren);
for(int32_t i = 0; i < size; ++i) {
setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
}
}
}
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
setExchangSourceNode(templateId, pSource, subplan->pNode);
}
static void destroyDataSinkNode(SDataSink* pSinkNode) {
if (pSinkNode == NULL) {
return;
}
if (queryNodeType(pSinkNode) == DSINK_Dispatch) {
SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode;
tfree(pDdSink->sink.schema.pSchema);
}
tfree(pSinkNode);
}
void qDestroySubplan(SSubplan* pSubplan) {
if (pSubplan == NULL) {
return;
}
taosArrayDestroy(pSubplan->pChildren);
taosArrayDestroy(pSubplan->pParents);
destroyDataSinkNode(pSubplan->pDataSink);
cleanupPhyNode(pSubplan->pNode);
tfree(pSubplan);
}
#endif
#include "plannerInt.h"
#include "functionMgt.h"
...
...
@@ -948,6 +468,7 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
static
SSubplan
*
createPhysiSubplan
(
SPhysiPlanContext
*
pCxt
,
SSubLogicPlan
*
pLogicSubplan
)
{
SSubplan
*
pSubplan
=
(
SSubplan
*
)
nodesMakeNode
(
QUERY_NODE_PHYSICAL_SUBPLAN
);
CHECK_ALLOC
(
pSubplan
,
NULL
);
pSubplan
->
pNode
=
createPhysiNode
(
pCxt
,
pLogicSubplan
->
pNode
);
return
pSubplan
;
}
...
...
@@ -976,7 +497,7 @@ static SQueryLogicPlan* createRawQueryLogicPlan(SPhysiPlanContext* pCxt, SLogicN
SSubLogicPlan
*
pSubplan
=
(
SSubLogicPlan
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
CHECK_ALLOC
(
pSubplan
,
pLogicPlan
);
CHECK_CODE
(
strictListAppend
(
pTopSubplans
->
pNodeList
,
pSubplan
),
pLogicPlan
);
pSubplan
->
pNode
=
(
SLogicNode
*
)
nodesCloneNode
((
SNode
*
)
pLogicNode
)
;
pSubplan
->
pNode
=
pLogicNode
;
CHECK_ALLOC
(
pSubplan
->
pNode
,
pLogicPlan
);
return
pLogicPlan
;
}
...
...
@@ -1005,6 +526,8 @@ static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPl
SNodeListNode
*
pLevelSubplans
=
(
SNodeListNode
*
)
nodesMakeNode
(
QUERY_NODE_NODE_LIST
);
CHECK_ALLOC
(
pLevelSubplans
,
TSDB_CODE_OUT_OF_MEMORY
);
CHECK_CODE
(
strictListAppend
(
pQueryPlan
->
pSubplans
,
pLevelSubplans
),
TSDB_CODE_OUT_OF_MEMORY
);
pLevelSubplans
->
pNodeList
=
nodesMakeList
();
CHECK_ALLOC
(
pLevelSubplans
->
pNodeList
,
TSDB_CODE_OUT_OF_MEMORY
);
SNode
*
pLogicSubplan
;
FOREACH
(
pLogicSubplan
,
((
SNodeListNode
*
)
pNode
)
->
pNodeList
)
{
CHECK_CODE
(
strictListAppend
(
pLevelSubplans
->
pNodeList
,
...
...
source/libs/planner/test/plannerTest.cpp
浏览文件 @
ddd8ae91
...
...
@@ -78,6 +78,14 @@ protected:
}
cout
<<
"unformatted physical plan : "
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pPlan
,
false
)
<<
endl
;
SNode
*
pNode
;
FOREACH
(
pNode
,
pPlan
->
pSubplans
)
{
SNode
*
pSubplan
;
FOREACH
(
pSubplan
,
((
SNodeListNode
*
)
pNode
)
->
pNodeList
)
{
cout
<<
"unformatted physical subplan : "
<<
endl
;
cout
<<
toString
(
pSubplan
,
false
)
<<
endl
;
}
}
}
return
true
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录