未验证 提交 17d15e10 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #15779 from taosdata/feature/3.0_wxy

fix: plan problem of create stream statement
...@@ -31,7 +31,8 @@ extern "C" { ...@@ -31,7 +31,8 @@ extern "C" {
#define parserDebug(param, ...) qDebug("PARSER: " param, ##__VA_ARGS__) #define parserDebug(param, ...) qDebug("PARSER: " param, ##__VA_ARGS__)
#define parserTrace(param, ...) qTrace("PARSER: " param, ##__VA_ARGS__) #define parserTrace(param, ...) qTrace("PARSER: " param, ##__VA_ARGS__)
#define PK_TS_COL_INTERNAL_NAME "_rowts" #define ROWTS_PSEUDO_COLUMN_NAME "_rowts"
#define C0_PSEUDO_COLUMN_NAME "_c0"
typedef struct SMsgBuf { typedef struct SMsgBuf {
int32_t len; int32_t len;
......
...@@ -443,19 +443,23 @@ SNode* createNotBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, ...@@ -443,19 +443,23 @@ SNode* createNotBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft,
createOperatorNode(pCxt, OP_TYPE_GREATER_THAN, nodesCloneNode(pExpr), pRight)); createOperatorNode(pCxt, OP_TYPE_GREATER_THAN, nodesCloneNode(pExpr), pRight));
} }
static SNode* createPrimaryKeyCol(SAstCreateContext* pCxt) { static SNode* createPrimaryKeyCol(SAstCreateContext* pCxt, const SToken* pFuncName) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_OUT_OF_MEM(pCol); CHECK_OUT_OF_MEM(pCol);
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); if (NULL == pFuncName) {
strcpy(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME);
} else {
strncpy(pCol->colName, pFuncName->z, pFuncName->n);
}
return (SNode*)pCol; return (SNode*)pCol;
} }
SNode* createFunctionNode(SAstCreateContext* pCxt, const SToken* pFuncName, SNodeList* pParameterList) { SNode* createFunctionNode(SAstCreateContext* pCxt, const SToken* pFuncName, SNodeList* pParameterList) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (0 == strncasecmp("_rowts", pFuncName->z, pFuncName->n) || 0 == strncasecmp("_c0", pFuncName->z, pFuncName->n)) { if (0 == strncasecmp("_rowts", pFuncName->z, pFuncName->n) || 0 == strncasecmp("_c0", pFuncName->z, pFuncName->n)) {
return createPrimaryKeyCol(pCxt); return createPrimaryKeyCol(pCxt, pFuncName);
} }
SFunctionNode* func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(func); CHECK_OUT_OF_MEM(func);
...@@ -586,7 +590,7 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) { ...@@ -586,7 +590,7 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
SStateWindowNode* state = (SStateWindowNode*)nodesMakeNode(QUERY_NODE_STATE_WINDOW); SStateWindowNode* state = (SStateWindowNode*)nodesMakeNode(QUERY_NODE_STATE_WINDOW);
CHECK_OUT_OF_MEM(state); CHECK_OUT_OF_MEM(state);
state->pCol = createPrimaryKeyCol(pCxt); state->pCol = createPrimaryKeyCol(pCxt, NULL);
if (NULL == state->pCol) { if (NULL == state->pCol) {
nodesDestroyNode((SNode*)state); nodesDestroyNode((SNode*)state);
CHECK_OUT_OF_MEM(state->pCol); CHECK_OUT_OF_MEM(state->pCol);
...@@ -600,7 +604,7 @@ SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode ...@@ -600,7 +604,7 @@ SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
SIntervalWindowNode* interval = (SIntervalWindowNode*)nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW); SIntervalWindowNode* interval = (SIntervalWindowNode*)nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
CHECK_OUT_OF_MEM(interval); CHECK_OUT_OF_MEM(interval);
interval->pCol = createPrimaryKeyCol(pCxt); interval->pCol = createPrimaryKeyCol(pCxt, NULL);
if (NULL == interval->pCol) { if (NULL == interval->pCol) {
nodesDestroyNode((SNode*)interval); nodesDestroyNode((SNode*)interval);
CHECK_OUT_OF_MEM(interval->pCol); CHECK_OUT_OF_MEM(interval->pCol);
...@@ -639,7 +643,7 @@ SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode) { ...@@ -639,7 +643,7 @@ SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode) {
SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd) { SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt), pStart, pEnd); return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt, NULL), pStart, pEnd);
} }
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) { SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) {
...@@ -752,7 +756,7 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) { ...@@ -752,7 +756,7 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) { if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) {
SFillNode* pFillClause = (SFillNode*)pFill; SFillNode* pFillClause = (SFillNode*)pFill;
nodesDestroyNode(pFillClause->pWStartTs); nodesDestroyNode(pFillClause->pWStartTs);
pFillClause->pWStartTs = createPrimaryKeyCol(pCxt); pFillClause->pWStartTs = createPrimaryKeyCol(pCxt, NULL);
((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause; ((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause;
} }
return pStmt; return pStmt;
...@@ -1731,7 +1735,7 @@ SNode* createCountFuncForDelete(SAstCreateContext* pCxt) { ...@@ -1731,7 +1735,7 @@ SNode* createCountFuncForDelete(SAstCreateContext* pCxt) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(pFunc); CHECK_OUT_OF_MEM(pFunc);
strcpy(pFunc->functionName, "count"); strcpy(pFunc->functionName, "count");
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt))) { if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt, NULL))) {
nodesDestroyNode((SNode*)pFunc); nodesDestroyNode((SNode*)pFunc);
CHECK_OUT_OF_MEM(NULL); CHECK_OUT_OF_MEM(NULL);
} }
......
...@@ -612,7 +612,8 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p ...@@ -612,7 +612,8 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p
} }
static bool isInternalPrimaryKey(const SColumnNode* pCol) { static bool isInternalPrimaryKey(const SColumnNode* pCol) {
return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME); return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId &&
(0 == strcmp(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME) || 0 == strcmp(pCol->colName, C0_PSEUDO_COLUMN_NAME));
} }
static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, const STableNode* pTable, static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, const STableNode* pTable,
...@@ -2566,7 +2567,7 @@ static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { ...@@ -2566,7 +2567,7 @@ static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); strcpy(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME);
pFill->pWStartTs = (SNode*)pCol; pFill->pWStartTs = (SNode*)pCol;
*pOutput = (SNode*)pFill; *pOutput = (SNode*)pFill;
...@@ -2652,7 +2653,7 @@ static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* p ...@@ -2652,7 +2653,7 @@ static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* p
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); strcpy(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME);
bool found = false; bool found = false;
int32_t code = findAndSetColumn(pCxt, &pCol, pTable, &found); int32_t code = findAndSetColumn(pCxt, &pCol, pTable, &found);
if (TSDB_CODE_SUCCESS != code || !found) { if (TSDB_CODE_SUCCESS != code || !found) {
...@@ -3878,7 +3879,7 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch ...@@ -3878,7 +3879,7 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID; ((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME); strcpy(((SColumnNode*)pInterval->pCol)->colName, ROWTS_PSEUDO_COLUMN_NAME);
pCxt->createStream = true; pCxt->createStream = true;
int32_t code = translateQuery(pCxt, (SNode*)pSelect); int32_t code = translateQuery(pCxt, (SNode*)pSelect);
......
...@@ -436,8 +436,8 @@ static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* p ...@@ -436,8 +436,8 @@ static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* p
SNode* pPrimaryKeyCond = NULL; SNode* pPrimaryKeyCond = NULL;
SNode* pOtherCond = NULL; SNode* pOtherCond = NULL;
int32_t code = filterPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond, int32_t code = filterPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond,
&pOtherCond); &pScan->pTagCond, &pOtherCond);
if (TSDB_CODE_SUCCESS == code && NULL != pScan->pTagCond) { if (TSDB_CODE_SUCCESS == code && NULL != pScan->pTagCond) {
code = pushDownCondOptRebuildTbanme(&pScan->pTagCond); code = pushDownCondOptRebuildTbanme(&pScan->pTagCond);
} }
...@@ -1768,7 +1768,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* ...@@ -1768,7 +1768,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pProjectNode->node.pChildren); NODES_CLEAR_LIST(pProjectNode->node.pChildren);
nodesDestroyNode((SNode*)pProjectNode); nodesDestroyNode((SNode*)pProjectNode);
//if pChild is a project logic node, remove its projection which is not reference by its target. // if pChild is a project logic node, remove its projection which is not reference by its target.
alignProjectionWithTarget(pChild); alignProjectionWithTarget(pChild);
} }
pCxt->optimized = true; pCxt->optimized = true;
...@@ -2404,6 +2404,9 @@ static const SOptimizeRule optimizeRuleSet[] = { ...@@ -2404,6 +2404,9 @@ static const SOptimizeRule optimizeRuleSet[] = {
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule)); static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
if (0 == (qDebugFlag & DEBUG_DEBUG)) {
return;
}
char* pStr = NULL; char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) { if (NULL == pRuleName) {
......
...@@ -264,7 +264,7 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) { ...@@ -264,7 +264,7 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); return streamQuery ? false : stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode); return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_PARTITION: case QUERY_NODE_LOGIC_PLAN_PARTITION:
...@@ -1423,6 +1423,9 @@ static const SSplitRule splitRuleSet[] = { ...@@ -1423,6 +1423,9 @@ static const SSplitRule splitRuleSet[] = {
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
if (0 == (qDebugFlag & DEBUG_DEBUG)) {
return;
}
char* pStr = NULL; char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) { if (NULL == pRuleName) {
......
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
#include "scalar.h" #include "scalar.h"
static void dumpQueryPlan(SQueryPlan* pPlan) { static void dumpQueryPlan(SQueryPlan* pPlan) {
if (0 == (qDebugFlag & DEBUG_DEBUG)) {
return;
}
char* pStr = NULL; char* pStr = NULL;
nodesNodeToString((SNode*)pPlan, false, &pStr, NULL); nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr); planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr);
...@@ -42,6 +45,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo ...@@ -42,6 +45,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList); code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
} }
if (TSDB_CODE_SUCCESS == code) {
dumpQueryPlan(*pPlan);
}
nodesDestroyNode((SNode*)pLogicSubplan); nodesDestroyNode((SNode*)pLogicSubplan);
nodesDestroyNode((SNode*)pLogicPlan); nodesDestroyNode((SNode*)pLogicPlan);
...@@ -79,6 +85,7 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown ...@@ -79,6 +85,7 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
} }
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.groupId, groupId);
return setSubplanExecutionNode(subplan->pNode, groupId, pSource); return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册