未验证 提交 3119efc7 编写于 作者: X xiao-yu-wang 提交者: GitHub

Merge pull request #10533 from taosdata/feature/3.0_query_integrate_wxy

TD-13706 client modify vnode process integration
......@@ -51,6 +51,7 @@ typedef void **TAOS_ROW;
#define TSDB_DATA_TYPE_JSON 17 // json
#define TSDB_DATA_TYPE_DECIMAL 18 // decimal
#define TSDB_DATA_TYPE_BLOB 19 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 20
typedef enum {
TSDB_OPTION_LOCALE,
......
......@@ -57,63 +57,85 @@
#define TK_VGROUPS 39
#define TK_SINGLESTABLE 40
#define TK_STREAMMODE 41
#define TK_NK_FLOAT 42
#define TK_NK_BOOL 43
#define TK_TIMESTAMP 44
#define TK_NK_VARIABLE 45
#define TK_NK_COMMA 46
#define TK_NK_ID 47
#define TK_NK_LP 48
#define TK_NK_RP 49
#define TK_NK_DOT 50
#define TK_BETWEEN 51
#define TK_IS 52
#define TK_NULL 53
#define TK_NK_LT 54
#define TK_NK_GT 55
#define TK_NK_LE 56
#define TK_NK_GE 57
#define TK_NK_NE 58
#define TK_NK_EQ 59
#define TK_LIKE 60
#define TK_MATCH 61
#define TK_NMATCH 62
#define TK_IN 63
#define TK_FROM 64
#define TK_AS 65
#define TK_JOIN 66
#define TK_ON 67
#define TK_INNER 68
#define TK_SELECT 69
#define TK_DISTINCT 70
#define TK_WHERE 71
#define TK_PARTITION 72
#define TK_BY 73
#define TK_SESSION 74
#define TK_STATE_WINDOW 75
#define TK_INTERVAL 76
#define TK_SLIDING 77
#define TK_FILL 78
#define TK_VALUE 79
#define TK_NONE 80
#define TK_PREV 81
#define TK_LINEAR 82
#define TK_NEXT 83
#define TK_GROUP 84
#define TK_HAVING 85
#define TK_ORDER 86
#define TK_SLIMIT 87
#define TK_SOFFSET 88
#define TK_LIMIT 89
#define TK_OFFSET 90
#define TK_ASC 91
#define TK_DESC 92
#define TK_NULLS 93
#define TK_FIRST 94
#define TK_LAST 95
#define TK_USE 42
#define TK_TABLE 43
#define TK_NK_LP 44
#define TK_NK_RP 45
#define TK_NK_ID 46
#define TK_NK_DOT 47
#define TK_NK_COMMA 48
#define TK_COMMENT 49
#define TK_BOOL 50
#define TK_TINYINT 51
#define TK_SMALLINT 52
#define TK_INT 53
#define TK_INTEGER 54
#define TK_BIGINT 55
#define TK_FLOAT 56
#define TK_DOUBLE 57
#define TK_BINARY 58
#define TK_TIMESTAMP 59
#define TK_NCHAR 60
#define TK_UNSIGNED 61
#define TK_JSON 62
#define TK_VARCHAR 63
#define TK_MEDIUMBLOB 64
#define TK_BLOB 65
#define TK_VARBINARY 66
#define TK_DECIMAL 67
#define TK_SHOW 68
#define TK_DATABASES 69
#define TK_NK_FLOAT 70
#define TK_NK_BOOL 71
#define TK_NK_VARIABLE 72
#define TK_BETWEEN 73
#define TK_IS 74
#define TK_NULL 75
#define TK_NK_LT 76
#define TK_NK_GT 77
#define TK_NK_LE 78
#define TK_NK_GE 79
#define TK_NK_NE 80
#define TK_NK_EQ 81
#define TK_LIKE 82
#define TK_MATCH 83
#define TK_NMATCH 84
#define TK_IN 85
#define TK_FROM 86
#define TK_AS 87
#define TK_JOIN 88
#define TK_ON 89
#define TK_INNER 90
#define TK_SELECT 91
#define TK_DISTINCT 92
#define TK_WHERE 93
#define TK_PARTITION 94
#define TK_BY 95
#define TK_SESSION 96
#define TK_STATE_WINDOW 97
#define TK_INTERVAL 98
#define TK_SLIDING 99
#define TK_FILL 100
#define TK_VALUE 101
#define TK_NONE 102
#define TK_PREV 103
#define TK_LINEAR 104
#define TK_NEXT 105
#define TK_GROUP 106
#define TK_HAVING 107
#define TK_ORDER 108
#define TK_SLIMIT 109
#define TK_SOFFSET 110
#define TK_LIMIT 111
#define TK_OFFSET 112
#define TK_ASC 113
#define TK_DESC 114
#define TK_NULLS 115
#define TK_FIRST 116
#define TK_LAST 117
#define TK_SPACE 300
#define TK_COMMENT 301
#define TK_NK_COMMENT 301
#define TK_ILLEGAL 302
#define TK_HEX 303 // hex number 0x123
#define TK_OCT 304 // oct number
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PLANN_NODES_H_
#define _TD_PLANN_NODES_H_
#ifndef _TD_CMD_NODES_H_
#define _TD_CMD_NODES_H_
#ifdef __cplusplus
extern "C" {
......@@ -49,6 +49,11 @@ typedef struct SCreateDatabaseStmt {
SDatabaseOptions options;
} SCreateDatabaseStmt;
typedef struct SUseDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
} SUseDatabaseStmt;
typedef struct STableOptions {
int32_t keep;
int32_t ttl;
......@@ -77,4 +82,4 @@ typedef struct SCreateTableStmt {
}
#endif
#endif /*_TD_PLANN_NODES_H_*/
#endif /*_TD_CMD_NODES_H_*/
......@@ -70,16 +70,18 @@ typedef enum ENodeType {
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_SHOW_STMT,
QUERY_NODE_VNODE_MODIF_STMT,
QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_USE_DATABASE_STMT,
QUERY_NODE_SHOW_DATABASE_STMT, // temp
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
......@@ -94,6 +96,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
} ENodeType;
......@@ -153,6 +156,7 @@ bool nodesEqualNode(const SNodeptr a, const SNodeptr b);
SNodeptr nodesCloneNode(const SNodeptr pNode);
SNodeList* nodesCloneList(const SNodeList* pList);
const char* nodesNodeName(ENodeType type);
int32_t nodesNodeToString(const SNodeptr pNode, bool format, char** pStr, int32_t* pLen);
int32_t nodesStringToNode(const char* pStr, SNode** pNode);
......
......@@ -65,11 +65,28 @@ typedef struct SProjectLogicNode {
SNodeList* pProjections;
} SProjectLogicNode;
typedef struct SVnodeModifLogicNode {
ENodeType type;;
int32_t msgType;
SArray* pDataBlocks;
SVgDataBlocks* pVgDataBlocks;
} SVnodeModifLogicNode;
typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL,
SUBPLAN_TYPE_SCAN,
SUBPLAN_TYPE_MODIFY
} ESubplanType;
typedef struct SSubLogicPlan {
ENodeType type;
SNodeList* pChildren;
SNodeList* pParents;
SLogicNode* pNode;
SQueryNodeAddr execNode;
ESubplanType subplanType;
int32_t level;
} SSubLogicPlan;
typedef struct SQueryLogicPlan {
......@@ -178,13 +195,6 @@ typedef struct SSubplanId {
int32_t subplanId;
} SSubplanId;
typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL,
SUBPLAN_TYPE_SCAN,
SUBPLAN_TYPE_MODIFY
} ESubplanType;
typedef struct SSubplan {
ENodeType type;
SSubplanId id; // unique id of the subplan
......
......@@ -264,6 +264,7 @@ typedef struct SVgDataBlocks {
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
......
......@@ -43,7 +43,9 @@ typedef struct SCmdMsgInfo {
} SCmdMsgInfo;
typedef struct SQuery {
bool isCmd;
bool directRpc;
bool haveResultSet;
ENodeType sqlNodeType;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
......
......@@ -161,7 +161,7 @@ int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
}
code = qParseQuerySql(&cxt, pQuery);
if (TSDB_CODE_SUCCESS == code && !((*pQuery)->isCmd)) {
if (TSDB_CODE_SUCCESS == code && ((*pQuery)->haveResultSet)) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
}
......@@ -184,19 +184,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
}
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pDag, SArray* pNodeList) {
// pRequest->type = pQuery->type;
pRequest->type = pQuery->sqlNodeType;
SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot };
int32_t code = qCreateQueryPlan(&cxt, pDag);
if (code != 0) {
return code;
}
// if (pQuery->type == TSDB_SQL_SELECT) {
// setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
// pRequest->type = TDMT_VND_QUERY;
// }
return code;
}
......@@ -254,7 +247,7 @@ TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
if (pQuery->isCmd) {
if (pQuery->directRpc) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "plannodes.h"
#include "querynodes.h"
#include "taos.h"
#include "taoserror.h"
......@@ -157,6 +158,13 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
return (SNode*)pDst;
}
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
COPY_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(execNode);
COPY_SCALAR_FIELD(subplanType);
return (SNode*)pDst;
}
SNodeptr nodesCloneNode(const SNodeptr pNode) {
if (NULL == pNode) {
return NULL;
......@@ -187,9 +195,13 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return groupingSetNodeCopy((const SGroupingSetNode*)pNode, (SGroupingSetNode*)pDst);
case QUERY_NODE_ORDER_BY_EXPR:
case QUERY_NODE_LIMIT:
break;
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
default:
break;
}
printf("nodesCloneNode unknown node = %s\n", nodesNodeName(nodeType(pNode)));
return pDst;
}
......
......@@ -24,7 +24,7 @@ static int32_t jsonToNode(const SJson* pJson, void* pObj);
static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** pNode);
static int32_t makeNodeByJson(const SJson* pJson, SNode** pNode);
static char* nodeName(ENodeType type) {
const char* nodesNodeName(ENodeType type) {
switch (type) {
case QUERY_NODE_COLUMN:
return "Column";
......@@ -58,20 +58,30 @@ static char* nodeName(ENodeType type) {
return "NodeList";
case QUERY_NODE_FILL:
return "Fill";
case QUERY_NODE_TARGET:
return "Target";
case QUERY_NODE_RAW_EXPR:
return "RawExpr";
case QUERY_NODE_TARGET:
return "Target";
case QUERY_NODE_DATABLOCK_DESC:
return "TupleDesc";
case QUERY_NODE_SLOT_DESC:
return "SlotDesc";
case QUERY_NODE_COLUMN_DEF:
return "ColumnDef";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
return "SelectStmt";
case QUERY_NODE_SHOW_STMT:
return "ShowStmt";
case QUERY_NODE_VNODE_MODIF_STMT:
return "VnodeModifStmt";
case QUERY_NODE_CREATE_DATABASE_STMT:
return "CreateDatabaseStmt";
case QUERY_NODE_CREATE_TABLE_STMT:
return "CreateTableStmt";
case QUERY_NODE_USE_DATABASE_STMT:
return "UseDatabaseStmt";
case QUERY_NODE_SHOW_DATABASE_STMT:
return "ShowDatabaseStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -80,16 +90,34 @@ static char* nodeName(ENodeType type) {
return "LogicAgg";
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return "LogicProject";
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
return "LogicVnodeModif";
case QUERY_NODE_LOGIC_SUBPLAN:
return "LogicSubplan";
case QUERY_NODE_LOGIC_PLAN:
return "LogicPlan";
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return "PhysiTagScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return "PhysiTableScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
return "PhysiTableSeqScan";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "PhysiSreamScan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
return "PhysiJoin";
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return "PhysiAgg";
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return "PhysiExchange";
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return "PhysiInsert";
case QUERY_NODE_PHYSICAL_SUBPLAN:
return "PhysiSubplan";
case QUERY_NODE_PHYSICAL_PLAN:
......@@ -1249,23 +1277,28 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_STATE_WINDOW:
case QUERY_NODE_SESSION_WINDOW:
case QUERY_NODE_INTERVAL_WINDOW:
break;
case QUERY_NODE_NODE_LIST:
return nodeListNodeToJson(pObj, pJson);
case QUERY_NODE_FILL:
case QUERY_NODE_RAW_EXPR:
break;
case QUERY_NODE_TARGET:
return targetNodeToJson(pObj, pJson);
case QUERY_NODE_RAW_EXPR:
break;
case QUERY_NODE_DATABLOCK_DESC:
return dataBlockDescNodeToJson(pObj, pJson);
case QUERY_NODE_SLOT_DESC:
return slotDescNodeToJson(pObj, pJson);
case QUERY_NODE_COLUMN_DEF:
case QUERY_NODE_SET_OPERATOR:
break;
case QUERY_NODE_SELECT_STMT:
return selectStmtTojson(pObj, pJson);
case QUERY_NODE_SHOW_STMT:
case QUERY_NODE_VNODE_MODIF_STMT:
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_CREATE_TABLE_STMT:
case QUERY_NODE_USE_DATABASE_STMT:
case QUERY_NODE_SHOW_DATABASE_STMT:
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
return logicScanNodeToJson(pObj, pJson);
......@@ -1275,16 +1308,28 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return logicAggNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return logicProjectNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
case QUERY_NODE_LOGIC_SUBPLAN:
case QUERY_NODE_LOGIC_PLAN:
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return physiTagScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return physiTableScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
break;
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return physiProjectNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
return physiJoinNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
break;
case QUERY_NODE_PHYSICAL_SUBPLAN:
return subplanToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN:
......@@ -1292,7 +1337,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
default:
break;
}
printf("================================ specificNodeToJson unknown node = %s\n", nodeName(nodeType(pObj)));
printf("================================ specificNodeToJson unknown node = %s\n", nodesNodeName(nodeType(pObj)));
return TSDB_CODE_SUCCESS;
}
......@@ -1334,8 +1379,6 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
// break;
// case QUERY_NODE_SELECT_STMT:
// return jsonToSelectStmt(pJson, pObj);
// case QUERY_NODE_SHOW_STMT:
// break;
// case QUERY_NODE_LOGIC_PLAN_SCAN:
// return jsonToLogicScanNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -1372,10 +1415,10 @@ static int32_t nodeToJson(const void* pObj, SJson* pJson) {
int32_t code = tjsonAddIntegerToObject(pJson, jkNodeType, pNode->type);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkNodeName, nodeName(pNode->type));
code = tjsonAddStringToObject(pJson, jkNodeName, nodesNodeName(pNode->type));
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, nodeName(pNode->type), specificNodeToJson, pNode);
code = tjsonAddObject(pJson, nodesNodeName(pNode->type), specificNodeToJson, pNode);
}
return code;
......@@ -1388,7 +1431,7 @@ static int32_t jsonToNode(const SJson* pJson, void* pObj) {
int32_t code = tjsonGetIntValue(pJson, jkNodeType, &val);
pNode->type = val;
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, nodeName(pNode->type), jsonToSpecificNode, pNode);
code = tjsonToObject(pJson, nodesNodeName(pNode->type), jsonToSpecificNode, pNode);
}
return code;
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "querynodes.h"
#include "plannodes.h"
#include "taos.h"
......@@ -65,12 +66,28 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SFillNode));
case QUERY_NODE_RAW_EXPR:
return makeNode(type, sizeof(SRawExprNode));
case QUERY_NODE_TARGET:
return makeNode(type, sizeof(STargetNode));
case QUERY_NODE_DATABLOCK_DESC:
return makeNode(type, sizeof(SDataBlockDescNode));
case QUERY_NODE_SLOT_DESC:
return makeNode(type, sizeof(SSlotDescNode));
case QUERY_NODE_COLUMN_DEF:
return makeNode(type, sizeof(SColumnDefNode));
case QUERY_NODE_SET_OPERATOR:
return makeNode(type, sizeof(SSetOperator));
case QUERY_NODE_SELECT_STMT:
return makeNode(type, sizeof(SSelectStmt));
// case QUERY_NODE_SHOW_STMT:
// return makeNode(type, sizeof(SShowStmt));
case QUERY_NODE_VNODE_MODIF_STMT:
return makeNode(type, sizeof(SVnodeModifOpStmt));
case QUERY_NODE_CREATE_DATABASE_STMT:
return makeNode(type, sizeof(SCreateDatabaseStmt));
case QUERY_NODE_CREATE_TABLE_STMT:
return makeNode(type, sizeof(SCreateTableStmt));
case QUERY_NODE_USE_DATABASE_STMT:
return makeNode(type, sizeof(SUseDatabaseStmt));
case QUERY_NODE_SHOW_DATABASE_STMT:
return makeNode(type, sizeof(SNode));;
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -79,26 +96,34 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SAggLogicNode));
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectLogicNode));
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SSubLogicPlan));
case QUERY_NODE_LOGIC_PLAN:
return makeNode(type, sizeof(SQueryLogicPlan));
case QUERY_NODE_TARGET:
return makeNode(type, sizeof(STargetNode));
case QUERY_NODE_DATABLOCK_DESC:
return makeNode(type, sizeof(SDataBlockDescNode));
case QUERY_NODE_SLOT_DESC:
return makeNode(type, sizeof(SSlotDescNode));
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return makeNode(type, sizeof(STagScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return makeNode(type, sizeof(STableScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
return makeNode(type, sizeof(STableSeqScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return makeNode(type, sizeof(SNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
return makeNode(type, sizeof(SJoinPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SNode));
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return makeNode(type, sizeof(SDataInserterNode));
case QUERY_NODE_PHYSICAL_SUBPLAN:
return makeNode(type, sizeof(SSubplan));
case QUERY_NODE_PHYSICAL_PLAN:
......
......@@ -116,8 +116,14 @@ typedef enum ETableOptionType {
STableOptions* createDefaultTableOptions(SAstCreateContext* pCxt);
STableOptions* setTableOption(SAstCreateContext* pCxt, STableOptions* pOptions, ETableOptionType type, const SToken* pVal);
SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDataType dataType, const SToken* pComment);
SDataType createDataType(uint8_t type);
SDataType createVarLenDataType(uint8_t type, const SToken* pLen);
SNode* createCreateTableStmt(SAstCreateContext* pCxt, bool ignoreExists, const STokenPair* pFullTableName, SNodeList* pCols, STableOptions* pOptions);
SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName);
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type);
#ifdef __cplusplus
}
#endif
......
......@@ -92,22 +92,25 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
db_options(A) ::= db_options(B) SINGLESTABLE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SINGLESTABLE, &C); }
db_options(A) ::= db_options(B) STREAMMODE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_STREAMMODE, &C); }
/************************************************ create database *****************************************************/
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A);}
/************************************************ create table *******************************************************/
cmd ::= CREATE TABLE exists_opt(A) full_table_name(B)
NK_LP column_def_list(C) NK_RP table_options(D). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, &B, C, D);}
%type full_table_name { STokenPair }
%destructor full_table_name { }
full_table_name(A) ::= NK_ID(B). { A = { .first = B, .second = nil_token}; }
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { A = { .first = B, .second = C}; }
full_table_name(A) ::= NK_ID(B). { STokenPair t = { .first = B, .second = nil_token}; A = t; }
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { STokenPair t = { .first = B, .second = C}; A = t; }
%type column_def_list { SNodeList* }
%destructor column_def_list { nodesDestroyList($$); }
column_def_list(A) ::= column_def(B). { A = createNodeList(pCxt, B); }
column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, B, C, NULL); }
column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, B, C, &D); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, &B, C, NULL); }
column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
%type type_name { SDataType }
%destructor type_name { }
......@@ -115,6 +118,7 @@ type_name(A) ::= BOOL.
type_name(A) ::= TINYINT. { A = createDataType(TSDB_DATA_TYPE_TINYINT); }
type_name(A) ::= SMALLINT. { A = createDataType(TSDB_DATA_TYPE_SMALLINT); }
type_name(A) ::= INT. { A = createDataType(TSDB_DATA_TYPE_INT); }
type_name(A) ::= INTEGER. { A = createDataType(TSDB_DATA_TYPE_INT); }
type_name(A) ::= BIGINT. { A = createDataType(TSDB_DATA_TYPE_BIGINT); }
type_name(A) ::= FLOAT. { A = createDataType(TSDB_DATA_TYPE_FLOAT); }
type_name(A) ::= DOUBLE. { A = createDataType(TSDB_DATA_TYPE_DOUBLE); }
......@@ -134,14 +138,15 @@ type_name(A) ::= DECIMAL.
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_COMMA NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
%type table_options { SDatabaseOptions* }
%type table_options { STableOptions* }
%destructor table_options { tfree($$); }
table_options(A) ::= . { A = createDefaultTableOptions(pCxt);}
table_options(A) ::= table_options(B) COMMENT NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_COMMENT, &C); }
table_options(A) ::= table_options(B) KEEP NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_KEEP, &C); }
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
//cmd ::= SHOW DATABASES. { PARSER_TRACE; createShowStmt(pCxt, SHOW_TYPE_DATABASE); }
/************************************************ show ***************************************************************/
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASE_STMT); }
/************************************************ select *************************************************************/
cmd ::= query_expression(A). { PARSER_TRACE; pCxt->pRootNode = A; }
......
......@@ -719,6 +719,16 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDat
return (SNode*)pCol;
}
SDataType createDataType(uint8_t type) {
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = 0 };
return dt;
}
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = 0 };
return dt;
}
SNode* createCreateTableStmt(SAstCreateContext* pCxt,
bool ignoreExists, const STokenPair* pFullTableName, SNodeList* pCols, STableOptions* pOptions) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
......@@ -732,3 +742,16 @@ SNode* createCreateTableStmt(SAstCreateContext* pCxt,
pStmt->options = *pOptions;
return (SNode*)pStmt;
}
SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
SUseDatabaseStmt* pStmt = (SUseDatabaseStmt*)nodesMakeNode(QUERY_NODE_USE_DATABASE_STMT);
CHECK_OUT_OF_MEM(pStmt);
strncpy(pStmt->dbName, pDbName->z, pDbName->n);
return (SNode*)pStmt;
}
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type) {
SNode* pStmt = nodesMakeNode(QUERY_NODE_SHOW_DATABASE_STMT);;
CHECK_OUT_OF_MEM(pStmt);
return pStmt;
}
......@@ -26,18 +26,19 @@ extern void NewParse(void*, int, SToken, void*);
extern void NewParseFree(void*, FFree);
extern void NewParseTrace(FILE*, char*);
static bool isCmd(const SNode* pRootNode) {
if (NULL == pRootNode) {
return true;
static void setQuery(SAstCreateContext* pCxt, SQuery* pQuery) {
pQuery->pRoot = pCxt->pRootNode;
ENodeType type = nodeType(pCxt->pRootNode);
if (QUERY_NODE_SELECT_STMT == type) {
pQuery->haveResultSet = true;
pQuery->directRpc = false;
} else if (QUERY_NODE_CREATE_TABLE_STMT == type) {
pQuery->haveResultSet = false;
pQuery->directRpc = false;
} else {
pQuery->haveResultSet = false;
pQuery->directRpc = true;
}
switch (nodeType(pRootNode)) {
case QUERY_NODE_SELECT_STMT:
case QUERY_NODE_CREATE_TABLE_STMT:
return false;
default:
break;
}
return true;
}
int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
......@@ -60,6 +61,10 @@ int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
case TK_COMMENT: {
break;
}
case TK_SEMI: {
NewParse(pParser, 0, t0, &cxt);
goto abort_parse;
}
case TK_QUESTION:
case TK_ILLEGAL: {
snprintf(cxt.pQueryCxt->pMsg, cxt.pQueryCxt->msgLen, "unrecognized token: \"%s\"", t0.z);
......@@ -89,8 +94,7 @@ abort_parse:
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->isCmd = isCmd(cxt.pRootNode);
(*pQuery)->pRoot = cxt.pRootNode;
setQuery(&cxt, *pQuery);
}
return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
......@@ -829,10 +829,52 @@ static int32_t translateCreateDatabase(STranslateContext* pCxt, SCreateDatabaseS
return TSDB_CODE_SUCCESS;
}
static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* pStmt) {
SName name = {0};
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
SUseDbReq usedbReq = {0};
tNameExtractFullName(&name, usedbReq.db);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_USE_DB;
pCxt->pCmdMsg->msgLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSUseDbReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &usedbReq);
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateShow(STranslateContext* pCxt) {
SShowReq showReq = { .type = TSDB_MGMT_TABLE_DB };
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_SHOW;
pCxt->pCmdMsg->msgLen = tSerializeSShowReq(NULL, 0, &showReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSShowReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &showReq);
return TSDB_CODE_SUCCESS;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
......@@ -842,6 +884,11 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_CREATE_DATABASE_STMT:
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
break;
case QUERY_NODE_USE_DATABASE_STMT:
code = translateUseDatabase(pCxt, (SUseDatabaseStmt*)pNode);
break;
case QUERY_NODE_SHOW_DATABASE_STMT:
code = translateShow(pCxt);
case QUERY_NODE_CREATE_TABLE_STMT:
code = translateCreateTable(pCxt, (SCreateTableStmt*)pNode);
break;
......@@ -862,7 +909,7 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
return code;
}
int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) {
static int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) {
if (QUERY_NODE_SELECT_STMT == nodeType(pQuery->pRoot)) {
SSelectStmt* pSelect = (SSelectStmt*)pQuery->pRoot;
pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList);
......@@ -882,7 +929,7 @@ int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_SUCCESS;
}
void destroyTranslateContext(STranslateContext* pCxt) {
static void destroyTranslateContext(STranslateContext* pCxt) {
taosArrayDestroy(pCxt->pNsLevel);
if (NULL != pCxt->pCmdMsg) {
tfree(pCxt->pCmdMsg->pMsg);
......@@ -890,6 +937,116 @@ void destroyTranslateContext(STranslateContext* pCxt) {
}
}
typedef struct SVgroupTablesBatch {
SVCreateTbBatchReq req;
SVgroupInfo info;
} SVgroupTablesBatch;
static void toSchema(const SColumnNode* pCol, SSchema* pSchema) {
pSchema->type = pCol->node.resType.type;
pSchema->bytes = pCol->node.resType.bytes;
strcpy(pSchema->name, pCol->colName);
}
static int32_t doBuildSingleTableBatchReq(SName* pTableName, SNodeList* pColumns, SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
req.name = strdup(tNameGetTableName(pTableName));
req.ntbCfg.nCols = LIST_LENGTH(pColumns);
int32_t num = req.ntbCfg.nCols;
req.ntbCfg.pSchema = calloc(num, sizeof(SSchema));
SNode* pCol;
int32_t index = 0;
FOREACH(pCol, pColumns) {
toSchema((SColumnNode*)pCol, req.ntbCfg.pSchema + index++);
}
pBatch->info = *pVgroupInfo;
pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (pBatch->req.pArray == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
taosArrayPush(pBatch->req.pArray, &req);
return TSDB_CODE_SUCCESS;
}
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req));
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
}
static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for(int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
tfree(pTableReq->name);
if (pTableReq->type == TSDB_NORMAL_TABLE) {
tfree(pTableReq->ntbCfg.pSchema);
} else if (pTableReq->type == TSDB_CHILD_TABLE) {
tfree(pTableReq->ctbCfg.pTag);
} else {
assert(0);
}
}
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
if (QUERY_NODE_CREATE_TABLE_STMT == nodeType(pQuery->pRoot)) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(tableName.dbname, pStmt->dbName);
strcpy(tableName.tname, pStmt->tableName);
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &tableName, &info);
SVgroupTablesBatch tbatch = {0};
int32_t code = doBuildSingleTableBatchReq(&tableName, pStmt->pCols, &info, &tbatch);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SArray* pBufArray = taosArrayInit(1, POINTER_BYTES);
if (pBufArray == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
serializeVgroupTablesBatchImpl(&tbatch, pBufArray);
destroyCreateTbReqBatch(&tbatch);
SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
pNewStmt->sqlNodeType = nodeType(pQuery->pRoot);
pNewStmt->pDataBlocks = pBufArray;
pQuery->sqlNodeType = nodeType(pQuery->pRoot);
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pNewStmt;
}
return TSDB_CODE_SUCCESS;
}
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
STranslateContext cxt = {
.pParseCxt = pParseCxt,
......@@ -900,14 +1057,18 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
.currClause = 0
};
int32_t code = fmFuncMgtInit();
if (TSDB_CODE_SUCCESS == code) {
code = rewriteQuery(&cxt, pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(&cxt, pQuery->pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
if (pQuery->isCmd) {
if (pQuery->directRpc) {
pQuery->pCmdMsg = cxt.pCmdMsg;
cxt.pCmdMsg = NULL;
} else {
}
if (pQuery->haveResultSet) {
code = setReslutSchema(&cxt, pQuery);
}
}
......
此差异已折叠。
......@@ -30,18 +30,18 @@ typedef struct SKeyword {
// keywords in sql string
static SKeyword keywordTable[] = {
// {"ID", TK_ID},
// {"BOOL", TK_BOOL},
// {"TINYINT", TK_TINYINT},
// {"SMALLINT", TK_SMALLINT},
// {"INTEGER", TK_INTEGER},
// {"INT", TK_INTEGER},
// {"BIGINT", TK_BIGINT},
// {"FLOAT", TK_FLOAT},
// {"DOUBLE", TK_DOUBLE},
{"BOOL", TK_BOOL},
{"TINYINT", TK_TINYINT},
{"SMALLINT", TK_SMALLINT},
{"INTEGER", TK_INTEGER},
{"INT", TK_INTEGER},
{"BIGINT", TK_BIGINT},
{"FLOAT", TK_FLOAT},
{"DOUBLE", TK_DOUBLE},
// {"STRING", TK_STRING},
{"TIMESTAMP", TK_TIMESTAMP},
// {"BINARY", TK_BINARY},
// {"NCHAR", TK_NCHAR},
{"BINARY", TK_BINARY},
{"NCHAR", TK_NCHAR},
{"OR", TK_OR},
{"AND", TK_AND},
{"NOT", TK_NOT},
......@@ -74,8 +74,8 @@ static SKeyword keywordTable[] = {
// {"UMINUS", TK_UMINUS},
// {"UPLUS", TK_UPLUS},
// {"BITNOT", TK_BITNOT},
// {"SHOW", TK_SHOW},
// {"DATABASES", TK_DATABASES},
{"SHOW", TK_SHOW},
{"DATABASES", TK_DATABASES},
// {"MNODES", TK_MNODES},
// {"DNODES", TK_DNODES},
// {"ACCOUNTS", TK_ACCOUNTS},
......@@ -92,12 +92,12 @@ static SKeyword keywordTable[] = {
// {"STABLES", TK_STABLES},
{"VGROUPS", TK_VGROUPS},
// {"DROP", TK_DROP},
// {"TABLE", TK_TABLE},
{"TABLE", TK_TABLE},
{"DATABASE", TK_DATABASE},
// {"DNODE", TK_DNODE},
// {"USER", TK_USER},
// {"ACCOUNT", TK_ACCOUNT},
// {"USE", TK_USE},
{"USE", TK_USE},
// {"DESCRIBE", TK_DESCRIBE},
// {"SYNCDB", TK_SYNCDB},
// {"ALTER", TK_ALTER},
......@@ -309,7 +309,7 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
if (z[1] == '-') {
for (i = 2; z[i] && z[i] != '\n'; i++) {
}
*tokenId = TK_COMMENT;
*tokenId = TK_NK_COMMENT;
return i;
}
*tokenId = TK_MINUS;
......@@ -343,7 +343,7 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
for (i = 3; z[i] && (z[i] != '/' || z[i - 1] != '*'); i++) {
}
if (z[i]) i++;
*tokenId = TK_COMMENT;
*tokenId = TK_NK_COMMENT;
return i;
}
case '%': {
......
......@@ -696,3 +696,23 @@ TEST_F(ParserTest, selectSemanticError) {
bind("SELECT DISTINCT c2 FROM t1 WHERE c1 > 0 ORDER BY count(c2)");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION));
}
TEST_F(ParserTest, createDatabase) {
bind("create database wxy_db");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, showDatabase) {
bind("show databases");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, useDatabase) {
bind("use wxy_db");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, createTable) {
bind("create table t1(ts timestamp, c1 int)");
ASSERT_TRUE(run());
}
......@@ -39,6 +39,15 @@ extern "C" {
} \
} while (0)
#define CHECK_CODE_EXT(exec) \
do { \
int32_t code = (exec); \
if (TSDB_CODE_SUCCESS != code) { \
pCxt->errCode = code; \
return code; \
} \
} while (0)
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan);
......
......@@ -348,10 +348,20 @@ static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return pRoot;
}
static SLogicNode* createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpStmt* pStmt) {
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIF);
CHECK_ALLOC(pModif, NULL);
pModif->pDataBlocks = pStmt->pDataBlocks;
pModif->msgType = (QUERY_NODE_CREATE_TABLE_STMT == pStmt->sqlNodeType ? TDMT_VND_CREATE_TABLE : TDMT_VND_SUBMIT);
return (SLogicNode*)pModif;
}
static SLogicNode* createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SELECT_STMT:
return createSelectLogicNode(pCxt, (SSelectStmt*)pStmt);
return createSelectLogicNode(pCxt, (SSelectStmt*)pStmt);
case QUERY_NODE_VNODE_MODIF_STMT:
return createVnodeModifLogicNode(pCxt, (SVnodeModifOpStmt*)pStmt);
default:
break;
}
......
......@@ -466,10 +466,26 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
return pPhyNode;
}
static SDataSinkNode* createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks) {
SDataInserterNode* pInserter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
pInserter->numOfTables = pBlocks->numOfTables;
pInserter->size = pBlocks->size;
TSWAP(pInserter->pData, pBlocks->pData, char*);
return (SDataSinkNode*)pInserter;
}
static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) {
SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
CHECK_ALLOC(pSubplan, NULL);
pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode);
if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
pSubplan->pDataSink = createDataInserter(pCxt, pModif->pVgDataBlocks);
pSubplan->msgType = pModif->msgType;
} else {
pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode);
// pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
}
pSubplan->subplanType = pLogicSubplan->subplanType;
return pSubplan;
}
......@@ -484,57 +500,130 @@ static int32_t strictListAppend(SNodeList* pList, SNodeptr pNode) {
return code;
}
static SQueryLogicPlan* createRawQueryLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode) {
static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubLogicPlan** pSubLogicPlan) {
*pSubLogicPlan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
CHECK_ALLOC(*pSubLogicPlan, TSDB_CODE_OUT_OF_MEMORY);
// todo pSubplan->pNode = nodesCloneNode(pLogicNode);
(*pSubLogicPlan)->pNode = pLogicNode;
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIF == nodeType(pLogicNode)) {
(*pSubLogicPlan)->subplanType = SUBPLAN_TYPE_MODIFY;
}
// todo split
return TSDB_CODE_SUCCESS;
}
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t level, SNodeList* pSubplans) {
SNodeListNode* pGroup;
if (level >= LIST_LENGTH(pSubplans)) {
pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
CHECK_ALLOC(pGroup, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(strictListAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY);
} else {
pGroup = nodesListGetNode(pSubplans, level);
}
if (NULL == pGroup->pNodeList) {
pGroup->pNodeList = nodesMakeList();
CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY);
}
CHECK_CODE(strictListAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY);
}
SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) {
SSubLogicPlan* pDst = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
CHECK_ALLOC(pDst, NULL);
// todo pDst->pNode = nodesCloneNode(pSrc->pNode);
pDst->pNode = pSrc->pNode;
if (NULL == pDst->pNode) {
nodesDestroyNode(pDst);
return NULL;
}
pDst->subplanType = pSrc->subplanType;
pDst->level = level;
return pDst;
}
static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) {
SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode;
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
for (int32_t i = 0; i < numOfVgroups; ++i) {
SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
pNewSubplan->execNode.epset = blocks->vg.epset;
((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks;
CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
}
} else {
SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
}
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, level + 1, pLogicPlan));
}
return TSDB_CODE_SUCCESS;
}
static SQueryLogicPlan* makeQueryLogicPlan(SPhysiPlanContext* pCxt) {
SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN);
CHECK_ALLOC(pLogicPlan, NULL);
pLogicPlan->pSubplans = nodesMakeList();
CHECK_ALLOC(pLogicPlan->pSubplans, pLogicPlan);
SNodeListNode* pTopSubplans = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
CHECK_ALLOC(pTopSubplans, pLogicPlan);
CHECK_CODE(strictListAppend(pLogicPlan->pSubplans, pTopSubplans), pLogicPlan);
pTopSubplans->pNodeList = nodesMakeList();
CHECK_ALLOC(pTopSubplans->pNodeList, pLogicPlan);
SSubLogicPlan* pSubplan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
CHECK_ALLOC(pSubplan, pLogicPlan);
CHECK_CODE(strictListAppend(pTopSubplans->pNodeList, pSubplan), pLogicPlan);
pSubplan->pNode = pLogicNode;
CHECK_ALLOC(pSubplan->pNode, pLogicPlan);
if (NULL == pLogicPlan->pSubplans) {
nodesDestroyNode(pLogicPlan);
return NULL;
}
return pLogicPlan;
}
static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SQueryLogicPlan** pLogicPlan) {
SQueryLogicPlan* pPlan = createRawQueryLogicPlan(pCxt, pLogicNode);
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
nodesDestroyNode((SNode*)pPlan);
return pCxt->errCode;
static int32_t scaleOutLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pRootSubLogicPlan, SQueryLogicPlan** pLogicPlan) {
*pLogicPlan = makeQueryLogicPlan(pCxt);
CHECK_ALLOC(*pLogicPlan, TSDB_CODE_OUT_OF_MEMORY);
return doScaleOut(pCxt, pRootSubLogicPlan, 0, *pLogicPlan);
}
typedef struct SBuildPhysiSubplanCxt {
int32_t errCode;
SQueryPlan* pQueryPlan;
SPhysiPlanContext* pPhyCxt;
} SBuildPhysiSubplanCxt;
static EDealRes doBuildPhysiSubplan(SNode* pNode, void* pContext) {
SBuildPhysiSubplanCxt* pCxt = (SBuildPhysiSubplanCxt*)pContext;
if (QUERY_NODE_LOGIC_SUBPLAN == nodeType(pNode)) {
SSubplan* pSubplan = createPhysiSubplan(pCxt->pPhyCxt, (SSubLogicPlan*)pNode);
CHECK_ALLOC(pSubplan, DEAL_RES_ERROR);
CHECK_CODE(pushSubplan(pCxt->pPhyCxt, pSubplan, ((SSubLogicPlan*)pNode)->level, pCxt->pQueryPlan->pSubplans), DEAL_RES_ERROR);
++(pCxt->pQueryPlan->numOfSubplans);
return DEAL_RES_IGNORE_CHILD;
}
// todo split
*pLogicPlan = pPlan;
return TSDB_CODE_SUCCESS;
return DEAL_RES_CONTINUE;
}
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
SQueryPlan* pQueryPlan = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
CHECK_ALLOC(pQueryPlan, TSDB_CODE_OUT_OF_MEMORY);
*pPlan = pQueryPlan;
pQueryPlan->queryId = pCxt->pPlanCxt->queryId;
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
SQueryPlan* pPlan = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
CHECK_ALLOC(pPlan, NULL);
pPlan->pSubplans = nodesMakeList();
if (NULL == pPlan->pSubplans) {
nodesDestroyNode(pPlan);
return NULL;
}
pPlan->queryId = pCxt->pPlanCxt->queryId;
return pPlan;
}
pQueryPlan->pSubplans = nodesMakeList();
CHECK_ALLOC(pQueryPlan->pSubplans, TSDB_CODE_OUT_OF_MEMORY);
SNode* pNode;
FOREACH(pNode, pLogicPlan->pSubplans) {
SNodeListNode* pLevelSubplans = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
CHECK_ALLOC(pLevelSubplans, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(strictListAppend(pQueryPlan->pSubplans, pLevelSubplans), TSDB_CODE_OUT_OF_MEMORY);
pLevelSubplans->pNodeList = nodesMakeList();
CHECK_ALLOC(pLevelSubplans->pNodeList, TSDB_CODE_OUT_OF_MEMORY);
SNode* pLogicSubplan;
FOREACH(pLogicSubplan, ((SNodeListNode*)pNode)->pNodeList) {
CHECK_CODE(strictListAppend(pLevelSubplans->pNodeList,
createPhysiSubplan(pCxt, (SSubLogicPlan*)pLogicSubplan)), TSDB_CODE_OUT_OF_MEMORY);
++(pQueryPlan->numOfSubplans);
}
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
SBuildPhysiSubplanCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pQueryPlan = makeQueryPhysiPlan(pCxt), .pPhyCxt = pCxt };
CHECK_ALLOC(cxt.pQueryPlan, TSDB_CODE_OUT_OF_MEMORY);
nodesWalkList(pLogicPlan->pSubplans, doBuildPhysiSubplan, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyNode(cxt.pQueryPlan);
return cxt.errCode;
}
*pPlan = cxt.pQueryPlan;
return TSDB_CODE_SUCCESS;
}
......@@ -549,8 +638,11 @@ int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan**
return TSDB_CODE_OUT_OF_MEMORY;
}
SQueryLogicPlan* pLogicPlan;
int32_t code = splitLogicPlan(&cxt, pLogicNode, &pLogicPlan);
// todo scale out
SSubLogicPlan* pSubLogicPlan;
int32_t code = splitLogicPlan(&cxt, pLogicNode, &pSubLogicPlan);
if (TSDB_CODE_SUCCESS == code) {
code = scaleOutLogicPlan(&cxt, pSubLogicPlan, &pLogicPlan);
}
// todo maping
if (TSDB_CODE_SUCCESS == code) {
code = buildPhysiPlan(&cxt, pLogicPlan, pPlan);
......
......@@ -38,6 +38,13 @@ void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstrea
}
int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
if (SUBPLAN_TYPE_MODIFY == subplan->subplanType) {
SDataInserterNode* insert = (SDataInserterNode*)subplan->pDataSink;
*len = insert->size;
*str = insert->pData;
insert->pData = NULL;
return TSDB_CODE_SUCCESS;
}
return nodesNodeToString((const SNode*)subplan, false, str, len);
}
......
......@@ -151,3 +151,8 @@ TEST_F(PlannerTest, subquery) {
bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, createTable) {
bind("create table t1(ts timestamp, c1 int)");
ASSERT_TRUE(run());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册