未验证 提交 4f15fc19 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #16042 from taosdata/feature/3.0_wxy

enh: create stream options adjust
......@@ -1678,9 +1678,10 @@ typedef struct {
int32_t code;
} STaskDropRsp;
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
#define STREAM_DEFAULT_IGNORE_EXPIRED 0
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
......
......@@ -359,7 +359,7 @@ typedef struct SStreamOptions {
int8_t triggerType;
SNode* pDelay;
SNode* pWatermark;
bool ignoreExpired;
int8_t ignoreExpired;
} SStreamOptions;
typedef struct SCreateStreamStmt {
......
......@@ -506,7 +506,7 @@ stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE.
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED. { ((SStreamOptions*)B)->ignoreExpired = true; A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
......
......@@ -1628,6 +1628,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
SStreamOptions* pOptions = (SStreamOptions*)nodesMakeNode(QUERY_NODE_STREAM_OPTIONS);
CHECK_OUT_OF_MEM(pOptions);
pOptions->triggerType = STREAM_TRIGGER_AT_ONCE;
pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
return (SNode*)pOptions;
}
......
此差异已折叠。
......@@ -571,7 +571,7 @@ TEST_F(ParserInitialCTest, createStream) {
auto setCreateStreamReqFunc = [&](const char* pStream, const char* pSrcDb, const char* pSql,
const char* pDstStb = nullptr, int8_t igExists = 0,
int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
int64_t watermark = 0, int8_t igExpired = 0) {
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
if (NULL != pDstStb) {
......@@ -617,11 +617,11 @@ TEST_F(ParserInitialCTest, createStream) {
clearCreateStreamReq();
setCreateStreamReqFunc("s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired into st1 "
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 into st1 "
"as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND,
1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED INTO st1 AS SELECT COUNT(*) "
0);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 INTO st1 AS SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
}
......
......@@ -45,9 +45,9 @@ print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED into streamt1 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
sql create stream streams2 trigger at_once IGNORE EXPIRED into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 session(ts,10s);
sql create stream streams3 trigger at_once IGNORE EXPIRED into streamt3 as select _wstart, count(*) c1, sum(a) c3 from t1 state_window(a);
sql create stream streams1 trigger at_once IGNORE EXPIRED 1 into streamt1 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
sql create stream streams2 trigger at_once IGNORE EXPIRED 1 into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 session(ts,10s);
sql create stream streams3 trigger at_once IGNORE EXPIRED 1 into streamt3 as select _wstart, count(*) c1, sum(a) c3 from t1 state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.1);
sql insert into t1 values(1648791233002,2,2,3,2.1);
......@@ -111,8 +111,8 @@ sql use test1
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED into streamtST1 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql create stream stream_t2 trigger at_once IGNORE EXPIRED into streamtST2 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 1 into streamtST1 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql create stream stream_t2 trigger at_once IGNORE EXPIRED 1 into streamtST2 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册