提交 547ab702 编写于 作者: X Xiaoyu Wang

enh: last function optimize

上级 915e4e40
......@@ -205,6 +205,7 @@ typedef struct SWindowLogicNode {
int8_t igExpired;
EWindowAlgorithm windowAlgo;
EOrder inputTsOrder;
EOrder outputTsOrder;
} SWindowLogicNode;
typedef struct SFillLogicNode {
......@@ -213,6 +214,7 @@ typedef struct SFillLogicNode {
SNode* pWStartTs;
SNode* pValues; // SNodeListNode
STimeWindow timeRange;
EOrder inputTsOrder;
} SFillLogicNode;
typedef struct SSortLogicNode {
......@@ -411,6 +413,8 @@ typedef struct SWinodwPhysiNode {
int8_t triggerType;
int64_t watermark;
int8_t igExpired;
EOrder inputTsOrder;
EOrder outputTsOrder;
} SWinodwPhysiNode;
typedef struct SIntervalPhysiNode {
......@@ -435,6 +439,7 @@ typedef struct SFillPhysiNode {
SNode* pValues; // SNodeListNode
SNodeList* pTargets;
STimeWindow timeRange;
EOrder inputTsOrder;
} SFillPhysiNode;
typedef struct SMultiTableIntervalPhysiNode {
......
......@@ -443,6 +443,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(inputTsOrder);
COPY_SCALAR_FIELD(outputTsOrder);
return TSDB_CODE_SUCCESS;
}
......@@ -452,6 +453,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
CLONE_NODE_FIELD(pWStartTs);
CLONE_NODE_FIELD(pValues);
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
COPY_SCALAR_FIELD(inputTsOrder);
return TSDB_CODE_SUCCESS;
}
......
......@@ -1936,6 +1936,8 @@ static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark";
static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired";
static const char* jkWindowPhysiPlanInputTsOrder = "inputTsOrder";
static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
......@@ -1962,6 +1964,12 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanIgnoreExpired, pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanInputTsOrder, pNode->inputTsOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder);
}
return code;
}
......@@ -1991,6 +1999,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanIgnoreExpired, &pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanInputTsOrder, pNode->inputTsOrder, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder, code);
}
return code;
}
......@@ -2053,6 +2067,7 @@ static const char* jkFillPhysiPlanValues = "Values";
static const char* jkFillPhysiPlanTargets = "Targets";
static const char* jkFillPhysiPlanStartTime = "StartTime";
static const char* jkFillPhysiPlanEndTime = "EndTime";
static const char* jkFillPhysiPlanInputTsOrder = "inputTsOrder";
static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj;
......@@ -2076,6 +2091,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanEndTime, pNode->timeRange.ekey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanInputTsOrder, pNode->inputTsOrder);
}
return code;
}
......@@ -2103,6 +2121,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkFillPhysiPlanEndTime, &pNode->timeRange.ekey);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkFillPhysiPlanInputTsOrder, pNode->inputTsOrder, code);
}
return code;
}
......
......@@ -632,6 +632,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
pWindow->igExpired = pCxt->pPlanCxt->igExpired;
}
pWindow->inputTsOrder = ORDER_ASC;
pWindow->outputTsOrder = ORDER_ASC;
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
......@@ -764,6 +765,7 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pFill->node.groupAction = GROUP_ACTION_KEEP;
pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pFill->inputTsOrder = ORDER_ASC;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WINDOW, NULL, COLLECT_COL_TYPE_ALL, &pFill->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pFill->node.pTargets) {
......
......@@ -100,6 +100,27 @@ static EDealRes optRebuildTbanme(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
if (NULL == pNode) {
return;
}
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_WINDOW:
((SWindowLogicNode*)pNode)->inputTsOrder = order;
// window has a sorting function, and the operator behind it uses its output order
return;
case QUERY_NODE_LOGIC_PLAN_JOIN:
((SJoinLogicNode*)pNode)->inputTsOrder = order;
break;
case QUERY_NODE_LOGIC_PLAN_FILL:
((SFillLogicNode*)pNode)->inputTsOrder = order;
break;
default:
break;
}
optSetParentOrder(pNode->pParent, order);
}
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
// *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
......@@ -305,10 +326,12 @@ static void scanPathOptSetScanOrder(EScanOrder scanOrder, SScanLogicNode* pScan)
case SCAN_ORDER_ASC:
pScan->scanSeq[0] = 1;
pScan->scanSeq[1] = 0;
optSetParentOrder(pScan->node.pParent, ORDER_ASC);
break;
case SCAN_ORDER_DESC:
pScan->scanSeq[0] = 0;
pScan->scanSeq[1] = 1;
optSetParentOrder(pScan->node.pParent, ORDER_DESC);
break;
case SCAN_ORDER_BOTH:
pScan->scanSeq[0] = 1;
......@@ -1038,9 +1061,6 @@ static int32_t pushDownCondOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogi
}
static bool sortPriKeyOptIsPriKeyOrderBy(SNodeList* pSortKeys) {
if (1 != LIST_LENGTH(pSortKeys)) {
return false;
}
SNode* pNode = ((SOrderByExprNode*)nodesListGetNode(pSortKeys, 0))->pExpr;
return (QUERY_NODE_COLUMN == nodeType(pNode) ? (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) : false);
}
......@@ -1051,12 +1071,13 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
}
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
if (pSort->groupSort || !sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
return TSDB_CODE_SUCCESS;
return false;
}
return true;
}
static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool* pNotOptimize,
SNodeList** pSequencingNodes) {
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
......@@ -1064,17 +1085,19 @@ static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimi
*pNotOptimize = true;
return TSDB_CODE_SUCCESS;
}
return nodesListMakeAppend(pScanNodes, (SNode*)pNode);
return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode);
}
case QUERY_NODE_LOGIC_PLAN_JOIN: {
int32_t code =
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
int32_t code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0),
pNotOptimize, pSequencingNodes);
if (TSDB_CODE_SUCCESS == code) {
code =
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize,
pSequencingNodes);
}
return code;
}
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_PARTITION:
*pNotOptimize = true;
......@@ -1088,14 +1111,15 @@ static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimi
return TSDB_CODE_SUCCESS;
}
return sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize,
pSequencingNodes);
}
static int32_t sortPriKeyOptGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) {
static int32_t sortPriKeyOptGetSequencingNodes(SLogicNode* pNode, SNodeList** pSequencingNodes) {
bool notOptimize = false;
int32_t code = sortPriKeyOptGetScanNodesImpl(pNode, &notOptimize, pScanNodes);
int32_t code = sortPriKeyOptGetSequencingNodesImpl(pNode, &notOptimize, pSequencingNodes);
if (TSDB_CODE_SUCCESS != code || notOptimize) {
nodesClearList(*pScanNodes);
nodesClearList(*pSequencingNodes);
}
return code;
}
......@@ -1104,24 +1128,13 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) {
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
}
static void sortPriKeyOptSetParentOrder(SLogicNode* pNode, EOrder order) {
if (NULL == pNode) {
return;
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode)) {
((SWindowLogicNode*)pNode)->inputTsOrder = order;
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) {
((SJoinLogicNode*)pNode)->inputTsOrder = order;
}
sortPriKeyOptSetParentOrder(pNode->pParent, order);
}
static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort,
SNodeList* pScanNodes) {
SNodeList* pSequencingNodes) {
EOrder order = sortPriKeyOptGetPriKeyOrder(pSort);
SNode* pScanNode = NULL;
FOREACH(pScanNode, pScanNodes) {
SScanLogicNode* pScan = (SScanLogicNode*)pScanNode;
SNode* pSequencingNode = NULL;
FOREACH(pSequencingNode, pSequencingNodes) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pSequencingNode)) {
SScanLogicNode* pScan = (SScanLogicNode*)pSequencingNode;
if ((ORDER_DESC == order && pScan->scanSeq[0] > 0) || (ORDER_ASC == order && pScan->scanSeq[1] > 0)) {
TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]);
}
......@@ -1130,8 +1143,11 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
}
sortPriKeyOptSetParentOrder(pScan->node.pParent, order);
pScan->sortPrimaryKey = true;
} else if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pSequencingNode)) {
((SWindowLogicNode*)pSequencingNode)->outputTsOrder = order;
}
optSetParentOrder(((SLogicNode*)pSequencingNode)->pParent, order);
}
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0);
......@@ -1148,12 +1164,13 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
}
static int32_t sortPrimaryKeyOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort) {
SNodeList* pScanNodes = NULL;
int32_t code = sortPriKeyOptGetScanNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes);
if (TSDB_CODE_SUCCESS == code && NULL != pScanNodes) {
code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pScanNodes);
SNodeList* pSequencingNodes = NULL;
int32_t code =
sortPriKeyOptGetSequencingNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pSequencingNodes);
if (TSDB_CODE_SUCCESS == code && NULL != pSequencingNodes) {
code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pSequencingNodes);
}
nodesClearList(pScanNodes);
nodesClearList(pSequencingNodes);
return code;
}
......
......@@ -1089,6 +1089,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark;
pWindow->igExpired = pWindowLogicNode->igExpired;
pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder;
pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder;
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
......@@ -1363,6 +1365,7 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
pFill->mode = pFillNode->mode;
pFill->timeRange = pFillNode->timeRange;
pFill->inputTsOrder = pFillNode->inputTsOrder;
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->node.pTargets, &pFill->pTargets);
......
......@@ -32,6 +32,9 @@ TEST_F(PlanOptimizeTest, scanPath) {
run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1");
run("SELECT LAST(c1) FROM t1");
run("SELECT LAST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) "
"FILL(LINEAR)");
}
TEST_F(PlanOptimizeTest, pushDownCondition) {
......@@ -59,7 +62,15 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) {
run("SELECT c1 FROM t1 ORDER BY ts DESC");
run("SELECT c1 FROM st1 ORDER BY ts DESC");
run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC");
run("SELECT FIRST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) "
"FILL(LINEAR) ORDER BY _WSTART DESC");
run("SELECT LAST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) "
"FILL(LINEAR) ORDER BY _WSTART");
}
TEST_F(PlanOptimizeTest, PartitionTags) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册