未验证 提交 2cd36572 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #13867 from taosdata/feature/3.0_debug_wxy

feat: support compute only use qnode
...@@ -177,7 +177,8 @@ typedef enum ESubplanType { ...@@ -177,7 +177,8 @@ typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL, SUBPLAN_TYPE_PARTIAL,
SUBPLAN_TYPE_SCAN, SUBPLAN_TYPE_SCAN,
SUBPLAN_TYPE_MODIFY SUBPLAN_TYPE_MODIFY,
SUBPLAN_TYPE_COMPUTE
} ESubplanType; } ESubplanType;
typedef struct SSubplanId { typedef struct SSubplanId {
...@@ -196,6 +197,7 @@ typedef struct SLogicSubplan { ...@@ -196,6 +197,7 @@ typedef struct SLogicSubplan {
SVgroupsInfo* pVgroupList; SVgroupsInfo* pVgroupList;
int32_t level; int32_t level;
int32_t splitFlag; int32_t splitFlag;
int32_t numOfComputeNodes;
} SLogicSubplan; } SLogicSubplan;
typedef struct SQueryLogicPlan { typedef struct SQueryLogicPlan {
......
...@@ -1117,6 +1117,7 @@ static const char* jkLogicSubplanVgroupsSize = "VgroupsSize"; ...@@ -1117,6 +1117,7 @@ static const char* jkLogicSubplanVgroupsSize = "VgroupsSize";
static const char* jkLogicSubplanVgroups = "Vgroups"; static const char* jkLogicSubplanVgroups = "Vgroups";
static const char* jkLogicSubplanLevel = "Level"; static const char* jkLogicSubplanLevel = "Level";
static const char* jkLogicSubplanSplitFlag = "SplitFlag"; static const char* jkLogicSubplanSplitFlag = "SplitFlag";
static const char* jkLogicSubplanNumOfComputeNodes = "NumOfComputeNodes";
static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) { static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) {
const SLogicSubplan* pNode = (const SLogicSubplan*)pObj; const SLogicSubplan* pNode = (const SLogicSubplan*)pObj;
...@@ -1143,6 +1144,9 @@ static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) { ...@@ -1143,6 +1144,9 @@ static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicSubplanSplitFlag, pNode->splitFlag); code = tjsonAddIntegerToObject(pJson, jkLogicSubplanSplitFlag, pNode->splitFlag);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicSubplanNumOfComputeNodes, pNode->numOfComputeNodes);
}
return code; return code;
} }
...@@ -1159,7 +1163,6 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { ...@@ -1159,7 +1163,6 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code); tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code);
;
} }
int32_t objSize = 0; int32_t objSize = 0;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -1174,6 +1177,9 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { ...@@ -1174,6 +1177,9 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkLogicSubplanSplitFlag, &pNode->splitFlag); code = tjsonGetIntValue(pJson, jkLogicSubplanSplitFlag, &pNode->splitFlag);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkLogicSubplanNumOfComputeNodes, &pNode->numOfComputeNodes);
}
return code; return code;
} }
......
...@@ -115,7 +115,45 @@ static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, ...@@ -115,7 +115,45 @@ static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan,
} }
} }
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) { static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
SNode* pChild = NULL;
SNode* pParent = NULL;
int32_t code = TSDB_CODE_SUCCESS;
FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static bool isComputeGroup(SNodeList* pGroup) {
if (0 == LIST_LENGTH(pGroup)) {
return false;
}
return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType;
}
static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool topLevel = (0 == LIST_LENGTH(pParentsGroup)); bool topLevel = (0 == LIST_LENGTH(pParentsGroup));
SNode* pChild = NULL; SNode* pChild = NULL;
...@@ -138,6 +176,13 @@ static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurren ...@@ -138,6 +176,13 @@ static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurren
return code; return code;
} }
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
if (isComputeGroup(pParentsGroup)) {
return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup);
}
return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup);
}
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) { static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
SNodeList* pCurrentGroup = nodesMakeList(); SNodeList* pCurrentGroup = nodesMakeList();
if (NULL == pCurrentGroup) { if (NULL == pCurrentGroup) {
...@@ -155,6 +200,9 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32 ...@@ -155,6 +200,9 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32
case SUBPLAN_TYPE_MODIFY: case SUBPLAN_TYPE_MODIFY:
code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup); code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
break; break;
case SUBPLAN_TYPE_COMPUTE:
code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup);
break;
default: default:
break; break;
} }
......
...@@ -994,8 +994,20 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -994,8 +994,20 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
} }
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType); int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)); SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
if (NULL != pScanSubplan) {
if (NULL != info.pSubplan->pVgroupList) {
info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
} else {
info.pSubplan->numOfComputeNodes = 1;
}
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
++(pCxt->groupId); ++(pCxt->groupId);
pCxt->split = true; pCxt->split = true;
return code; return code;
...@@ -1007,8 +1019,7 @@ static const SSplitRule splitRuleSet[] = { ...@@ -1007,8 +1019,7 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
{.pName = "QnodeSplit", .splitFunc = qnodeSplit}
}; };
// clang-format on // clang-format on
...@@ -1039,7 +1050,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -1039,7 +1050,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
} }
} }
} while (split); } while (split);
return TSDB_CODE_SUCCESS; return qnodeSplit(&cxt, pSubplan);
} }
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册