提交 9305b0ef 编写于 作者: X Xiaoyu Wang

fix: create stream syntax check

上级 cf9c2a11
......@@ -41,14 +41,15 @@ extern "C" {
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_ALIVE_RESULT_COLS 1
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_SUBSCRIBE PRIVILEGE_TYPE_MASK(3)
#define BIT_FLAG_MASK(n) (1 << n)
#define BIT_FLAG_SET_MASK(val, mask) ((val) |= (mask))
#define BIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define PRIVILEGE_TYPE_ALL BIT_FLAG_MASK(0)
#define PRIVILEGE_TYPE_READ BIT_FLAG_MASK(1)
#define PRIVILEGE_TYPE_WRITE BIT_FLAG_MASK(2)
#define PRIVILEGE_TYPE_SUBSCRIBE BIT_FLAG_MASK(3)
typedef struct SDatabaseOptions {
ENodeType type;
......@@ -393,6 +394,15 @@ typedef struct SKillQueryStmt {
char queryId[TSDB_QUERY_ID_LEN];
} SKillQueryStmt;
typedef enum EStreamOptionsSetFlag {
SOPT_TRIGGER_TYPE_SET = BIT_FLAG_MASK(0),
SOPT_WATERMARK_SET = BIT_FLAG_MASK(1),
SOPT_DELETE_MARK_SET = BIT_FLAG_MASK(2),
SOPT_FILL_HISTORY_SET = BIT_FLAG_MASK(3),
SOPT_IGNORE_EXPIRED_SET = BIT_FLAG_MASK(4),
SOPT_IGNORE_UPDATE_SET = BIT_FLAG_MASK(5),
} EStreamOptionsSetFlag;
typedef struct SStreamOptions {
ENodeType type;
int8_t triggerType;
......@@ -402,6 +412,7 @@ typedef struct SStreamOptions {
int8_t fillHistory;
int8_t ignoreExpired;
int8_t ignoreUpdate;
int64_t setFlag;
} SStreamOptions;
typedef struct SCreateStreamStmt {
......
......@@ -215,6 +215,8 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
const SToken* pLibPath, SDataType dataType, int32_t bufSize);
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
......
......@@ -562,14 +562,14 @@ tag_def_or_ref_opt(A) ::= tags_def(B).
tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, D); }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_UPDATE_SET, &C, NULL); }
subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
......
......@@ -1817,6 +1817,59 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
return (SNode*)pOptions;
}
static int8_t getTriggerType(uint32_t tokenType) {
switch (tokenType) {
case TK_AT_ONCE:
return STREAM_TRIGGER_AT_ONCE;
case TK_WINDOW_CLOSE:
return STREAM_TRIGGER_WINDOW_CLOSE;
case TK_MAX_DELAY:
return STREAM_TRIGGER_MAX_DELAY;
default:
break;
}
return STREAM_TRIGGER_WINDOW_CLOSE;
}
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode) {
SStreamOptions* pStreamOptions = (SStreamOptions*)pOptions;
if (BIT_FLAG_TEST_MASK(setflag, pStreamOptions->setFlag)) {
pCxt->errCode =
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "stream options each item is only set once");
return pOptions;
}
switch (setflag) {
case SOPT_TRIGGER_TYPE_SET:
pStreamOptions->triggerType = getTriggerType(pToken->type);
if (STREAM_TRIGGER_MAX_DELAY == pStreamOptions->triggerType) {
pStreamOptions->pDelay = pNode;
}
break;
case SOPT_WATERMARK_SET:
pStreamOptions->pWatermark = pNode;
break;
case SOPT_DELETE_MARK_SET:
pStreamOptions->pDeleteMark = pNode;
break;
case SOPT_FILL_HISTORY_SET:
pStreamOptions->fillHistory = taosStr2Int8(pToken->z, NULL, 10);
break;
case SOPT_IGNORE_EXPIRED_SET:
pStreamOptions->ignoreExpired = taosStr2Int8(pToken->z, NULL, 10);
break;
case SOPT_IGNORE_UPDATE_SET:
pStreamOptions->ignoreUpdate = taosStr2Int8(pToken->z, NULL, 10);
break;
default:
break;
}
BIT_FLAG_SET_MASK(pStreamOptions->setFlag, setflag);
return pOptions;
}
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
CHECK_PARSER_STATUS(pCxt);
......
......@@ -6380,15 +6380,15 @@ static int32_t translateDropFunction(STranslateContext* pCxt, SDropFunctionStmt*
static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
SAlterUserReq req = {0};
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
req.alterType = TSDB_ALTER_USER_ADD_ALL_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
req.alterType = TSDB_ALTER_USER_ADD_READ_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
req.alterType = TSDB_ALTER_USER_ADD_WRITE_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
req.alterType = TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC;
}
strcpy(req.user, pStmt->userName);
......@@ -6398,15 +6398,15 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
SAlterUserReq req = {0};
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
req.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
req.alterType = TSDB_ALTER_USER_REMOVE_READ_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
req.alterType = TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC;
}
strcpy(req.user, pStmt->userName);
......@@ -6478,11 +6478,11 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD
if (NULL == pStmt->pCfg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, pStmt->dbFName);
return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg);
}
......
......@@ -4608,7 +4608,6 @@ static YYACTIONTYPE yy_reduce(
{ yymsp[1].minor.yy42 = createStreamOptions(pCxt); }
break;
case 279: /* sma_stream_opt ::= sma_stream_opt WATERMARK duration_literal */
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */ yytestcase(yyruleno==316);
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pWatermark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
......@@ -4617,7 +4616,6 @@ static YYACTIONTYPE yy_reduce(
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 281: /* sma_stream_opt ::= sma_stream_opt DELETE_MARK duration_literal */
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */ yytestcase(yyruleno==319);
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pDeleteMark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
......@@ -4677,27 +4675,32 @@ static YYACTIONTYPE yy_reduce(
{ pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy103, &yymsp[0].minor.yy225); }
break;
case 313: /* stream_options ::= stream_options TRIGGER AT_ONCE */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_AT_ONCE; yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; yylhsminor.yy42 = yymsp[-2].minor.yy42; }
case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */ yytestcase(yyruleno==314);
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 315: /* stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)yymsp[-3].minor.yy42)->pDelay = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[-1].minor.yy0, yymsp[0].minor.yy42); }
yymsp[-3].minor.yy42 = yylhsminor.yy42;
break;
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 317: /* stream_options ::= stream_options IGNORE EXPIRED NK_INTEGER */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreExpired = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_EXPIRED_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-3].minor.yy42 = yylhsminor.yy42;
break;
case 318: /* stream_options ::= stream_options FILL_HISTORY NK_INTEGER */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->fillHistory = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_FILL_HISTORY_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 320: /* stream_options ::= stream_options IGNORE UPDATE NK_INTEGER */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreUpdate = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_UPDATE_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-3].minor.yy42 = yylhsminor.yy42;
break;
case 322: /* subtable_opt ::= SUBTABLE NK_LP expression NK_RP */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册