提交 8615c97f 编写于 作者: X Xiaoyu Wang

TD-13704 super table plan split

上级 b91899e7
......@@ -105,7 +105,8 @@ typedef struct SSubLogicPlan {
typedef struct SQueryLogicPlan {
ENodeType type;;
SNodeList* pSubplans;
int32_t totalLevel;
SNodeList* pTopSubplans;
} SQueryLogicPlan;
typedef struct SSlotDescNode {
......@@ -221,8 +222,8 @@ typedef struct SSubplan {
typedef struct SQueryPlan {
ENodeType type;;
uint64_t queryId;
int32_t numOfSubplans;
SNodeList* pSubplans; // SNodeListNode. The execution level of subplan, starting from 0.
int32_t numOfSubplans;
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
} SQueryPlan;
#ifdef __cplusplus
......
......@@ -31,14 +31,14 @@ typedef struct SPlanContext {
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @groupId id of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans
void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource);
// @pSubplan subplan to be schedule
// @groupId id of a group of datasource subplans of this @pSubplan
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
// Convert to subplan to string for the scheduler to send to the executor
int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len);
int32_t qStringToSubplan(const char* str, SSubplan** subplan);
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
SQueryPlan* qStringToQueryPlan(const char* pStr);
......
......@@ -28,7 +28,6 @@ typedef struct SPhysiPlanContext {
int16_t nextDataBlockId;
SArray* pLocationHelper;
SArray* pExecNodeList;
int32_t groupId;
int32_t subplanId;
} SPhysiPlanContext;
......@@ -537,6 +536,8 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubpl
SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
CHECK_ALLOC(pSubplan, NULL);
pSubplan->id = pLogicSubplan->id;
pSubplan->subplanType = pLogicSubplan->subplanType;
pSubplan->level = pLogicSubplan->level;
return pSubplan;
}
......@@ -554,8 +555,6 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
pSubplan->msgType = TDMT_VND_QUERY;
}
pSubplan->subplanType = pLogicSubplan->subplanType;
pSubplan->level = pLogicSubplan->level;
return pSubplan;
}
......@@ -603,7 +602,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
return TSDB_CODE_SUCCESS;
}
SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) {
static SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) {
SSubLogicPlan* pDst = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
CHECK_ALLOC(pDst, NULL);
pDst->pNode = nodesCloneNode(pSrc->pNode);
......@@ -613,13 +612,13 @@ SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* p
}
pDst->subplanType = pSrc->subplanType;
pDst->level = level;
pDst->id.queryId = pCxt->pPlanCxt->queryId;
pDst->id.groupId = pCxt->groupId;
pDst->id.queryId = pSrc->id.queryId;
pDst->id.groupId = pSrc->id.groupId;
pDst->id.subplanId = pCxt->subplanId++;
return pDst;
}
static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) {
static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SNodeList* pGroup) {
SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode;
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
for (int32_t i = 0; i < numOfVgroups; ++i) {
......@@ -627,17 +626,12 @@ static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubpla
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks;
// CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan));
}
return TSDB_CODE_SUCCESS;
}
static int32_t scaleOutForMerge(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) {
// SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
// CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
// CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
// return TSDB_CODE_SUCCESS;
static int32_t scaleOutForMerge(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, singleCloneSubLogicPlan(pCxt, pSubplan, level));
}
......@@ -665,40 +659,60 @@ static int32_t setScanVgroup(SPhysiPlanContext* pCxt, SLogicNode* pNode, const S
return doSetScanVgroup(pCxt, pNode, pVgroup, &found);
}
static int32_t scaleOutForScan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) {
static int32_t scaleOutForScan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SNodeList* pGroup) {
if (pSubplan->pVgroupList) {
for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE_EXT(setScanVgroup(pCxt, pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i));
// CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan));
}
return TSDB_CODE_SUCCESS;
} else {
return scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pGroup);
return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
}
}
// static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup, int32_t level, SQueryLogicPlan* pLogicPlan) {
// FOREACH() {
static int32_t appendWithMakeList(SNodeList** pList, SNodeptr pNode) {
if (NULL == *pList) {
*pList = nodesMakeList();
if (NULL == *pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return nodesListAppend(*pList, pNode);
}
// }
// }
static int32_t pushHierarchicalPlan(SPhysiPlanContext* pCxt, SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
bool topLevel = (0 == LIST_LENGTH(pParentsGroup));
SNode* pChild = NULL;
FOREACH(pChild, pCurrentGroup) {
if (topLevel) {
CHECK_CODE_EXT(nodesListAppend(pParentsGroup, pChild));
} else {
SNode* pParent = NULL;
FOREACH(pParent, pParentsGroup) {
CHECK_CODE_EXT(appendWithMakeList(&(((SSubLogicPlan*)pParent)->pChildren), pChild));
CHECK_CODE_EXT(appendWithMakeList(&(((SSubLogicPlan*)pChild)->pParents), pParent));
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SHashObj* pHash, SNodeList* pParentsGroup) {
static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t* pLevel, SNodeList* pParentsGroup) {
SNodeList* pCurrentGroup = nodesMakeList();
CHECK_ALLOC(pCurrentGroup, TSDB_CODE_OUT_OF_MEMORY);
int32_t code = TSDB_CODE_SUCCESS;
switch (pSubplan->subplanType) {
case SUBPLAN_TYPE_MERGE:
code = scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup);
code = scaleOutForMerge(pCxt, pSubplan, *pLevel, pCurrentGroup);
break;
case SUBPLAN_TYPE_SCAN:
code = scaleOutForScan(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup);
code = scaleOutForScan(pCxt, pSubplan, *pLevel, pCurrentGroup);
break;
case SUBPLAN_TYPE_MODIFY:
code = scaleOutForModify(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup);
code = scaleOutForModify(pCxt, pSubplan, *pLevel, pCurrentGroup);
break;
default:
break;
......@@ -706,12 +720,12 @@ static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int3
if (TSDB_CODE_SUCCESS != code) {
return code;
}
// pushHierarchicalPlan(pParentsGroup, pCurrentGroup, level, pLogicPlan);
CHECK_CODE(taosHashPut(pHash, &pCxt->groupId, sizeof(pCxt->groupId), &pSubplan->id.groupId, sizeof(pSubplan->id.groupId)), TSDB_CODE_OUT_OF_MEMORY);
++(pCxt->groupId);
CHECK_CODE_EXT(pushHierarchicalPlan(pCxt, pParentsGroup, pCurrentGroup));
++(*pLevel);
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, level + 1, pLogicPlan, pHash, pCurrentGroup));
CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, pLevel, pCurrentGroup));
}
return TSDB_CODE_SUCCESS;
......@@ -720,75 +734,18 @@ static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int3
static SQueryLogicPlan* makeQueryLogicPlan(SPhysiPlanContext* pCxt) {
SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN);
CHECK_ALLOC(pLogicPlan, NULL);
pLogicPlan->pSubplans = nodesMakeList();
if (NULL == pLogicPlan->pSubplans) {
pLogicPlan->pTopSubplans = nodesMakeList();
if (NULL == pLogicPlan->pTopSubplans) {
nodesDestroyNode(pLogicPlan);
return NULL;
}
return pLogicPlan;
}
static int32_t doMappingLogicPlan(SLogicNode* pNode, SHashObj* pHash) {
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pNode)) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)pNode;
int32_t* pGroupId = taosHashGet(pHash, &pExchange->srcGroupId, sizeof(pExchange->srcGroupId));
if (NULL == pGroupId) {
return TSDB_CODE_FAILED;
}
pExchange->srcGroupId = *pGroupId;
return TSDB_CODE_SUCCESS;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
doMappingLogicPlan((SLogicNode*)pChild, pHash);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mappingLogicPlan(SQueryLogicPlan* pLogicPlan, SHashObj* pHash) {
SNode* pNode = NULL;
FOREACH(pNode, pLogicPlan->pSubplans) {
SNode* pSubplan = NULL;
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) {
int32_t code = doMappingLogicPlan(((SSubLogicPlan*)pSubplan)->pNode, pHash);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t scaleOutLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pRootSubLogicPlan, SQueryLogicPlan** pLogicPlan) {
*pLogicPlan = makeQueryLogicPlan(pCxt);
CHECK_ALLOC(*pLogicPlan, TSDB_CODE_OUT_OF_MEMORY);
SHashObj* pHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
int32_t code = doScaleOut(pCxt, pRootSubLogicPlan, 0, *pLogicPlan, pHash, NULL);
if (TSDB_CODE_SUCCESS == code) {
code = mappingLogicPlan(*pLogicPlan, pHash);
}
taosHashCleanup(pHash);
return code;
}
typedef struct SBuildPhysiSubplanCxt {
int32_t errCode;
SQueryPlan* pQueryPlan;
SPhysiPlanContext* pPhyCxt;
} SBuildPhysiSubplanCxt;
static EDealRes doBuildPhysiSubplan(SNode* pNode, void* pContext) {
SBuildPhysiSubplanCxt* pCxt = (SBuildPhysiSubplanCxt*)pContext;
if (QUERY_NODE_LOGIC_SUBPLAN == nodeType(pNode)) {
SSubplan* pSubplan = createPhysiSubplan(pCxt->pPhyCxt, (SSubLogicPlan*)pNode);
CHECK_ALLOC(pSubplan, DEAL_RES_ERROR);
CHECK_CODE(pushSubplan(pCxt->pPhyCxt, pSubplan, ((SSubLogicPlan*)pNode)->level, pCxt->pQueryPlan->pSubplans), DEAL_RES_ERROR);
++(pCxt->pQueryPlan->numOfSubplans);
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
return doScaleOut(pCxt, pRootSubLogicPlan, &((*pLogicPlan)->totalLevel), (*pLogicPlan)->pTopSubplans);
}
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
......@@ -803,15 +760,31 @@ static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
return pPlan;
}
static int32_t doBuildPhysiPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) {
SSubplan* pSubplan = createPhysiSubplan(pCxt, pLogicSubplan);
CHECK_ALLOC(pSubplan, DEAL_RES_ERROR);
CHECK_CODE_EXT(pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans));
++(pQueryPlan->numOfSubplans);
if (NULL != pParent) {
CHECK_CODE_EXT(appendWithMakeList(&pParent->pChildren, pSubplan));
CHECK_CODE_EXT(appendWithMakeList(&pSubplan->pParents, pParent));
}
SNode* pChild = NULL;
FOREACH(pChild, pLogicSubplan->pChildren) {
CHECK_CODE_EXT(doBuildPhysiPlan(pCxt, (SSubLogicPlan*)pChild, pSubplan, pQueryPlan));
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
SBuildPhysiSubplanCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pQueryPlan = makeQueryPhysiPlan(pCxt), .pPhyCxt = pCxt };
CHECK_ALLOC(cxt.pQueryPlan, TSDB_CODE_OUT_OF_MEMORY);
nodesWalkList(pLogicPlan->pSubplans, doBuildPhysiSubplan, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyNode(cxt.pQueryPlan);
return cxt.errCode;
*pPlan = makeQueryPhysiPlan(pCxt);
CHECK_ALLOC(*pPlan, TSDB_CODE_OUT_OF_MEMORY);
SNode* pSubplan = NULL;
FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
CHECK_CODE_EXT(doBuildPhysiPlan(pCxt, (SSubLogicPlan*)pSubplan, NULL, *pPlan));
}
*pPlan = cxt.pQueryPlan;
return TSDB_CODE_SUCCESS;
}
......
......@@ -34,31 +34,66 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
return code;
}
void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) {
if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
if (pExchange->srcGroupId == groupId) {
if (NULL == pExchange->pSrcEndPoints) {
pExchange->pSrcEndPoints = nodesMakeList();
if (NULL == pExchange->pSrcEndPoints) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pExchange->pSrcEndPoints, nodesCloneNode(pSource))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
}
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) {
if (TSDB_CODE_SUCCESS != setSubplanExecutionNode((SPhysiNode*)pChild, groupId, pSource)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
}
int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
if (SUBPLAN_TYPE_MODIFY == subplan->subplanType) {
SDataInserterNode* insert = (SDataInserterNode*)subplan->pDataSink;
*len = insert->size;
*str = insert->pData;
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) {
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
*pLen = insert->size;
*pStr = insert->pData;
insert->pData = NULL;
return TSDB_CODE_SUCCESS;
}
return nodesNodeToString((const SNode*)subplan, false, str, len);
return nodesNodeToString((const SNode*)pSubplan, false, pStr, pLen);
}
int32_t qStringToSubplan(const char* str, SSubplan** subplan) {
return nodesStringToNode(str, (SNode**)subplan);
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) {
return nodesStringToNode(pStr, (SNode**)pSubplan);
}
char* qQueryPlanToString(const SQueryPlan* pPlan) {
char* pStr = NULL;
int32_t len = 0;
if (TSDB_CODE_SUCCESS != nodesNodeToString(pPlan, false, &pStr, &len)) {
return NULL;
}
return pStr;
}
SQueryPlan* qStringToQueryPlan(const char* pStr) {
SQueryPlan* pPlan = NULL;
if (TSDB_CODE_SUCCESS != nodesStringToNode(pStr, (SNode**)&pPlan)) {
return NULL;
}
return pPlan;
}
void qDestroyQueryPlan(SQueryPlan* pPlan) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册