提交 708c361e 编写于 作者: X Xiaoyu Wang

TD-13495 physical plan refactoring

上级 740abc11
...@@ -61,22 +61,27 @@ typedef enum ENodeType { ...@@ -61,22 +61,27 @@ typedef enum ENodeType {
QUERY_NODE_INTERVAL_WINDOW, QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_NODE_LIST, QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL, QUERY_NODE_FILL,
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
QUERY_NODE_COLUMN_REF, QUERY_NODE_COLUMN_REF,
QUERY_NODE_TARGET, QUERY_NODE_TARGET,
QUERY_NODE_TUPLE_DESC,
// Only be used in parser module. QUERY_NODE_SLOT_DESC,
QUERY_NODE_RAW_EXPR,
// Statement nodes are used in parser and planner module. // Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR, QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT, QUERY_NODE_SELECT_STMT,
QUERY_NODE_SHOW_STMT, QUERY_NODE_SHOW_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN, QUERY_NODE_LOGIC_PLAN_SCAN,
QUERY_NODE_LOGIC_PLAN_JOIN, QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_FILTER,
QUERY_NODE_LOGIC_PLAN_AGG, QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT QUERY_NODE_LOGIC_PLAN_PROJECT,
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
} ENodeType; } ENodeType;
/** /**
......
...@@ -21,6 +21,7 @@ extern "C" { ...@@ -21,6 +21,7 @@ extern "C" {
#endif #endif
#include "querynodes.h" #include "querynodes.h"
#include "tmsg.h"
typedef struct SLogicNode { typedef struct SLogicNode {
ENodeType type; ENodeType type;
...@@ -31,10 +32,20 @@ typedef struct SLogicNode { ...@@ -31,10 +32,20 @@ typedef struct SLogicNode {
struct SLogicNode* pParent; struct SLogicNode* pParent;
} SLogicNode; } SLogicNode;
typedef enum EScanType {
SCAN_TYPE_TAG,
SCAN_TYPE_TABLE,
SCAN_TYPE_STABLE,
SCAN_TYPE_STREAM
} EScanType;
typedef struct SScanLogicNode { typedef struct SScanLogicNode {
SLogicNode node; SLogicNode node;
SNodeList* pScanCols; SNodeList* pScanCols;
struct STableMeta* pMeta; struct STableMeta* pMeta;
EScanType scanType;
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange;
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode { typedef struct SJoinLogicNode {
...@@ -43,10 +54,6 @@ typedef struct SJoinLogicNode { ...@@ -43,10 +54,6 @@ typedef struct SJoinLogicNode {
SNode* pOnConditions; SNode* pOnConditions;
} SJoinLogicNode; } SJoinLogicNode;
typedef struct SFilterLogicNode {
SLogicNode node;
} SFilterLogicNode;
typedef struct SAggLogicNode { typedef struct SAggLogicNode {
SLogicNode node; SLogicNode node;
SNodeList* pGroupKeys; SNodeList* pGroupKeys;
...@@ -58,6 +65,56 @@ typedef struct SProjectLogicNode { ...@@ -58,6 +65,56 @@ typedef struct SProjectLogicNode {
SNodeList* pProjections; SNodeList* pProjections;
} SProjectLogicNode; } SProjectLogicNode;
typedef struct SSlotDescNode {
ENodeType type;
int16_t slotId;
SDataType dataType;
int16_t srcTupleId;
int16_t srcSlotId;
bool reserve;
bool output;
} SSlotDescNode;
typedef struct STupleDescNode {
ENodeType type;
int16_t tupleId;
SNodeList* pSlots;
} STupleDescNode;
typedef struct SPhysiNode {
ENodeType type;
STupleDescNode outputTuple;
SNode* pConditions;
SNodeList* pChildren;
struct SPhysiNode* pParent;
} SPhysiNode;
typedef struct SScanPhysiNode {
SPhysiNode node;
SNodeList* pScanCols;
uint64_t uid; // unique id of the table
int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
int32_t reverse; // reverse scan count
} SScanPhysiNode;
typedef SScanPhysiNode SSystemTableScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode;
typedef struct STableScanPhysiNode {
SScanPhysiNode scan;
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;
typedef struct SProjectPhysiNode {
SPhysiNode node;
SNodeList* pProjections;
} SProjectPhysiNode;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -68,6 +68,13 @@ typedef struct SColumnRefNode { ...@@ -68,6 +68,13 @@ typedef struct SColumnRefNode {
int16_t columnId; int16_t columnId;
} SColumnRefNode; } SColumnRefNode;
typedef struct STargetNode {
ENodeType type;
int16_t tupleId;
int16_t slotId;
SNode* pExpr;
} STargetNode;
typedef struct SValueNode { typedef struct SValueNode {
SExprNode node; // QUERY_NODE_VALUE SExprNode node; // QUERY_NODE_VALUE
char* literal; char* literal;
...@@ -141,6 +148,7 @@ typedef struct SLogicConditionNode { ...@@ -141,6 +148,7 @@ typedef struct SLogicConditionNode {
typedef struct SNodeListNode { typedef struct SNodeListNode {
ENodeType type; // QUERY_NODE_NODE_LIST ENodeType type; // QUERY_NODE_NODE_LIST
SDataType dataType;
SNodeList* pNodeList; SNodeList* pNodeList;
} SNodeListNode; } SNodeListNode;
...@@ -306,7 +314,8 @@ bool nodesIsJsonOp(const SOperatorNode* pOp); ...@@ -306,7 +314,8 @@ bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsTimeorderQuery(const SNode* pQuery); bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery); bool nodesIsTimelineQuery(const SNode* pQuery);
void *nodesGetValueFromNode(SValueNode *pNode);
void* nodesGetValueFromNode(SValueNode *pNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -142,6 +142,21 @@ static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) { ...@@ -142,6 +142,21 @@ static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
return (SNode*)pDst; return (SNode*)pDst;
} }
static SNode* columnRefNodeCopy(const SColumnRefNode* pSrc, SColumnRefNode* pDst) {
dataTypeCopy(&pSrc->dataType, &pDst->dataType);
COPY_SCALAR_FIELD(tupleId);
COPY_SCALAR_FIELD(slotId);
COPY_SCALAR_FIELD(columnId);
return (SNode*)pDst;
}
static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) {
COPY_SCALAR_FIELD(tupleId);
COPY_SCALAR_FIELD(slotId);
COPY_NODE_FIELD(pExpr);
return (SNode*)pDst;
}
static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode* pDst) { static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode* pDst) {
COPY_SCALAR_FIELD(groupingSetType); COPY_SCALAR_FIELD(groupingSetType);
COPY_NODE_LIST_FIELD(pParameterList); COPY_NODE_LIST_FIELD(pParameterList);
...@@ -168,6 +183,10 @@ SNode* nodesCloneNode(const SNode* pNode) { ...@@ -168,6 +183,10 @@ SNode* nodesCloneNode(const SNode* pNode) {
return logicConditionNodeCopy((const SLogicConditionNode*)pNode, (SLogicConditionNode*)pDst); return logicConditionNodeCopy((const SLogicConditionNode*)pNode, (SLogicConditionNode*)pDst);
case QUERY_NODE_FUNCTION: case QUERY_NODE_FUNCTION:
return functionNodeCopy((const SFunctionNode*)pNode, (SFunctionNode*)pDst); return functionNodeCopy((const SFunctionNode*)pNode, (SFunctionNode*)pDst);
case QUERY_NODE_COLUMN_REF:
return columnRefNodeCopy((const SColumnRefNode*)pNode, (SColumnRefNode*)pDst);
case QUERY_NODE_TARGET:
return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst);
case QUERY_NODE_REAL_TABLE: case QUERY_NODE_REAL_TABLE:
case QUERY_NODE_TEMP_TABLE: case QUERY_NODE_TEMP_TABLE:
case QUERY_NODE_JOIN_TABLE: case QUERY_NODE_JOIN_TABLE:
......
...@@ -61,6 +61,10 @@ static char* nodeName(ENodeType type) { ...@@ -61,6 +61,10 @@ static char* nodeName(ENodeType type) {
return "Target"; return "Target";
case QUERY_NODE_RAW_EXPR: case QUERY_NODE_RAW_EXPR:
return "RawExpr"; return "RawExpr";
case QUERY_NODE_TUPLE_DESC:
return "TupleDesc";
case QUERY_NODE_SLOT_DESC:
return "SlotDesc";
case QUERY_NODE_SET_OPERATOR: case QUERY_NODE_SET_OPERATOR:
return "SetOperator"; return "SetOperator";
case QUERY_NODE_SELECT_STMT: case QUERY_NODE_SELECT_STMT:
...@@ -71,16 +75,22 @@ static char* nodeName(ENodeType type) { ...@@ -71,16 +75,22 @@ static char* nodeName(ENodeType type) {
return "LogicScan"; return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return "LogicJoin"; return "LogicJoin";
case QUERY_NODE_LOGIC_PLAN_FILTER:
return "LogicFilter";
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return "LogicAgg"; return "LogicAgg";
case QUERY_NODE_LOGIC_PLAN_PROJECT: case QUERY_NODE_LOGIC_PLAN_PROJECT:
return "LogicProject"; return "LogicProject";
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return "PhysiTagScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return "PhysiTableScan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
default: default:
break; break;
} }
return "Unknown"; static char tmp[20];
snprintf(tmp, sizeof(tmp), "Unknown %d", type);
return tmp;
} }
static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const SNodeList* pList) { static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const SNodeList* pList) {
...@@ -183,8 +193,93 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { ...@@ -183,8 +193,93 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
return code; return code;
} }
static int32_t logicFilterNodeToJson(const void* pObj, SJson* pJson) { static const char* jkPhysiPlanOutputTuple = "OutputTuple";
return logicPlanNodeToJson(pObj, pJson); static const char* jkPhysiPlanConditions = "Conditions";
static const char* jkPhysiPlanChildren = "Children";
static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) {
const SPhysiNode* pNode = (const SPhysiNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputTuple, nodeToJson, &pNode->outputTuple);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions);
}
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkPhysiPlanChildren, nodeToJson, pNode->pChildren);
}
return code;
}
static const char* jkScanPhysiPlanScanCols = "ScanCols";
static const char* jkScanPhysiPlanTableId = "TableId";
static const char* jkScanPhysiPlanTableType = "TableType";
static const char* jkScanPhysiPlanScanOrder = "ScanOrder";
static const char* jkScanPhysiPlanScanCount = "ScanCount";
static const char* jkScanPhysiPlanReverseScanCount = "ReverseScanCount";
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkScanPhysiPlanScanCols, nodeToJson, pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanOrder, pNode->order);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanCount, pNode->count);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanReverseScanCount, pNode->reverse);
}
return code;
}
static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) {
return physiScanNodeToJson(pObj, pJson);
}
static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag";
static const char* jkTableScanPhysiPlanStartKey = "StartKey";
static const char* jkTableScanPhysiPlanEndKey = "EndKey";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanFlag, pNode->scanFlag);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanEndKey, pNode->scanRange.ekey);
}
return code;
}
static const char* jkProjectPhysiPlanProjections = "Projections";
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkProjectPhysiPlanProjections, nodeToJson, pNode->pProjections);
}
return code;
} }
static const char* jkAggLogicPlanGroupKeys = "GroupKeys"; static const char* jkAggLogicPlanGroupKeys = "GroupKeys";
...@@ -277,19 +372,6 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) { ...@@ -277,19 +372,6 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
return code; return code;
} }
// typedef struct SValueNode {
// SExprNode node; // QUERY_NODE_VALUE
// char* ;
// bool ;
// union {
// bool b;
// int64_t i;
// uint64_t u;
// double d;
// char* p;
// } datum;
// } SValueNode;
static const char* jkValueLiteral = "Literal"; static const char* jkValueLiteral = "Literal";
static const char* jkValueDuration = "Duration"; static const char* jkValueDuration = "Duration";
static const char* jkValueDatum = "Datum"; static const char* jkValueDatum = "Datum";
...@@ -421,6 +503,74 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) { ...@@ -421,6 +503,74 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
return code; return code;
} }
static const char* jkColumnRefDataType = "DataType";
static const char* jkColumnRefTupleId = "TupleId";
static const char* jkColumnRefSlotId = "SlotId";
static const char* jkColumnRefColumnId = "ColumnId";
static int32_t columnRefNodeToJson(const void* pObj, SJson* pJson) {
const SColumnRefNode* pNode = (const SColumnRefNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkColumnRefDataType, dataTypeToJson, &pNode->dataType);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkColumnRefTupleId, pNode->tupleId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkColumnRefSlotId, pNode->slotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkColumnRefColumnId, pNode->columnId);
}
return code;
}
static const char* jkTargetTupleId = "TupleId";
static const char* jkTargetSlotId = "SlotId";
static const char* jkTargetExpr = "Expr";
static int32_t targetNodeToJson(const void* pObj, SJson* pJson) {
const STargetNode* pNode = (const STargetNode*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTargetTupleId, pNode->tupleId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTargetSlotId, pNode->slotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkTargetExpr, nodeToJson, pNode->pExpr);
}
return code;
}
static const char* jkSlotDescSlotId = "SlotId";
static const char* jkSlotDescDataType = "DataType";
static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) {
const SSlotDescNode* pNode = (const SSlotDescNode*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkSlotDescSlotId, pNode->slotId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSlotDescDataType, dataTypeToJson, &pNode->dataType);
}
return code;
}
static const char* jkTupleDescTupleId = "TupleId";
static const char* jkTupleDescSlots = "Slots";
static int32_t tupleDescNodeToJson(const void* pObj, SJson* pJson) {
const STupleDescNode* pNode = (const STupleDescNode*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTupleDescTupleId, pNode->tupleId);
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkTupleDescSlots, nodeToJson, pNode->pSlots);
}
return code;
}
static const char* jkSelectStmtDistinct = "Distinct"; static const char* jkSelectStmtDistinct = "Distinct";
static const char* jkSelectStmtProjections = "Projections"; static const char* jkSelectStmtProjections = "Projections";
static const char* jkSelectStmtFrom = "From"; static const char* jkSelectStmtFrom = "From";
...@@ -497,8 +647,15 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -497,8 +647,15 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_NODE_LIST: case QUERY_NODE_NODE_LIST:
case QUERY_NODE_FILL: case QUERY_NODE_FILL:
case QUERY_NODE_COLUMN_REF: case QUERY_NODE_COLUMN_REF:
return columnRefNodeToJson(pObj, pJson);
case QUERY_NODE_TARGET: case QUERY_NODE_TARGET:
return targetNodeToJson(pObj, pJson);
case QUERY_NODE_RAW_EXPR: case QUERY_NODE_RAW_EXPR:
break;
case QUERY_NODE_TUPLE_DESC:
return tupleDescNodeToJson(pObj, pJson);
case QUERY_NODE_SLOT_DESC:
return slotDescNodeToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR: case QUERY_NODE_SET_OPERATOR:
break; break;
case QUERY_NODE_SELECT_STMT: case QUERY_NODE_SELECT_STMT:
...@@ -509,12 +666,16 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -509,12 +666,16 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return logicScanNodeToJson(pObj, pJson); return logicScanNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return logicJoinNodeToJson(pObj, pJson); return logicJoinNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_FILTER:
return logicFilterNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return logicAggNodeToJson(pObj, pJson); return logicAggNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_PROJECT: case QUERY_NODE_LOGIC_PLAN_PROJECT:
return logicProjectNodeToJson(pObj, pJson); return logicProjectNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return physiTagScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return physiTableScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return physiProjectNodeToJson(pObj, pJson);
default: default:
break; break;
} }
......
...@@ -75,12 +75,24 @@ SNode* nodesMakeNode(ENodeType type) { ...@@ -75,12 +75,24 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SScanLogicNode)); return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return makeNode(type, sizeof(SJoinLogicNode)); return makeNode(type, sizeof(SJoinLogicNode));
case QUERY_NODE_LOGIC_PLAN_FILTER:
return makeNode(type, sizeof(SFilterLogicNode));
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return makeNode(type, sizeof(SAggLogicNode)); return makeNode(type, sizeof(SAggLogicNode));
case QUERY_NODE_LOGIC_PLAN_PROJECT: case QUERY_NODE_LOGIC_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectLogicNode)); return makeNode(type, sizeof(SProjectLogicNode));
case QUERY_NODE_COLUMN_REF:
return makeNode(type, sizeof(SColumnRefNode));
case QUERY_NODE_TARGET:
return makeNode(type, sizeof(STargetNode));
case QUERY_NODE_TUPLE_DESC:
return makeNode(type, sizeof(STupleDescNode));
case QUERY_NODE_SLOT_DESC:
return makeNode(type, sizeof(SSlotDescNode));
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return makeNode(type, sizeof(STagScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return makeNode(type, sizeof(STableScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
default: default:
break; break;
} }
...@@ -184,29 +196,29 @@ void nodesDestroyList(SNodeList* pList) { ...@@ -184,29 +196,29 @@ void nodesDestroyList(SNodeList* pList) {
tfree(pList); tfree(pList);
} }
void *nodesGetValueFromNode(SValueNode *pNode) { void* nodesGetValueFromNode(SValueNode *pNode) {
switch (pNode->node.resType.type) { switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
return (void *)&pNode->datum.b; return (void*)&pNode->datum.b;
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
return (void *)&pNode->datum.i; return (void*)&pNode->datum.i;
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
return (void *)&pNode->datum.u; return (void*)&pNode->datum.u;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
return (void *)&pNode->datum.d; return (void*)&pNode->datum.d;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
return (void *)pNode->datum.p; return (void*)pNode->datum.p;
default: default:
break; break;
} }
......
...@@ -500,22 +500,24 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) { ...@@ -500,22 +500,24 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
} }
static int32_t trimStringCopy(const char* src, int32_t len, char* dst) { static int32_t trimStringCopy(const char* src, int32_t len, char* dst) {
varDataSetLen(dst, len);
char* dstVal = varDataVal(dst);
// delete escape character: \\, \', \" // delete escape character: \\, \', \"
char delim = src[0]; char delim = src[0];
int32_t cnt = 0; int32_t cnt = 0;
int32_t j = 0; int32_t j = 0;
for (uint32_t k = 1; k < len - 1; ++k) { for (uint32_t k = 1; k < len - 1; ++k) {
if (src[k] == '\\' || (src[k] == delim && src[k + 1] == delim)) { if (src[k] == '\\' || (src[k] == delim && src[k + 1] == delim)) {
dst[j] = src[k + 1]; dstVal[j] = src[k + 1];
cnt++; cnt++;
j++; j++;
k++; k++;
continue; continue;
} }
dst[j] = src[k]; dstVal[j] = src[k];
j++; j++;
} }
dst[j] = '\0'; dstVal[j] = '\0';
return j; return j;
} }
...@@ -560,7 +562,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { ...@@ -560,7 +562,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: { case TSDB_DATA_TYPE_VARBINARY: {
int32_t n = strlen(pVal->literal); int32_t n = strlen(pVal->literal);
pVal->datum.p = calloc(1, n); pVal->datum.p = calloc(1, n + VARSTR_HEADER_SIZE);
if (NULL == pVal->datum.p) { if (NULL == pVal->datum.p) {
generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
#include "planner.h" #include "planner.h"
int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode); int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode);
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#define CHECK_ALLOC(p, res) \ #define CHECK_ALLOC(p, res) \
do { \ do { \
if (NULL == (p)) { \ if (NULL == (p)) { \
printf("%s : %d\n", __FUNCTION__, __LINE__); \
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \ pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
return (res); \ return (res); \
} \ } \
...@@ -29,7 +28,6 @@ ...@@ -29,7 +28,6 @@
do { \ do { \
int32_t code = (exec); \ int32_t code = (exec); \
if (TSDB_CODE_SUCCESS != code) { \ if (TSDB_CODE_SUCCESS != code) { \
printf("%s : %d\n", __FUNCTION__, __LINE__); \
pCxt->errCode = code; \ pCxt->errCode = code; \
return (res); \ return (res); \
} \ } \
...@@ -38,7 +36,6 @@ ...@@ -38,7 +36,6 @@
typedef struct SPlanContext { typedef struct SPlanContext {
int32_t errCode; int32_t errCode;
int32_t planNodeId; int32_t planNodeId;
SNodeList* pResource;
} SPlanContext; } SPlanContext;
static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt); static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt);
...@@ -60,10 +57,7 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { ...@@ -60,10 +57,7 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
FOREACH(pExpr, pCxt->pExprs) { FOREACH(pExpr, pCxt->pExprs) {
if (nodesEqualNode(pExpr, *pNode)) { if (nodesEqualNode(pExpr, *pNode)) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { CHECK_ALLOC(pCol, DEAL_RES_ERROR);
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode); SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode);
pCol->node.resType = pToBeRewrittenExpr->resType; pCol->node.resType = pToBeRewrittenExpr->resType;
strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName); strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName);
...@@ -222,26 +216,6 @@ static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSele ...@@ -222,26 +216,6 @@ static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSele
return NULL; return NULL;
} }
static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SLogicNode* pChild, SSelectStmt* pSelect) {
if (NULL == pSelect->pWhere) {
return NULL;
}
SFilterLogicNode* pFilter = (SFilterLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILTER);
CHECK_ALLOC(pFilter, NULL);
pFilter->node.id = pCxt->planNodeId++;
// set filter conditions
pFilter->node.pConditions = nodesCloneNode(pSelect->pWhere);
CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter);
// set the output
pFilter->node.pTargets = nodesCloneList(pChild->pTargets);
CHECK_ALLOC(pFilter->node.pTargets, (SLogicNode*)pFilter);
return (SLogicNode*)pFilter;
}
typedef struct SCreateColumnCxt { typedef struct SCreateColumnCxt {
int32_t errCode; int32_t errCode;
SNodeList* pList; SNodeList* pList;
...@@ -252,10 +226,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { ...@@ -252,10 +226,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: { case QUERY_NODE_COLUMN: {
SNode* pCol = nodesCloneNode(pNode); SNode* pCol = nodesCloneNode(pNode);
if (NULL == pCol || TSDB_CODE_SUCCESS != nodesListAppend(pCxt->pList, pCol)) { CHECK_ALLOC(pCol, DEAL_RES_ERROR);
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; CHECK_CODE(nodesListAppend(pCxt->pList, pCol), DEAL_RES_ERROR);
return DEAL_RES_ERROR;
}
return DEAL_RES_IGNORE_CHILD; return DEAL_RES_IGNORE_CHILD;
} }
case QUERY_NODE_OPERATOR: case QUERY_NODE_OPERATOR:
...@@ -263,16 +235,10 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { ...@@ -263,16 +235,10 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
case QUERY_NODE_FUNCTION: { case QUERY_NODE_FUNCTION: {
SExprNode* pExpr = (SExprNode*)pNode; SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { CHECK_ALLOC(pCol, DEAL_RES_ERROR);
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
pCol->node.resType = pExpr->resType; pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName); strcpy(pCol->colName, pExpr->aliasName);
if (TSDB_CODE_SUCCESS != nodesListAppend(pCxt->pList, (SNode*)pCol)) { CHECK_CODE(nodesListAppend(pCxt->pList, (SNode*)pCol), DEAL_RES_ERROR);
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_IGNORE_CHILD; return DEAL_RES_IGNORE_CHILD;
} }
default: default:
...@@ -284,9 +250,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { ...@@ -284,9 +250,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
static SNodeList* createColumnByRewriteExps(SPlanContext* pCxt, SNodeList* pExprs) { static SNodeList* createColumnByRewriteExps(SPlanContext* pCxt, SNodeList* pExprs) {
SCreateColumnCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pList = nodesMakeList() }; SCreateColumnCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pList = nodesMakeList() };
if (NULL == cxt.pList) { CHECK_ALLOC(cxt.pList, NULL);
return NULL;
}
nodesWalkList(pExprs, doCreateColumn, &cxt); nodesWalkList(pExprs, doCreateColumn, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) { if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pList); nodesDestroyList(cxt.pList);
...@@ -379,8 +344,9 @@ static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSele ...@@ -379,8 +344,9 @@ static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSele
static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) { static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable); SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable);
if (TSDB_CODE_SUCCESS == pCxt->errCode) { if (TSDB_CODE_SUCCESS == pCxt->errCode && NULL != pSelect->pWhere) {
pRoot = pushLogicNode(pCxt, pRoot, createWhereFilterLogicNode(pCxt, pRoot, pSelect)); pRoot->pConditions = nodesCloneNode(pSelect->pWhere);
CHECK_ALLOC(pRoot->pConditions, pRoot);
} }
if (TSDB_CODE_SUCCESS == pCxt->errCode) { if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect)); pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));
...@@ -410,3 +376,300 @@ int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) { ...@@ -410,3 +376,300 @@ int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) {
*pLogicNode = pRoot; *pLogicNode = pRoot;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t optimize(SLogicNode* pLogicNode) {
// todo
return TSDB_CODE_SUCCESS;
}
typedef struct SSubLogicPlan {
SNode* pRoot; // SLogicNode
bool haveSuperTable;
bool haveSystemTable;
} SSubLogicPlan;
int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) {
// todo
return TSDB_CODE_SUCCESS;
}
typedef struct SSlotIndex {
int16_t tupleId;
int16_t slotId;
} SSlotIndex;
typedef struct SPhysiPlanContext {
int32_t errCode;
int16_t nextTupleId;
SArray* pTupleHelper;
} SPhysiPlanContext;
static int32_t getSlotKey(SNode* pNode, char* pKey) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
return sprintf(pKey, "%s.%s", ((SColumnNode*)pNode)->tableAlias, ((SColumnNode*)pNode)->colName);
} else {
return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}
}
static SNode* createColumnRef(SNode* pNode, int16_t tupleId, int16_t slotId) {
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol) {
return NULL;
}
pCol->dataType = ((SExprNode*)pNode)->resType;
pCol->tupleId = tupleId;
pCol->slotId = slotId;
pCol->columnId = (QUERY_NODE_COLUMN == nodeType(pNode) ? ((SColumnNode*)pNode)->colId : -1);
return (SNode*)pCol;
}
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) {
SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
CHECK_ALLOC(pSlot, NULL);
pSlot->slotId = slotId;
pSlot->dataType = ((SExprNode*)pNode)->resType;
pSlot->srcTupleId = -1;
pSlot->srcSlotId = -1;
pSlot->reserve = false;
pSlot->output = true;
return (SNode*)pSlot;
}
static SNode* createTarget(SNode* pNode, int16_t tupleId, int16_t slotId) {
STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
if (NULL == pTarget) {
return NULL;
}
pTarget->tupleId = tupleId;
pTarget->slotId = slotId;
pTarget->pExpr = nodesCloneNode(pNode);
if (NULL == pTarget->pExpr) {
nodesDestroyNode((SNode*)pTarget);
return NULL;
}
return (SNode*)pTarget;
}
static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDescNode* pTuple, SNodeList** pOutput) {
pTuple->tupleId = pCxt->nextTupleId++;
SHashObj* pHash = NULL;
if (NULL == pTuple->pSlots) {
pTuple->pSlots = nodesMakeList();
CHECK_ALLOC(pTuple->pSlots, TSDB_CODE_OUT_OF_MEMORY);
pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
if (NULL == taosArrayInsert(pCxt->pTupleHelper, pTuple->tupleId, &pHash)) {
taosHashCleanup(pHash);
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId);
}
*pOutput = nodesMakeList();
CHECK_ALLOC(*pOutput, TSDB_CODE_OUT_OF_MEMORY);
SNode* pNode = NULL;
int16_t slotId = 0;
FOREACH(pNode, pList) {
SNode* pSlot = createSlotDesc(pCxt, pNode, slotId);
CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY);
if (TSDB_CODE_SUCCESS != nodesListAppend(pTuple->pSlots, (SNode*)pSlot)) {
nodesDestroyNode(pSlot);
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pTarget = createTarget(pNode, pTuple->tupleId, slotId);
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
if (TSDB_CODE_SUCCESS != nodesListAppend(*pOutput, pTarget)) {
nodesDestroyNode(pTarget);
return TSDB_CODE_OUT_OF_MEMORY;
}
SSlotIndex index = { .tupleId = pTuple->tupleId, .slotId = slotId };
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(pNode, name);
CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY);
++slotId;
}
return TSDB_CODE_SUCCESS;
}
typedef struct STransformCxt {
int32_t errCode;
SHashObj* pHash;
} STransformCxt;
static EDealRes doTransform(SNode** pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
STransformCxt* pCxt = (STransformCxt*)pContext;
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(*pNode, name);
SSlotIndex* pIndex = taosHashGet(pCxt->pHash, name, len);
if (NULL != pIndex) {
*pNode = createColumnRef(*pNode, pIndex->tupleId, pIndex->slotId);
CHECK_ALLOC(*pNode, DEAL_RES_ERROR);
return DEAL_RES_IGNORE_CHILD;
}
}
return DEAL_RES_CONTINUE;
}
static SNode* transformForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNode* pNode) {
SNode* pRes = nodesCloneNode(pNode);
CHECK_ALLOC(pRes, NULL);
STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) };
nodesRewriteNode(&pRes, doTransform, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyNode(pRes);
return NULL;
}
return pRes;
}
static SNodeList* transformListForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNodeList* pList) {
SNodeList* pRes = nodesCloneList(pList);
CHECK_ALLOC(pRes, NULL);
STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) };
nodesRewriteList(pRes, doTransform, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(pRes);
return NULL;
}
return pRes;
}
static SPhysiNode* makePhysiNode(ENodeType type) {
SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
if (NULL == pPhysiNode) {
return NULL;
}
pPhysiNode->outputTuple.type = QUERY_NODE_TUPLE_DESC;
return pPhysiNode;
}
static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) {
CHECK_CODE(addTupleDesc(pCxt, pScanLogicNode->pScanCols, &pScanPhysiNode->node.outputTuple, &pScanPhysiNode->pScanCols), TSDB_CODE_OUT_OF_MEMORY);
if (NULL != pScanLogicNode->node.pConditions) {
pScanPhysiNode->node.pConditions = transformForPhysiPlan(pCxt, pScanPhysiNode->node.outputTuple.tupleId, pScanLogicNode->node.pConditions);
CHECK_ALLOC(pScanPhysiNode->node.pConditions, TSDB_CODE_OUT_OF_MEMORY);
}
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
pScanPhysiNode->order = TSDB_ORDER_ASC;
pScanPhysiNode->count = 1;
pScanPhysiNode->reverse = 0;
return TSDB_CODE_SUCCESS;
}
static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
CHECK_ALLOC(pTagScan, NULL);
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan), (SPhysiNode*)pTagScan);
return (SPhysiNode*)pTagScan;
}
static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
CHECK_ALLOC(pTableScan, NULL);
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
pTableScan->scanFlag = pScanLogicNode->scanFlag;
pTableScan->scanRange = pScanLogicNode->scanRange;
return (SPhysiNode*)pTableScan;
}
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
switch (pScanLogicNode->scanType) {
case SCAN_TYPE_TAG:
return createTagScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_TABLE:
return createTableScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_STABLE:
case SCAN_TYPE_STREAM:
break;
default:
break;
}
}
static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SProjectLogicNode* pProjectLogicNode) {
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_PROJECT);
CHECK_ALLOC(pProject, NULL);
SNodeList* pProjections = transformListForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->pProjections);
CHECK_ALLOC(pProjections, (SPhysiNode*)pProject);
CHECK_CODE(addTupleDesc(pCxt, pProjections, &pProject->node.outputTuple, &pProject->pProjections), (SPhysiNode*)pProject);
nodesDestroyList(pProjections);
if (NULL != pProjectLogicNode->node.pConditions) {
pProject->node.pConditions = transformForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->node.pConditions);
CHECK_ALLOC(pProject->node.pConditions, (SPhysiNode*)pProject);
}
return (SPhysiNode*)pProject;
}
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPlan) {
SNodeList* pChildern = nodesMakeList();
CHECK_ALLOC(pChildern, NULL);
SNode* pLogicChild;
FOREACH(pLogicChild, pLogicPlan->pChildren) {
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, (SLogicNode*)pLogicChild);
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildern, pChildPhyNode)) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
nodesDestroyList(pChildern);
return NULL;
}
}
SPhysiNode* pPhyNode = NULL;
switch (nodeType(pLogicPlan)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
pPhyNode = createScanPhysiNode(pCxt, (SScanLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
break;
case QUERY_NODE_LOGIC_PLAN_AGG:
break;
case QUERY_NODE_LOGIC_PLAN_PROJECT:
pPhyNode = createProjectPhysiNode(pCxt, (SProjectLogicNode*)pLogicPlan);
break;
default:
break;
}
if (NULL != pPhyNode) {
pPhyNode->pChildren = pChildern;
SNode* pChild;
FOREACH(pChild, pPhyNode->pChildren) {
((SPhysiNode*)pChild)->pParent = pPhyNode;
}
}
return pPhyNode;
}
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextTupleId = 0, .pTupleHelper = taosArrayInit(32, POINTER_BYTES) };
if (NULL == cxt.pTupleHelper) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*pPhyNode = createPhysiNode(&cxt, pLogicNode);
return cxt.errCode;
}
int32_t buildPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
// split
// scale out
// maping
// create
return TSDB_CODE_SUCCESS;
}
...@@ -25,6 +25,11 @@ using namespace testing; ...@@ -25,6 +25,11 @@ using namespace testing;
class NewPlannerTest : public Test { class NewPlannerTest : public Test {
protected: protected:
enum TestTarget {
TEST_LOGIC_PLAN,
TEST_PHYSICAL_PLAN
};
void setDatabase(const string& acctId, const string& db) { void setDatabase(const string& acctId, const string& db) {
acctId_ = acctId; acctId_ = acctId;
db_ = db; db_ = db;
...@@ -40,7 +45,7 @@ protected: ...@@ -40,7 +45,7 @@ protected:
cxt_.pSql = sqlBuf_.c_str(); cxt_.pSql = sqlBuf_.c_str();
} }
bool run() { bool run(TestTarget target = TEST_PHYSICAL_PLAN) {
int32_t code = parser(&cxt_, &query_); int32_t code = parser(&cxt_, &query_);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -53,17 +58,27 @@ protected: ...@@ -53,17 +58,27 @@ protected:
SLogicNode* pLogicPlan = nullptr; SLogicNode* pLogicPlan = nullptr;
code = createLogicPlan(query_.pRoot, &pLogicPlan); code = createLogicPlan(query_.pRoot, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] plan code:" << code << ", strerror:" << tstrerror(code) << endl; cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false; return false;
} }
cout << "sql : [" << cxt_.pSql << "]" << endl; cout << "sql : [" << cxt_.pSql << "]" << endl;
cout << "syntax test : " << endl; cout << "syntax test : " << endl;
cout << syntaxTreeStr << endl; cout << syntaxTreeStr << endl;
// cout << "logic plan : " << endl;
// cout << toString((const SNode*)pLogicPlan) << endl;
cout << "unformatted logic plan : " << endl; cout << "unformatted logic plan : " << endl;
cout << toString((const SNode*)pLogicPlan, false) << endl; cout << toString((const SNode*)pLogicPlan, false) << endl;
if (TEST_PHYSICAL_PLAN == target) {
SPhysiNode* pPhyPlan = nullptr;
code = createPhysiPlan(pLogicPlan, &pPhyPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
}
cout << "unformatted physical plan : " << endl;
cout << toString((const SNode*)pPhyPlan, false) << endl;
}
return true; return true;
} }
...@@ -120,3 +135,10 @@ TEST_F(NewPlannerTest, groupBy) { ...@@ -120,3 +135,10 @@ TEST_F(NewPlannerTest, groupBy) {
bind("SELECT c1 + c3, count(*) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3"); bind("SELECT c1 + c3, count(*) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
TEST_F(NewPlannerTest, subquery) {
setDatabase("root", "test");
bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b");
ASSERT_TRUE(run());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册