From edd0295a00dba5b145576230199814cf3c12e8f7 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 2 Jun 2022 10:45:27 +0800 Subject: [PATCH] feat: interval distributed split --- include/libs/function/functionMgt.h | 3 + include/libs/nodes/nodes.h | 2 + include/libs/nodes/plannodes.h | 15 +- source/libs/function/inc/builtins.h | 26 +- source/libs/function/src/builtins.c | 33 +-- source/libs/function/src/functionMgt.c | 78 ++++++ source/libs/nodes/src/nodesCloneFuncs.c | 11 +- source/libs/nodes/src/nodesCodeFuncs.c | 196 ++++++++++++++- source/libs/nodes/src/nodesUtilFuncs.c | 4 + source/libs/planner/inc/planInt.h | 1 + source/libs/planner/src/planLogicCreater.c | 114 ++++----- source/libs/planner/src/planPhysiCreater.c | 68 +++++- source/libs/planner/src/planSpliter.c | 271 ++++++++++++++++++--- source/libs/planner/src/planUtil.c | 51 ++++ source/libs/planner/src/planner.c | 21 +- 15 files changed, 753 insertions(+), 141 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 922136b590..f3e28936af 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -156,6 +156,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId); bool fmIsMultiResFunc(int32_t funcId); bool fmIsRepeatScanFunc(int32_t funcId); bool fmIsUserDefinedFunc(int32_t funcId); +bool fmIsDistExecFunc(int32_t funcId); + +int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); typedef enum EFuncDataRequired { FUNC_DATA_REQUIRED_DATA_LOAD = 1, diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3860266725..9a974f1b0d 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -190,6 +190,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_PROJECT, QUERY_NODE_LOGIC_PLAN_VNODE_MODIF, QUERY_NODE_LOGIC_PLAN_EXCHANGE, + QUERY_NODE_LOGIC_PLAN_MERGE, QUERY_NODE_LOGIC_PLAN_WINDOW, QUERY_NODE_LOGIC_PLAN_FILL, QUERY_NODE_LOGIC_PLAN_SORT, @@ -207,6 +208,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_JOIN, QUERY_NODE_PHYSICAL_PLAN_AGG, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, + QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 2648a468dd..26d0f93ca1 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -94,9 +94,15 @@ typedef struct SVnodeModifLogicNode { typedef struct SExchangeLogicNode { SLogicNode node; int32_t srcGroupId; - uint8_t precision; } SExchangeLogicNode; +typedef struct SMergeLogicNode { + SLogicNode node; + SNodeList* pMergeKeys; + int32_t numOfChannels; + int32_t srcGroupId; +} SMergeLogicNode; + typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType; typedef struct SWindowLogicNode { @@ -265,6 +271,13 @@ typedef struct SExchangePhysiNode { SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; +typedef struct SMergePhysiNode { + SPhysiNode node; + SNodeList* pMergeKeys; + int32_t numOfChannels; + int32_t srcGroupId; +} SMergePhysiNode; + typedef struct SWinodwPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of parameter expression of function diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 3bd0f35bf5..bc91875006 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -26,22 +26,24 @@ typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t l typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow); typedef struct SBuiltinFuncDefinition { - char name[FUNCTION_NAME_MAX_LENGTH]; - EFunctionType type; - uint64_t classification; - FTranslateFunc translateFunc; - FFuncDataRequired dataRequiredFunc; - FExecGetEnv getEnvFunc; - FExecInit initFunc; - FExecProcess processFunc; + const char* name; + EFunctionType type; + uint64_t classification; + FTranslateFunc translateFunc; + FFuncDataRequired dataRequiredFunc; + FExecGetEnv getEnvFunc; + FExecInit initFunc; + FExecProcess processFunc; FScalarExecProcess sprocessFunc; - FExecFinalize finalizeFunc; - FExecProcess invertFunc; - FExecCombine combineFunc; + FExecFinalize finalizeFunc; + FExecProcess invertFunc; + FExecCombine combineFunc; + const char* pPartialFunc; + const char* pMergeFunc; } SBuiltinFuncDefinition; extern const SBuiltinFuncDefinition funcMgtBuiltins[]; -extern const int funcMgtBuiltinsNum; +extern const int funcMgtBuiltinsNum; #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4725f11715..dc812b3a83 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -156,14 +156,14 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 + // param0 SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The first parameter of PERCENTILE function can only be column"); } - //param1 + // param1 SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); if (pValue->datum.i < 0 || pValue->datum.i > 100) { @@ -178,7 +178,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //set result type + // set result type pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; return TSDB_CODE_SUCCESS; } @@ -197,14 +197,14 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 + // param0 SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The first parameter of APERCENTILE function can only be column"); } - //param1 + // param1 SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -223,7 +223,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //param2 + // param2 if (3 == numOfParams) { uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type; if (!IS_VAR_DATA_TYPE(para3Type)) { @@ -263,14 +263,14 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 + // param0 SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The first parameter of TOP/BOTTOM function can only be column"); } - //param1 + // param1 SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -287,7 +287,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { pValue->notReserved = true; - //set result type + // set result type SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; return TSDB_CODE_SUCCESS; @@ -659,8 +659,8 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (pValue->datum.i < ((i > 1) ? 0 : 1) || pValue->datum.i > 100) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "TAIL function second parameter should be in range [1, 100], " - "third parameter should be in range [0, 100]"); + "TAIL function second parameter should be in range [1, 100], " + "third parameter should be in range [0, 100]"); } pValue->notReserved = true; @@ -721,7 +721,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 + // param0 SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, @@ -729,12 +729,11 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && - TSDB_DATA_TYPE_BOOL != colType) { + if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //param1 + // param1 if (numOfParams == 2) { uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; if (!IS_INTEGER_TYPE(paraType)) { @@ -852,7 +851,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) if (3 == numOfParams) { SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2); - uint8_t para2Type = p2->resType.type; + uint8_t para2Type = p2->resType.type; if (!IS_INTEGER_TYPE(para2Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } @@ -993,6 +992,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = functionFinalize, .invertFunc = countInvertFunction, .combineFunc = combineFunction, + // .pPartialFunc = "count", + // .pMergeFunc = "sum" }, { .name = "sum", diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index c2b325bc92..611ae8d81f 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -199,3 +199,81 @@ bool fmIsInvertible(int32_t funcId) { } return res; } + +static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { + SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pFunc) { + return NULL; + } + strcpy(pFunc->functionName, pName); + pFunc->pParameterList = pParameterList; + char msg[64] = {0}; + if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc, msg, sizeof(msg))) { + nodesDestroyNode(pFunc); + return NULL; + } + return pFunc; +} + +static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { + SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return NULL; + } + strcpy(pCol->colName, pFunc->node.aliasName); + pCol->node.resType = pFunc->node.resType; + return pCol; +} + +bool fmIsDistExecFunc(int32_t funcId) { + if (!fmIsVectorFunc(funcId)) { + return true; + } + return (NULL != funcMgtBuiltins[funcId].pPartialFunc && NULL != funcMgtBuiltins[funcId].pMergeFunc); +} + +static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNode** pPartialFunc) { + SNodeList* pParameterList = nodesCloneList(pSrcFunc->pParameterList); + if (NULL == pParameterList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *pPartialFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList); + if (NULL == *pPartialFunc) { + nodesDestroyList(pParameterList); + return TSDB_CODE_OUT_OF_MEMORY; + } + snprintf((*pPartialFunc)->node.aliasName, sizeof((*pPartialFunc)->node.aliasName), "%s.%p", + (*pPartialFunc)->functionName, pSrcFunc); + return TSDB_CODE_SUCCESS; +} + +static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, + SFunctionNode** pMergeFunc) { + SNodeList* pParameterList = NULL; + nodesListMakeStrictAppend(&pParameterList, createColumnByFunc(pPartialFunc)); + *pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList); + if (NULL == *pMergeFunc) { + nodesDestroyList(pParameterList); + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName); + return TSDB_CODE_SUCCESS; +} + +int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) { + if (!fmIsDistExecFunc(pFunc->funcId)) { + return TSDB_CODE_FAILED; + } + + int32_t code = createPartialFunction(pFunc, pPartialFunc); + if (TSDB_CODE_SUCCESS == code) { + code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc); + } + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(*pPartialFunc); + nodesDestroyNode(*pMergeFunc); + } + + return code; +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 68d3741b48..0973435e89 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -366,7 +366,14 @@ static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifL static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(srcGroupId); - COPY_SCALAR_FIELD(precision); + return (SNode*)pDst; +} + +static SNode* logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + CLONE_NODE_LIST_FIELD(pMergeKeys); + COPY_SCALAR_FIELD(numOfChannels); + COPY_SCALAR_FIELD(srcGroupId); return (SNode*)pDst; } @@ -529,6 +536,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_EXCHANGE: return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst); + case QUERY_NODE_LOGIC_PLAN_MERGE: + return logicMergeCopy((const SMergeLogicNode*)pNode, (SMergeLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_WINDOW: return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_FILL: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 78710569cb..72e2a54a12 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -190,6 +190,8 @@ const char* nodesNodeName(ENodeType type) { return "LogicVnodeModif"; case QUERY_NODE_LOGIC_PLAN_EXCHANGE: return "LogicExchange"; + case QUERY_NODE_LOGIC_PLAN_MERGE: + return "LogicMerge"; case QUERY_NODE_LOGIC_PLAN_WINDOW: return "LogicWindow"; case QUERY_NODE_LOGIC_PLAN_FILL: @@ -220,6 +222,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiAgg"; case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: return "PhysiExchange"; + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + return "PhysiMerge"; case QUERY_NODE_PHYSICAL_PLAN_SORT: return "PhysiSort"; case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: @@ -596,7 +600,6 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) { } static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId"; -static const char* jkExchangeLogicPlanSrcPrecision = "Precision"; static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) { const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj; @@ -605,9 +608,6 @@ static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcPrecision, pNode->precision); - } return code; } @@ -619,8 +619,144 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId); } + + return code; +} + +static const char* jkMergeLogicPlanMergeKeys = "MergeKeys"; +static const char* jkMergeLogicPlanNumOfChannels = "NumOfChannels"; +static const char* jkMergeLogicPlanSrcGroupId = "SrcGroupId"; + +static int32_t logicMergeNodeToJson(const void* pObj, SJson* pJson) { + const SMergeLogicNode* pNode = (const SMergeLogicNode*)pObj; + + int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkMergeLogicPlanMergeKeys, pNode->pMergeKeys); + } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetUTinyIntValue(pJson, jkExchangeLogicPlanSrcPrecision, &pNode->precision); + code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanNumOfChannels, pNode->numOfChannels); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanSrcGroupId, pNode->srcGroupId); + } + + return code; +} + +static int32_t jsonToLogicMergeNode(const SJson* pJson, void* pObj) { + SMergeLogicNode* pNode = (SMergeLogicNode*)pObj; + + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkMergeLogicPlanMergeKeys, &pNode->pMergeKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkMergeLogicPlanNumOfChannels, &pNode->numOfChannels); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkMergeLogicPlanSrcGroupId, &pNode->srcGroupId); + } + + return code; +} + +static const char* jkWindowLogicPlanWinType = "WinType"; +static const char* jkWindowLogicPlanFuncs = "Funcs"; +static const char* jkWindowLogicPlanInterval = "Interval"; +static const char* jkWindowLogicPlanOffset = "Offset"; +static const char* jkWindowLogicPlanSliding = "Sliding"; +static const char* jkWindowLogicPlanIntervalUnit = "IntervalUnit"; +static const char* jkWindowLogicPlanSlidingUnit = "SlidingUnit"; +static const char* jkWindowLogicPlanSessionGap = "SessionGap"; +static const char* jkWindowLogicPlanTspk = "Tspk"; +static const char* jkWindowLogicPlanStateExpr = "StateExpr"; +static const char* jkWindowLogicPlanTriggerType = "TriggerType"; +static const char* jkWindowLogicPlanWatermark = "Watermark"; + +static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) { + const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj; + + int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWinType, pNode->winType); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkWindowLogicPlanFuncs, pNode->pFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanInterval, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanOffset, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSliding, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanIntervalUnit, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSlidingUnit, pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSessionGap, pNode->sessionGap); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkWindowLogicPlanTspk, nodeToJson, pNode->pTspk); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkWindowLogicPlanStateExpr, nodeToJson, pNode->pStateExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanTriggerType, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark); + } + + return code; +} + +static int32_t jsonToLogicWindowNode(const SJson* pJson, void* pObj) { + SWindowLogicNode* pNode = (SWindowLogicNode*)pObj; + + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkWindowLogicPlanWinType, pNode->winType, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkWindowLogicPlanFuncs, &pNode->pFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanInterval, &pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanOffset, &pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSliding, &pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanIntervalUnit, &pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanSlidingUnit, &pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSessionGap, &pNode->sessionGap); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkWindowLogicPlanTspk, &pNode->pTspk); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkWindowLogicPlanStateExpr, &pNode->pStateExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanTriggerType, &pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark); } return code; @@ -1453,6 +1589,44 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkMergePhysiPlanMergeKeys = "MergeKeys"; +static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels"; +static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; + +static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { + const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkMergePhysiPlanMergeKeys, pNode->pMergeKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanNumOfChannels, pNode->numOfChannels); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId); + } + + return code; +} + +static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { + SMergePhysiNode* pNode = (SMergePhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkMergePhysiPlanMergeKeys, &pNode->pMergeKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkMergePhysiPlanNumOfChannels, &pNode->numOfChannels); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId); + } + + return code; +} + static const char* jkSortPhysiPlanExprs = "Exprs"; static const char* jkSortPhysiPlanSortKeys = "SortKeys"; static const char* jkSortPhysiPlanTargets = "Targets"; @@ -3388,6 +3562,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { break; case QUERY_NODE_LOGIC_PLAN_EXCHANGE: return logicExchangeNodeToJson(pObj, pJson); + case QUERY_NODE_LOGIC_PLAN_MERGE: + return logicMergeNodeToJson(pObj, pJson); + case QUERY_NODE_LOGIC_PLAN_WINDOW: + return logicWindowNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_FILL: return logicFillNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_SORT: @@ -3414,6 +3592,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiAggNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: return physiExchangeNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + return physiMergeNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_SORT: return physiSortNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: @@ -3499,6 +3679,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicProjectNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_EXCHANGE: return jsonToLogicExchangeNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_MERGE: + return jsonToLogicMergeNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_WINDOW: + return jsonToLogicWindowNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_FILL: return jsonToLogicFillNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_SORT: @@ -3525,6 +3709,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiAggNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: return jsonToPhysiExchangeNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + return jsonToPhysiMergeNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_SORT: return jsonToPhysiSortNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index e28844f2e1..fa9cb40d17 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -222,6 +222,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SVnodeModifLogicNode)); case QUERY_NODE_LOGIC_PLAN_EXCHANGE: return makeNode(type, sizeof(SExchangeLogicNode)); + case QUERY_NODE_LOGIC_PLAN_MERGE: + return makeNode(type, sizeof(SMergeLogicNode)); case QUERY_NODE_LOGIC_PLAN_WINDOW: return makeNode(type, sizeof(SWindowLogicNode)); case QUERY_NODE_LOGIC_PLAN_FILL: @@ -252,6 +254,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SAggPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: return makeNode(type, sizeof(SExchangePhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + return makeNode(type, sizeof(SMergePhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_SORT: return makeNode(type, sizeof(SSortPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 6a18a267e2..1a8c7657df 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -36,6 +36,7 @@ extern "C" { #define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__) int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...); +int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList); int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode); int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 467b26b7c4..52393c5707 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -132,56 +132,56 @@ static int32_t createChildLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelec return code; } -typedef struct SCreateColumnCxt { - int32_t errCode; - SNodeList* pList; -} SCreateColumnCxt; - -static EDealRes doCreateColumn(SNode* pNode, void* pContext) { - SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext; - switch (nodeType(pNode)) { - case QUERY_NODE_COLUMN: { - SNode* pCol = nodesCloneNode(pNode); - if (NULL == pCol) { - return DEAL_RES_ERROR; - } - return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); - } - case QUERY_NODE_OPERATOR: - case QUERY_NODE_LOGIC_CONDITION: - case QUERY_NODE_FUNCTION: { - SExprNode* pExpr = (SExprNode*)pNode; - SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); - if (NULL == pCol) { - return DEAL_RES_ERROR; - } - pCol->node.resType = pExpr->resType; - strcpy(pCol->colName, pExpr->aliasName); - return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); - } - default: - break; - } - - return DEAL_RES_CONTINUE; -} - -static int32_t createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pList) { - SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)}; - if (NULL == cxt.pList) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - nodesWalkExprs(pExprs, doCreateColumn, &cxt); - if (TSDB_CODE_SUCCESS != cxt.errCode) { - nodesDestroyList(cxt.pList); - return cxt.errCode; - } - if (NULL == *pList) { - *pList = cxt.pList; - } - return cxt.errCode; -} +// typedef struct SCreateColumnCxt { +// int32_t errCode; +// SNodeList* pList; +// } SCreateColumnCxt; + +// static EDealRes doCreateColumn(SNode* pNode, void* pContext) { +// SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext; +// switch (nodeType(pNode)) { +// case QUERY_NODE_COLUMN: { +// SNode* pCol = nodesCloneNode(pNode); +// if (NULL == pCol) { +// return DEAL_RES_ERROR; +// } +// return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); +// } +// case QUERY_NODE_OPERATOR: +// case QUERY_NODE_LOGIC_CONDITION: +// case QUERY_NODE_FUNCTION: { +// SExprNode* pExpr = (SExprNode*)pNode; +// SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); +// if (NULL == pCol) { +// return DEAL_RES_ERROR; +// } +// pCol->node.resType = pExpr->resType; +// strcpy(pCol->colName, pExpr->aliasName); +// return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); +// } +// default: +// break; +// } + +// return DEAL_RES_CONTINUE; +// } + +// static int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) { +// SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)}; +// if (NULL == cxt.pList) { +// return TSDB_CODE_OUT_OF_MEMORY; +// } + +// nodesWalkExprs(pExprs, doCreateColumn, &cxt); +// if (TSDB_CODE_SUCCESS != cxt.errCode) { +// nodesDestroyList(cxt.pList); +// return cxt.errCode; +// } +// if (NULL == *pList) { +// *pList = cxt.pList; +// } +// return cxt.errCode; +// } static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols, STableMeta* pMeta) { @@ -293,10 +293,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pCxt, pScan->pScanCols, &pScan->node.pTargets); + code = createColumnByRewriteExps(pScan->pScanCols, &pScan->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pCxt, pScan->pScanPseudoCols, &pScan->node.pTargets); + code = createColumnByRewriteExps(pScan->pScanPseudoCols, &pScan->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -461,10 +461,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, // set the output if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) { - code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets); + code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { - code = createColumnByRewriteExps(pCxt, pAgg->pAggFuncs, &pAgg->node.pTargets); + code = createColumnByRewriteExps(pAgg->pAggFuncs, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -490,7 +490,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm } if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pCxt, pWindow->pFuncs, &pWindow->node.pTargets); + code = createColumnByRewriteExps(pWindow->pFuncs, &pWindow->node.pTargets); } pSelect->hasAggFuncs = false; @@ -760,7 +760,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe // set the output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets); + code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -907,7 +907,7 @@ static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pS // set the output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets); + code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a45eabefb9..52f289601c 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -835,7 +835,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode( - pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); + pCxt, pExchangeLogicNode->node.precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -845,10 +845,11 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic return TSDB_CODE_SUCCESS; } + static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) { SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode( - pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + pCxt, pExchangeLogicNode->node.precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -949,7 +950,8 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode( pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, - (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW)); + (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW + : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW)); if (NULL == pSession) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -1132,6 +1134,54 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren return code; } +static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) { + SExchangePhysiNode* pExchange = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); + if (NULL == pExchange) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pExchange->srcGroupId = pMerge->srcGroupId; + pExchange->node.pParent = (SPhysiNode*)pMerge; + pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc); + if (NULL == pExchange->node.pOutputDataBlockDesc) { + nodesDestroyNode(pExchange); + return TSDB_CODE_OUT_OF_MEMORY; + } + return nodesListMakeStrictAppend(&pMerge->node.pChildren, pExchange); +} + +static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) { + SMergePhysiNode* pMerge = (SMergePhysiNode*)makePhysiNode( + pCxt, pMergeLogicNode->node.precision, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE); + if (NULL == pMerge) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pMerge->numOfChannels = pMergeLogicNode->numOfChannels; + pMerge->srcGroupId = pMergeLogicNode->srcGroupId; + + int32_t code = TSDB_CODE_SUCCESS; + + for (int32_t i = 0; i < pMerge->numOfChannels; ++i) { + code = createExchangePhysiNodeByMerge(pMerge); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, + &pMerge->pMergeKeys); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pMerge; + } else { + nodesDestroyNode(pMerge); + } + + return code; +} + static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan, SNodeList* pChildren, SPhysiNode** pPhyNode) { switch (nodeType(pLogicNode)) { @@ -1153,6 +1203,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_FILL: return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_MERGE: + return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode); default: break; } @@ -1183,9 +1235,13 @@ static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, } if (TSDB_CODE_SUCCESS == code) { - (*pPhyNode)->pChildren = pChildren; - SNode* pChild; - FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); } + if (LIST_LENGTH(pChildren) > 0) { + (*pPhyNode)->pChildren = pChildren; + SNode* pChild; + FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); } + } else { + nodesDestroyList(pChildren); + } } else { nodesDestroyList(pChildren); } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index cfa265b722..2ae749bef5 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "functionMgt.h" #include "planInt.h" #define SPLIT_FLAG_MASK(n) (1 << n) @@ -37,7 +38,17 @@ typedef struct SSplitRule { typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo); -static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) { +static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList); + } else { + if (1 == LIST_LENGTH(pNode->pChildren)) { + splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0)); + } + } +} + +static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) { SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); if (NULL == pSubplan) { return NULL; @@ -45,10 +56,9 @@ static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, i pSubplan->id.queryId = pCxt->queryId; pSubplan->id.groupId = pCxt->groupId; pSubplan->subplanType = SUBPLAN_TYPE_SCAN; - pSubplan->pNode = (SLogicNode*)nodesCloneNode(pNode); - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { - TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList); - } + pSubplan->pNode = pNode; + pSubplan->pNode->pParent = NULL; + splSetSubplanVgroups(pSubplan, pNode); SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag); return pSubplan; } @@ -60,7 +70,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla return TSDB_CODE_OUT_OF_MEMORY; } pExchange->srcGroupId = pCxt->groupId; - pExchange->precision = pSplitNode->precision; + pExchange->node.precision = pSplitNode->precision; pExchange->node.pTargets = nodesCloneList(pSplitNode->pTargets); if (NULL == pExchange->node.pTargets) { return TSDB_CODE_OUT_OF_MEMORY; @@ -77,7 +87,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla FOREACH(pNode, pSplitNode->pParent->pChildren) { if (nodesEqualNode(pNode, pSplitNode)) { REPLACE_NODE(pExchange); - nodesDestroyNode(pNode); + pExchange->node.pParent = pSplitNode->pParent; return TSDB_CODE_SUCCESS; } } @@ -101,13 +111,50 @@ static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, } typedef struct SStableSplitInfo { - SScanLogicNode* pScan; - SLogicSubplan* pSubplan; + SLogicNode* pSplitNode; + SLogicSubplan* pSubplan; } SStableSplitInfo; +static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) { + SNode* pFunc = NULL; + FOREACH(pFunc, pFuncs) { + if (!fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) { + return true; + } + } + return false; +} + +static bool stbSplIsMultiTbScan(SScanLogicNode* pScan) { + return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1); +} + +static bool stbSplHasMultiTbScan(SLogicNode* pNode) { + if (1 != LIST_LENGTH(pNode->pChildren)) { + return false; + } + SNode* pChild = nodesListGetNode(pNode->pChildren, 0); + return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild)); +} + +static bool stbSplNeedSplit(SLogicNode* pNode) { + switch (nodeType(pNode)) { + // case QUERY_NODE_LOGIC_PLAN_AGG: + // return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode); + case QUERY_NODE_LOGIC_PLAN_WINDOW: + return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode); + // case QUERY_NODE_LOGIC_PLAN_SORT: + // return stbSplHasMultiTbScan(pNode); + case QUERY_NODE_LOGIC_PLAN_SCAN: + return stbSplIsMultiTbScan((SScanLogicNode*)pNode); + default: + break; + } + return false; +} + static SLogicNode* stbSplMatchByNode(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != ((SScanLogicNode*)pNode)->pVgroupList && - ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) { + if (stbSplNeedSplit(pNode)) { return pNode; } SNode* pChild; @@ -123,22 +170,178 @@ static SLogicNode* stbSplMatchByNode(SLogicNode* pNode) { static bool stbSplFindSplitNode(SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) { SLogicNode* pSplitNode = stbSplMatchByNode(pSubplan->pNode); if (NULL != pSplitNode) { - pInfo->pScan = (SScanLogicNode*)pSplitNode; + pInfo->pSplitNode = pSplitNode; pInfo->pSubplan = pSubplan; } return NULL != pSplitNode; } +static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) { + SNode* pNode = NULL; + FOREACH(pNode, pFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + SFunctionNode* pPartFunc = NULL; + SFunctionNode* pMergeFunc = NULL; + int32_t code = TSDB_CODE_SUCCESS; + if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) { + pPartFunc = nodesCloneNode(pFunc); + pMergeFunc = nodesCloneNode(pFunc); + if (NULL == pPartFunc || NULL == pMergeFunc) { + nodesDestroyNode(pPartFunc); + nodesDestroyNode(pMergeFunc); + code = TSDB_CODE_OUT_OF_MEMORY; + } + } else { + code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(pPartialFuncs, pPartFunc); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(pMergeFuncs, pMergeFunc); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(*pPartialFuncs); + nodesDestroyList(*pMergeFuncs); + return code; + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) { + int32_t index = 0; + SNode* pFunc = NULL; + FOREACH(pFunc, pFuncs) { + if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) { + *pIndex = index; + return TSDB_CODE_SUCCESS; + } + ++index; + } + + SFunctionNode* pWStart = nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pWStart) { + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(pWStart->functionName, "_wstartts"); + snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%p", pWStart->functionName, pWStart); + int32_t code = fmGetFuncInfo(pWStart, NULL, 0); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListStrictAppend(pFuncs, pWStart); + } + *pIndex = index; + return code; +} + +static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) { + SNodeList* pFunc = pMergeWindow->pFuncs; + pMergeWindow->pFuncs = NULL; + SNodeList* pTargets = pMergeWindow->node.pTargets; + pMergeWindow->node.pTargets = NULL; + SNodeList* pChildren = pMergeWindow->node.pChildren; + pMergeWindow->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SWindowLogicNode* pPartWin = nodesCloneNode(pMergeWindow); + if (NULL == pPartWin) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (TSDB_CODE_SUCCESS == code) { + pMergeWindow->node.pTargets = pTargets; + pPartWin->node.pChildren = pChildren; + code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs); + } + int32_t index = 0; + if (TSDB_CODE_SUCCESS == code) { + code = stbSplAppendWStart(pPartWin->pFuncs, &index); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pPartWin->pFuncs, &pPartWin->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + nodesDestroyNode(pMergeWindow->pTspk); + pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index)); + if (NULL == pMergeWindow->pTspk) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + nodesDestroyList(pFunc); + if (TSDB_CODE_SUCCESS == code) { + *pPartWindow = (SLogicNode*)pPartWin; + } else { + nodesDestroyNode(pPartWin); + } + + return code; +} + +static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { + SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); + if (NULL == pMerge) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMerge->numOfChannels = ((SScanLogicNode*)nodesListGetNode(pPartChild->pChildren, 0))->pVgroupList->numOfVgroups; + pMerge->srcGroupId = pCxt->groupId; + pMerge->node.pParent = pParent; + pMerge->node.precision = pPartChild->precision; + int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk)); + if (TSDB_CODE_SUCCESS == code) { + pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); + if (NULL == pMerge->node.pTargets) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeAppend(&pParent->pChildren, pMerge); + } + + return code; +} + +static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartWindow = NULL; + int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + +static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + int32_t code = splCreateExchangeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); + } + return code; +} + static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { SStableSplitInfo info = {0}; if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) { return TSDB_CODE_SUCCESS; } - int32_t code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, - splCreateSubplan(pCxt, (SLogicNode*)info.pScan, SPLIT_FLAG_STABLE_SPLIT)); - if (TSDB_CODE_SUCCESS == code) { - code = splCreateExchangeNode(pCxt, info.pSubplan, (SLogicNode*)info.pScan, SUBPLAN_TYPE_MERGE); + + int32_t code = TSDB_CODE_SUCCESS; + switch (nodeType(info.pSplitNode)) { + case QUERY_NODE_LOGIC_PLAN_WINDOW: + code = stbSplSplitWindowNode(pCxt, &info); + break; + case QUERY_NODE_LOGIC_PLAN_SCAN: + code = stbSplSplitScanNode(pCxt, &info); + break; + default: + break; } + ++(pCxt->groupId); pCxt->split = true; return code; @@ -187,9 +390,9 @@ static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) { return TSDB_CODE_SUCCESS; } - int32_t code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, splCreateSubplan(pCxt, info.pSplitNode, 0)); + int32_t code = splCreateExchangeNode(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType); if (TSDB_CODE_SUCCESS == code) { - code = splCreateExchangeNode(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType); + code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pSplitNode, 0)); } ++(pCxt->groupId); pCxt->split = true; @@ -272,13 +475,13 @@ typedef struct SUnionAllSplitInfo { SLogicSubplan* pSubplan; } SUnionAllSplitInfo; -static SLogicNode* unionAllMatchByNode(SLogicNode* pNode) { +static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { return pNode; } SNode* pChild; FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = unionAllMatchByNode((SLogicNode*)pChild); + SLogicNode* pSplitNode = unAllSplMatchByNode((SLogicNode*)pChild); if (NULL != pSplitNode) { return pSplitNode; } @@ -286,8 +489,8 @@ static SLogicNode* unionAllMatchByNode(SLogicNode* pNode) { return NULL; } -static bool unionAllFindSplitNode(SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) { - SLogicNode* pSplitNode = unionAllMatchByNode(pSubplan->pNode); +static bool unAllSplFindSplitNode(SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) { + SLogicNode* pSplitNode = unAllSplMatchByNode(pSubplan->pNode); if (NULL != pSplitNode) { pInfo->pProject = (SProjectLogicNode*)pSplitNode; pInfo->pSubplan = pSubplan; @@ -295,13 +498,13 @@ static bool unionAllFindSplitNode(SLogicSubplan* pSubplan, SUnionAllSplitInfo* p return NULL != pSplitNode; } -static int32_t unionAllCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) { +static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) { SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } pExchange->srcGroupId = pCxt->groupId; - pExchange->precision = pProject->node.precision; + pExchange->node.precision = pProject->node.precision; pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets); if (NULL == pExchange->node.pTargets) { return TSDB_CODE_OUT_OF_MEMORY; @@ -329,13 +532,13 @@ static int32_t unionAllCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pS static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { SUnionAllSplitInfo info = {0}; - if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unionAllFindSplitNode, &info)) { + if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) { return TSDB_CODE_SUCCESS; } int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject); if (TSDB_CODE_SUCCESS == code) { - code = unionAllCreateExchangeNode(pCxt, info.pSubplan, info.pProject); + code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject); } ++(pCxt->groupId); pCxt->split = true; @@ -347,13 +550,13 @@ typedef struct SUnionDistinctSplitInfo { SLogicSubplan* pSubplan; } SUnionDistinctSplitInfo; -static SLogicNode* unionDistinctMatchByNode(SLogicNode* pNode) { +static SLogicNode* unDistSplMatchByNode(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { return pNode; } SNode* pChild; FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = unionDistinctMatchByNode((SLogicNode*)pChild); + SLogicNode* pSplitNode = unDistSplMatchByNode((SLogicNode*)pChild); if (NULL != pSplitNode) { return pSplitNode; } @@ -361,13 +564,13 @@ static SLogicNode* unionDistinctMatchByNode(SLogicNode* pNode) { return NULL; } -static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) { +static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) { SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } pExchange->srcGroupId = pCxt->groupId; - // pExchange->precision = pScan->pMeta->tableInfo.precision; + pExchange->node.precision = pAgg->node.precision; pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys); if (NULL == pExchange->node.pTargets) { return TSDB_CODE_OUT_OF_MEMORY; @@ -378,8 +581,8 @@ static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan return nodesListMakeAppend(&pAgg->node.pChildren, pExchange); } -static bool unionDistinctFindSplitNode(SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) { - SLogicNode* pSplitNode = unionDistinctMatchByNode(pSubplan->pNode); +static bool unDistSplFindSplitNode(SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) { + SLogicNode* pSplitNode = unDistSplMatchByNode(pSubplan->pNode); if (NULL != pSplitNode) { pInfo->pAgg = (SAggLogicNode*)pSplitNode; pInfo->pSubplan = pSubplan; @@ -389,13 +592,13 @@ static bool unionDistinctFindSplitNode(SLogicSubplan* pSubplan, SUnionDistinctSp static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { SUnionDistinctSplitInfo info = {0}; - if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unionDistinctFindSplitNode, &info)) { + if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) { return TSDB_CODE_SUCCESS; } int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg); if (TSDB_CODE_SUCCESS == code) { - code = unCreateExchangeNode(pCxt, info.pSubplan, info.pAgg); + code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg); } ++(pCxt->groupId); pCxt->split = true; diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 3c83d9f53a..63d31912f0 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -34,3 +34,54 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...) { va_end(vArgList); return errCode; } + +typedef struct SCreateColumnCxt { + int32_t errCode; + SNodeList* pList; +} SCreateColumnCxt; + +static EDealRes doCreateColumn(SNode* pNode, void* pContext) { + SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext; + switch (nodeType(pNode)) { + case QUERY_NODE_COLUMN: { + SNode* pCol = nodesCloneNode(pNode); + if (NULL == pCol) { + return DEAL_RES_ERROR; + } + return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); + } + case QUERY_NODE_OPERATOR: + case QUERY_NODE_LOGIC_CONDITION: + case QUERY_NODE_FUNCTION: { + SExprNode* pExpr = (SExprNode*)pNode; + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return DEAL_RES_ERROR; + } + pCol->node.resType = pExpr->resType; + strcpy(pCol->colName, pExpr->aliasName); + return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); + } + default: + break; + } + + return DEAL_RES_CONTINUE; +} + +int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) { + SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)}; + if (NULL == cxt.pList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + nodesWalkExprs(pExprs, doCreateColumn, &cxt); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pList); + return cxt.errCode; + } + if (NULL == *pList) { + *pList = cxt.pList; + } + return cxt.errCode; +} diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index af62c52a89..f8d240c7b2 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -58,16 +58,19 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode; if (pExchange->srcGroupId == groupId) { - if (NULL == pExchange->pSrcEndPoints) { - pExchange->pSrcEndPoints = nodesMakeList(); - if (NULL == pExchange->pSrcEndPoints) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pExchange->pSrcEndPoints, nodesCloneNode(pSource))) { - return TSDB_CODE_OUT_OF_MEMORY; + return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode(pSource)); + } + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) { + SMergePhysiNode* pMerge = (SMergePhysiNode*)pNode; + if (pMerge->srcGroupId == groupId) { + SExchangePhysiNode* pExchange = + (SExchangePhysiNode*)nodesListGetNode(pMerge->node.pChildren, pMerge->numOfChannels - 1); + if (1 == pMerge->numOfChannels) { + pMerge->numOfChannels = LIST_LENGTH(pMerge->node.pChildren); + } else { + --(pMerge->numOfChannels); } - return TSDB_CODE_SUCCESS; + return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode(pSource)); } } -- GitLab