diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3ef832a69b86d8f84bd1d64ff5b7231581f8036c..f09ffa8926da51396e1f32161e0b17c7bf8aaed9 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -61,22 +61,27 @@ typedef enum ENodeType { QUERY_NODE_INTERVAL_WINDOW, QUERY_NODE_NODE_LIST, QUERY_NODE_FILL, + QUERY_NODE_RAW_EXPR, // Only be used in parser module. QUERY_NODE_COLUMN_REF, QUERY_NODE_TARGET, - - // Only be used in parser module. - QUERY_NODE_RAW_EXPR, + QUERY_NODE_TUPLE_DESC, + QUERY_NODE_SLOT_DESC, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR, QUERY_NODE_SELECT_STMT, QUERY_NODE_SHOW_STMT, + // logic plan node QUERY_NODE_LOGIC_PLAN_SCAN, QUERY_NODE_LOGIC_PLAN_JOIN, - QUERY_NODE_LOGIC_PLAN_FILTER, QUERY_NODE_LOGIC_PLAN_AGG, - QUERY_NODE_LOGIC_PLAN_PROJECT + QUERY_NODE_LOGIC_PLAN_PROJECT, + + // physical plan node + QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, + QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, + QUERY_NODE_PHYSICAL_PLAN_PROJECT } ENodeType; /** diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3668f256eb068c224c2f375a0b2da6a081637549..40684f2b2622afff6d8cbf781222e15c92d84d65 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "querynodes.h" +#include "tmsg.h" typedef struct SLogicNode { ENodeType type; @@ -31,10 +32,20 @@ typedef struct SLogicNode { struct SLogicNode* pParent; } SLogicNode; +typedef enum EScanType { + SCAN_TYPE_TAG, + SCAN_TYPE_TABLE, + SCAN_TYPE_STABLE, + SCAN_TYPE_STREAM +} EScanType; + typedef struct SScanLogicNode { SLogicNode node; SNodeList* pScanCols; struct STableMeta* pMeta; + EScanType scanType; + uint8_t scanFlag; // denotes reversed scan of data or not + STimeWindow scanRange; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -43,10 +54,6 @@ typedef struct SJoinLogicNode { SNode* pOnConditions; } SJoinLogicNode; -typedef struct SFilterLogicNode { - SLogicNode node; -} SFilterLogicNode; - typedef struct SAggLogicNode { SLogicNode node; SNodeList* pGroupKeys; @@ -58,6 +65,56 @@ typedef struct SProjectLogicNode { SNodeList* pProjections; } SProjectLogicNode; +typedef struct SSlotDescNode { + ENodeType type; + int16_t slotId; + SDataType dataType; + int16_t srcTupleId; + int16_t srcSlotId; + bool reserve; + bool output; +} SSlotDescNode; + +typedef struct STupleDescNode { + ENodeType type; + int16_t tupleId; + SNodeList* pSlots; +} STupleDescNode; + +typedef struct SPhysiNode { + ENodeType type; + STupleDescNode outputTuple; + SNode* pConditions; + SNodeList* pChildren; + struct SPhysiNode* pParent; +} SPhysiNode; + +typedef struct SScanPhysiNode { + SPhysiNode node; + SNodeList* pScanCols; + uint64_t uid; // unique id of the table + int8_t tableType; + int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC + int32_t count; // repeat count + int32_t reverse; // reverse scan count +} SScanPhysiNode; + +typedef SScanPhysiNode SSystemTableScanPhysiNode; +typedef SScanPhysiNode STagScanPhysiNode; + +typedef struct STableScanPhysiNode { + SScanPhysiNode scan; + uint8_t scanFlag; // denotes reversed scan of data or not + STimeWindow scanRange; +} STableScanPhysiNode; + +typedef STableScanPhysiNode STableSeqScanPhysiNode; + +typedef struct SProjectPhysiNode { + SPhysiNode node; + SNodeList* pProjections; +} SProjectPhysiNode; + #ifdef __cplusplus } #endif diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 7c755627593724d615a3040a53cad93aa92670ed..4de4095752a8dcd4ccdb3bc309ca4ef28f6a49cd 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -68,6 +68,13 @@ typedef struct SColumnRefNode { int16_t columnId; } SColumnRefNode; +typedef struct STargetNode { + ENodeType type; + int16_t tupleId; + int16_t slotId; + SNode* pExpr; +} STargetNode; + typedef struct SValueNode { SExprNode node; // QUERY_NODE_VALUE char* literal; @@ -141,6 +148,7 @@ typedef struct SLogicConditionNode { typedef struct SNodeListNode { ENodeType type; // QUERY_NODE_NODE_LIST + SDataType dataType; SNodeList* pNodeList; } SNodeListNode; @@ -306,7 +314,8 @@ bool nodesIsJsonOp(const SOperatorNode* pOp); bool nodesIsTimeorderQuery(const SNode* pQuery); bool nodesIsTimelineQuery(const SNode* pQuery); -void *nodesGetValueFromNode(SValueNode *pNode); + +void* nodesGetValueFromNode(SValueNode *pNode); #ifdef __cplusplus } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 78606cd7a2c5b5bcfb36d392400c3dc6d8fa3c5f..18a316320e839e2e0a74e5e7dba15a449677c82a 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -142,6 +142,21 @@ static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) { return (SNode*)pDst; } +static SNode* columnRefNodeCopy(const SColumnRefNode* pSrc, SColumnRefNode* pDst) { + dataTypeCopy(&pSrc->dataType, &pDst->dataType); + COPY_SCALAR_FIELD(tupleId); + COPY_SCALAR_FIELD(slotId); + COPY_SCALAR_FIELD(columnId); + return (SNode*)pDst; +} + +static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) { + COPY_SCALAR_FIELD(tupleId); + COPY_SCALAR_FIELD(slotId); + COPY_NODE_FIELD(pExpr); + return (SNode*)pDst; +} + static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode* pDst) { COPY_SCALAR_FIELD(groupingSetType); COPY_NODE_LIST_FIELD(pParameterList); @@ -168,6 +183,10 @@ SNode* nodesCloneNode(const SNode* pNode) { return logicConditionNodeCopy((const SLogicConditionNode*)pNode, (SLogicConditionNode*)pDst); case QUERY_NODE_FUNCTION: return functionNodeCopy((const SFunctionNode*)pNode, (SFunctionNode*)pDst); + case QUERY_NODE_COLUMN_REF: + return columnRefNodeCopy((const SColumnRefNode*)pNode, (SColumnRefNode*)pDst); + case QUERY_NODE_TARGET: + return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst); case QUERY_NODE_REAL_TABLE: case QUERY_NODE_TEMP_TABLE: case QUERY_NODE_JOIN_TABLE: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 8968f0579dcc580ec805b6cd7977747fd524f963..356deb2f513cad1962bbecb8158e8bec8f838257 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -61,6 +61,10 @@ static char* nodeName(ENodeType type) { return "Target"; case QUERY_NODE_RAW_EXPR: return "RawExpr"; + case QUERY_NODE_TUPLE_DESC: + return "TupleDesc"; + case QUERY_NODE_SLOT_DESC: + return "SlotDesc"; case QUERY_NODE_SET_OPERATOR: return "SetOperator"; case QUERY_NODE_SELECT_STMT: @@ -71,16 +75,22 @@ static char* nodeName(ENodeType type) { return "LogicScan"; case QUERY_NODE_LOGIC_PLAN_JOIN: return "LogicJoin"; - case QUERY_NODE_LOGIC_PLAN_FILTER: - return "LogicFilter"; case QUERY_NODE_LOGIC_PLAN_AGG: return "LogicAgg"; case QUERY_NODE_LOGIC_PLAN_PROJECT: return "LogicProject"; + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + return "PhysiTagScan"; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + return "PhysiTableScan"; + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + return "PhysiProject"; default: break; } - return "Unknown"; + static char tmp[20]; + snprintf(tmp, sizeof(tmp), "Unknown %d", type); + return tmp; } static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const SNodeList* pList) { @@ -183,8 +193,93 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { return code; } -static int32_t logicFilterNodeToJson(const void* pObj, SJson* pJson) { - return logicPlanNodeToJson(pObj, pJson); +static const char* jkPhysiPlanOutputTuple = "OutputTuple"; +static const char* jkPhysiPlanConditions = "Conditions"; +static const char* jkPhysiPlanChildren = "Children"; + +static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) { + const SPhysiNode* pNode = (const SPhysiNode*)pObj; + + int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputTuple, nodeToJson, &pNode->outputTuple); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = addNodeList(pJson, jkPhysiPlanChildren, nodeToJson, pNode->pChildren); + } + + return code; +} + +static const char* jkScanPhysiPlanScanCols = "ScanCols"; +static const char* jkScanPhysiPlanTableId = "TableId"; +static const char* jkScanPhysiPlanTableType = "TableType"; +static const char* jkScanPhysiPlanScanOrder = "ScanOrder"; +static const char* jkScanPhysiPlanScanCount = "ScanCount"; +static const char* jkScanPhysiPlanReverseScanCount = "ReverseScanCount"; + +static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { + const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = addNodeList(pJson, jkScanPhysiPlanScanCols, nodeToJson, pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanOrder, pNode->order); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanCount, pNode->count); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanReverseScanCount, pNode->reverse); + } + + return code; +} + +static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { + return physiScanNodeToJson(pObj, pJson); +} + +static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag"; +static const char* jkTableScanPhysiPlanStartKey = "StartKey"; +static const char* jkTableScanPhysiPlanEndKey = "EndKey"; + +static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { + const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; + + int32_t code = physiScanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanFlag, pNode->scanFlag); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanEndKey, pNode->scanRange.ekey); + } + + return code; +} + +static const char* jkProjectPhysiPlanProjections = "Projections"; + +static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { + const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = addNodeList(pJson, jkProjectPhysiPlanProjections, nodeToJson, pNode->pProjections); + } + + return code; } static const char* jkAggLogicPlanGroupKeys = "GroupKeys"; @@ -277,19 +372,6 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) { return code; } -// typedef struct SValueNode { -// SExprNode node; // QUERY_NODE_VALUE -// char* ; -// bool ; -// union { -// bool b; -// int64_t i; -// uint64_t u; -// double d; -// char* p; -// } datum; -// } SValueNode; - static const char* jkValueLiteral = "Literal"; static const char* jkValueDuration = "Duration"; static const char* jkValueDatum = "Datum"; @@ -421,6 +503,74 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) { return code; } +static const char* jkColumnRefDataType = "DataType"; +static const char* jkColumnRefTupleId = "TupleId"; +static const char* jkColumnRefSlotId = "SlotId"; +static const char* jkColumnRefColumnId = "ColumnId"; + +static int32_t columnRefNodeToJson(const void* pObj, SJson* pJson) { + const SColumnRefNode* pNode = (const SColumnRefNode*)pObj; + + int32_t code = tjsonAddObject(pJson, jkColumnRefDataType, dataTypeToJson, &pNode->dataType); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkColumnRefTupleId, pNode->tupleId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkColumnRefSlotId, pNode->slotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkColumnRefColumnId, pNode->columnId); + } + + return code; +} + +static const char* jkTargetTupleId = "TupleId"; +static const char* jkTargetSlotId = "SlotId"; +static const char* jkTargetExpr = "Expr"; + +static int32_t targetNodeToJson(const void* pObj, SJson* pJson) { + const STargetNode* pNode = (const STargetNode*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkTargetTupleId, pNode->tupleId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTargetSlotId, pNode->slotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkTargetExpr, nodeToJson, pNode->pExpr); + } + + return code; +} + +static const char* jkSlotDescSlotId = "SlotId"; +static const char* jkSlotDescDataType = "DataType"; + +static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) { + const SSlotDescNode* pNode = (const SSlotDescNode*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkSlotDescSlotId, pNode->slotId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSlotDescDataType, dataTypeToJson, &pNode->dataType); + } + + return code; +} + +static const char* jkTupleDescTupleId = "TupleId"; +static const char* jkTupleDescSlots = "Slots"; + +static int32_t tupleDescNodeToJson(const void* pObj, SJson* pJson) { + const STupleDescNode* pNode = (const STupleDescNode*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkTupleDescTupleId, pNode->tupleId); + if (TSDB_CODE_SUCCESS == code) { + code = addNodeList(pJson, jkTupleDescSlots, nodeToJson, pNode->pSlots); + } + + return code; +} + static const char* jkSelectStmtDistinct = "Distinct"; static const char* jkSelectStmtProjections = "Projections"; static const char* jkSelectStmtFrom = "From"; @@ -497,8 +647,15 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_NODE_LIST: case QUERY_NODE_FILL: case QUERY_NODE_COLUMN_REF: + return columnRefNodeToJson(pObj, pJson); case QUERY_NODE_TARGET: + return targetNodeToJson(pObj, pJson); case QUERY_NODE_RAW_EXPR: + break; + case QUERY_NODE_TUPLE_DESC: + return tupleDescNodeToJson(pObj, pJson); + case QUERY_NODE_SLOT_DESC: + return slotDescNodeToJson(pObj, pJson); case QUERY_NODE_SET_OPERATOR: break; case QUERY_NODE_SELECT_STMT: @@ -509,12 +666,16 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return logicScanNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_JOIN: return logicJoinNodeToJson(pObj, pJson); - case QUERY_NODE_LOGIC_PLAN_FILTER: - return logicFilterNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_AGG: return logicAggNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_PROJECT: return logicProjectNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + return physiTagScanNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + return physiTableScanNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + return physiProjectNodeToJson(pObj, pJson); default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index f2205f511ed17d40a73ca2237820a5e5f0f81d65..35981322170da249df643df36c984d3da1e52946 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -75,12 +75,24 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SScanLogicNode)); case QUERY_NODE_LOGIC_PLAN_JOIN: return makeNode(type, sizeof(SJoinLogicNode)); - case QUERY_NODE_LOGIC_PLAN_FILTER: - return makeNode(type, sizeof(SFilterLogicNode)); case QUERY_NODE_LOGIC_PLAN_AGG: return makeNode(type, sizeof(SAggLogicNode)); case QUERY_NODE_LOGIC_PLAN_PROJECT: return makeNode(type, sizeof(SProjectLogicNode)); + case QUERY_NODE_COLUMN_REF: + return makeNode(type, sizeof(SColumnRefNode)); + case QUERY_NODE_TARGET: + return makeNode(type, sizeof(STargetNode)); + case QUERY_NODE_TUPLE_DESC: + return makeNode(type, sizeof(STupleDescNode)); + case QUERY_NODE_SLOT_DESC: + return makeNode(type, sizeof(SSlotDescNode)); + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + return makeNode(type, sizeof(STagScanPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + return makeNode(type, sizeof(STableScanPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + return makeNode(type, sizeof(SProjectPhysiNode)); default: break; } @@ -184,29 +196,29 @@ void nodesDestroyList(SNodeList* pList) { tfree(pList); } -void *nodesGetValueFromNode(SValueNode *pNode) { +void* nodesGetValueFromNode(SValueNode *pNode) { switch (pNode->node.resType.type) { case TSDB_DATA_TYPE_BOOL: - return (void *)&pNode->datum.b; + return (void*)&pNode->datum.b; case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: - return (void *)&pNode->datum.i; + return (void*)&pNode->datum.i; case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UBIGINT: - return (void *)&pNode->datum.u; + return (void*)&pNode->datum.u; case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE: - return (void *)&pNode->datum.d; + return (void*)&pNode->datum.d; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: - return (void *)pNode->datum.p; + return (void*)pNode->datum.p; default: break; } diff --git a/source/libs/parser/src/parserImpl.c b/source/libs/parser/src/parserImpl.c index cf8d05975b5a06e84b257da9302350a75ab33ec0..ef040fdff47ff925acfbdc08071147d2eb56bf74 100644 --- a/source/libs/parser/src/parserImpl.c +++ b/source/libs/parser/src/parserImpl.c @@ -500,22 +500,24 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) { } static int32_t trimStringCopy(const char* src, int32_t len, char* dst) { + varDataSetLen(dst, len); + char* dstVal = varDataVal(dst); // delete escape character: \\, \', \" char delim = src[0]; int32_t cnt = 0; int32_t j = 0; for (uint32_t k = 1; k < len - 1; ++k) { if (src[k] == '\\' || (src[k] == delim && src[k + 1] == delim)) { - dst[j] = src[k + 1]; + dstVal[j] = src[k + 1]; cnt++; j++; k++; continue; } - dst[j] = src[k]; + dstVal[j] = src[k]; j++; } - dst[j] = '\0'; + dstVal[j] = '\0'; return j; } @@ -560,7 +562,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: { int32_t n = strlen(pVal->literal); - pVal->datum.p = calloc(1, n); + pVal->datum.p = calloc(1, n + VARSTR_HEADER_SIZE); if (NULL == pVal->datum.p) { generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); return DEAL_RES_ERROR; diff --git a/source/libs/planner/inc/plannerImpl.h b/source/libs/planner/inc/plannerImpl.h index d89cc26700e445254e7fbc29ebb5f38e6e0dc46c..559d614829291a00de665d45e67f81f4736f459d 100644 --- a/source/libs/planner/inc/plannerImpl.h +++ b/source/libs/planner/inc/plannerImpl.h @@ -24,6 +24,7 @@ extern "C" { #include "planner.h" int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode); +int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode); #ifdef __cplusplus } diff --git a/source/libs/planner/src/plannerImpl.c b/source/libs/planner/src/plannerImpl.c index 54a7330667ff7b272c026a889a8c7b95eb2d7471..be570f0b96106f168c067db82165c571d8a9d18e 100644 --- a/source/libs/planner/src/plannerImpl.c +++ b/source/libs/planner/src/plannerImpl.c @@ -19,7 +19,6 @@ #define CHECK_ALLOC(p, res) \ do { \ if (NULL == (p)) { \ - printf("%s : %d\n", __FUNCTION__, __LINE__); \ pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \ return (res); \ } \ @@ -29,7 +28,6 @@ do { \ int32_t code = (exec); \ if (TSDB_CODE_SUCCESS != code) { \ - printf("%s : %d\n", __FUNCTION__, __LINE__); \ pCxt->errCode = code; \ return (res); \ } \ @@ -38,7 +36,6 @@ typedef struct SPlanContext { int32_t errCode; int32_t planNodeId; - SNodeList* pResource; } SPlanContext; static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt); @@ -60,10 +57,7 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { FOREACH(pExpr, pCxt->pExprs) { if (nodesEqualNode(pExpr, *pNode)) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); - if (NULL == pCol) { - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; - return DEAL_RES_ERROR; - } + CHECK_ALLOC(pCol, DEAL_RES_ERROR); SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode); pCol->node.resType = pToBeRewrittenExpr->resType; strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName); @@ -222,26 +216,6 @@ static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSele return NULL; } -static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SLogicNode* pChild, SSelectStmt* pSelect) { - if (NULL == pSelect->pWhere) { - return NULL; - } - - SFilterLogicNode* pFilter = (SFilterLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILTER); - CHECK_ALLOC(pFilter, NULL); - pFilter->node.id = pCxt->planNodeId++; - - // set filter conditions - pFilter->node.pConditions = nodesCloneNode(pSelect->pWhere); - CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter); - - // set the output - pFilter->node.pTargets = nodesCloneList(pChild->pTargets); - CHECK_ALLOC(pFilter->node.pTargets, (SLogicNode*)pFilter); - - return (SLogicNode*)pFilter; -} - typedef struct SCreateColumnCxt { int32_t errCode; SNodeList* pList; @@ -252,10 +226,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { switch (nodeType(pNode)) { case QUERY_NODE_COLUMN: { SNode* pCol = nodesCloneNode(pNode); - if (NULL == pCol || TSDB_CODE_SUCCESS != nodesListAppend(pCxt->pList, pCol)) { - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; - return DEAL_RES_ERROR; - } + CHECK_ALLOC(pCol, DEAL_RES_ERROR); + CHECK_CODE(nodesListAppend(pCxt->pList, pCol), DEAL_RES_ERROR); return DEAL_RES_IGNORE_CHILD; } case QUERY_NODE_OPERATOR: @@ -263,16 +235,10 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { case QUERY_NODE_FUNCTION: { SExprNode* pExpr = (SExprNode*)pNode; SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); - if (NULL == pCol) { - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; - return DEAL_RES_ERROR; - } + CHECK_ALLOC(pCol, DEAL_RES_ERROR); pCol->node.resType = pExpr->resType; strcpy(pCol->colName, pExpr->aliasName); - if (TSDB_CODE_SUCCESS != nodesListAppend(pCxt->pList, (SNode*)pCol)) { - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; - return DEAL_RES_ERROR; - } + CHECK_CODE(nodesListAppend(pCxt->pList, (SNode*)pCol), DEAL_RES_ERROR); return DEAL_RES_IGNORE_CHILD; } default: @@ -284,9 +250,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { static SNodeList* createColumnByRewriteExps(SPlanContext* pCxt, SNodeList* pExprs) { SCreateColumnCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pList = nodesMakeList() }; - if (NULL == cxt.pList) { - return NULL; - } + CHECK_ALLOC(cxt.pList, NULL); + nodesWalkList(pExprs, doCreateColumn, &cxt); if (TSDB_CODE_SUCCESS != cxt.errCode) { nodesDestroyList(cxt.pList); @@ -379,8 +344,9 @@ static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSele static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) { SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable); - if (TSDB_CODE_SUCCESS == pCxt->errCode) { - pRoot = pushLogicNode(pCxt, pRoot, createWhereFilterLogicNode(pCxt, pRoot, pSelect)); + if (TSDB_CODE_SUCCESS == pCxt->errCode && NULL != pSelect->pWhere) { + pRoot->pConditions = nodesCloneNode(pSelect->pWhere); + CHECK_ALLOC(pRoot->pConditions, pRoot); } if (TSDB_CODE_SUCCESS == pCxt->errCode) { pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect)); @@ -410,3 +376,300 @@ int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) { *pLogicNode = pRoot; return TSDB_CODE_SUCCESS; } + +int32_t optimize(SLogicNode* pLogicNode) { + // todo + return TSDB_CODE_SUCCESS; +} + +typedef struct SSubLogicPlan { + SNode* pRoot; // SLogicNode + bool haveSuperTable; + bool haveSystemTable; +} SSubLogicPlan; + +int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) { + // todo + return TSDB_CODE_SUCCESS; +} + +typedef struct SSlotIndex { + int16_t tupleId; + int16_t slotId; +} SSlotIndex; + +typedef struct SPhysiPlanContext { + int32_t errCode; + int16_t nextTupleId; + SArray* pTupleHelper; +} SPhysiPlanContext; + +static int32_t getSlotKey(SNode* pNode, char* pKey) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + return sprintf(pKey, "%s.%s", ((SColumnNode*)pNode)->tableAlias, ((SColumnNode*)pNode)->colName); + } else { + return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName); + } +} + +static SNode* createColumnRef(SNode* pNode, int16_t tupleId, int16_t slotId) { + SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF); + if (NULL == pCol) { + return NULL; + } + pCol->dataType = ((SExprNode*)pNode)->resType; + pCol->tupleId = tupleId; + pCol->slotId = slotId; + pCol->columnId = (QUERY_NODE_COLUMN == nodeType(pNode) ? ((SColumnNode*)pNode)->colId : -1); + return (SNode*)pCol; +} + +static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) { + SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC); + CHECK_ALLOC(pSlot, NULL); + pSlot->slotId = slotId; + pSlot->dataType = ((SExprNode*)pNode)->resType; + pSlot->srcTupleId = -1; + pSlot->srcSlotId = -1; + pSlot->reserve = false; + pSlot->output = true; + return (SNode*)pSlot; +} + +static SNode* createTarget(SNode* pNode, int16_t tupleId, int16_t slotId) { + STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET); + if (NULL == pTarget) { + return NULL; + } + pTarget->tupleId = tupleId; + pTarget->slotId = slotId; + pTarget->pExpr = nodesCloneNode(pNode); + if (NULL == pTarget->pExpr) { + nodesDestroyNode((SNode*)pTarget); + return NULL; + } + return (SNode*)pTarget; +} + +static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDescNode* pTuple, SNodeList** pOutput) { + pTuple->tupleId = pCxt->nextTupleId++; + + SHashObj* pHash = NULL; + if (NULL == pTuple->pSlots) { + pTuple->pSlots = nodesMakeList(); + CHECK_ALLOC(pTuple->pSlots, TSDB_CODE_OUT_OF_MEMORY); + + pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == taosArrayInsert(pCxt->pTupleHelper, pTuple->tupleId, &pHash)) { + taosHashCleanup(pHash); + return TSDB_CODE_OUT_OF_MEMORY; + } + } else { + pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId); + } + + *pOutput = nodesMakeList(); + CHECK_ALLOC(*pOutput, TSDB_CODE_OUT_OF_MEMORY); + + SNode* pNode = NULL; + int16_t slotId = 0; + FOREACH(pNode, pList) { + SNode* pSlot = createSlotDesc(pCxt, pNode, slotId); + CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY); + if (TSDB_CODE_SUCCESS != nodesListAppend(pTuple->pSlots, (SNode*)pSlot)) { + nodesDestroyNode(pSlot); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNode* pTarget = createTarget(pNode, pTuple->tupleId, slotId); + CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY); + if (TSDB_CODE_SUCCESS != nodesListAppend(*pOutput, pTarget)) { + nodesDestroyNode(pTarget); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSlotIndex index = { .tupleId = pTuple->tupleId, .slotId = slotId }; + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; + int32_t len = getSlotKey(pNode, name); + CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY); + + ++slotId; + } + return TSDB_CODE_SUCCESS; +} + +typedef struct STransformCxt { + int32_t errCode; + SHashObj* pHash; +} STransformCxt; + +static EDealRes doTransform(SNode** pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(*pNode)) { + STransformCxt* pCxt = (STransformCxt*)pContext; + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; + int32_t len = getSlotKey(*pNode, name); + SSlotIndex* pIndex = taosHashGet(pCxt->pHash, name, len); + if (NULL != pIndex) { + *pNode = createColumnRef(*pNode, pIndex->tupleId, pIndex->slotId); + CHECK_ALLOC(*pNode, DEAL_RES_ERROR); + return DEAL_RES_IGNORE_CHILD; + } + } + return DEAL_RES_CONTINUE; +} + +static SNode* transformForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNode* pNode) { + SNode* pRes = nodesCloneNode(pNode); + CHECK_ALLOC(pRes, NULL); + STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) }; + nodesRewriteNode(&pRes, doTransform, &cxt); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyNode(pRes); + return NULL; + } + return pRes; +} + +static SNodeList* transformListForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNodeList* pList) { + SNodeList* pRes = nodesCloneList(pList); + CHECK_ALLOC(pRes, NULL); + STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) }; + nodesRewriteList(pRes, doTransform, &cxt); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(pRes); + return NULL; + } + return pRes; +} + +static SPhysiNode* makePhysiNode(ENodeType type) { + SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type); + if (NULL == pPhysiNode) { + return NULL; + } + pPhysiNode->outputTuple.type = QUERY_NODE_TUPLE_DESC; + return pPhysiNode; +} + +static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) { + CHECK_CODE(addTupleDesc(pCxt, pScanLogicNode->pScanCols, &pScanPhysiNode->node.outputTuple, &pScanPhysiNode->pScanCols), TSDB_CODE_OUT_OF_MEMORY); + + if (NULL != pScanLogicNode->node.pConditions) { + pScanPhysiNode->node.pConditions = transformForPhysiPlan(pCxt, pScanPhysiNode->node.outputTuple.tupleId, pScanLogicNode->node.pConditions); + CHECK_ALLOC(pScanPhysiNode->node.pConditions, TSDB_CODE_OUT_OF_MEMORY); + } + + pScanPhysiNode->uid = pScanLogicNode->pMeta->uid; + pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType; + pScanPhysiNode->order = TSDB_ORDER_ASC; + pScanPhysiNode->count = 1; + pScanPhysiNode->reverse = 0; + + return TSDB_CODE_SUCCESS; +} + +static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) { + STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN); + CHECK_ALLOC(pTagScan, NULL); + CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan), (SPhysiNode*)pTagScan); + return (SPhysiNode*)pTagScan; +} + +static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) { + STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + CHECK_ALLOC(pTableScan, NULL); + CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan); + pTableScan->scanFlag = pScanLogicNode->scanFlag; + pTableScan->scanRange = pScanLogicNode->scanRange; + return (SPhysiNode*)pTableScan; +} + +static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) { + switch (pScanLogicNode->scanType) { + case SCAN_TYPE_TAG: + return createTagScanPhysiNode(pCxt, pScanLogicNode); + case SCAN_TYPE_TABLE: + return createTableScanPhysiNode(pCxt, pScanLogicNode); + case SCAN_TYPE_STABLE: + case SCAN_TYPE_STREAM: + break; + default: + break; + } +} + +static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SProjectLogicNode* pProjectLogicNode) { + SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_PROJECT); + CHECK_ALLOC(pProject, NULL); + + SNodeList* pProjections = transformListForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->pProjections); + CHECK_ALLOC(pProjections, (SPhysiNode*)pProject); + CHECK_CODE(addTupleDesc(pCxt, pProjections, &pProject->node.outputTuple, &pProject->pProjections), (SPhysiNode*)pProject); + nodesDestroyList(pProjections); + + if (NULL != pProjectLogicNode->node.pConditions) { + pProject->node.pConditions = transformForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->node.pConditions); + CHECK_ALLOC(pProject->node.pConditions, (SPhysiNode*)pProject); + } + + return (SPhysiNode*)pProject; +} + +static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPlan) { + SNodeList* pChildern = nodesMakeList(); + CHECK_ALLOC(pChildern, NULL); + + SNode* pLogicChild; + FOREACH(pLogicChild, pLogicPlan->pChildren) { + SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, (SLogicNode*)pLogicChild); + if (TSDB_CODE_SUCCESS != nodesListAppend(pChildern, pChildPhyNode)) { + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; + nodesDestroyList(pChildern); + return NULL; + } + } + + SPhysiNode* pPhyNode = NULL; + switch (nodeType(pLogicPlan)) { + case QUERY_NODE_LOGIC_PLAN_SCAN: + pPhyNode = createScanPhysiNode(pCxt, (SScanLogicNode*)pLogicPlan); + break; + case QUERY_NODE_LOGIC_PLAN_JOIN: + break; + case QUERY_NODE_LOGIC_PLAN_AGG: + break; + case QUERY_NODE_LOGIC_PLAN_PROJECT: + pPhyNode = createProjectPhysiNode(pCxt, (SProjectLogicNode*)pLogicPlan); + break; + default: + break; + } + + if (NULL != pPhyNode) { + pPhyNode->pChildren = pChildern; + SNode* pChild; + FOREACH(pChild, pPhyNode->pChildren) { + ((SPhysiNode*)pChild)->pParent = pPhyNode; + } + } + + return pPhyNode; +} + +int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) { + SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextTupleId = 0, .pTupleHelper = taosArrayInit(32, POINTER_BYTES) }; + if (NULL == cxt.pTupleHelper) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *pPhyNode = createPhysiNode(&cxt, pLogicNode); + return cxt.errCode; +} + +int32_t buildPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) { + // split + // scale out + // maping + // create + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/planner/test/newPlannerTest.cpp b/source/libs/planner/test/newPlannerTest.cpp index c3ee4fb9a8d0f205487057c37399b72a4b078ede..e99f0c150c37a33abacac56435e116bc5ae765f9 100644 --- a/source/libs/planner/test/newPlannerTest.cpp +++ b/source/libs/planner/test/newPlannerTest.cpp @@ -25,6 +25,11 @@ using namespace testing; class NewPlannerTest : public Test { protected: + enum TestTarget { + TEST_LOGIC_PLAN, + TEST_PHYSICAL_PLAN + }; + void setDatabase(const string& acctId, const string& db) { acctId_ = acctId; db_ = db; @@ -40,7 +45,7 @@ protected: cxt_.pSql = sqlBuf_.c_str(); } - bool run() { + bool run(TestTarget target = TEST_PHYSICAL_PLAN) { int32_t code = parser(&cxt_, &query_); if (code != TSDB_CODE_SUCCESS) { @@ -53,17 +58,27 @@ protected: SLogicNode* pLogicPlan = nullptr; code = createLogicPlan(query_.pRoot, &pLogicPlan); if (code != TSDB_CODE_SUCCESS) { - cout << "sql:[" << cxt_.pSql << "] plan code:" << code << ", strerror:" << tstrerror(code) << endl; + cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; return false; } cout << "sql : [" << cxt_.pSql << "]" << endl; cout << "syntax test : " << endl; cout << syntaxTreeStr << endl; - // cout << "logic plan : " << endl; - // cout << toString((const SNode*)pLogicPlan) << endl; cout << "unformatted logic plan : " << endl; cout << toString((const SNode*)pLogicPlan, false) << endl; + + if (TEST_PHYSICAL_PLAN == target) { + SPhysiNode* pPhyPlan = nullptr; + code = createPhysiPlan(pLogicPlan, &pPhyPlan); + if (code != TSDB_CODE_SUCCESS) { + cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl; + return false; + } + cout << "unformatted physical plan : " << endl; + cout << toString((const SNode*)pPhyPlan, false) << endl; + } + return true; } @@ -120,3 +135,10 @@ TEST_F(NewPlannerTest, groupBy) { bind("SELECT c1 + c3, count(*) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3"); ASSERT_TRUE(run()); } + +TEST_F(NewPlannerTest, subquery) { + setDatabase("root", "test"); + + bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b"); + ASSERT_TRUE(run()); +}