提交 74fb0f4b 编写于 作者: X Xiaoyu Wang

enh: order by primary key optimize

上级 809a78b0
...@@ -37,6 +37,8 @@ typedef struct SPlanContext { ...@@ -37,6 +37,8 @@ typedef struct SPlanContext {
bool isStmtQuery; bool isStmtQuery;
void* pTransporter; void* pTransporter;
struct SCatalog* pCatalog; struct SCatalog* pCatalog;
char* pMsg;
int32_t msgLen;
} SPlanContext; } SPlanContext;
// Create the physical plan for the query, according to the AST. // Create the physical plan for the query, according to the AST.
......
...@@ -623,6 +623,7 @@ int32_t* taosGetErrno(); ...@@ -623,6 +623,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636) #define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636)
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637) #define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638) #define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
...@@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra ...@@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot, .pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite, .showRewrite = pQuery->showRewrite,
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter .pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
}; };
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog); int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -2614,37 +2614,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen ...@@ -2614,37 +2614,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq); (FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
} }
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
SCMCreateTopicReq createReq = {0}; SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
// tNameGetFullDbName(&name, pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);
pReq->igExists = pStmt->ignoreExists;
pReq->withTbName = pStmt->pOptions->withTable;
pReq->withSchema = pStmt->pOptions->withSchema;
pReq->withTag = pStmt->pOptions->withTag;
pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pStmt->pQuery) { if (NULL != pStmt->pQuery) {
strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName);
pCxt->pParseCxt->topicQuery = true; pCxt->pParseCxt->topicQuery = true;
int32_t code = translateQuery(pCxt, pStmt->pQuery); code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL); code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
if (TSDB_CODE_SUCCESS != code) {
return code;
} }
} else { } else {
strcpy(createReq.subscribeDbName, pStmt->subscribeDbName); strcpy(pReq->subscribeDbName, pStmt->subscribeDbName);
} }
createReq.sql = strdup(pCxt->pParseCxt->pSql); return code;
if (NULL == createReq.sql) { }
return TSDB_CODE_OUT_OF_MEMORY;
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
if (NULL == pStmt->pQuery) {
return TSDB_CODE_SUCCESS;
} }
SName name; if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db)); SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
// tNameGetFullDbName(&name, createReq.name); if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList &&
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), createReq.name); NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
createReq.igExists = pStmt->ignoreExists; return TSDB_CODE_SUCCESS;
createReq.withTbName = pStmt->pOptions->withTable; }
createReq.withSchema = pStmt->pOptions->withSchema; }
createReq.withTag = pStmt->pOptions->withTag;
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY);
}
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
SCMCreateTopicReq createReq = {0};
int32_t code = checkCreateTopic(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTopicReq(pCxt, pStmt, &createReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq);
}
tFreeSCMCreateTopicReq(&createReq); tFreeSCMCreateTopicReq(&createReq);
return code; return code;
} }
......
...@@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "soffset/offset can not be less than 0"; return "soffset/offset can not be less than 0";
case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY: case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY:
return "slimit/soffset only available for PARTITION BY query"; return "slimit/soffset only available for PARTITION BY query";
case TSDB_CODE_PAR_INVALID_TOPIC_QUERY:
return "Invalid topic query";
case TSDB_CODE_OUT_OF_MEMORY: case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory"; return "Out of memory";
default: default:
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) #define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef struct SOptimizeContext { typedef struct SOptimizeContext {
SPlanContext* pPlanCxt;
bool optimized; bool optimized;
} SOptimizeContext; } SOptimizeContext;
...@@ -515,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN ...@@ -515,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN
return TSDB_CODE_PLAN_INTERNAL_ERROR; return TSDB_CODE_PLAN_INTERNAL_ERROR;
} }
static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
SColumnNode* pCol = (SColumnNode*)pNode;
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
return false;
}
return cpdBelongThisTable(pNode, pTableCols);
}
static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions;
if (cpdIsPrimaryKey(pOper->pLeft, pLeftCols)) {
return cpdIsPrimaryKey(pOper->pRight, pRightCols);
} else if (cpdIsPrimaryKey(pOper->pLeft, pRightCols)) {
return cpdIsPrimaryKey(pOper->pRight, pLeftCols);
}
return false;
}
static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
SNode* pCond = NULL;
FOREACH(pCond, pOnCond->pParameterList) {
if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOnConditions) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "not support cross join");
return TSDB_CODE_FAILED;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
} else {
return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
}
}
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (NULL == pJoin->node.pConditions) {
return cpdCheckJoinOnCond(pCxt, pJoin);
}
SNode* pOnCond = NULL; SNode* pOnCond = NULL;
SNode* pLeftChildCond = NULL; SNode* pLeftChildCond = NULL;
SNode* pRightChildCond = NULL; SNode* pRightChildCond = NULL;
...@@ -537,6 +603,7 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi ...@@ -537,6 +603,7 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD); OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
pCxt->optimized = true; pCxt->optimized = true;
code = cpdCheckJoinOnCond(pCxt, pJoin);
} else { } else {
nodesDestroyNode(pOnCond); nodesDestroyNode(pOnCond);
nodesDestroyNode(pLeftChildCond); nodesDestroyNode(pLeftChildCond);
...@@ -703,8 +770,8 @@ static const SOptimizeRule optimizeRuleSet[] = { ...@@ -703,8 +770,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule)); static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
static int32_t applyOptimizeRule(SLogicNode* pLogicNode) { static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) {
SOptimizeContext cxt = { .optimized = false }; SOptimizeContext cxt = { .pPlanCxt = pCxt, .optimized = false };
do { do {
cxt.optimized = false; cxt.optimized = false;
for (int32_t i = 0; i < optimizeRuleNum; ++i) { for (int32_t i = 0; i < optimizeRuleNum; ++i) {
...@@ -718,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) { ...@@ -718,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
} }
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return applyOptimizeRule(pLogicNode); return applyOptimizeRule(pCxt, pLogicNode);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册