提交 8f39b9d2 编写于 作者: D dapan1121

enh: optimize query plan

上级 ce263507
......@@ -232,6 +232,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PARTITION,
QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_LOGIC_PLAN_INTERP_FUNC,
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
......
......@@ -108,13 +108,15 @@ typedef struct SScanLogicNode {
} SScanLogicNode;
typedef struct SJoinLogicNode {
SLogicNode node;
EJoinType joinType;
SNode* pPrimKeyEqCond;
SNode* pColEqCond;
SNode* pTagEqCond;
SNode* pOtherOnCond;
bool isSingleTableJoin;
SLogicNode node;
EJoinType joinType;
EJoinAlgorithm joinAlgo;
SNode* pPrimKeyEqCond;
SNode* pColEqCond;
SNode* pTagEqCond;
SNode* pOtherOnCond;
bool isSingleTableJoin;
bool hasSubQuery;
} SJoinLogicNode;
typedef struct SAggLogicNode {
......@@ -153,6 +155,12 @@ typedef struct SInterpFuncLogicNode {
SNode* pTimeSeries; // SColumnNode
} SInterpFuncLogicNode;
typedef struct SGroupCacheLogicNode {
SLogicNode node;
SNode* pGroupCol;
} SGroupCacheLogicNode;
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;
typedef struct SVnodeModifyLogicNode {
......@@ -404,10 +412,10 @@ typedef struct SInterpFuncPhysiNode {
typedef struct SSortMergeJoinPhysiNode {
SPhysiNode node;
EJoinType joinType;
SNode* pMergeCondition;
SNode* pOnConditions;
SNode* pPrimKeyCond;
SNode* pColEqCond;
SNode* pOtherOnCond;
SNodeList* pTargets;
SNode* pColEqualOnConditions;
} SSortMergeJoinPhysiNode;
typedef struct SHashJoinPhysiNode {
......
......@@ -174,9 +174,16 @@ typedef enum EJoinType {
JOIN_TYPE_RIGHT,
} EJoinType;
typedef enum EJoinAlgorithm {
JOIN_ALGO_UNKNOWN = 0,
JOIN_ALGO_MERGE,
JOIN_ALGO_HASH,
} EJoinAlgorithm;
typedef struct SJoinTableNode {
STableNode table; // QUERY_NODE_JOIN_TABLE
EJoinType joinType;
bool hasSubQuery;
SNode* pLeft;
SNode* pRight;
SNode* pOnCond;
......
......@@ -575,11 +575,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT);
QRY_ERR_RET(
nodesNodeToSQL(pJoinNode->pMergeCondition, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
if (pJoinNode->pOnConditions) {
nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
if (pJoinNode->pOtherOnCond) {
EXPLAIN_ROW_APPEND(" AND ");
QRY_ERR_RET(
nodesNodeToSQL(pJoinNode->pOnConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
nodesNodeToSQL(pJoinNode->pOtherOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
......
......@@ -74,13 +74,13 @@ static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInf
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
SNode* pMergeCondition = pJoinNode->pMergeCondition;
if (nodeType(pMergeCondition) != QUERY_NODE_OPERATOR) {
SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond;
if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) {
qError("not support this in join operator, %s", idStr);
return; // do not handle this
}
SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
SOperatorNode* pNode = (SOperatorNode*)pPrimKeyCond;
SColumnNode* col1 = (SColumnNode*)pNode->pLeft;
SColumnNode* col2 = (SColumnNode*)pNode->pRight;
SColumnNode* leftTsCol = NULL;
......@@ -222,7 +222,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo));
if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
if (pJoinNode->pOtherOnCond != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (pInfo->pCondAfterMerge == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -236,11 +236,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOtherOnCond));
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
pLogicCond->condType = LOGIC_COND_TYPE_AND;
} else if (pJoinNode->pOnConditions != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
} else if (pJoinNode->pOtherOnCond != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOtherOnCond);
} else if (pJoinNode->pColEqCond != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pColEqCond);
} else if (pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
} else {
......@@ -259,7 +261,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->inputOrder = TSDB_ORDER_DESC;
}
pInfo->pColEqualOnConditions = pJoinNode->pColEqualOnConditions;
pInfo->pColEqualOnConditions = pJoinNode->pColEqCond;
if (pInfo->pColEqualOnConditions != NULL) {
pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
......
......@@ -1892,7 +1892,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
static const char* jkJoinPhysiPlanJoinType = "JoinType";
static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition";
static const char* jkJoinPhysiPlanPrimKeyCondition = "PrimKeyCondition";
static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
static const char* jkJoinPhysiPlanTargets = "Targets";
static const char* jkJoinPhysiPlanColEqualOnConditions = "ColumnEqualOnConditions";
......@@ -1905,16 +1905,16 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition);
code = tjsonAddObject(pJson, jkJoinPhysiPlanPrimKeyCondition, nodeToJson, pNode->pPrimKeyCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOnConditions);
code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOtherOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanColEqualOnConditions, nodeToJson, pNode->pColEqualOnConditions);
code = tjsonAddObject(pJson, jkJoinPhysiPlanColEqualOnConditions, nodeToJson, pNode->pColEqCond);
}
return code;
}
......@@ -1927,16 +1927,16 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions);
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOtherOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanMergeCondition, &pNode->pMergeCondition);
code = jsonToNodeObject(pJson, jkJoinPhysiPlanPrimKeyCondition, &pNode->pPrimKeyCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanColEqualOnConditions, &pNode->pColEqualOnConditions);
code = jsonToNodeObject(pJson, jkJoinPhysiPlanColEqualOnConditions, &pNode->pColEqCond);
}
return code;
}
......
......@@ -2330,7 +2330,7 @@ static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) {
enum {
PHY_SORT_MERGE_JOIN_CODE_BASE_NODE = 1,
PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE,
PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION,
PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION,
PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS,
PHY_SORT_MERGE_JOIN_CODE_TARGETS,
PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER,
......@@ -2345,16 +2345,16 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, pNode->joinType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, nodeToMsg, pNode->pMergeCondition);
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, nodeToMsg, pNode->pPrimKeyCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pOnConditions);
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pOtherOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pColEqualOnConditions);
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pColEqCond);
}
return code;
}
......@@ -2372,17 +2372,17 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE:
code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType));
break;
case PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pMergeCondition);
case PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pPrimKeyCond);
break;
case PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOnConditions);
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOtherOnCond);
break;
case PHY_SORT_MERGE_JOIN_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColEqualOnConditions);
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColEqCond);
break;
default:
break;
......
......@@ -490,6 +490,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SIndefRowsFuncLogicNode));
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return makeNode(type, sizeof(SInterpFuncLogicNode));
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return makeNode(type, sizeof(SGroupCacheLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SLogicSubplan));
case QUERY_NODE_LOGIC_PLAN:
......@@ -1222,10 +1224,10 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pMergeCondition);
nodesDestroyNode(pPhyNode->pOnConditions);
nodesDestroyNode(pPhyNode->pPrimKeyCond);
nodesDestroyNode(pPhyNode->pOtherOnCond);
nodesDestroyList(pPhyNode->pTargets);
nodesDestroyNode(pPhyNode->pColEqualOnConditions);
nodesDestroyNode(pPhyNode->pColEqCond);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
......
......@@ -2763,6 +2763,7 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
pJoinTable->table.precision = calcJoinTablePrecision(pJoinTable);
pJoinTable->table.singleTable = joinTableIsSingleTable(pJoinTable);
code = translateExpr(pCxt, &pJoinTable->pOnCond);
pJoinTable->hasSubQuery = (nodeType(pJoinTable->pLeft) != QUERY_NODE_REAL_TABLE) || (nodeType(pJoinTable->pRight) != QUERY_NODE_REAL_TABLE);
}
break;
}
......
......@@ -436,6 +436,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->joinType = pJoinTable->joinType;
pJoin->isSingleTableJoin = pJoinTable->table.singleTable;
pJoin->hasSubQuery = pJoinTable->hasSubQuery;
pJoin->node.inputTsOrder = ORDER_ASC;
pJoin->node.groupAction = GROUP_ACTION_CLEAR;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
......@@ -475,12 +476,12 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set the output
if (TSDB_CODE_SUCCESS == code) {
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
if (NULL == pJoin->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
SNodeList* pColList = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, NULL, COLLECT_COL_TYPE_ALL, &pColList);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppendList(pJoin->node.pTargets, nodesCloneList(pRight->pTargets));
code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets);
}
}
......
......@@ -689,11 +689,14 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pOnConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
WHERE_EACH(pCond, pLogicCond->pParameterList) {
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) {
nodesDestroyNode(*ppPrimKeyEqCond);
*ppPrimKeyEqCond = nodesCloneNode(pCond);
ERASE_NODE(pLogicCond->pParameterList);
} else {
code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond));
WHERE_NEXT;
}
}
......@@ -721,9 +724,8 @@ static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppPr
}
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOtherOnCond)) {
*ppPrimKeyEqCond = nodesCloneNode(pJoin->pOtherOnCond);
*ppPrimKeyEqCond = pJoin->pOtherOnCond;
*ppOnCond = NULL;
nodesDestroyNode(pJoin->pOtherOnCond);
pJoin->pOtherOnCond = NULL;
return TSDB_CODE_SUCCESS;
} else {
......@@ -1814,15 +1816,9 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild,
nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
if (!cxt.canUse) return false;
}
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->pOtherOnCond) {
SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild;
CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false};
nodesWalkExpr(pJoinLogicNode->pPrimKeyEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
if (!cxt.canUse) return false;
nodesWalkExpr(pJoinLogicNode->pColEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
if (!cxt.canUse) return false;
nodesWalkExpr(pJoinLogicNode->pTagEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
if (!cxt.canUse) return false;
nodesWalkExpr(pJoinLogicNode->pOtherOnCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
if (!cxt.canUse) return false;
}
......@@ -2969,6 +2965,246 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
return TSDB_CODE_SUCCESS;
}
static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
return false;
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != UNKNOWN_JOIN_ALGO) {
return false;
}
return true;
}
int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
SJoinLogicNode* pJoinNode = (SJoinLogicNode*)pJoin;
pScan->scanType = SCAN_TYPE_TAG;
NODES_DESTORY_LIST(pScan->pScanCols);
NODES_DESTORY_NODE(pScan->node.pConditions);
pScan->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
SNodeList* pTags = nodesMakeList();
int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags);
if (TSDB_CODE_SUCCESS == code) {
SNode* pTarget = NULL;
SNode* pTag = NULL;
bool found = false;
WHERE_EACH(pTarget, pScan->node.pTargets) {
found = false;
FOREACH(pTag, pTags) {
if (nodesEqualNode(pTarget, pTag)) {
found = true;
break;
}
}
if (!found) {
ERASE_NODE(pScan->node.pTargets);
} else {
WHERE_NEXT;
}
}
}
if (TSDB_CODE_SUCCESS == code) {
SFunctionNode* pUidFunc = createFunction("_tbuid", NULL);
snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p",
pUidFunc->functionName, pUidFunc);
nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc);
code = createColumnByRewriteExpr(pUidFunc, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
SFunctionNode* pVgidFunc = createFunction("_vgid", NULL);
snprintf(pVgidFunc->node.aliasName, sizeof(pVgidFunc->node.aliasName), "%s.%p",
pVgidFunc->functionName, pVgidFunc);
nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pVgidFunc);
code = createColumnByRewriteExpr(pVgidFunc, &pScan->node.pTargets);
}
if (code) {
nodesDestroyList(pTags);
}
return code;
}
static int32_t stbJoinOptCreateTagScanNode(SLogicNode* pJoin, SNodeList** ppList) {
SNodeList* pList = nodesCloneList(pJoin->pChildren);
if (NULL == pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
code = stbJoinOptRewriteToTagScan(pJoin, pNode);
if (code) {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
*ppList = pList;
} else {
nodesDestroyList(pList);
}
return code;
}
static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pChildren, SLogicNode** ppLogic) {
SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig;
SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_JOIN);
if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pJoin->joinType = pOrigJoin->joinType;
pJoin->joinAlgo = JOIN_ALGO_HASH;
pJoin->isSingleTableJoin = pOrigJoin->isSingleTableJoin;
pJoin->hasSubQuery = pOrigJoin->hasSubQuery;
pJoin->node.inputTsOrder = pOrigJoin->node.inputTsOrder;
pJoin->node.groupAction = pOrigJoin->node.groupAction;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
pJoin->pTagEqCond = nodesCloneNode(pOrigJoin->pTagEqCond);
int32_t code = TSDB_CODE_SUCCESS;
pJoin->node.pChildren = pChildren;
SNode* pNode = NULL;
FOREACH(pNode, pChildren) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanPseudoCols) {
if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) {
code = createColumnByRewriteExpr(pCol, &pJoin->node.pTargets);
if (code) {
break;
}
}
}
if (code) {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
*ppLogic = (SLogicNode*)pJoin;
} else {
nodesDestroyNode((SNode*)pJoin);
}
return code;
}
static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppList) {
SNodeList* pList = nodesCloneList(pJoin->pChildren);
if (NULL == pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pNode = NULL;
FOREACH(pNode, pList) {
code = stbJoinOptAddUidToScan(pJoin, pNode);
if (code) {
break;
}
}
*ppList = pList;
return TSDB_CODE_SUCCESS;
}
static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChildren, SLogicNode** ppLogic) {
SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig;
SGroupCacheLogicNode* pGrpCache = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE);
if (NULL == pGrpCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
pGrpCache->node.pTargets = nodesMakeList();
if (NULL == pGrpCache->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0);
code = nodesListStrictAppendList(pGrpCache->node.pTargets, nodesCloneList(pScan->node.pTargets));
}
if (TSDB_CODE_SUCCESS == code) {
*ppLogic = (SLogicNode*)pGrpCache;
} else {
nodesDestroyNode((SNode*)pGrpCache);
}
return code;
}
static int32_t stbJoinOptCreateDynTaskCtrlNode(SLogicNode* pJoin, SLogicNode* pHJoinNode, SLogicNode* pMJoinNode, SLogicNode** ppDynNode) {
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) {
pDynNode->pChildren = nodesMakeList();
if (NULL == pDynNode->pChildren) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pDynNode->pChildren, (SNode*)pHJoinNode);
}
}
static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) {
SNodeList* pTagScanNodes = NULL;
SNodeList* pTbScanNodes = NULL;
SLogicNode* pGrpCacheNode = NULL;
SLogicNode* pHJoinNode = NULL;
SLogicNode* pMJoinNode = NULL;
SLogicNode* pDynNode = NULL;
int32_t code = stbJoinOptCreateTagScanNode(pJoin, &pTagScanNodes);
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateTableScanNodes(pJoin, pTbScanNodes);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateGroupCacheNode(pJoin, pTbScanNodes, &pGrpCacheNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateMergeJoinNode(pJoin, pGrpCacheNode, &pMJoinNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateDynTaskCtrlNode(pJoin, pHJoinNode, pMJoinNode, &pDynNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pJoin, (SLogicNode*)pDynNode);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pJoin);
pCxt->optimized = true;
}
return code;
}
static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, stbJoinOptShouldBeOptimized);
if (NULL == pNode) {
return TSDB_CODE_SUCCESS;
}
return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan);
}
// clang-format off
static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
......@@ -2977,6 +3213,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
......@@ -2998,9 +3235,9 @@ static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
qDebugL("before optimize: %s", pStr);
qDebugL("before optimize, JsonPlan: %s", pStr);
} else {
qDebugL("apply optimize %s rule: %s", pRuleName, pStr);
qDebugL("apply optimize %s rule, JsonPlan: %s", pRuleName, pStr);
}
taosMemoryFree(pStr);
}
......
......@@ -665,6 +665,49 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
return TSDB_CODE_FAILED;
}
static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) {
if (NULL == *ppSrc) {
return TSDB_CODE_SUCCESS;
}
if (NULL == *ppDst) {
*ppDst = *ppSrc;
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) {
TSWAP(*ppDst, *ppSrc);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppDst)) {
SLogicConditionNode* pLogic = (SLogicConditionNode*)*ppDst;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) {
nodesListStrictAppendList(pLogic->pParameterList, ((SLogicConditionNode*)(*ppSrc))->pParameterList);
((SLogicConditionNode*)(*ppSrc))->pParameterList = NULL;
} else {
nodesListStrictAppend(pLogic->pParameterList, *ppSrc);
*ppSrc = NULL;
}
nodesDestroyNode(*ppSrc);
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL;
pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
pLogicCond->condType = LOGIC_COND_TYPE_AND;
pLogicCond->pParameterList = nodesMakeList();
nodesListStrictAppend(pLogicCond->pParameterList, *ppSrc);
nodesListStrictAppend(pLogicCond->pParameterList, *ppDst);
*ppDst = (SNode*)pLogicCond;
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) {
SSortMergeJoinPhysiNode* pJoin =
......@@ -680,40 +723,58 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
pJoin->joinType = pJoinLogicNode->joinType;
pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
&pJoin->pMergeCondition);
&pJoin->pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
&pJoin->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
SNodeList* pCondCols = nodesMakeList();
SNodeList* pTargets = NULL;
SNodeList* pFinTargets = NULL;
if (NULL == pCondCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
code = nodesCollectColumnsFromNode(pJoinLogicNode->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pTargets, &pFinTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppendList(pJoin->pTargets, pFinTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pCondCols, pJoin->node.pOutputDataBlockDesc);
}
nodesDestroyList(pTargets);
nodesDestroyList(pCondCols);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
pJoinLogicNode->pOtherOnCond, &pJoin->pOnConditions);
pJoinLogicNode->pOtherOnCond, &pJoin->pOtherOnCond);
}
if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) {
code = mergeEqCond(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqualOnConditions);
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pJoin;
} else {
......
......@@ -1621,9 +1621,9 @@ static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
qDebugL("before split: %s", pStr);
qDebugL("before split, JsonPlan: %s", pStr);
} else {
qDebugL("apply split %s rule: %s", pRuleName, pStr);
qDebugL("apply split %s rule, JsonPlan: %s", pRuleName, pStr);
}
taosMemoryFree(pStr);
}
......
......@@ -33,7 +33,7 @@ static void dumpQueryPlan(SQueryPlan* pPlan) {
}
char* pStr = NULL;
nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr);
planDebugL("QID:0x%" PRIx64 " Query Plan, JsonPlan: %s", pPlan->queryId, pStr);
taosMemoryFree(pStr);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册