提交 a5237195 编写于 作者: X Xiaoyu Wang

feat: add ignore update option for create stream

上级 e89362fb
...@@ -1752,6 +1752,7 @@ typedef struct { ...@@ -1752,6 +1752,7 @@ typedef struct {
#define STREAM_FILL_HISTORY_ON 1 #define STREAM_FILL_HISTORY_ON 1
#define STREAM_FILL_HISTORY_OFF 0 #define STREAM_FILL_HISTORY_OFF 0
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF #define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
#define STREAM_DEFAULT_IGNORE_UPDATE 0
typedef struct { typedef struct {
char name[TSDB_STREAM_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
...@@ -1769,6 +1770,7 @@ typedef struct { ...@@ -1769,6 +1770,7 @@ typedef struct {
SArray* pTags; // array of SField SArray* pTags; // array of SField
// 3.0.20 // 3.0.20
int64_t checkpointFreq; // ms int64_t checkpointFreq; // ms
int8_t igUpdate;
} SCMCreateStreamReq; } SCMCreateStreamReq;
typedef struct { typedef struct {
......
...@@ -209,130 +209,130 @@ ...@@ -209,130 +209,130 @@
#define TK_IGNORE 191 #define TK_IGNORE 191
#define TK_EXPIRED 192 #define TK_EXPIRED 192
#define TK_FILL_HISTORY 193 #define TK_FILL_HISTORY 193
#define TK_SUBTABLE 194 #define TK_UPDATE 194
#define TK_KILL 195 #define TK_SUBTABLE 195
#define TK_CONNECTION 196 #define TK_KILL 196
#define TK_TRANSACTION 197 #define TK_CONNECTION 197
#define TK_BALANCE 198 #define TK_TRANSACTION 198
#define TK_VGROUP 199 #define TK_BALANCE 199
#define TK_MERGE 200 #define TK_VGROUP 200
#define TK_REDISTRIBUTE 201 #define TK_MERGE 201
#define TK_SPLIT 202 #define TK_REDISTRIBUTE 202
#define TK_DELETE 203 #define TK_SPLIT 203
#define TK_INSERT 204 #define TK_DELETE 204
#define TK_NULL 205 #define TK_INSERT 205
#define TK_NK_QUESTION 206 #define TK_NULL 206
#define TK_NK_ARROW 207 #define TK_NK_QUESTION 207
#define TK_ROWTS 208 #define TK_NK_ARROW 208
#define TK_QSTART 209 #define TK_ROWTS 209
#define TK_QEND 210 #define TK_QSTART 210
#define TK_QDURATION 211 #define TK_QEND 211
#define TK_WSTART 212 #define TK_QDURATION 212
#define TK_WEND 213 #define TK_WSTART 213
#define TK_WDURATION 214 #define TK_WEND 214
#define TK_IROWTS 215 #define TK_WDURATION 215
#define TK_CAST 216 #define TK_IROWTS 216
#define TK_NOW 217 #define TK_CAST 217
#define TK_TODAY 218 #define TK_NOW 218
#define TK_TIMEZONE 219 #define TK_TODAY 219
#define TK_CLIENT_VERSION 220 #define TK_TIMEZONE 220
#define TK_SERVER_VERSION 221 #define TK_CLIENT_VERSION 221
#define TK_SERVER_STATUS 222 #define TK_SERVER_VERSION 222
#define TK_CURRENT_USER 223 #define TK_SERVER_STATUS 223
#define TK_COUNT 224 #define TK_CURRENT_USER 224
#define TK_LAST_ROW 225 #define TK_COUNT 225
#define TK_CASE 226 #define TK_LAST_ROW 226
#define TK_END 227 #define TK_CASE 227
#define TK_WHEN 228 #define TK_END 228
#define TK_THEN 229 #define TK_WHEN 229
#define TK_ELSE 230 #define TK_THEN 230
#define TK_BETWEEN 231 #define TK_ELSE 231
#define TK_IS 232 #define TK_BETWEEN 232
#define TK_NK_LT 233 #define TK_IS 233
#define TK_NK_GT 234 #define TK_NK_LT 234
#define TK_NK_LE 235 #define TK_NK_GT 235
#define TK_NK_GE 236 #define TK_NK_LE 236
#define TK_NK_NE 237 #define TK_NK_GE 237
#define TK_MATCH 238 #define TK_NK_NE 238
#define TK_NMATCH 239 #define TK_MATCH 239
#define TK_CONTAINS 240 #define TK_NMATCH 240
#define TK_IN 241 #define TK_CONTAINS 241
#define TK_JOIN 242 #define TK_IN 242
#define TK_INNER 243 #define TK_JOIN 243
#define TK_SELECT 244 #define TK_INNER 244
#define TK_DISTINCT 245 #define TK_SELECT 245
#define TK_WHERE 246 #define TK_DISTINCT 246
#define TK_PARTITION 247 #define TK_WHERE 247
#define TK_BY 248 #define TK_PARTITION 248
#define TK_SESSION 249 #define TK_BY 249
#define TK_STATE_WINDOW 250 #define TK_SESSION 250
#define TK_SLIDING 251 #define TK_STATE_WINDOW 251
#define TK_FILL 252 #define TK_SLIDING 252
#define TK_VALUE 253 #define TK_FILL 253
#define TK_NONE 254 #define TK_VALUE 254
#define TK_PREV 255 #define TK_NONE 255
#define TK_LINEAR 256 #define TK_PREV 256
#define TK_NEXT 257 #define TK_LINEAR 257
#define TK_HAVING 258 #define TK_NEXT 258
#define TK_RANGE 259 #define TK_HAVING 259
#define TK_EVERY 260 #define TK_RANGE 260
#define TK_ORDER 261 #define TK_EVERY 261
#define TK_SLIMIT 262 #define TK_ORDER 262
#define TK_SOFFSET 263 #define TK_SLIMIT 263
#define TK_LIMIT 264 #define TK_SOFFSET 264
#define TK_OFFSET 265 #define TK_LIMIT 265
#define TK_ASC 266 #define TK_OFFSET 266
#define TK_NULLS 267 #define TK_ASC 267
#define TK_ABORT 268 #define TK_NULLS 268
#define TK_AFTER 269 #define TK_ABORT 269
#define TK_ATTACH 270 #define TK_AFTER 270
#define TK_BEFORE 271 #define TK_ATTACH 271
#define TK_BEGIN 272 #define TK_BEFORE 272
#define TK_BITAND 273 #define TK_BEGIN 273
#define TK_BITNOT 274 #define TK_BITAND 274
#define TK_BITOR 275 #define TK_BITNOT 275
#define TK_BLOCKS 276 #define TK_BITOR 276
#define TK_CHANGE 277 #define TK_BLOCKS 277
#define TK_COMMA 278 #define TK_CHANGE 278
#define TK_COMPACT 279 #define TK_COMMA 279
#define TK_CONCAT 280 #define TK_COMPACT 280
#define TK_CONFLICT 281 #define TK_CONCAT 281
#define TK_COPY 282 #define TK_CONFLICT 282
#define TK_DEFERRED 283 #define TK_COPY 283
#define TK_DELIMITERS 284 #define TK_DEFERRED 284
#define TK_DETACH 285 #define TK_DELIMITERS 285
#define TK_DIVIDE 286 #define TK_DETACH 286
#define TK_DOT 287 #define TK_DIVIDE 287
#define TK_EACH 288 #define TK_DOT 288
#define TK_FAIL 289 #define TK_EACH 289
#define TK_FILE 290 #define TK_FAIL 290
#define TK_FOR 291 #define TK_FILE 291
#define TK_GLOB 292 #define TK_FOR 292
#define TK_ID 293 #define TK_GLOB 293
#define TK_IMMEDIATE 294 #define TK_ID 294
#define TK_IMPORT 295 #define TK_IMMEDIATE 295
#define TK_INITIALLY 296 #define TK_IMPORT 296
#define TK_INSTEAD 297 #define TK_INITIALLY 297
#define TK_ISNULL 298 #define TK_INSTEAD 298
#define TK_KEY 299 #define TK_ISNULL 299
#define TK_MODULES 300 #define TK_KEY 300
#define TK_NK_BITNOT 301 #define TK_MODULES 301
#define TK_NK_SEMI 302 #define TK_NK_BITNOT 302
#define TK_NOTNULL 303 #define TK_NK_SEMI 303
#define TK_OF 304 #define TK_NOTNULL 304
#define TK_PLUS 305 #define TK_OF 305
#define TK_PRIVILEGE 306 #define TK_PLUS 306
#define TK_RAISE 307 #define TK_PRIVILEGE 307
#define TK_REPLACE 308 #define TK_RAISE 308
#define TK_RESTRICT 309 #define TK_REPLACE 309
#define TK_ROW 310 #define TK_RESTRICT 310
#define TK_SEMI 311 #define TK_ROW 311
#define TK_STAR 312 #define TK_SEMI 312
#define TK_STATEMENT 313 #define TK_STAR 313
#define TK_STRICT 314 #define TK_STATEMENT 314
#define TK_STRING 315 #define TK_STRICT 315
#define TK_TIMES 316 #define TK_STRING 316
#define TK_UPDATE 317 #define TK_TIMES 317
#define TK_VALUES 318 #define TK_VALUES 318
#define TK_VARIABLE 319 #define TK_VARIABLE 319
#define TK_VIEW 320 #define TK_VIEW 320
......
...@@ -389,6 +389,7 @@ typedef struct SStreamOptions { ...@@ -389,6 +389,7 @@ typedef struct SStreamOptions {
SNode* pDeleteMark; SNode* pDeleteMark;
int8_t fillHistory; int8_t fillHistory;
int8_t ignoreExpired; int8_t ignoreExpired;
int8_t ignoreUpdate;
} SStreamOptions; } SStreamOptions;
typedef struct SCreateStreamStmt { typedef struct SCreateStreamStmt {
......
...@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -5486,6 +5487,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -5486,6 +5487,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
} }
} }
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
......
...@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). ...@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C).
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; } stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
subtable_opt(A) ::= . { A = NULL; } subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); } subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
...@@ -1075,4 +1076,4 @@ null_ordering_opt(A) ::= NULLS LAST. ...@@ -1075,4 +1076,4 @@ null_ordering_opt(A) ::= NULLS LAST.
%fallback ABORT AFTER ATTACH BEFORE BEGIN BITAND BITNOT BITOR BLOCKS CHANGE COMMA COMPACT CONCAT CONFLICT COPY DEFERRED DELIMITERS DETACH DIVIDE DOT EACH END FAIL %fallback ABORT AFTER ATTACH BEFORE BEGIN BITAND BITNOT BITOR BLOCKS CHANGE COMMA COMPACT CONCAT CONFLICT COPY DEFERRED DELIMITERS DETACH DIVIDE DOT EACH END FAIL
FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT
STRICT STRING TIMES UPDATE VALUES VARIABLE VIEW WAL. STRICT STRING TIMES VALUES VARIABLE VIEW WAL.
...@@ -1763,6 +1763,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) { ...@@ -1763,6 +1763,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
pOptions->triggerType = STREAM_TRIGGER_AT_ONCE; pOptions->triggerType = STREAM_TRIGGER_AT_ONCE;
pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY; pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY;
pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED; pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
pOptions->ignoreUpdate = STREAM_DEFAULT_IGNORE_UPDATE;
return (SNode*)pOptions; return (SNode*)pOptions;
} }
......
...@@ -232,6 +232,7 @@ static SKeyword keywordTable[] = { ...@@ -232,6 +232,7 @@ static SKeyword keywordTable[] = {
{"TTL", TK_TTL}, {"TTL", TK_TTL},
{"UNION", TK_UNION}, {"UNION", TK_UNION},
{"UNSIGNED", TK_UNSIGNED}, {"UNSIGNED", TK_UNSIGNED},
{"UPDATE", TK_UPDATE},
{"USE", TK_USE}, {"USE", TK_USE},
{"USER", TK_USER}, {"USER", TK_USER},
{"USERS", TK_USERS}, {"USERS", TK_USERS},
......
...@@ -1576,7 +1576,8 @@ static int32_t translateBlockDistFunc(STranslateContext* pCtx, SFunctionNode* pF ...@@ -1576,7 +1576,8 @@ static int32_t translateBlockDistFunc(STranslateContext* pCtx, SFunctionNode* pF
TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) { TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) {
return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s is only supported on super table, child table or normal table", pFunc->functionName); "%s is only supported on super table, child table or normal table",
pFunc->functionName);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5738,6 +5739,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* ...@@ -5738,6 +5739,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->fillHistory = pStmt->pOptions->fillHistory; pReq->fillHistory = pStmt->pOptions->fillHistory;
pReq->igExpired = pStmt->pOptions->ignoreExpired; pReq->igExpired = pStmt->pOptions->ignoreExpired;
pReq->igUpdate = pStmt->pOptions->ignoreUpdate;
columnDefNodeToField(pStmt->pTags, &pReq->pTags); columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags); pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
} }
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -643,7 +643,8 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -643,7 +643,8 @@ TEST_F(ParserInitialCTest, createStream) {
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb, auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED, int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) { int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY,
int8_t igUpdate = STREAM_DEFAULT_IGNORE_UPDATE) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream); snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb); snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb); snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
...@@ -654,6 +655,7 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -654,6 +655,7 @@ TEST_F(ParserInitialCTest, createStream) {
expect.watermark = watermark; expect.watermark = watermark;
expect.fillHistory = fillHistory; expect.fillHistory = fillHistory;
expect.igExpired = igExpired; expect.igExpired = igExpired;
expect.igUpdate = igUpdate;
}; };
auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) { auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) {
...@@ -699,6 +701,7 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -699,6 +701,7 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ(pField->flags, pExpectField->flags); ASSERT_EQ(pField->flags, pExpectField->flags);
} }
} }
ASSERT_EQ(req.igUpdate, expect.igUpdate);
tFreeSCMCreateStreamReq(&req); tFreeSCMCreateStreamReq(&req);
}); });
...@@ -708,12 +711,11 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -708,12 +711,11 @@ TEST_F(ParserInitialCTest, createStream) {
setCreateStreamReq( setCreateStreamReq(
"s1", "test", "s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 " "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 ignore "
"as select count(*) from t1 interval(10s)", "update 1 into st1 as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1); "st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS " run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 IGNORE "
"SELECT COUNT(*) " "UPDATE 1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
"FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
setCreateStreamReq("s1", "test", setCreateStreamReq("s1", "test",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册