From 74fb0f4b55c9d71a9592f37c541d5c0a365b55f0 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sun, 24 Apr 2022 20:11:06 +0800 Subject: [PATCH] enh: order by primary key optimize --- include/libs/planner/planner.h | 2 + include/util/taoserror.h | 1 + source/client/src/clientImpl.c | 4 +- source/libs/parser/src/parTranslater.c | 65 ++++++++++++++------- source/libs/parser/src/parUtil.c | 2 + source/libs/planner/src/planOptimizer.c | 75 +++++++++++++++++++++++-- 6 files changed, 124 insertions(+), 25 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 2f6b9e1866..78d0911502 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -37,6 +37,8 @@ typedef struct SPlanContext { bool isStmtQuery; void* pTransporter; struct SCatalog* pCatalog; + char* pMsg; + int32_t msgLen; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 03fb43a46b..3ac440346c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -623,6 +623,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636) #define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637) #define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638) +#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index fc4192d818..367baef53f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), .pAstRoot = pQuery->pRoot, .showRewrite = pQuery->showRewrite, - .pTransporter = pRequest->pTscObj->pAppInfo->pTransporter + .pTransporter = pRequest->pTscObj->pAppInfo->pTransporter, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE }; int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 01a386427d..0041db6864 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2614,37 +2614,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen (FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq); } -static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { - SCMCreateTopicReq createReq = {0}; +static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) { + SName name; + // tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); + // tNameGetFullDbName(&name, pReq->name); + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name); + pReq->igExists = pStmt->ignoreExists; + pReq->withTbName = pStmt->pOptions->withTable; + pReq->withSchema = pStmt->pOptions->withSchema; + pReq->withTag = pStmt->pOptions->withTag; + + pReq->sql = strdup(pCxt->pParseCxt->pSql); + if (NULL == pReq->sql) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; if (NULL != pStmt->pQuery) { + strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName); pCxt->pParseCxt->topicQuery = true; - int32_t code = translateQuery(pCxt, pStmt->pQuery); + code = translateQuery(pCxt, pStmt->pQuery); if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL); - } - if (TSDB_CODE_SUCCESS != code) { - return code; + code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); } } else { - strcpy(createReq.subscribeDbName, pStmt->subscribeDbName); + strcpy(pReq->subscribeDbName, pStmt->subscribeDbName); } - createReq.sql = strdup(pCxt->pParseCxt->pSql); - if (NULL == createReq.sql) { - return TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { + if (NULL == pStmt->pQuery) { + return TSDB_CODE_SUCCESS; } - SName name; - // tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db)); - // tNameGetFullDbName(&name, createReq.name); - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), createReq.name); - createReq.igExists = pStmt->ignoreExists; - createReq.withTbName = pStmt->pOptions->withTable; - createReq.withSchema = pStmt->pOptions->withSchema; - createReq.withTag = pStmt->pOptions->withTag; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList && + NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) { + return TSDB_CODE_SUCCESS; + } + } - int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq); + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY); +} + +static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { + SCMCreateTopicReq createReq = {0}; + int32_t code = checkCreateTopic(pCxt, pStmt); + if (TSDB_CODE_SUCCESS == code) { + code = buildCreateTopicReq(pCxt, pStmt, &createReq); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq); + } tFreeSCMCreateTopicReq(&createReq); return code; } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 7e41bbe3fd..fd700f8d46 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "soffset/offset can not be less than 0"; case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY: return "slimit/soffset only available for PARTITION BY query"; + case TSDB_CODE_PAR_INVALID_TOPIC_QUERY: + return "Invalid topic query"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 0eb4c8435a..21e55fc34a 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -27,6 +27,7 @@ #define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) typedef struct SOptimizeContext { + SPlanContext* pPlanCxt; bool optimized; } SOptimizeContext; @@ -515,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN return TSDB_CODE_PLAN_INTERNAL_ERROR; } +static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) { + if (QUERY_NODE_COLUMN != nodeType(pNode)) { + return false; + } + SColumnNode* pCol = (SColumnNode*)pNode; + if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) { + return false; + } + return cpdBelongThisTable(pNode, pTableCols); +} + +static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { + if (QUERY_NODE_OPERATOR != nodeType(pCond)) { + return false; + } + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions; + if (cpdIsPrimaryKey(pOper->pLeft, pLeftCols)) { + return cpdIsPrimaryKey(pOper->pRight, pRightCols); + } else if (cpdIsPrimaryKey(pOper->pLeft, pRightCols)) { + return cpdIsPrimaryKey(pOper->pRight, pLeftCols); + } + return false; +} + +static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) { + if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression"); + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) { + if (LOGIC_COND_TYPE_AND != pOnCond->condType) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression"); + return TSDB_CODE_FAILED; + } + SNode* pCond = NULL; + FOREACH(pCond, pOnCond->pParameterList) { + if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression"); + return TSDB_CODE_FAILED; + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pOnConditions) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "not support cross join"); + return TSDB_CODE_FAILED; + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) { + return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions); + } else { + return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions); + } +} + static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL == pJoin->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) { + if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) { return TSDB_CODE_SUCCESS; } + if (NULL == pJoin->node.pConditions) { + return cpdCheckJoinOnCond(pCxt, pJoin); + } + SNode* pOnCond = NULL; SNode* pLeftChildCond = NULL; SNode* pRightChildCond = NULL; @@ -537,6 +603,7 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD); pCxt->optimized = true; + code = cpdCheckJoinOnCond(pCxt, pJoin); } else { nodesDestroyNode(pOnCond); nodesDestroyNode(pLeftChildCond); @@ -703,8 +770,8 @@ static const SOptimizeRule optimizeRuleSet[] = { static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule)); -static int32_t applyOptimizeRule(SLogicNode* pLogicNode) { - SOptimizeContext cxt = { .optimized = false }; +static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) { + SOptimizeContext cxt = { .pPlanCxt = pCxt, .optimized = false }; do { cxt.optimized = false; for (int32_t i = 0; i < optimizeRuleNum; ++i) { @@ -718,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) { } int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { - return applyOptimizeRule(pLogicNode); + return applyOptimizeRule(pCxt, pLogicNode); } -- GitLab