提交 dde0a264 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/wal

......@@ -114,7 +114,6 @@ TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#
<summary>订阅和消费</summary>
```c
{{#include examples/c/subscribe.c}}
```
</details>
......
......@@ -114,7 +114,6 @@ This section shows sample code for standard access methods to TDengine clusters
<summary>Subscribe and consume</summary>
```c
{{#include examples/c/subscribe.c}}
```
</details>
......
......@@ -156,6 +156,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
bool fmIsMultiResFunc(int32_t funcId);
bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
......
......@@ -189,6 +189,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_MERGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_FILL,
QUERY_NODE_LOGIC_PLAN_SORT,
......@@ -206,6 +207,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
......
......@@ -95,9 +95,15 @@ typedef struct SVnodeModifLogicNode {
typedef struct SExchangeLogicNode {
SLogicNode node;
int32_t srcGroupId;
uint8_t precision;
} SExchangeLogicNode;
typedef struct SMergeLogicNode {
SLogicNode node;
SNodeList* pMergeKeys;
int32_t numOfChannels;
int32_t srcGroupId;
} SMergeLogicNode;
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef struct SWindowLogicNode {
......@@ -268,6 +274,13 @@ typedef struct SExchangePhysiNode {
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
typedef struct SMergePhysiNode {
SPhysiNode node;
SNodeList* pMergeKeys;
int32_t numOfChannels;
int32_t srcGroupId;
} SMergePhysiNode;
typedef struct SWinodwPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
......
......@@ -26,22 +26,24 @@ typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t l
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
typedef struct SBuiltinFuncDefinition {
char name[FUNCTION_NAME_MAX_LENGTH];
EFunctionType type;
uint64_t classification;
FTranslateFunc translateFunc;
FFuncDataRequired dataRequiredFunc;
FExecGetEnv getEnvFunc;
FExecInit initFunc;
FExecProcess processFunc;
const char* name;
EFunctionType type;
uint64_t classification;
FTranslateFunc translateFunc;
FFuncDataRequired dataRequiredFunc;
FExecGetEnv getEnvFunc;
FExecInit initFunc;
FExecProcess processFunc;
FScalarExecProcess sprocessFunc;
FExecFinalize finalizeFunc;
FExecProcess invertFunc;
FExecCombine combineFunc;
FExecFinalize finalizeFunc;
FExecProcess invertFunc;
FExecCombine combineFunc;
const char* pPartialFunc;
const char* pMergeFunc;
} SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
extern const int funcMgtBuiltinsNum;
extern const int funcMgtBuiltinsNum;
#ifdef __cplusplus
}
......
......@@ -178,14 +178,14 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of PERCENTILE function can only be column");
}
//param1
// param1
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
......@@ -200,7 +200,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//set result type
// set result type
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS;
}
......@@ -219,14 +219,14 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of APERCENTILE function can only be column");
}
//param1
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -245,7 +245,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//param2
// param2
if (3 == numOfParams) {
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
if (!IS_VAR_DATA_TYPE(para3Type)) {
......@@ -285,14 +285,14 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of TOP/BOTTOM function can only be column");
}
//param1
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -309,7 +309,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pValue->notReserved = true;
//set result type
// set result type
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
......@@ -709,8 +709,8 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (pValue->datum.i < ((i > 1) ? 0 : 1) || pValue->datum.i > 100) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TAIL function second parameter should be in range [1, 100], "
"third parameter should be in range [0, 100]");
"TAIL function second parameter should be in range [1, 100], "
"third parameter should be in range [0, 100]");
}
pValue->notReserved = true;
......@@ -771,7 +771,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
//param0
// param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
......@@ -779,12 +779,11 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) &&
TSDB_DATA_TYPE_BOOL != colType) {
if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
//param1
// param1
if (numOfParams == 2) {
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
......@@ -911,7 +910,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
if (3 == numOfParams) {
SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2);
uint8_t para2Type = p2->resType.type;
uint8_t para2Type = p2->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
......@@ -1144,6 +1143,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = functionFinalize,
.invertFunc = countInvertFunction,
.combineFunc = combineFunction,
// .pPartialFunc = "count",
// .pMergeFunc = "sum"
},
{
.name = "sum",
......
......@@ -199,3 +199,81 @@ bool fmIsInvertible(int32_t funcId) {
}
return res;
}
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
return NULL;
}
strcpy(pFunc->functionName, pName);
pFunc->pParameterList = pParameterList;
char msg[64] = {0};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc, msg, sizeof(msg))) {
nodesDestroyNode(pFunc);
return NULL;
}
return pFunc;
}
static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
strcpy(pCol->colName, pFunc->node.aliasName);
pCol->node.resType = pFunc->node.resType;
return pCol;
}
bool fmIsDistExecFunc(int32_t funcId) {
if (!fmIsVectorFunc(funcId)) {
return true;
}
return (NULL != funcMgtBuiltins[funcId].pPartialFunc && NULL != funcMgtBuiltins[funcId].pMergeFunc);
}
static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNode** pPartialFunc) {
SNodeList* pParameterList = nodesCloneList(pSrcFunc->pParameterList);
if (NULL == pParameterList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*pPartialFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList);
if (NULL == *pPartialFunc) {
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
snprintf((*pPartialFunc)->node.aliasName, sizeof((*pPartialFunc)->node.aliasName), "%s.%p",
(*pPartialFunc)->functionName, pSrcFunc);
return TSDB_CODE_SUCCESS;
}
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
SFunctionNode** pMergeFunc) {
SNodeList* pParameterList = NULL;
nodesListMakeStrictAppend(&pParameterList, createColumnByFunc(pPartialFunc));
*pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
if (NULL == *pMergeFunc) {
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
return TSDB_CODE_SUCCESS;
}
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {
if (!fmIsDistExecFunc(pFunc->funcId)) {
return TSDB_CODE_FAILED;
}
int32_t code = createPartialFunction(pFunc, pPartialFunc);
if (TSDB_CODE_SUCCESS == code) {
code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(*pPartialFunc);
nodesDestroyNode(*pMergeFunc);
}
return code;
}
......@@ -373,7 +373,14 @@ static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifL
static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(precision);
return (SNode*)pDst;
}
static SNode* logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pMergeKeys);
COPY_SCALAR_FIELD(numOfChannels);
COPY_SCALAR_FIELD(srcGroupId);
return (SNode*)pDst;
}
......@@ -537,6 +544,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return logicMergeCopy((const SMergeLogicNode*)pNode, (SMergeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_FILL:
......
......@@ -190,6 +190,8 @@ const char* nodesNodeName(ENodeType type) {
return "LogicVnodeModif";
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return "LogicExchange";
case QUERY_NODE_LOGIC_PLAN_MERGE:
return "LogicMerge";
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return "LogicWindow";
case QUERY_NODE_LOGIC_PLAN_FILL:
......@@ -220,6 +222,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiAgg";
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return "PhysiExchange";
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return "PhysiMerge";
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......@@ -596,7 +600,6 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
}
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId";
static const char* jkExchangeLogicPlanSrcPrecision = "Precision";
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
......@@ -605,9 +608,6 @@ static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcPrecision, pNode->precision);
}
return code;
}
......@@ -619,8 +619,144 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId);
}
return code;
}
static const char* jkMergeLogicPlanMergeKeys = "MergeKeys";
static const char* jkMergeLogicPlanNumOfChannels = "NumOfChannels";
static const char* jkMergeLogicPlanSrcGroupId = "SrcGroupId";
static int32_t logicMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergeLogicNode* pNode = (const SMergeLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergeLogicPlanMergeKeys, pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkExchangeLogicPlanSrcPrecision, &pNode->precision);
code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanNumOfChannels, pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanSrcGroupId, pNode->srcGroupId);
}
return code;
}
static int32_t jsonToLogicMergeNode(const SJson* pJson, void* pObj) {
SMergeLogicNode* pNode = (SMergeLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergeLogicPlanMergeKeys, &pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergeLogicPlanNumOfChannels, &pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergeLogicPlanSrcGroupId, &pNode->srcGroupId);
}
return code;
}
static const char* jkWindowLogicPlanWinType = "WinType";
static const char* jkWindowLogicPlanFuncs = "Funcs";
static const char* jkWindowLogicPlanInterval = "Interval";
static const char* jkWindowLogicPlanOffset = "Offset";
static const char* jkWindowLogicPlanSliding = "Sliding";
static const char* jkWindowLogicPlanIntervalUnit = "IntervalUnit";
static const char* jkWindowLogicPlanSlidingUnit = "SlidingUnit";
static const char* jkWindowLogicPlanSessionGap = "SessionGap";
static const char* jkWindowLogicPlanTspk = "Tspk";
static const char* jkWindowLogicPlanStateExpr = "StateExpr";
static const char* jkWindowLogicPlanTriggerType = "TriggerType";
static const char* jkWindowLogicPlanWatermark = "Watermark";
static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWinType, pNode->winType);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkWindowLogicPlanFuncs, pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanInterval, pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanOffset, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanIntervalUnit, pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSlidingUnit, pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSessionGap, pNode->sessionGap);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowLogicPlanTspk, nodeToJson, pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowLogicPlanStateExpr, nodeToJson, pNode->pStateExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanTriggerType, pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark);
}
return code;
}
static int32_t jsonToLogicWindowNode(const SJson* pJson, void* pObj) {
SWindowLogicNode* pNode = (SWindowLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowLogicPlanWinType, pNode->winType, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkWindowLogicPlanFuncs, &pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanInterval, &pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanOffset, &pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanIntervalUnit, &pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanSlidingUnit, &pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSessionGap, &pNode->sessionGap);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowLogicPlanTspk, &pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowLogicPlanStateExpr, &pNode->pStateExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanTriggerType, &pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark);
}
return code;
......@@ -1459,6 +1595,44 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkMergePhysiPlanMergeKeys = "MergeKeys";
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergePhysiPlanMergeKeys, pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanNumOfChannels, pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId);
}
return code;
}
static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
SMergePhysiNode* pNode = (SMergePhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergePhysiPlanMergeKeys, &pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanNumOfChannels, &pNode->numOfChannels);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId);
}
return code;
}
static const char* jkSortPhysiPlanExprs = "Exprs";
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
static const char* jkSortPhysiPlanTargets = "Targets";
......@@ -3401,6 +3575,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break;
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return logicMergeNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_FILL:
return logicFillNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_SORT:
......@@ -3427,6 +3605,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return physiExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return physiMergeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return physiSortNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......@@ -3512,6 +3692,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return jsonToLogicExchangeNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return jsonToLogicMergeNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return jsonToLogicWindowNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_FILL:
return jsonToLogicFillNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SORT:
......@@ -3538,6 +3722,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return jsonToPhysiMergeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return jsonToPhysiSortNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......
......@@ -220,6 +220,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode));
case QUERY_NODE_LOGIC_PLAN_MERGE:
return makeNode(type, sizeof(SMergeLogicNode));
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_PLAN_FILL:
......@@ -250,6 +252,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return makeNode(type, sizeof(SMergePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SSortPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
......
......@@ -188,8 +188,8 @@ int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* ve
}
int32_t __catalogGetDBVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName,
SArray** vgroupList) {
return 0;
SArray** pVgList) {
return g_mockCatalogService->catalogGetDBVgInfo(dbFName, pVgList);
}
int32_t __catalogGetDBCfg(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
......
......@@ -18,6 +18,7 @@
#include <iomanip>
#include <iostream>
#include <map>
#include <set>
#include "tdatablock.h"
#include "tname.h"
......@@ -120,6 +121,25 @@ class MockCatalogServiceImpl {
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
}
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
std::string dbFName(pDbFName);
DbMetaCache::const_iterator it = meta_.find(dbFName.substr(std::string(pDbFName).find_last_of('.') + 1));
if (meta_.end() == it) {
return TSDB_CODE_FAILED;
}
std::set<int32_t> vgSet;
*pVgList = taosArrayInit(it->second.size(), sizeof(SVgroupInfo));
for (const auto& vgs : it->second) {
for (const auto& vg : vgs.second->vgs) {
if (0 == vgSet.count(vg.vgId)) {
taosArrayPush(*pVgList, &vg);
vgSet.insert(vg.vgId);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
auto it = udf_.find(funcName);
if (udf_.end() == it) {
......@@ -187,8 +207,9 @@ class MockCatalogServiceImpl {
// number of backward fills
#define NOB(n) ((n) % 2 ? (n) / 2 + 1 : (n) / 2)
// center aligned
#define CA(n, s) std::setw(NOF((n) - int((s).length()))) << "" << (s) \
<< std::setw(NOB((n) - int((s).length()))) << "" << "|"
#define CA(n, s) \
std::setw(NOF((n) - int((s).length()))) << "" << (s) << std::setw(NOB((n) - int((s).length()))) << "" \
<< "|"
// string field length
#define SFL 20
// string field header
......@@ -490,6 +511,10 @@ int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, S
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
}
int32_t MockCatalogService::catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
return impl_->catalogGetDBVgInfo(pDbFName, pVgList);
}
int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
return impl_->catalogGetUdfInfo(funcName, pInfo);
}
......
......@@ -61,6 +61,7 @@ class MockCatalogService {
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
......
......@@ -36,6 +36,7 @@ extern "C" {
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList);
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode);
......
......@@ -133,56 +133,56 @@ static int32_t createChildLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelec
return code;
}
typedef struct SCreateColumnCxt {
int32_t errCode;
SNodeList* pList;
} SCreateColumnCxt;
static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: {
SNode* pCol = nodesCloneNode(pNode);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
default:
break;
}
return DEAL_RES_CONTINUE;
}
static int32_t createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pList) {
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
if (NULL == cxt.pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
nodesWalkExprs(pExprs, doCreateColumn, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pList);
return cxt.errCode;
}
if (NULL == *pList) {
*pList = cxt.pList;
}
return cxt.errCode;
}
// typedef struct SCreateColumnCxt {
// int32_t errCode;
// SNodeList* pList;
// } SCreateColumnCxt;
// static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
// SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
// switch (nodeType(pNode)) {
// case QUERY_NODE_COLUMN: {
// SNode* pCol = nodesCloneNode(pNode);
// if (NULL == pCol) {
// return DEAL_RES_ERROR;
// }
// return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
// }
// case QUERY_NODE_OPERATOR:
// case QUERY_NODE_LOGIC_CONDITION:
// case QUERY_NODE_FUNCTION: {
// SExprNode* pExpr = (SExprNode*)pNode;
// SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
// if (NULL == pCol) {
// return DEAL_RES_ERROR;
// }
// pCol->node.resType = pExpr->resType;
// strcpy(pCol->colName, pExpr->aliasName);
// return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
// }
// default:
// break;
// }
// return DEAL_RES_CONTINUE;
// }
// static int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) {
// SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
// if (NULL == cxt.pList) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// nodesWalkExprs(pExprs, doCreateColumn, &cxt);
// if (TSDB_CODE_SUCCESS != cxt.errCode) {
// nodesDestroyList(cxt.pList);
// return cxt.errCode;
// }
// if (NULL == *pList) {
// *pList = cxt.pList;
// }
// return cxt.errCode;
// }
static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
STableMeta* pMeta) {
......@@ -294,10 +294,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pScan->pScanCols, &pScan->node.pTargets);
code = createColumnByRewriteExps(pScan->pScanCols, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pScan->pScanPseudoCols, &pScan->node.pTargets);
code = createColumnByRewriteExps(pScan->pScanPseudoCols, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -463,10 +463,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// set the output
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
code = createColumnByRewriteExps(pCxt, pAgg->pAggFuncs, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pAggFuncs, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -496,7 +496,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pWindow->pFuncs, &pWindow->node.pTargets);
code = createColumnByRewriteExps(pWindow->pFuncs, &pWindow->node.pTargets);
}
pSelect->hasAggFuncs = false;
......@@ -766,7 +766,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -913,7 +913,7 @@ static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pS
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -839,7 +839,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
SPhysiNode** pPhyNode) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(
pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
pCxt, pExchangeLogicNode->node.precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -849,10 +849,11 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
return TSDB_CODE_SUCCESS;
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
SPhysiNode** pPhyNode) {
SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode(
pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
pCxt, pExchangeLogicNode->node.precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -954,7 +955,8 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
: QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -1137,6 +1139,54 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return code;
}
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
SExchangePhysiNode* pExchange = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pMerge->srcGroupId;
pExchange->node.pParent = (SPhysiNode*)pMerge;
pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc);
if (NULL == pExchange->node.pOutputDataBlockDesc) {
nodesDestroyNode(pExchange);
return TSDB_CODE_OUT_OF_MEMORY;
}
return nodesListMakeStrictAppend(&pMerge->node.pChildren, pExchange);
}
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
SMergePhysiNode* pMerge = (SMergePhysiNode*)makePhysiNode(
pCxt, pMergeLogicNode->node.precision, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
code = createExchangePhysiNodeByMerge(pMerge);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
&pMerge->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pMerge;
} else {
nodesDestroyNode(pMerge);
}
return code;
}
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
SNodeList* pChildren, SPhysiNode** pPhyNode) {
switch (nodeType(pLogicNode)) {
......@@ -1158,6 +1208,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_FILL:
return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
default:
break;
}
......@@ -1188,9 +1240,13 @@ static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode,
}
if (TSDB_CODE_SUCCESS == code) {
(*pPhyNode)->pChildren = pChildren;
SNode* pChild;
FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
if (LIST_LENGTH(pChildren) > 0) {
(*pPhyNode)->pChildren = pChildren;
SNode* pChild;
FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
} else {
nodesDestroyList(pChildren);
}
} else {
nodesDestroyList(pChildren);
}
......
......@@ -34,3 +34,54 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...) {
va_end(vArgList);
return errCode;
}
typedef struct SCreateColumnCxt {
int32_t errCode;
SNodeList* pList;
} SCreateColumnCxt;
static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: {
SNode* pCol = nodesCloneNode(pNode);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
default:
break;
}
return DEAL_RES_CONTINUE;
}
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) {
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
if (NULL == cxt.pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
nodesWalkExprs(pExprs, doCreateColumn, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pList);
return cxt.errCode;
}
if (NULL == *pList) {
*pList = cxt.pList;
}
return cxt.errCode;
}
......@@ -58,16 +58,19 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
if (pExchange->srcGroupId == groupId) {
if (NULL == pExchange->pSrcEndPoints) {
pExchange->pSrcEndPoints = nodesMakeList();
if (NULL == pExchange->pSrcEndPoints) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pExchange->pSrcEndPoints, nodesCloneNode(pSource))) {
return TSDB_CODE_OUT_OF_MEMORY;
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode(pSource));
}
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
SMergePhysiNode* pMerge = (SMergePhysiNode*)pNode;
if (pMerge->srcGroupId == groupId) {
SExchangePhysiNode* pExchange =
(SExchangePhysiNode*)nodesListGetNode(pMerge->node.pChildren, pMerge->numOfChannels - 1);
if (1 == pMerge->numOfChannels) {
pMerge->numOfChannels = LIST_LENGTH(pMerge->node.pChildren);
} else {
--(pMerge->numOfChannels);
}
return TSDB_CODE_SUCCESS;
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode(pSource));
}
}
......
......@@ -50,4 +50,10 @@ TEST_F(PlanIntervalTest, selectFunc) {
run("SELECT MAX(c1), MIN(c1) FROM t1 INTERVAL(10s)");
// select function along with the columns of select row, and with INTERVAL clause
run("SELECT MAX(c1), c2 FROM t1 INTERVAL(10s)");
}
\ No newline at end of file
}
TEST_F(PlanIntervalTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1 INTERVAL(10s)");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册