提交 b91899e7 编写于 作者: X Xiaoyu Wang

TD-13704 super table plan split

上级 cde4c9a7
......@@ -66,6 +66,7 @@ typedef enum ENodeType {
QUERY_NODE_DATABLOCK_DESC,
QUERY_NODE_SLOT_DESC,
QUERY_NODE_COLUMN_DEF,
QUERY_NODE_DOWNSTREAM_SOURCE,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,
......@@ -98,6 +99,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
......
......@@ -67,12 +67,17 @@ typedef struct SProjectLogicNode {
} SProjectLogicNode;
typedef struct SVnodeModifLogicNode {
ENodeType type;;
SLogicNode node;;
int32_t msgType;
SArray* pDataBlocks;
SVgDataBlocks* pVgDataBlocks;
} SVnodeModifLogicNode;
typedef struct SExchangeLogicNode {
SLogicNode node;
int32_t srcGroupId;
} SExchangeLogicNode;
typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL,
......@@ -80,13 +85,22 @@ typedef enum ESubplanType {
SUBPLAN_TYPE_MODIFY
} ESubplanType;
typedef struct SSubplanId {
uint64_t queryId;
int32_t groupId;
int32_t subplanId;
} SSubplanId;
typedef struct SSubLogicPlan {
ENodeType type;
SSubplanId id;
SNodeList* pChildren;
SNodeList* pParents;
SLogicNode* pNode;
ESubplanType subplanType;
SVgroupsInfo* pVgroupList;
int32_t level;
int32_t splitFlag;
} SSubLogicPlan;
typedef struct SQueryLogicPlan {
......@@ -161,20 +175,21 @@ typedef struct SAggPhysiNode {
SNodeList* pAggFuncs;
} SAggPhysiNode;
typedef struct SDownstreamSource {
typedef struct SDownstreamSourceNode {
ENodeType type;
SQueryNodeAddr addr;
uint64_t taskId;
uint64_t schedId;
} SDownstreamSource;
} SDownstreamSourceNode;
typedef struct SExchangePhysiNode {
SPhysiNode node;
uint64_t srcTemplateId; // template id of datasource suplans
SArray* pSrcEndPoints; // SArray<SDownstreamSource>, scheduler fill by calling qSetSuplanExecutionNode
int32_t srcGroupId; // group id of datasource suplans
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
typedef struct SDataSinkNode {
ENodeType type;;
ENodeType type;
SDataBlockDescNode* pInputDataBlockDesc;
} SDataSinkNode;
......@@ -189,12 +204,6 @@ typedef struct SDataInserterNode {
char *pData;
} SDataInserterNode;
typedef struct SSubplanId {
uint64_t queryId;
int32_t templateId;
int32_t subplanId;
} SSubplanId;
typedef struct SSubplan {
ENodeType type;
SSubplanId id; // unique id of the subplan
......
......@@ -32,9 +32,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @groupId id of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource);
// Convert to subplan to string for the scheduler to send to the executor
int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len);
......
......@@ -4984,7 +4984,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
return pTaskInfo->code;
}
SDownstreamSource *pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SDownstreamSourceNode *pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo *pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
......@@ -5082,7 +5082,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, i);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
SSDataBlock* pRes = pExchangeInfo->pResult;
......@@ -5179,7 +5179,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
tsem_wait(&pExchangeInfo->ready);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
if (pRsp->numOfRows == 0) {
......
......@@ -51,7 +51,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
}
};
const int funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition));
const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition));
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
return TSDB_CODE_SUCCESS;
......
......@@ -60,6 +60,9 @@
#define CLONE_OBJECT_FIELD(fldname, cloneFunc) \
do { \
if (NULL == (pSrc)->fldname) { \
break; \
} \
(pDst)->fldname = cloneFunc((pSrc)->fldname); \
if (NULL == (pDst)->fldname) { \
nodesDestroyNode((SNode*)(pDst)); \
......@@ -234,10 +237,17 @@ static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode*
}
static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(msgType);
return (SNode*)pDst;
}
static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcGroupId);
return (SNode*)pDst;
}
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
CLONE_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(subplanType);
......@@ -304,6 +314,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicProjectCopy((const SProjectLogicNode*)pNode, (SProjectLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
default:
......
......@@ -496,6 +496,37 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkExchangePhysiPlanSrcGroupId = "SrcGroupId";
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcGroupId, pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
}
return code;
}
static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
SExchangePhysiNode* pNode = (SExchangePhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcGroupId, &pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
}
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
......@@ -517,7 +548,7 @@ static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) {
}
static const char* jkSubplanIdQueryId = "QueryId";
static const char* jkSubplanIdTemplateId = "TemplateId";
static const char* jkSubplanIdGroupId = "GroupId";
static const char* jkSubplanIdSubplanId = "SubplanId";
static int32_t subplanIdToJson(const void* pObj, SJson* pJson) {
......@@ -525,7 +556,7 @@ static int32_t subplanIdToJson(const void* pObj, SJson* pJson) {
int32_t code = tjsonAddIntegerToObject(pJson, jkSubplanIdQueryId, pNode->queryId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanIdTemplateId, pNode->templateId);
code = tjsonAddIntegerToObject(pJson, jkSubplanIdGroupId, pNode->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanIdSubplanId, pNode->subplanId);
......@@ -539,7 +570,7 @@ static int32_t jsonToSubplanId(const SJson* pJson, void* pObj) {
int32_t code = tjsonGetUBigIntValue(pJson, jkSubplanIdQueryId, &pNode->queryId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanIdTemplateId, &pNode->templateId);
code = tjsonGetIntValue(pJson, jkSubplanIdGroupId, &pNode->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanIdSubplanId, &pNode->subplanId);
......@@ -1387,7 +1418,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return physiExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
......@@ -1459,6 +1492,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiJoinNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN:
......
......@@ -129,6 +129,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SProjectLogicNode));
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SSubLogicPlan));
case QUERY_NODE_LOGIC_PLAN:
......
......@@ -50,6 +50,7 @@ extern "C" {
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t applySplitRule(SSubLogicPlan* pSubplan);
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan, SArray* pExecNodeList);
#ifdef __cplusplus
......
......@@ -28,6 +28,8 @@ typedef struct SPhysiPlanContext {
int16_t nextDataBlockId;
SArray* pLocationHelper;
SArray* pExecNodeList;
int32_t groupId;
int32_t subplanId;
} SPhysiPlanContext;
static int32_t getSlotKey(SNode* pNode, char* pKey) {
......@@ -81,12 +83,7 @@ static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SData
SNode* pNode = NULL;
int16_t slotId = taosHashGetSize(pHash);
FOREACH(pNode, pList) {
SNode* pSlot = createSlotDesc(pCxt, pNode, slotId);
CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY);
if (TSDB_CODE_SUCCESS != nodesListAppend(pDataBlockDesc->pSlots, (SNode*)pSlot)) {
nodesDestroyNode(pSlot);
return TSDB_CODE_OUT_OF_MEMORY;
}
CHECK_CODE_EXT(nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId)));
SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId };
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
......@@ -97,7 +94,7 @@ static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SData
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
REPLACE_NODE(pTarget);
pDataBlockDesc->resultRowSize += ((SSlotDescNode*)pSlot)->dataType.bytes;
pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes;
++slotId;
}
return TSDB_CODE_SUCCESS;
......@@ -467,6 +464,14 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
return (SPhysiNode*)pProject;
}
static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
CHECK_ALLOC(pExchange, NULL);
CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange);
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
return (SPhysiNode*)pExchange;
}
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) {
SNodeList* pChildren = nodesMakeList();
CHECK_ALLOC(pChildren, NULL);
......@@ -495,6 +500,9 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
case QUERY_NODE_LOGIC_PLAN_PROJECT:
pPhyNode = createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
pPhyNode = createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicPlan);
break;
default:
break;
}
......@@ -525,8 +533,15 @@ static SDataSinkNode* createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysi
return (SDataSinkNode*)pDispatcher;
}
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) {
SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
CHECK_ALLOC(pSubplan, NULL);
pSubplan->id = pLogicSubplan->id;
return pSubplan;
}
static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) {
SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
CHECK_ALLOC(pSubplan, NULL);
if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
......@@ -540,9 +555,22 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
pSubplan->msgType = TDMT_VND_QUERY;
}
pSubplan->subplanType = pLogicSubplan->subplanType;
pSubplan->level = pLogicSubplan->level;
return pSubplan;
}
static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
pNode->pParent = pParent;
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
doSetLogicNodeParent((SLogicNode*)pChild, pNode);
}
}
static void setLogicNodeParent(SLogicNode* pNode) {
doSetLogicNodeParent(pNode, NULL);
}
static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubLogicPlan** pSubLogicPlan) {
*pSubLogicPlan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
CHECK_ALLOC(*pSubLogicPlan, TSDB_CODE_OUT_OF_MEMORY);
......@@ -553,8 +581,9 @@ static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, S
} else {
(*pSubLogicPlan)->subplanType = SUBPLAN_TYPE_MERGE;
}
// todo split
return TSDB_CODE_SUCCESS;
(*pSubLogicPlan)->id.queryId = pCxt->pPlanCxt->queryId;
setLogicNodeParent((*pSubLogicPlan)->pNode);
return applySplitRule(*pSubLogicPlan);
}
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t level, SNodeList* pSubplans) {
......@@ -571,6 +600,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY);
}
CHECK_CODE(nodesListStrictAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_SUCCESS;
}
SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) {
......@@ -583,11 +613,13 @@ SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* p
}
pDst->subplanType = pSrc->subplanType;
pDst->level = level;
pDst->id.queryId = pCxt->pPlanCxt->queryId;
pDst->id.groupId = pCxt->groupId;
pDst->id.subplanId = pCxt->subplanId++;
return pDst;
}
static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) {
static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) {
SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode;
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
for (int32_t i = 0; i < numOfVgroups; ++i) {
......@@ -595,17 +627,91 @@ static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int3
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks;
CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
// CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan));
}
} else {
return TSDB_CODE_SUCCESS;
}
static int32_t scaleOutForMerge(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) {
// SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
// CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
// CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
// return TSDB_CODE_SUCCESS;
return nodesListStrictAppend(pGroup, singleCloneSubLogicPlan(pCxt, pSubplan, level));
}
static int32_t doSetScanVgroup(SPhysiPlanContext* pCxt, SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
pScan->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
CHECK_ALLOC(pScan->pVgroupList, TSDB_CODE_OUT_OF_MEMORY);
memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
*pFound = true;
return TSDB_CODE_SUCCESS;
}
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) {
int32_t code = doSetScanVgroup(pCxt, (SLogicNode*)pChild, pVgroup, pFound);
if (TSDB_CODE_SUCCESS != code || *pFound) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t setScanVgroup(SPhysiPlanContext* pCxt, SLogicNode* pNode, const SVgroupInfo* pVgroup) {
bool found = false;
return doSetScanVgroup(pCxt, pNode, pVgroup, &found);
}
static int32_t scaleOutForScan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) {
if (pSubplan->pVgroupList) {
for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
CHECK_CODE_EXT(setScanVgroup(pCxt, pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i));
// CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan));
}
return TSDB_CODE_SUCCESS;
} else {
return scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pGroup);
}
}
// static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup, int32_t level, SQueryLogicPlan* pLogicPlan) {
// FOREACH() {
// }
// }
static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SHashObj* pHash, SNodeList* pParentsGroup) {
SNodeList* pCurrentGroup = nodesMakeList();
CHECK_ALLOC(pCurrentGroup, TSDB_CODE_OUT_OF_MEMORY);
int32_t code = TSDB_CODE_SUCCESS;
switch (pSubplan->subplanType) {
case SUBPLAN_TYPE_MERGE:
code = scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup);
break;
case SUBPLAN_TYPE_SCAN:
code = scaleOutForScan(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup);
break;
case SUBPLAN_TYPE_MODIFY:
code = scaleOutForModify(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup);
break;
default:
break;
}
if (TSDB_CODE_SUCCESS != code) {
return code;
}
// pushHierarchicalPlan(pParentsGroup, pCurrentGroup, level, pLogicPlan);
CHECK_CODE(taosHashPut(pHash, &pCxt->groupId, sizeof(pCxt->groupId), &pSubplan->id.groupId, sizeof(pSubplan->id.groupId)), TSDB_CODE_OUT_OF_MEMORY);
++(pCxt->groupId);
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, level + 1, pLogicPlan));
CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, level + 1, pLogicPlan, pHash, pCurrentGroup));
}
return TSDB_CODE_SUCCESS;
......@@ -622,10 +728,49 @@ static SQueryLogicPlan* makeQueryLogicPlan(SPhysiPlanContext* pCxt) {
return pLogicPlan;
}
static int32_t doMappingLogicPlan(SLogicNode* pNode, SHashObj* pHash) {
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pNode)) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)pNode;
int32_t* pGroupId = taosHashGet(pHash, &pExchange->srcGroupId, sizeof(pExchange->srcGroupId));
if (NULL == pGroupId) {
return TSDB_CODE_FAILED;
}
pExchange->srcGroupId = *pGroupId;
return TSDB_CODE_SUCCESS;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
doMappingLogicPlan((SLogicNode*)pChild, pHash);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mappingLogicPlan(SQueryLogicPlan* pLogicPlan, SHashObj* pHash) {
SNode* pNode = NULL;
FOREACH(pNode, pLogicPlan->pSubplans) {
SNode* pSubplan = NULL;
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) {
int32_t code = doMappingLogicPlan(((SSubLogicPlan*)pSubplan)->pNode, pHash);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t scaleOutLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pRootSubLogicPlan, SQueryLogicPlan** pLogicPlan) {
*pLogicPlan = makeQueryLogicPlan(pCxt);
CHECK_ALLOC(*pLogicPlan, TSDB_CODE_OUT_OF_MEMORY);
return doScaleOut(pCxt, pRootSubLogicPlan, 0, *pLogicPlan);
SHashObj* pHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
int32_t code = doScaleOut(pCxt, pRootSubLogicPlan, 0, *pLogicPlan, pHash, NULL);
if (TSDB_CODE_SUCCESS == code) {
code = mappingLogicPlan(*pLogicPlan, pHash);
}
taosHashCleanup(pHash);
return code;
}
typedef struct SBuildPhysiSubplanCxt {
......@@ -681,13 +826,12 @@ int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan**
if (NULL == cxt.pLocationHelper) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SQueryLogicPlan* pLogicPlan;
SSubLogicPlan* pSubLogicPlan;
SQueryLogicPlan* pLogicPlan = NULL;
SSubLogicPlan* pSubLogicPlan = NULL;
int32_t code = splitLogicPlan(&cxt, pLogicNode, &pSubLogicPlan);
if (TSDB_CODE_SUCCESS == code) {
code = scaleOutLogicPlan(&cxt, pSubLogicPlan, &pLogicPlan);
}
// todo maping
if (TSDB_CODE_SUCCESS == code) {
code = buildPhysiPlan(&cxt, pLogicPlan, pPlan);
}
......
......@@ -34,7 +34,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
return code;
}
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
}
......@@ -62,5 +62,5 @@ SQueryPlan* qStringToQueryPlan(const char* pStr) {
}
void qDestroyQueryPlan(SQueryPlan* pPlan) {
nodesDestroyNode(pPlan);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "plannerInt.h"
#define SPLIT_FLAG_MASK(n) (1 << n)
#define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef struct SSplitContext {
int32_t errCode;
int32_t groupId;
bool match;
void* pInfo;
} SSplitContext;
typedef int32_t (*FMatch)(SSplitContext* pCxt, SSubLogicPlan* pSubplan);
typedef int32_t (*FSplit)(SSplitContext* pCxt);
typedef struct SSplitRule {
char* pName;
FMatch matchFunc;
FSplit splitFunc;
} SSplitRule;
typedef struct SStsInfo {
SScanLogicNode* pScan;
SSubLogicPlan* pSubplan;
} SStsInfo;
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static int32_t stsMatch(SSplitContext* pCxt, SSubLogicPlan* pSubplan) {
if (SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS)) {
return TSDB_CODE_SUCCESS;
}
SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
SStsInfo* pInfo = calloc(1, sizeof(SStsInfo));
CHECK_ALLOC(pInfo, TSDB_CODE_OUT_OF_MEMORY);
pInfo->pScan = (SScanLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
pCxt->pInfo = pInfo;
pCxt->match = true;
return TSDB_CODE_SUCCESS;
}
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
int32_t code = stsMatch(pCxt, (SSubLogicPlan*)pChild);
if (TSDB_CODE_SUCCESS != code || pCxt->match) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) {
SSubLogicPlan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return NULL;
}
pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan);
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo);
SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS);
return pSubplan;
}
static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SSubLogicPlan* pSubplan, SScanLogicNode* pScan) {
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
pExchange->node.pTargets = nodesCloneList(pScan->node.pTargets);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == pScan->node.pParent) {
pSubplan->pNode = (SLogicNode*)pExchange;
return TSDB_CODE_SUCCESS;
}
SNode* pNode;
FOREACH(pNode, pScan->node.pParent->pChildren) {
if (nodesEqualNode(pNode, pScan)) {
REPLACE_NODE(pExchange);
nodesDestroyNode(pNode);
return TSDB_CODE_SUCCESS;
}
}
nodesDestroyNode(pExchange);
return TSDB_CODE_FAILED;
}
static int32_t stsSplit(SSplitContext* pCxt) {
SStsInfo* pInfo = pCxt->pInfo;
if (NULL == pInfo->pSubplan->pChildren) {
pInfo->pSubplan->pChildren = nodesMakeList();
if (NULL == pInfo->pSubplan->pChildren) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
int32_t code = nodesListStrictAppend(pInfo->pSubplan->pChildren, stsCreateScanSubplan(pCxt, pInfo->pScan));
if (TSDB_CODE_SUCCESS == code) {
code = stsCreateExchangeNode(pCxt, pInfo->pSubplan, pInfo->pScan);
}
++(pCxt->groupId);
return code;
}
static const SSplitRule splitRuleSet[] = {
{ .pName = "SuperTableScan", .matchFunc = stsMatch, .splitFunc = stsSplit }
};
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
int32_t applySplitRule(SSubLogicPlan* pSubplan) {
SSplitContext cxt = { .errCode = TSDB_CODE_SUCCESS, .groupId = pSubplan->id.groupId + 1, .match = false, .pInfo = NULL };
bool split = false;
do {
split = false;
for (int32_t i = 0; i < splitRuleNum; ++i) {
cxt.match = false;
int32_t code = splitRuleSet[i].matchFunc(&cxt, pSubplan);
if (TSDB_CODE_SUCCESS == code && cxt.match) {
code = splitRuleSet[i].splitFunc(&cxt);
split = true;
}
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
} while (split);
return TSDB_CODE_SUCCESS;
}
......@@ -137,6 +137,13 @@ TEST_F(PlannerTest, simple) {
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, stSimple) {
setDatabase("root", "test");
bind("SELECT * FROM st1");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, groupBy) {
setDatabase("root", "test");
......
......@@ -797,8 +797,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
SCH_LOCK(SCH_WRITE, &par->lock);
SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source);
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, .taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &par->lock);
if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
......
......@@ -99,7 +99,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));
scanPlan->id.queryId = qId;
scanPlan->id.templateId = 0x0000000000000002;
scanPlan->id.groupId = 0x0000000000000002;
scanPlan->id.subplanId = 0x0000000000000003;
scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
......@@ -114,7 +114,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->msgType = TDMT_VND_QUERY;
mergePlan->id.queryId = qId;
mergePlan->id.templateId = schtMergeTemplateId;
mergePlan->id.groupId = schtMergeTemplateId;
mergePlan->id.subplanId = 0x5555;
mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
mergePlan->level = 0;
......@@ -158,7 +158,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
for (int32_t i = 0; i < scanPlanNum; ++i) {
scanPlan[i].id.queryId = qId;
scanPlan[i].id.templateId = 0x0000000000000002;
scanPlan[i].id.groupId = 0x0000000000000002;
scanPlan[i].id.subplanId = 0x0000000000000003 + i;
scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;
......@@ -183,7 +183,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
}
mergePlan->id.queryId = qId;
mergePlan->id.templateId = schtMergeTemplateId;
mergePlan->id.groupId = schtMergeTemplateId;
mergePlan->id.subplanId = 0x5555;
mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
mergePlan->level = 0;
......@@ -216,7 +216,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
insertPlan[0].id.queryId = qId;
insertPlan[0].id.templateId = 0x0000000000000003;
insertPlan[0].id.groupId = 0x0000000000000003;
insertPlan[0].id.subplanId = 0x0000000000000004;
insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan[0].level = 0;
......@@ -232,7 +232,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan[0].msgType = TDMT_VND_SUBMIT;
insertPlan[1].id.queryId = qId;
insertPlan[1].id.templateId = 0x0000000000000003;
insertPlan[1].id.groupId = 0x0000000000000003;
insertPlan[1].id.subplanId = 0x0000000000000005;
insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan[1].level = 0;
......@@ -263,7 +263,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
return 0;
}
void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) {
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册