提交 2624a81a 编写于 作者: X Xiaoyu Wang

TD-13495 planner refactoring

上级 fb3bd53e
......@@ -524,6 +524,7 @@ typedef struct {
int8_t update;
int8_t cacheLastRow;
int8_t ignoreExist;
int8_t streamMode;
} SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
......
......@@ -126,86 +126,87 @@
#define TK_PRECISION 108
#define TK_UPDATE 109
#define TK_CACHELAST 110
#define TK_UNSIGNED 111
#define TK_TAGS 112
#define TK_USING 113
#define TK_NULL 114
#define TK_NOW 115
#define TK_SELECT 116
#define TK_UNION 117
#define TK_ALL 118
#define TK_DISTINCT 119
#define TK_FROM 120
#define TK_VARIABLE 121
#define TK_INTERVAL 122
#define TK_EVERY 123
#define TK_SESSION 124
#define TK_STATE_WINDOW 125
#define TK_FILL 126
#define TK_SLIDING 127
#define TK_ORDER 128
#define TK_BY 129
#define TK_ASC 130
#define TK_GROUP 131
#define TK_HAVING 132
#define TK_LIMIT 133
#define TK_OFFSET 134
#define TK_SLIMIT 135
#define TK_SOFFSET 136
#define TK_WHERE 137
#define TK_RESET 138
#define TK_QUERY 139
#define TK_SYNCDB 140
#define TK_ADD 141
#define TK_COLUMN 142
#define TK_MODIFY 143
#define TK_TAG 144
#define TK_CHANGE 145
#define TK_SET 146
#define TK_KILL 147
#define TK_CONNECTION 148
#define TK_STREAM 149
#define TK_COLON 150
#define TK_ABORT 151
#define TK_AFTER 152
#define TK_ATTACH 153
#define TK_BEFORE 154
#define TK_BEGIN 155
#define TK_CASCADE 156
#define TK_CLUSTER 157
#define TK_CONFLICT 158
#define TK_COPY 159
#define TK_DEFERRED 160
#define TK_DELIMITERS 161
#define TK_DETACH 162
#define TK_EACH 163
#define TK_END 164
#define TK_EXPLAIN 165
#define TK_FAIL 166
#define TK_FOR 167
#define TK_IGNORE 168
#define TK_IMMEDIATE 169
#define TK_INITIALLY 170
#define TK_INSTEAD 171
#define TK_KEY 172
#define TK_OF 173
#define TK_RAISE 174
#define TK_REPLACE 175
#define TK_RESTRICT 176
#define TK_ROW 177
#define TK_STATEMENT 178
#define TK_TRIGGER 179
#define TK_VIEW 180
#define TK_SEMI 181
#define TK_NONE 182
#define TK_PREV 183
#define TK_LINEAR 184
#define TK_IMPORT 185
#define TK_TBNAME 186
#define TK_JOIN 187
#define TK_INSERT 188
#define TK_INTO 189
#define TK_VALUES 190
#define TK_STREAM 111
#define TK_MODE 112
#define TK_UNSIGNED 113
#define TK_TAGS 114
#define TK_USING 115
#define TK_NULL 116
#define TK_NOW 117
#define TK_SELECT 118
#define TK_UNION 119
#define TK_ALL 120
#define TK_DISTINCT 121
#define TK_FROM 122
#define TK_VARIABLE 123
#define TK_INTERVAL 124
#define TK_EVERY 125
#define TK_SESSION 126
#define TK_STATE_WINDOW 127
#define TK_FILL 128
#define TK_SLIDING 129
#define TK_ORDER 130
#define TK_BY 131
#define TK_ASC 132
#define TK_GROUP 133
#define TK_HAVING 134
#define TK_LIMIT 135
#define TK_OFFSET 136
#define TK_SLIMIT 137
#define TK_SOFFSET 138
#define TK_WHERE 139
#define TK_RESET 140
#define TK_QUERY 141
#define TK_SYNCDB 142
#define TK_ADD 143
#define TK_COLUMN 144
#define TK_MODIFY 145
#define TK_TAG 146
#define TK_CHANGE 147
#define TK_SET 148
#define TK_KILL 149
#define TK_CONNECTION 150
#define TK_COLON 151
#define TK_ABORT 152
#define TK_AFTER 153
#define TK_ATTACH 154
#define TK_BEFORE 155
#define TK_BEGIN 156
#define TK_CASCADE 157
#define TK_CLUSTER 158
#define TK_CONFLICT 159
#define TK_COPY 160
#define TK_DEFERRED 161
#define TK_DELIMITERS 162
#define TK_DETACH 163
#define TK_EACH 164
#define TK_END 165
#define TK_EXPLAIN 166
#define TK_FAIL 167
#define TK_FOR 168
#define TK_IGNORE 169
#define TK_IMMEDIATE 170
#define TK_INITIALLY 171
#define TK_INSTEAD 172
#define TK_KEY 173
#define TK_OF 174
#define TK_RAISE 175
#define TK_REPLACE 176
#define TK_RESTRICT 177
#define TK_ROW 178
#define TK_STATEMENT 179
#define TK_TRIGGER 180
#define TK_VIEW 181
#define TK_SEMI 182
#define TK_NONE 183
#define TK_PREV 184
#define TK_LINEAR 185
#define TK_IMPORT 186
#define TK_TBNAME 187
#define TK_JOIN 188
#define TK_INSERT 189
#define TK_INTO 190
#define TK_VALUES 191
#define NEW_TK_OR 1
#define NEW_TK_AND 2
......
......@@ -72,8 +72,10 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_STMT,
QUERY_NODE_LOGIC_PLAN_SCAN,
QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_FILTER,
QUERY_NODE_LOGIC_PLAN_AGG
QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT
} ENodeType;
/**
......
......@@ -1275,6 +1275,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI8(&encoder, pReq->update) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1307,6 +1308,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI8(&decoder, &pReq->update) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
......
......@@ -170,6 +170,7 @@ typedef struct SCreateDbInfo {
int8_t update;
int8_t cachelast;
SArray *keep;
int8_t streamMode;
} SCreateDbInfo;
typedef struct SCreateFuncInfo {
......
......@@ -282,6 +282,7 @@ update(Y) ::= UPDATE INTEGER(X). { Y = X; }
cachelast(Y) ::= CACHELAST INTEGER(X). { Y = X; }
vgroups(Y) ::= VGROUPS INTEGER(X). { Y = X; }
//partitions(Y) ::= PARTITIONS INTEGER(X). { Y = X; }
stream_mode(Y) ::= STREAM MODE INTEGER(X). { Y = X; }
%type db_optr {SCreateDbInfo}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);}
......@@ -302,6 +303,7 @@ db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) vgroups(X). { Y = Z; Y.numOfVgroups = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) stream_mode(X). { Y = Z; Y.streamMode = strtol(X.z, NULL, 10); }
//%type topic_optr {SCreateDbInfo}
//
......
......@@ -242,6 +242,7 @@ static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast;
pMsg->numOfVgroups = pCreateDb->numOfVgroups;
pMsg->streamMode = pCreateDb->streamMode;
}
int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
......
......@@ -349,14 +349,15 @@ static SNodeList* getProjectList(SNode* pNode) {
return NULL;
}
static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColSchema, SColumnNode* pCol) {
strcpy(pCol->dbName, pTable->dbName);
strcpy(pCol->tableAlias, pTable->tableAlias);
strcpy(pCol->tableName, pTable->tableName);
static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* pColSchema, SColumnNode* pCol) {
strcpy(pCol->dbName, pTable->table.dbName);
strcpy(pCol->tableAlias, pTable->table.tableAlias);
strcpy(pCol->tableName, pTable->table.tableName);
strcpy(pCol->colName, pColSchema->name);
if ('\0' == pCol->node.aliasName[0]) {
strcpy(pCol->node.aliasName, pColSchema->name);
}
pCol->tableId = pTable->pMeta->uid;
pCol->colId = pColSchema->colId;
// pCol->colType = pColSchema->type;
pCol->node.resType.type = pColSchema->type;
......@@ -382,7 +383,7 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode
if (NULL == pCol) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
setColumnInfoBySchema(pTable, pMeta->schema + i, pCol);
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, pCol);
nodesListAppend(pList, (SNode*)pCol);
}
} else {
......@@ -407,7 +408,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) {
setColumnInfoBySchema(pTable, pMeta->schema + i, pCol);
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, pCol);
found = true;
break;
}
......
此差异已折叠。
......@@ -230,6 +230,7 @@ static SKeyword keywordTable[] = {
{"PORT", TK_PORT},
{"INNER", NEW_TK_INNER},
{"ON", NEW_TK_ON},
{"MODE", TK_MODE},
};
static const char isIdChar[] = {
......
......@@ -38,6 +38,12 @@ typedef struct SScanLogicNode {
struct STableMeta* pMeta;
} SScanLogicNode;
typedef struct SJoinLogicNode {
SLogicNode node;
EJoinType joinType;
SNode* pOnConditions;
} SJoinLogicNode;
typedef struct SFilterLogicNode {
SLogicNode node;
} SFilterLogicNode;
......@@ -48,6 +54,10 @@ typedef struct SAggLogicNode {
SNodeList* pAggFuncs;
} SAggLogicNode;
typedef struct SProjectLogicNode {
SLogicNode node;
} SProjectLogicNode;
#ifdef __cplusplus
}
#endif
......
......@@ -39,6 +39,9 @@ typedef struct SPlanContext {
SNodeList* pResource;
} SPlanContext;
static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt);
static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable);
typedef struct SRewriteExprCxt {
int32_t errCode;
int32_t planNodeId;
......@@ -46,6 +49,15 @@ typedef struct SRewriteExprCxt {
} SRewriteExprCxt;
static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
switch (nodeType(*pNode)) {
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
break;
}
default:
break;
}
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
SNode* pTarget;
int32_t index = 0;
......@@ -142,12 +154,41 @@ static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect,
return (SLogicNode*)pScan;
}
static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt);
static SLogicNode* createSubqueryLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable) {
return createQueryLogicNode(pCxt, pTable->pSubquery);
}
static SLogicNode* createJoinLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable) {
SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_JOIN);
CHECK_ALLOC(pJoin, NULL);
pJoin->node.id = pCxt->planNodeId++;
pJoin->joinType = pJoinTable->joinType;
// set left and right node
pJoin->node.pChildren = nodesMakeList();
CHECK_ALLOC(pJoin->node.pChildren, (SLogicNode*)pJoin);
SLogicNode* pLeft = createLogicNodeByTable(pCxt, pSelect, pJoinTable->pLeft);
CHECK_ALLOC(pLeft, (SLogicNode*)pJoin);
CHECK_CODE(nodesListAppend(pJoin->node.pChildren, (SNode*)pLeft), (SLogicNode*)pJoin);
SLogicNode* pRight = createLogicNodeByTable(pCxt, pSelect, pJoinTable->pRight);
CHECK_ALLOC(pRight, (SLogicNode*)pJoin);
CHECK_CODE(nodesListAppend(pJoin->node.pChildren, (SNode*)pRight), (SLogicNode*)pJoin);
// set on conditions
pJoin->pOnConditions = nodesCloneNode(pJoinTable->pOnCond);
CHECK_ALLOC(pJoin->pOnConditions, (SLogicNode*)pJoin);
// set the output and rewrite the expression in subsequent clauses with the output
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, 0, false, &pCols), (SLogicNode*)pJoin);
pJoin->node.pTargets = nodesCloneList(pCols);
CHECK_ALLOC(pJoin->node.pTargets, (SLogicNode*)pJoin);
CHECK_CODE(rewriteExpr(pJoin->node.id, pJoin->node.pTargets, pSelect, SQL_CLAUSE_FROM), (SLogicNode*)pJoin);
return (SLogicNode*)pJoin;
}
static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable) {
switch (nodeType(pTable)) {
case QUERY_NODE_REAL_TABLE:
......@@ -155,14 +196,15 @@ static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSele
case QUERY_NODE_TEMP_TABLE:
return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable);
case QUERY_NODE_JOIN_TABLE:
return createJoinLogicNode(pCxt, pSelect, (SJoinTableNode*)pTable);
default:
break;
}
return NULL;
}
static SLogicNode* createFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pWhere) {
if (NULL == pWhere) {
static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWhere) {
return NULL;
}
......@@ -171,7 +213,7 @@ static SLogicNode* createFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelec
pFilter->node.id = pCxt->planNodeId++;
// set filter conditions
pFilter->node.pConditions = nodesCloneNode(pWhere);
pFilter->node.pConditions = nodesCloneNode(pSelect->pWhere);
CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter);
// set the output and rewrite the expression in subsequent clauses with the output
......@@ -184,10 +226,10 @@ static SLogicNode* createFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelec
return (SLogicNode*)pFilter;
}
static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pGroupByList, SNode* pHaving) {
static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SNodeList* pAggFuncs = NULL;
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL);
if (NULL == pAggFuncs && NULL == pGroupByList) {
if (NULL == pAggFuncs && NULL == pSelect->pGroupByList) {
return NULL;
}
......@@ -196,11 +238,11 @@ static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect,
pAgg->node.id = pCxt->planNodeId++;
// set grouyp keys, agg funcs and having conditions
pAgg->pGroupKeys = nodesCloneList(pGroupByList);
pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
CHECK_ALLOC(pAgg->pGroupKeys, (SLogicNode*)pAgg);
pAgg->pAggFuncs = nodesCloneList(pAggFuncs);
CHECK_ALLOC(pAgg->pAggFuncs, (SLogicNode*)pAgg);
pAgg->node.pConditions = nodesCloneNode(pHaving);
pAgg->node.pConditions = nodesCloneNode(pSelect->pHaving);
CHECK_ALLOC(pAgg->node.pConditions, (SLogicNode*)pAgg);
// set the output and rewrite the expression in subsequent clauses with the output
......@@ -213,15 +255,28 @@ static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect,
return (SLogicNode*)pAgg;
}
static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SProjectLogicNode* pProject = (SProjectLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PROJECT);
CHECK_ALLOC(pProject, NULL);
pProject->node.id = pCxt->planNodeId++;
pProject->node.pTargets = nodesCloneList(pSelect->pProjectionList);
CHECK_ALLOC(pProject->node.pTargets, (SLogicNode*)pProject);
return (SLogicNode*)pProject;
}
static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createFilterLogicNode(pCxt, pSelect, pSelect->pWhere));
pRoot = pushLogicNode(pCxt, pRoot, createWhereFilterLogicNode(pCxt, pSelect));
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect, pSelect->pGroupByList, pSelect->pHaving));
pRoot = pushLogicNode(pCxt, pRoot, createProjectLogicNode(pCxt, pSelect));
}
// pRoot = pushLogicNode(pCxt, pRoot, createProjectLogicNode(pSelect, pSelect->pProjectionList));
return pRoot;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册