提交 3c466384 编写于 作者: X Xiaoyu Wang

feat: support compute only use qnode

上级 11564a3d
......@@ -177,7 +177,8 @@ typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL,
SUBPLAN_TYPE_SCAN,
SUBPLAN_TYPE_MODIFY
SUBPLAN_TYPE_MODIFY,
SUBPLAN_TYPE_COMPUTE
} ESubplanType;
typedef struct SSubplanId {
......@@ -196,6 +197,7 @@ typedef struct SLogicSubplan {
SVgroupsInfo* pVgroupList;
int32_t level;
int32_t splitFlag;
int32_t numOfComputeNodes;
} SLogicSubplan;
typedef struct SQueryLogicPlan {
......
......@@ -1117,6 +1117,7 @@ static const char* jkLogicSubplanVgroupsSize = "VgroupsSize";
static const char* jkLogicSubplanVgroups = "Vgroups";
static const char* jkLogicSubplanLevel = "Level";
static const char* jkLogicSubplanSplitFlag = "SplitFlag";
static const char* jkLogicSubplanNumOfComputeNodes = "NumOfComputeNodes";
static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) {
const SLogicSubplan* pNode = (const SLogicSubplan*)pObj;
......@@ -1143,6 +1144,9 @@ static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicSubplanSplitFlag, pNode->splitFlag);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicSubplanNumOfComputeNodes, pNode->numOfComputeNodes);
}
return code;
}
......@@ -1159,7 +1163,6 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code);
;
}
int32_t objSize = 0;
if (TSDB_CODE_SUCCESS == code) {
......@@ -1174,6 +1177,9 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkLogicSubplanSplitFlag, &pNode->splitFlag);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkLogicSubplanNumOfComputeNodes, &pNode->numOfComputeNodes);
}
return code;
}
......
......@@ -115,7 +115,45 @@ static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan,
}
}
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
SNode* pChild = NULL;
SNode* pParent = NULL;
int32_t code = TSDB_CODE_SUCCESS;
FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static bool isComputeGroup(SNodeList* pGroup) {
if (0 == LIST_LENGTH(pGroup)) {
return false;
}
return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType;
}
static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
int32_t code = TSDB_CODE_SUCCESS;
bool topLevel = (0 == LIST_LENGTH(pParentsGroup));
SNode* pChild = NULL;
......@@ -138,6 +176,13 @@ static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurren
return code;
}
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
if (isComputeGroup(pParentsGroup)) {
return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup);
}
return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup);
}
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
SNodeList* pCurrentGroup = nodesMakeList();
if (NULL == pCurrentGroup) {
......@@ -155,6 +200,9 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32
case SUBPLAN_TYPE_MODIFY:
code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
break;
case SUBPLAN_TYPE_COMPUTE:
code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup);
break;
default:
break;
}
......
......@@ -994,8 +994,20 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
if (NULL != pScanSubplan) {
if (NULL != info.pSubplan->pVgroupList) {
info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
} else {
info.pSubplan->numOfComputeNodes = 1;
}
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
++(pCxt->groupId);
pCxt->split = true;
return code;
......@@ -1007,8 +1019,7 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
{.pName = "QnodeSplit", .splitFunc = qnodeSplit}
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
};
// clang-format on
......@@ -1039,7 +1050,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
}
}
} while (split);
return TSDB_CODE_SUCCESS;
return qnodeSplit(&cxt, pSubplan);
}
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册