提交 8a883ea0 编写于 作者: X Xiaoyu Wang

TD-13990 interval plan implement

上级 e5286898
......@@ -80,9 +80,20 @@ typedef struct SExchangeLogicNode {
int32_t srcGroupId;
} SExchangeLogicNode;
typedef enum EWindowType {
WINDOW_TYPE_INTERVAL = 1,
WINDOW_TYPE_SESSION,
WINDOW_TYPE_STATE
} EWindowType;
typedef struct SWindowLogicNode {
SLogicNode node;
SNode* pWindow;
EWindowType winType;
SNodeList* pFuncs;
int64_t interval;
int64_t offset;
int64_t sliding;
SFillNode* pFill;
} SWindowLogicNode;
typedef enum ESubplanType {
......@@ -198,9 +209,11 @@ typedef struct SExchangePhysiNode {
typedef struct SIntervalPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
int64_t interval;
int64_t sliding;
int64_t offset;
int64_t sliding;
SFillNode* pFill;
} SIntervalPhysiNode;
......
......@@ -195,11 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
pRequest->type = pQuery->msgType;
SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId };
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
if (code != 0) {
return code;
}
return code;
return qCreateQueryPlan(&cxt, pPlan, pNodeList);
}
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
......
......@@ -184,6 +184,12 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
return (SNode*)pDst;
}
static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) {
COPY_SCALAR_FIELD(mode);
CLONE_NODE_FIELD(pValues);
return (SNode*)pDst;
}
static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD(id);
CLONE_NODE_LIST_FIELD(pTargets);
......@@ -249,6 +255,17 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
return (SNode*)pDst;
}
static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(winType);
CLONE_NODE_LIST_FIELD(pFuncs);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
CLONE_NODE_FIELD(pFill);
return (SNode*)pDst;
}
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
CLONE_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(subplanType);
......@@ -310,6 +327,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case QUERY_NODE_ORDER_BY_EXPR:
case QUERY_NODE_LIMIT:
break;
case QUERY_NODE_FILL:
return fillNodeCopy((const SFillNode*)pNode, (SFillNode*)pDst);
case QUERY_NODE_DATABLOCK_DESC:
return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst);
case QUERY_NODE_SLOT_DESC:
......@@ -326,6 +345,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
default:
......
......@@ -117,6 +117,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiExchange";
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return "PhysiInterval";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
......@@ -573,6 +575,65 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkIntervalPhysiPlanExprs = "Exprs";
static const char* jkIntervalPhysiPlanFuncs = "Funcs";
static const char* jkIntervalPhysiPlanInterval = "Interval";
static const char* jkIntervalPhysiPlanOffset = "Offset";
static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanFill = "Fill";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanFuncs, pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanInterval, pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanOffset, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
return code;
}
static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanFuncs, &pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanInterval, &pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanOffset, &pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
......@@ -1502,6 +1563,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
break;
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
......
......@@ -134,6 +134,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode));
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SSubLogicPlan));
case QUERY_NODE_LOGIC_PLAN:
......@@ -156,6 +158,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SExchangePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
......
......@@ -198,7 +198,7 @@ col_name(A) ::= column_name(B).
cmd ::= SHOW VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, NULL); }
cmd ::= SHOW db_name(B) NK_DOT VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, &B); }
/************************************************ show vgroups ********************************************************/
/************************************************ show mnodes *********************************************************/
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT, NULL); }
/************************************************ select **************************************************************/
......
......@@ -708,9 +708,17 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SNodeList* pGroupByList
return translateExprList(pCxt, pGroupByList);
}
static int32_t doTranslateWindow(STranslateContext* pCxt, SNode* pWindow) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateWindow(STranslateContext* pCxt, SNode* pWindow) {
pCxt->currClause = SQL_CLAUSE_WINDOW;
return translateExpr(pCxt, pWindow);
int32_t code = translateExpr(pCxt, pWindow);
if (TSDB_CODE_SUCCESS == code) {
code = doTranslateWindow(pCxt, pWindow);
}
return code;
}
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {
......
......@@ -183,6 +183,13 @@ TEST_F(ParserTest, selectClause) {
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectWindow) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectSyntaxError) {
setDatabase("root", "test");
......
......@@ -257,18 +257,6 @@ static SNodeList* createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList*
return cxt.pList;
}
static SLogicNode* createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) {
return NULL;
}
SWindowLogicNode* pAgg = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
CHECK_ALLOC(pAgg, NULL);
pAgg->node.id = pCxt->planNodeId++;
}
static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
SNodeList* pAggFuncs = NULL;
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL);
......@@ -316,6 +304,50 @@ static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
return (SLogicNode*)pAgg;
}
static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SIntervalWindowNode* pInterval, SSelectStmt* pSelect) {
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
CHECK_ALLOC(pWindow, NULL);
pWindow->node.id = pCxt->planNodeId++;
pWindow->winType = WINDOW_TYPE_INTERVAL;
pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i;
pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : 0);
if (NULL != pInterval->pFill) {
pWindow->pFill = nodesCloneNode(pInterval->pFill);
CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow);
}
SNodeList* pFuncs = NULL;
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pFuncs), NULL);
if (NULL != pFuncs) {
pWindow->pFuncs = nodesCloneList(pFuncs);
CHECK_ALLOC(pWindow->pFuncs, (SLogicNode*)pWindow);
}
CHECK_CODE(rewriteExpr(pWindow->node.id, 1, pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW), (SLogicNode*)pWindow);
pWindow->node.pTargets = createColumnByRewriteExps(pCxt, pWindow->pFuncs);
CHECK_ALLOC(pWindow->node.pTargets, (SLogicNode*)pWindow);
return (SLogicNode*)pWindow;
}
static SLogicNode* createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) {
return NULL;
}
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_INTERVAL_WINDOW:
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect);
default:
break;
}
return NULL;
}
static SNodeList* createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs) {
SNodeList* pList = nodesMakeList();
CHECK_ALLOC(pList, NULL);
......
......@@ -473,14 +473,58 @@ static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLog
return (SPhysiNode*)pExchange;
}
static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
CHECK_ALLOC(pInterval, NULL);
pInterval->interval = pWindowLogicNode->interval;
pInterval->offset = pWindowLogicNode->offset;
pInterval->sliding = pWindowLogicNode->sliding;
pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill);
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
CHECK_CODE(rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs), (SPhysiNode*)pInterval);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (NULL != pPrecalcExprs) {
pInterval->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs);
CHECK_ALLOC(pInterval->pExprs, (SPhysiNode*)pInterval);
CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pExprs, pChildTupe), (SPhysiNode*)pInterval);
}
if (NULL != pFuncs) {
pInterval->pFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs);
CHECK_ALLOC(pInterval->pFuncs, (SPhysiNode*)pInterval);
CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pFuncs, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval);
}
CHECK_CODE(setSlotOutput(pCxt, pWindowLogicNode->node.pTargets, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval);
return (SPhysiNode*)pInterval;
}
static SPhysiNode* createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
switch (pWindowLogicNode->winType) {
case WINDOW_TYPE_INTERVAL:
return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode);
case WINDOW_TYPE_SESSION:
case WINDOW_TYPE_STATE:
break;
default:
break;
}
return NULL;
}
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) {
SNodeList* pChildren = nodesMakeList();
CHECK_ALLOC(pChildren, NULL);
SNode* pLogicChild;
FOREACH(pLogicChild, pLogicPlan->pChildren) {
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild);
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) {
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pChildren, createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild))) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
nodesDestroyList(pChildren);
return NULL;
......@@ -504,6 +548,9 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
pPhyNode = createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
pPhyNode = createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicPlan);
break;
default:
break;
}
......
......@@ -166,3 +166,10 @@ TEST_F(PlannerTest, subquery) {
bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, interval) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册