提交 709dda39 编写于 作者: X Xiaoyu Wang

TD-13037 distinct, state window and partition by plan implement

上级 17a3b4cc
...@@ -163,6 +163,8 @@ typedef enum ENodeType { ...@@ -163,6 +163,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_SUBPLAN, QUERY_NODE_PHYSICAL_SUBPLAN,
......
...@@ -104,6 +104,7 @@ typedef struct SWindowLogicNode { ...@@ -104,6 +104,7 @@ typedef struct SWindowLogicNode {
SFillNode* pFill; SFillNode* pFill;
int64_t sessionGap; int64_t sessionGap;
SNode* pTspk; SNode* pTspk;
SNode* pStateExpr;
} SWindowLogicNode; } SWindowLogicNode;
typedef struct SSortLogicNode { typedef struct SSortLogicNode {
...@@ -194,11 +195,20 @@ typedef struct SSystemTableScanPhysiNode { ...@@ -194,11 +195,20 @@ typedef struct SSystemTableScanPhysiNode {
int32_t accountId; int32_t accountId;
} SSystemTableScanPhysiNode; } SSystemTableScanPhysiNode;
typedef enum EScanRequired {
SCAN_REQUIRED_DATA_NO_NEEDED = 1,
SCAN_REQUIRED_DATA_STATIS_NEEDED,
SCAN_REQUIRED_DATA_ALL_NEEDED,
SCAN_REQUIRED_DATA_DISCARD,
} EScanRequired;
typedef struct STableScanPhysiNode { typedef struct STableScanPhysiNode {
SScanPhysiNode scan; SScanPhysiNode scan;
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange; STimeWindow scanRange;
double ratio; double ratio;
EScanRequired scanRequired;
SNodeList* pScanReferFuncs;
} STableScanPhysiNode; } STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode;
...@@ -257,17 +267,33 @@ typedef struct SIntervalPhysiNode { ...@@ -257,17 +267,33 @@ typedef struct SIntervalPhysiNode {
SFillNode* pFill; SFillNode* pFill;
} SIntervalPhysiNode; } SIntervalPhysiNode;
typedef struct SMultiTableIntervalPhysiNode {
SIntervalPhysiNode interval;
SNodeList* pPartitionKeys;
} SMultiTableIntervalPhysiNode;
typedef struct SSessionWinodwPhysiNode { typedef struct SSessionWinodwPhysiNode {
SWinodwPhysiNode window; SWinodwPhysiNode window;
int64_t gap; int64_t gap;
} SSessionWinodwPhysiNode; } SSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode {
SWinodwPhysiNode window;
SNode* pStateKey;
} SStateWinodwPhysiNode;
typedef struct SSortPhysiNode { typedef struct SSortPhysiNode {
SPhysiNode node; SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
} SSortPhysiNode; } SSortPhysiNode;
typedef struct SPartitionPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of partition_by_clause
SNodeList* pPartitionKeys;
} SPartitionPhysiNode;
typedef struct SDataSinkNode { typedef struct SDataSinkNode {
ENodeType type; ENodeType type;
SDataBlockDescNode* pInputDataBlockDesc; SDataBlockDescNode* pInputDataBlockDesc;
......
...@@ -188,7 +188,7 @@ typedef struct SLimitNode { ...@@ -188,7 +188,7 @@ typedef struct SLimitNode {
typedef struct SStateWindowNode { typedef struct SStateWindowNode {
ENodeType type; // QUERY_NODE_STATE_WINDOW ENodeType type; // QUERY_NODE_STATE_WINDOW
SNode* pCol; SNode* pExpr;
} SStateWindowNode; } SStateWindowNode;
typedef struct SSessionWindowNode { typedef struct SSessionWindowNode {
......
...@@ -483,6 +483,9 @@ int32_t* taosGetErrno(); ...@@ -483,6 +483,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME TAOS_DEF_ERROR_CODE(0, 0x2617) #define TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME TAOS_DEF_ERROR_CODE(0, 0x2617)
#define TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR TAOS_DEF_ERROR_CODE(0, 0x2618) #define TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR TAOS_DEF_ERROR_CODE(0, 0x2618)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -283,6 +283,12 @@ static SNode* logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { ...@@ -283,6 +283,12 @@ static SNode* logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
return (SNode*)pDst; return (SNode*)pDst;
} }
static SNode* logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pPartitionKeys);
return (SNode*)pDst;
}
static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) { static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) {
CLONE_NODE_FIELD(pNode); CLONE_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(subplanType); COPY_SCALAR_FIELD(subplanType);
...@@ -367,6 +373,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { ...@@ -367,6 +373,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst); return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
return logicSortCopy((const SSortLogicNode*)pNode, (SSortLogicNode*)pDst); return logicSortCopy((const SSortLogicNode*)pNode, (SSortLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return logicPartitionCopy((const SPartitionLogicNode*)pNode, (SPartitionLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN: case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst); return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst);
default: default:
......
...@@ -198,6 +198,10 @@ const char* nodesNodeName(ENodeType type) { ...@@ -198,6 +198,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiInterval"; return "PhysiInterval";
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return "PhysiSessionWindow"; return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return "PhysiPartition";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch"; return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:
...@@ -1147,6 +1151,61 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) { ...@@ -1147,6 +1151,61 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) {
return code; return code;
} }
static const char* jkStateWindowPhysiPlanStateKey = "StateKey";
static int32_t physiStateWindowNodeToJson(const void* pObj, SJson* pJson) {
const SStateWinodwPhysiNode* pNode = (const SStateWinodwPhysiNode*)pObj;
int32_t code = physiWindowNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkStateWindowPhysiPlanStateKey, nodeToJson, pNode->pStateKey);
}
return code;
}
static int32_t jsonToPhysiStateWindowNode(const SJson* pJson, void* pObj) {
SStateWinodwPhysiNode* pNode = (SStateWinodwPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkStateWindowPhysiPlanStateKey, &pNode->pStateKey);
}
return code;
}
static const char* jkPartitionPhysiPlanExprs = "Exprs";
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) {
const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkPartitionPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkPartitionPhysiPlanPartitionKeys, pNode->pPartitionKeys);
}
return code;
}
static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) {
SPartitionPhysiNode* pNode = (SPartitionPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkPartitionPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkPartitionPhysiPlanPartitionKeys, &pNode->pPartitionKeys);
}
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc"; static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) { static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
...@@ -2420,6 +2479,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -2420,6 +2479,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiIntervalNodeToJson(pObj, pJson); return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return physiSessionWindowNodeToJson(pObj, pJson); return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson); return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:
...@@ -2512,6 +2575,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -2512,6 +2575,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiIntervalNode(pJson, pObj); return jsonToPhysiIntervalNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return jsonToPhysiSessionWindowNode(pJson, pObj); return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj); return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN: case QUERY_NODE_PHYSICAL_SUBPLAN:
......
...@@ -78,7 +78,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker ...@@ -78,7 +78,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
res = walkNode(((SOrderByExprNode*)pNode)->pExpr, order, walker, pContext); res = walkNode(((SOrderByExprNode*)pNode)->pExpr, order, walker, pContext);
break; break;
case QUERY_NODE_STATE_WINDOW: case QUERY_NODE_STATE_WINDOW:
res = walkNode(((SStateWindowNode*)pNode)->pCol, order, walker, pContext); res = walkNode(((SStateWindowNode*)pNode)->pExpr, order, walker, pContext);
break; break;
case QUERY_NODE_SESSION_WINDOW: { case QUERY_NODE_SESSION_WINDOW: {
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
...@@ -212,7 +212,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit ...@@ -212,7 +212,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
res = rewriteNode(&(((SOrderByExprNode*)pNode)->pExpr), order, rewriter, pContext); res = rewriteNode(&(((SOrderByExprNode*)pNode)->pExpr), order, rewriter, pContext);
break; break;
case QUERY_NODE_STATE_WINDOW: case QUERY_NODE_STATE_WINDOW:
res = rewriteNode(&(((SStateWindowNode*)pNode)->pCol), order, rewriter, pContext); res = rewriteNode(&(((SStateWindowNode*)pNode)->pExpr), order, rewriter, pContext);
break; break;
case QUERY_NODE_SESSION_WINDOW: case QUERY_NODE_SESSION_WINDOW:
res = rewriteNode(&(((SSessionWindowNode*)pNode)->pCol), order, rewriter, pContext); res = rewriteNode(&(((SSessionWindowNode*)pNode)->pCol), order, rewriter, pContext);
...@@ -301,10 +301,9 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa ...@@ -301,10 +301,9 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa
case SQL_CLAUSE_GROUP_BY: case SQL_CLAUSE_GROUP_BY:
nodesWalkExpr(pSelect->pHaving, walker, pContext); nodesWalkExpr(pSelect->pHaving, walker, pContext);
case SQL_CLAUSE_HAVING: case SQL_CLAUSE_HAVING:
nodesWalkExprs(pSelect->pOrderByList, walker, pContext);
case SQL_CLAUSE_ORDER_BY:
nodesWalkExprs(pSelect->pProjectionList, walker, pContext); nodesWalkExprs(pSelect->pProjectionList, walker, pContext);
case SQL_CLAUSE_SELECT: case SQL_CLAUSE_SELECT:
nodesWalkExprs(pSelect->pOrderByList, walker, pContext);
default: default:
break; break;
} }
......
...@@ -169,6 +169,8 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -169,6 +169,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SWindowLogicNode)); return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
return makeNode(type, sizeof(SSortLogicNode)); return makeNode(type, sizeof(SSortLogicNode));
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN: case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SLogicSubplan)); return makeNode(type, sizeof(SLogicSubplan));
case QUERY_NODE_LOGIC_PLAN: case QUERY_NODE_LOGIC_PLAN:
...@@ -197,6 +199,10 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -197,6 +199,10 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SIntervalPhysiNode)); return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return makeNode(type, sizeof(SSessionWinodwPhysiNode)); return makeNode(type, sizeof(SSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return makeNode(type, sizeof(SDataDispatcherNode)); return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:
...@@ -302,7 +308,7 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -302,7 +308,7 @@ void nodesDestroyNode(SNodeptr pNode) {
case QUERY_NODE_LIMIT: // no pointer field case QUERY_NODE_LIMIT: // no pointer field
break; break;
case QUERY_NODE_STATE_WINDOW: case QUERY_NODE_STATE_WINDOW:
nodesDestroyNode(((SStateWindowNode*)pNode)->pCol); nodesDestroyNode(((SStateWindowNode*)pNode)->pExpr);
break; break;
case QUERY_NODE_SESSION_WINDOW: { case QUERY_NODE_SESSION_WINDOW: {
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
......
...@@ -103,7 +103,7 @@ SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft ...@@ -103,7 +103,7 @@ SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft
SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset); SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset);
SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder); SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap); SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol); SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill); SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill);
SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues); SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues);
SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode); SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode);
......
...@@ -716,7 +716,7 @@ partition_by_clause_opt(A) ::= PARTITION BY expression_list(B). ...@@ -716,7 +716,7 @@ partition_by_clause_opt(A) ::= PARTITION BY expression_list(B).
twindow_clause_opt(A) ::= . { A = NULL; } twindow_clause_opt(A) ::= . { A = NULL; }
twindow_clause_opt(A) ::= twindow_clause_opt(A) ::=
SESSION NK_LP column_reference(B) NK_COMMA duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); } SESSION NK_LP column_reference(B) NK_COMMA duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
twindow_clause_opt(A) ::= STATE_WINDOW NK_LP column_reference(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); } twindow_clause_opt(A) ::= STATE_WINDOW NK_LP expression(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); }
twindow_clause_opt(A) ::= twindow_clause_opt(A) ::=
INTERVAL NK_LP duration_literal(B) NK_RP sliding_opt(C) fill_opt(D). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL, C, D); } INTERVAL NK_LP duration_literal(B) NK_RP sliding_opt(C) fill_opt(D). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL, C, D); }
twindow_clause_opt(A) ::= twindow_clause_opt(A) ::=
......
...@@ -730,10 +730,10 @@ SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap ...@@ -730,10 +730,10 @@ SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap
return (SNode*)session; return (SNode*)session;
} }
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol) { SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
SStateWindowNode* state = (SStateWindowNode*)nodesMakeNode(QUERY_NODE_STATE_WINDOW); SStateWindowNode* state = (SStateWindowNode*)nodesMakeNode(QUERY_NODE_STATE_WINDOW);
CHECK_OUT_OF_MEM(state); CHECK_OUT_OF_MEM(state);
state->pCol = pCol; state->pExpr = pExpr;
return (SNode*)state; return (SNode*)state;
} }
......
此差异已折叠。
...@@ -477,6 +477,18 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm ...@@ -477,6 +477,18 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
return code; return code;
} }
static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindowNode* pState, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
if (NULL == pWindow) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pWindow->winType = WINDOW_TYPE_STATE;
pWindow->pStateExpr = nodesCloneNode(pState->pExpr);
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionWindowNode* pSession, SSelectStmt* pSelect, SLogicNode** pLogicNode) { static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionWindowNode* pSession, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW); SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
if (NULL == pWindow) { if (NULL == pWindow) {
...@@ -525,6 +537,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele ...@@ -525,6 +537,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
} }
switch (nodeType(pSelect->pWindow)) { switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW:
return createWindowLogicNodeByState(pCxt, (SStateWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_SESSION_WINDOW: case QUERY_NODE_SESSION_WINDOW:
return createWindowLogicNodeBySession(pCxt, (SSessionWindowNode*)pSelect->pWindow, pSelect, pLogicNode); return createWindowLogicNodeBySession(pCxt, (SSessionWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_INTERVAL_WINDOW: case QUERY_NODE_INTERVAL_WINDOW:
...@@ -642,6 +656,29 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS ...@@ -642,6 +656,29 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SNodeList* pCols = NULL;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, &pCols);
if (TSDB_CODE_SUCCESS == code && NULL != pCols) {
pPartition->node.pTargets = nodesCloneList(pCols);
if (NULL == pPartition->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
pPartition->pPartitionKeys = nodesCloneList(pSelect->pPartitionByList);
if (NULL == pPartition->pPartitionKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pPartition;
} else {
nodesDestroyNode(pPartition);
}
return code;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -650,7 +687,35 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe ...@@ -650,7 +687,35 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return TSDB_CODE_SUCCESS; SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
// set grouyp keys, agg funcs and having conditions
pAgg->pGroupKeys = nodesCloneList(pSelect->pProjectionList);
if (NULL == pAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_SELECT);
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pAgg;
} else {
nodesDestroyNode(pAgg);
}
return code;
} }
static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
......
...@@ -176,6 +176,10 @@ static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) { ...@@ -176,6 +176,10 @@ static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
} }
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) { static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) {
if (NULL == pList) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0; int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0;
...@@ -219,6 +223,23 @@ static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDat ...@@ -219,6 +223,23 @@ static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDat
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false); return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false);
} }
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
if (NULL == pNode || NULL == *pNode) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pList = NULL;
int32_t code = nodesListMakeAppend(&pList, *pNode);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
*pNode = nodesListGetNode(pList, 0);
}
nodesClearList(pList);
return code;
}
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true); return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true);
} }
...@@ -244,6 +265,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { ...@@ -244,6 +265,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
} }
// pIndex is definitely not NULL, otherwise it is a bug // pIndex is definitely not NULL, otherwise it is a bug
if (NULL == pIndex) { if (NULL == pIndex) {
pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId; ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
...@@ -614,6 +636,25 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN ...@@ -614,6 +636,25 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN
return cxt.errCode; return cxt.errCode;
} }
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs, SNode** pRewritten) {
if (NULL == pNode) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pList = NULL;
int32_t code = nodesListMakeAppend(&pList, pNode);
SNodeList* pRewrittenList = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
}
if (TSDB_CODE_SUCCESS == code) {
*pRewritten = nodesListGetNode(pRewrittenList, 0);
}
nodesClearList(pList);
nodesClearList(pRewrittenList);
return code;
}
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode) { static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode) {
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG); SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG);
if (NULL == pAgg) { if (NULL == pAgg) {
...@@ -818,6 +859,40 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* ...@@ -818,6 +859,40 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode); return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
} }
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW);
if (NULL == pState) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNodeList* pPrecalcExprs = NULL;
SNode* pStateKey = NULL;
int32_t code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pState);
return code;
}
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
}
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
switch (pWindowLogicNode->winType) { switch (pWindowLogicNode->winType) {
case WINDOW_TYPE_INTERVAL: case WINDOW_TYPE_INTERVAL:
...@@ -825,7 +900,7 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr ...@@ -825,7 +900,7 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
case WINDOW_TYPE_SESSION: case WINDOW_TYPE_SESSION:
return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode); return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_STATE: case WINDOW_TYPE_STATE:
break; return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
default: default:
break; break;
} }
...@@ -867,6 +942,41 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren ...@@ -867,6 +942,41 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return code; return code;
} }
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
if (NULL == pPart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNodeList* pPrecalcExprs = NULL;
SNodeList* pPartitionKeys = NULL;
int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pPart->pPartitionKeys, pPart->node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pPart;
} else {
nodesDestroyNode(pPart);
}
return code;
}
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan, SNodeList* pChildren, SPhysiNode** pPhyNode) { static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan, SNodeList* pChildren, SPhysiNode** pPhyNode) {
switch (nodeType(pLogicNode)) { switch (nodeType(pLogicNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
...@@ -883,6 +993,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode ...@@ -883,6 +993,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode); return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode); return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
default: default:
break; break;
} }
......
...@@ -217,6 +217,32 @@ TEST_F(PlannerTest, sessionWindow) { ...@@ -217,6 +217,32 @@ TEST_F(PlannerTest, sessionWindow) {
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
TEST_F(PlannerTest, stateWindow) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 state_window(c1)");
ASSERT_TRUE(run());
bind("SELECT count(*) FROM t1 state_window(c1 + 10)");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, partitionBy) {
setDatabase("root", "test");
bind("SELECT * FROM t1 partition by c1");
ASSERT_TRUE(run());
bind("SELECT count(*) FROM t1 partition by c1");
ASSERT_TRUE(run());
bind("SELECT count(*) FROM t1 partition by c1 group by c2");
ASSERT_TRUE(run());
bind("SELECT count(*) FROM st1 partition by tag1, tag2 interval(10s)");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, orderBy) { TEST_F(PlannerTest, orderBy) {
setDatabase("root", "test"); setDatabase("root", "test");
...@@ -230,6 +256,19 @@ TEST_F(PlannerTest, orderBy) { ...@@ -230,6 +256,19 @@ TEST_F(PlannerTest, orderBy) {
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
TEST_F(PlannerTest, distinct) {
setDatabase("root", "test");
bind("SELECT distinct c1 FROM t1");
ASSERT_TRUE(run());
bind("SELECT distinct c1, c2 + 10 FROM t1");
ASSERT_TRUE(run());
bind("SELECT distinct c1 + 10 a FROM t1 order by a");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, limit) { TEST_F(PlannerTest, limit) {
setDatabase("root", "test"); setDatabase("root", "test");
......
...@@ -432,6 +432,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status erro ...@@ -432,6 +432,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status erro
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order") TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order")
//planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "planner internal error")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
#endif #endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册