提交 f8f7aabf 编写于 作者: X Xiaoyu Wang

TD-13597 create/drop topic, alter database, drop index, drop qnode statement implement

上级 82431b84
......@@ -1157,8 +1157,8 @@ typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
int8_t igExists;
char* sql;
char* physicalPlan;
char* logicalPlan;
char* ast;
char subscribeDbName[TSDB_DB_NAME_LEN];
} SCMCreateTopicReq;
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
......
......@@ -108,53 +108,54 @@
#define TK_FULLTEXT 90
#define TK_FUNCTION 91
#define TK_INTERVAL 92
#define TK_MNODES 93
#define TK_NK_FLOAT 94
#define TK_NK_BOOL 95
#define TK_NK_VARIABLE 96
#define TK_BETWEEN 97
#define TK_IS 98
#define TK_NULL 99
#define TK_NK_LT 100
#define TK_NK_GT 101
#define TK_NK_LE 102
#define TK_NK_GE 103
#define TK_NK_NE 104
#define TK_NK_EQ 105
#define TK_LIKE 106
#define TK_MATCH 107
#define TK_NMATCH 108
#define TK_IN 109
#define TK_FROM 110
#define TK_AS 111
#define TK_JOIN 112
#define TK_INNER 113
#define TK_SELECT 114
#define TK_DISTINCT 115
#define TK_WHERE 116
#define TK_PARTITION 117
#define TK_BY 118
#define TK_SESSION 119
#define TK_STATE_WINDOW 120
#define TK_SLIDING 121
#define TK_FILL 122
#define TK_VALUE 123
#define TK_NONE 124
#define TK_PREV 125
#define TK_LINEAR 126
#define TK_NEXT 127
#define TK_GROUP 128
#define TK_HAVING 129
#define TK_ORDER 130
#define TK_SLIMIT 131
#define TK_SOFFSET 132
#define TK_LIMIT 133
#define TK_OFFSET 134
#define TK_ASC 135
#define TK_DESC 136
#define TK_NULLS 137
#define TK_FIRST 138
#define TK_LAST 139
#define TK_TOPIC 93
#define TK_AS 94
#define TK_MNODES 95
#define TK_NK_FLOAT 96
#define TK_NK_BOOL 97
#define TK_NK_VARIABLE 98
#define TK_BETWEEN 99
#define TK_IS 100
#define TK_NULL 101
#define TK_NK_LT 102
#define TK_NK_GT 103
#define TK_NK_LE 104
#define TK_NK_GE 105
#define TK_NK_NE 106
#define TK_NK_EQ 107
#define TK_LIKE 108
#define TK_MATCH 109
#define TK_NMATCH 110
#define TK_IN 111
#define TK_FROM 112
#define TK_JOIN 113
#define TK_INNER 114
#define TK_SELECT 115
#define TK_DISTINCT 116
#define TK_WHERE 117
#define TK_PARTITION 118
#define TK_BY 119
#define TK_SESSION 120
#define TK_STATE_WINDOW 121
#define TK_SLIDING 122
#define TK_FILL 123
#define TK_VALUE 124
#define TK_NONE 125
#define TK_PREV 126
#define TK_LINEAR 127
#define TK_NEXT 128
#define TK_GROUP 129
#define TK_HAVING 130
#define TK_ORDER 131
#define TK_SLIMIT 132
#define TK_SOFFSET 133
#define TK_LIMIT 134
#define TK_OFFSET 135
#define TK_ASC 136
#define TK_DESC 137
#define TK_NULLS 138
#define TK_FIRST 139
#define TK_LAST 140
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -61,6 +61,12 @@ typedef struct SDropDatabaseStmt {
bool ignoreNotExists;
} SDropDatabaseStmt;
typedef struct SAlterDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
SDatabaseOptions* pOptions;
} SAlterDatabaseStmt;
typedef struct STableOptions {
ENodeType type;
int32_t keep;
......@@ -179,11 +185,36 @@ typedef struct SCreateIndexStmt {
SIndexOptions* pOptions;
} SCreateIndexStmt;
typedef struct SDropIndexStmt {
ENodeType type;
char indexName[TSDB_INDEX_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
} SDropIndexStmt;
typedef struct SCreateQnodeStmt {
ENodeType type;
int32_t dnodeId;
} SCreateQnodeStmt;
typedef struct SDropQnodeStmt {
ENodeType type;
int32_t dnodeId;
} SDropQnodeStmt;
typedef struct SCreateTopicStmt {
ENodeType type;
char topicName[TSDB_TABLE_NAME_LEN];
char subscribeDbName[TSDB_DB_NAME_LEN];
bool ignoreExists;
SNode* pQuery;
} SCreateTopicStmt;
typedef struct SDropTopicStmt {
ENodeType type;
char topicName[TSDB_TABLE_NAME_LEN];
bool ignoreNotExists;
} SDropTopicStmt;
#ifdef __cplusplus
}
#endif
......
......@@ -77,6 +77,7 @@ typedef enum ENodeType {
QUERY_NODE_VNODE_MODIF_STMT,
QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_DROP_DATABASE_STMT,
QUERY_NODE_ALTER_DATABASE_STMT,
QUERY_NODE_SHOW_DATABASES_STMT, // temp
QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
......@@ -98,7 +99,11 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_QNODES_STMT,
QUERY_NODE_CREATE_INDEX_STMT,
QUERY_NODE_DROP_INDEX_STMT,
QUERY_NODE_CREATE_QNODE_STMT,
QUERY_NODE_DROP_QNODE_STMT,
QUERY_NODE_CREATE_TOPIC_STMT,
QUERY_NODE_DROP_TOPIC_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
......
......@@ -482,7 +482,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
}
tscDebug("start to create topic, %s", topicName);
#if 0
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
......@@ -536,7 +536,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
#endif
_return:
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
......
......@@ -1990,11 +1990,9 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
int32_t sqlLen = 0;
int32_t physicalPlanLen = 0;
int32_t logicalPlanLen = 0;
int32_t astLen = 0;
if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql);
if (pReq->physicalPlan != NULL) physicalPlanLen = (int32_t)strlen(pReq->physicalPlan);
if (pReq->logicalPlan != NULL) logicalPlanLen = (int32_t)strlen(pReq->logicalPlan);
if (pReq->ast != NULL) astLen = (int32_t)strlen(pReq->ast);
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2003,11 +2001,9 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, physicalPlanLen) < 0) return -1;
if (tEncodeI32(&encoder, logicalPlanLen) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
tEndEncode(&encoder);
......@@ -2018,8 +2014,7 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicReq *pReq) {
int32_t sqlLen = 0;
int32_t physicalPlanLen = 0;
int32_t logicalPlanLen = 0;
int32_t astLen = 0;
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2028,17 +2023,20 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &physicalPlanLen) < 0) return -1;
if (tDecodeI32(&decoder, &logicalPlanLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (sqlLen > 0) {
pReq->sql = calloc(1, sqlLen + 1);
if (pReq->sql == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
}
pReq->sql = calloc(1, sqlLen + 1);
pReq->physicalPlan = calloc(1, physicalPlanLen + 1);
pReq->logicalPlan = calloc(1, logicalPlanLen + 1);
if (pReq->sql == NULL || pReq->physicalPlan == NULL || pReq->logicalPlan == NULL) return -1;
if (astLen > 0) {
pReq->ast = calloc(1, astLen + 1);
if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
}
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->physicalPlan) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->logicalPlan) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
......@@ -2047,8 +2045,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
tfree(pReq->sql);
tfree(pReq->physicalPlan);
tfree(pReq->logicalPlan);
tfree(pReq->ast);
}
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {
......
......@@ -236,6 +236,25 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
return 0;
}
static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) {
SNode* pAst = NULL;
int32_t code = nodesStringToNode(pCreate->ast, &pAst);
SQueryPlan* pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = { .pAstRoot = pAst, .streamQuery = true };
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pPlan, false, pStr, NULL);
}
nodesDestroyNode(pAst);
nodesDestroyNode(pPlan);
terrno = code;
return code;
}
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
mDebug("topic:%s to create", pCreate->name);
SMqTopicObj topicObj = {0};
......@@ -247,10 +266,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq
topicObj.dbUid = pDb->uid;
topicObj.version = 1;
topicObj.sql = pCreate->sql;
topicObj.physicalPlan = pCreate->physicalPlan;
topicObj.logicalPlan = pCreate->logicalPlan;
topicObj.logicalPlan = NULL;
topicObj.sqlLen = strlen(pCreate->sql);
if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &topicObj.physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
......
......@@ -65,8 +65,7 @@ void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql,
strcpy(createReq.name, topicName);
createReq.igExists = 0;
createReq.sql = (char*)sql;
createReq.physicalPlan = (char*)"physicalPlan";
createReq.logicalPlan = (char*)"logicalPlan";
createReq.ast = (char*)"ast";
int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
......
......@@ -1730,7 +1730,7 @@ static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** p
}
int32_t nodesNodeToString(const SNodeptr pNode, bool format, char** pStr, int32_t* pLen) {
if (NULL == pNode || NULL == pStr || NULL == pLen) {
if (NULL == pNode || NULL == pStr) {
terrno = TSDB_CODE_FAILED;
return TSDB_CODE_FAILED;
}
......@@ -1750,7 +1750,10 @@ int32_t nodesNodeToString(const SNodeptr pNode, bool format, char** pStr, int32_
*pStr = format ? tjsonToString(pJson) : tjsonToUnformattedString(pJson);
tjsonDelete(pJson);
*pLen = strlen(*pStr) + 1;
if (NULL != pLen) {
*pLen = strlen(*pStr) + 1;
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -92,6 +92,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SCreateDatabaseStmt));
case QUERY_NODE_DROP_DATABASE_STMT:
return makeNode(type, sizeof(SDropDatabaseStmt));
case QUERY_NODE_ALTER_DATABASE_STMT:
return makeNode(type, sizeof(SAlterDatabaseStmt));
case QUERY_NODE_SHOW_DATABASES_STMT:
return makeNode(type, sizeof(SShowStmt));
case QUERY_NODE_CREATE_TABLE_STMT:
......@@ -131,8 +133,16 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SShowStmt));
case QUERY_NODE_CREATE_INDEX_STMT:
return makeNode(type, sizeof(SCreateIndexStmt));
case QUERY_NODE_DROP_INDEX_STMT:
return makeNode(type, sizeof(SDropIndexStmt));
case QUERY_NODE_CREATE_QNODE_STMT:
return makeNode(type, sizeof(SCreateQnodeStmt));
case QUERY_NODE_DROP_QNODE_STMT:
return makeNode(type, sizeof(SDropQnodeStmt));
case QUERY_NODE_CREATE_TOPIC_STMT:
return makeNode(type, sizeof(SCreateTopicStmt));
case QUERY_NODE_DROP_TOPIC_STMT:
return makeNode(type, sizeof(SDropTopicStmt));
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN:
......
......@@ -113,6 +113,7 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt);
SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOptionType type, const SToken* pVal);
SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SNode* pOptions);
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pDbName);
SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName, SNode* pOptions);
SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, const SToken* pVal);
SNode* setTableSmaOption(SAstCreateContext* pCxt, SNode* pOptions, SNodeList* pSma);
......@@ -134,7 +135,11 @@ SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, const SToken* pIndexName, const SToken* pTableName, SNodeList* pCols, SNode* pOptions);
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, const SToken* pIndexName, const SToken* pTableName);
SNode* createCreateQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
SNode* createDropQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery, const SToken* pSubscribeDbName);
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName);
#ifdef __cplusplus
}
......
......@@ -42,17 +42,17 @@
//%right NK_BITNOT.
/************************************************ create/alter/drop/show user *****************************************/
cmd ::= CREATE USER user_name(A) PASS NK_STRING(B). { pCxt->pRootNode = createCreateUserStmt(pCxt, &A, &B);}
cmd ::= ALTER USER user_name(A) PASS NK_STRING(B). { pCxt->pRootNode = createAlterUserStmt(pCxt, &A, TSDB_ALTER_USER_PASSWD, &B);}
cmd ::= ALTER USER user_name(A) PRIVILEGE NK_STRING(B). { pCxt->pRootNode = createAlterUserStmt(pCxt, &A, TSDB_ALTER_USER_PRIVILEGES, &B);}
cmd ::= CREATE USER user_name(A) PASS NK_STRING(B). { pCxt->pRootNode = createCreateUserStmt(pCxt, &A, &B); }
cmd ::= ALTER USER user_name(A) PASS NK_STRING(B). { pCxt->pRootNode = createAlterUserStmt(pCxt, &A, TSDB_ALTER_USER_PASSWD, &B); }
cmd ::= ALTER USER user_name(A) PRIVILEGE NK_STRING(B). { pCxt->pRootNode = createAlterUserStmt(pCxt, &A, TSDB_ALTER_USER_PRIVILEGES, &B); }
cmd ::= DROP USER user_name(A). { pCxt->pRootNode = createDropUserStmt(pCxt, &A); }
cmd ::= SHOW USERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_USERS_STMT, NULL); }
/************************************************ create/drop/show dnode **********************************************/
cmd ::= CREATE DNODE dnode_endpoint(A). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, NULL);}
cmd ::= CREATE DNODE dnode_host_name(A) PORT NK_INTEGER(B). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, &B);}
cmd ::= DROP DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A);}
cmd ::= DROP DNODE dnode_endpoint(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A);}
cmd ::= CREATE DNODE dnode_endpoint(A). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, NULL); }
cmd ::= CREATE DNODE dnode_host_name(A) PORT NK_INTEGER(B). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, &B); }
cmd ::= DROP DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A); }
cmd ::= DROP DNODE dnode_endpoint(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A); }
cmd ::= SHOW DNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DNODES_STMT, NULL); }
%type dnode_endpoint { SToken }
......@@ -64,15 +64,17 @@ dnode_endpoint(A) ::= NK_STRING(B).
dnode_host_name(A) ::= NK_ID(B). { A = B; }
dnode_host_name(A) ::= NK_IPTOKEN(B). { A = B; }
/************************************************ create qnode ********************************************************/
/************************************************ create/drop qnode ***************************************************/
cmd ::= CREATE QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateQnodeStmt(pCxt, &A); }
cmd ::= DROP QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropQnodeStmt(pCxt, &A); }
cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT, NULL); }
/************************************************ create/drop/show/use database ***************************************/
cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C);}
cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C); }
cmd ::= DROP DATABASE exists_opt(A) db_name(B). { pCxt->pRootNode = createDropDatabaseStmt(pCxt, A, &B); }
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASES_STMT, NULL); }
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A);}
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A); }
cmd ::= ALTER DATABASE db_name(A) db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); }
%type not_exists_opt { bool }
%destructor not_exists_opt { }
......@@ -198,6 +200,7 @@ col_name(A) ::= column_name(B).
cmd ::= CREATE SMA INDEX index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, &A, &B, NULL, C); }
cmd ::= CREATE FULLTEXT INDEX
index_name(A) ON table_name(B) NK_LP col_name_list(C) NK_RP. { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_FULLTEXT, &A, &B, C, NULL); }
cmd ::= DROP INDEX index_name(A) ON table_name(B). { pCxt->pRootNode = createDropIndexStmt(pCxt, &A, &B); }
index_options(A) ::= . { A = NULL; }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
......@@ -212,6 +215,11 @@ func_list(A) ::= func_list(B) NK_COMMA func(C).
func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); }
/************************************************ create/drop topic ***************************************************/
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_expression(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, C, NULL); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS db_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, &C); }
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
/************************************************ show vgroups ********************************************************/
cmd ::= SHOW VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, NULL); }
cmd ::= SHOW db_name(B) NK_DOT VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, &B); }
......@@ -270,6 +278,10 @@ user_name(A) ::= NK_ID(B).
%destructor index_name { }
index_name(A) ::= NK_ID(B). { A = B; }
%type topic_name { SToken }
%destructor topic_name { }
topic_name(A) ::= NK_ID(B). { A = B; }
/************************************************ expression **********************************************************/
expression(A) ::= literal(B). { A = B; }
//expression(A) ::= NK_QUESTION(B). { A = B; }
......
......@@ -799,6 +799,17 @@ SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, con
return (SNode*)pStmt;
}
SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName, SNode* pOptions) {
if (!checkDbName(pCxt, pDbName)) {
return NULL;
}
SAlterDatabaseStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_DATABASE_STMT);
CHECK_OUT_OF_MEM(pStmt);
strncpy(pStmt->dbName, pDbName->z, pDbName->n);
pStmt->pOptions = (SDatabaseOptions*)pOptions;
return (SNode*)pStmt;
}
SNode* createDefaultTableOptions(SAstCreateContext* pCxt) {
STableOptions* pOptions = nodesMakeNode(QUERY_NODE_TABLE_OPTIONS);
CHECK_OUT_OF_MEM(pOptions);
......@@ -1022,9 +1033,47 @@ SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInt
return (SNode*)pOptions;
}
SNode* createDropIndexStmt(SAstCreateContext* pCxt, const SToken* pIndexName, const SToken* pTableName) {
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName)) {
return NULL;
}
SDropIndexStmt* pStmt = nodesMakeNode(QUERY_NODE_DROP_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
strncpy(pStmt->indexName, pIndexName->z, pIndexName->n);
strncpy(pStmt->tableName, pTableName->z, pTableName->n);
return (SNode*)pStmt;
}
SNode* createCreateQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId) {
SCreateQnodeStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_QNODE_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->dnodeId = strtol(pDnodeId->z, NULL, 10);;
return (SNode*)pStmt;
}
SNode* createDropQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId) {
SDropQnodeStmt* pStmt = nodesMakeNode(QUERY_NODE_DROP_QNODE_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->dnodeId = strtol(pDnodeId->z, NULL, 10);;
return (SNode*)pStmt;
}
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery, const SToken* pSubscribeDbName) {
SCreateTopicStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
CHECK_OUT_OF_MEM(pStmt);
strncpy(pStmt->topicName, pTopicName->z, pTopicName->n);
pStmt->ignoreExists = ignoreExists;
pStmt->pQuery = pQuery;
if (NULL != pSubscribeDbName) {
strncpy(pStmt->subscribeDbName, pSubscribeDbName->z, pSubscribeDbName->n);
}
return (SNode*)pStmt;
}
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName) {
SDropTopicStmt* pStmt = nodesMakeNode(QUERY_NODE_DROP_TOPIC_STMT);
CHECK_OUT_OF_MEM(pStmt);
strncpy(pStmt->topicName, pTopicName->z, pTopicName->n);
pStmt->ignoreNotExists = ignoreNotExists;
return (SNode*)pStmt;
}
......@@ -121,6 +121,7 @@ static SKeyword keywordTable[] = {
{"TAGS", TK_TAGS},
{"TIMESTAMP", TK_TIMESTAMP},
{"TINYINT", TK_TINYINT},
{"TOPIC", TK_TOPIC},
{"TTL", TK_TTL},
{"UNION", TK_UNION},
{"UNSIGNED", TK_UNSIGNED},
......@@ -230,7 +231,6 @@ static SKeyword keywordTable[] = {
// {"TBNAME", TK_TBNAME},
// {"VNODES", TK_VNODES},
// {"PARTITIONS", TK_PARTITIONS},
// {"TOPIC", TK_TOPIC},
// {"TOPICS", TK_TOPICS},
// {"COMPACT", TK_COMPACT},
// {"MODIFY", TK_MODIFY},
......
......@@ -21,14 +21,6 @@
#include "parUtil.h"
#include "ttime.h"
static bool afterGroupBy(ESqlClause clause) {
return clause > SQL_CLAUSE_GROUP_BY;
}
static bool beforeHaving(ESqlClause clause) {
return clause < SQL_CLAUSE_HAVING;
}
typedef struct STranslateContext {
SParseContext* pParseCxt;
int32_t errCode;
......@@ -41,6 +33,15 @@ typedef struct STranslateContext {
} STranslateContext;
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode);
static bool afterGroupBy(ESqlClause clause) {
return clause > SQL_CLAUSE_GROUP_BY;
}
static bool beforeHaving(ESqlClause clause) {
return clause < SQL_CLAUSE_HAVING;
}
static EDealRes generateDealNodeErrMsg(STranslateContext* pCxt, int32_t errCode, ...) {
va_list vArgList;
......@@ -833,6 +834,41 @@ static int32_t translateDropDatabase(STranslateContext* pCxt, SDropDatabaseStmt*
return TSDB_CODE_SUCCESS;
}
static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt, SAlterDbReq* pReq) {
SName name = {0};
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, pReq->db);
pReq->totalBlocks = pStmt->pOptions->numOfBlocks;
pReq->daysToKeep0 = pStmt->pOptions->keep;
pReq->daysToKeep1 = -1;
pReq->daysToKeep2 = -1;
pReq->fsyncPeriod = pStmt->pOptions->fsyncPeriod;
pReq->walLevel = pStmt->pOptions->walLevel;
pReq->quorum = pStmt->pOptions->quorum;
pReq->cacheLastRow = pStmt->pOptions->cachelast;
return;
}
static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt) {
SAlterDbReq alterReq = {0};
buildAlterDbReq(pCxt, pStmt, &alterReq);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_ALTER_DB;
pCxt->pCmdMsg->msgLen = tSerializeSAlterDbReq(NULL, 0, &alterReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSAlterDbReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &alterReq);
return TSDB_CODE_SUCCESS;
}
static int32_t columnNodeToField(SNodeList* pList, SArray** pArray) {
*pArray = taosArrayInit(LIST_LENGTH(pList), sizeof(SField));
SNode* pNode;
......@@ -1201,6 +1237,27 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p
}
}
static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) {
SVDropTSmaReq dropSmaReq = {0};
strcpy(dropSmaReq.indexName, pStmt->indexName);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_VND_DROP_SMA;
pCxt->pCmdMsg->msgLen = tSerializeSVDropTSmaReq(NULL, &dropSmaReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* pBuf = pCxt->pCmdMsg->pMsg;
tSerializeSVDropTSmaReq(&pBuf, &dropSmaReq);
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateQnode(STranslateContext* pCxt, SCreateQnodeStmt* pStmt) {
SMCreateQnodeReq createReq = { .dnodeId = pStmt->dnodeId };
......@@ -1220,6 +1277,93 @@ static int32_t translateCreateQnode(STranslateContext* pCxt, SCreateQnodeStmt* p
return TSDB_CODE_SUCCESS;
}
static int32_t translateDropQnode(STranslateContext* pCxt, SDropQnodeStmt* pStmt) {
SDDropQnodeReq dropReq = { .dnodeId = pStmt->dnodeId };
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_DND_DROP_QNODE;
pCxt->pCmdMsg->msgLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSMCreateDropQSBNodeReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &dropReq);
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
SCMCreateTopicReq createReq = {0};
if (NULL != pStmt->pQuery) {
int32_t code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
}
if (TSDB_CODE_SUCCESS != code ) {
return code;
}
} else {
strcpy(createReq.subscribeDbName, pStmt->subscribeDbName);
}
createReq.sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == createReq.sql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pCxt->pParseCxt->db);
strcpy(name.tname, pStmt->topicName);
tNameExtractFullName(&name, createReq.name);
createReq.igExists = pStmt->ignoreExists;
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_CREATE_TOPIC;
pCxt->pCmdMsg->msgLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSCMCreateTopicReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &createReq);
tFreeSCMCreateTopicReq(&createReq);
return TSDB_CODE_SUCCESS;
}
static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt) {
SMDropTopicReq dropReq = {0};
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pCxt->pParseCxt->db);
strcpy(name.tname, pStmt->topicName);
tNameExtractFullName(&name, dropReq.name);
dropReq.igNotExists = pStmt->ignoreNotExists;
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_DROP_TOPIC;
pCxt->pCmdMsg->msgLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSMDropTopicReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &dropReq);
return TSDB_CODE_SUCCESS;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
......@@ -1232,6 +1376,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_DROP_DATABASE_STMT:
code = translateDropDatabase(pCxt, (SDropDatabaseStmt*)pNode);
break;
case QUERY_NODE_ALTER_DATABASE_STMT:
code = translateAlterDatabase(pCxt, (SAlterDatabaseStmt*)pNode);
break;
case QUERY_NODE_CREATE_TABLE_STMT:
code = translateCreateSuperTable(pCxt, (SCreateTableStmt*)pNode);
break;
......@@ -1274,9 +1421,21 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_CREATE_INDEX_STMT:
code = translateCreateIndex(pCxt, (SCreateIndexStmt*)pNode);
break;
case QUERY_NODE_DROP_INDEX_STMT:
code = translateDropIndex(pCxt, (SDropIndexStmt*)pNode);
break;
case QUERY_NODE_CREATE_QNODE_STMT:
code = translateCreateQnode(pCxt, (SCreateQnodeStmt*)pNode);
break;
case QUERY_NODE_DROP_QNODE_STMT:
code = translateDropQnode(pCxt, (SDropQnodeStmt*)pNode);
break;
case QUERY_NODE_CREATE_TOPIC_STMT:
code = translateCreateTopic(pCxt, (SCreateTopicStmt*)pNode);
break;
case QUERY_NODE_DROP_TOPIC_STMT:
code = translateDropTopic(pCxt, (SDropTopicStmt*)pNode);
break;
default:
break;
}
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -340,6 +340,23 @@ TEST_F(ParserTest, createDatabase) {
ASSERT_TRUE(run());
}
TEST_F(ParserTest, alterDatabase) {
setDatabase("root", "test");
bind("alter database wxy_db BLOCKS 200");
ASSERT_TRUE(run());
bind("alter database wxy_db "
"BLOCKS 200 "
"CACHELAST 1 "
"FSYNC 200 "
"KEEP 200 "
"QUORUM 2 "
"WAL 1 "
);
ASSERT_TRUE(run());
}
TEST_F(ParserTest, showDatabase) {
setDatabase("root", "test");
......@@ -406,9 +423,49 @@ TEST_F(ParserTest, createSmaIndex) {
ASSERT_TRUE(run());
}
TEST_F(ParserTest, dropIndex) {
setDatabase("root", "test");
bind("drop index index1 on t1");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, createQnode) {
setDatabase("root", "test");
bind("create qnode on dnode 1");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, dropQnode) {
setDatabase("root", "test");
bind("drop qnode on dnode 1");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, createTopic) {
setDatabase("root", "test");
bind("create topic tp1 as select * from t1");
ASSERT_TRUE(run());
bind("create topic if not exists tp1 as select * from t1");
ASSERT_TRUE(run());
bind("create topic tp1 as test");
ASSERT_TRUE(run());
bind("create topic if not exists tp1 as test");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, dropTopic) {
setDatabase("root", "test");
bind("drop topic tp1");
ASSERT_TRUE(run());
bind("drop topic if exists tp1");
ASSERT_TRUE(run());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册