From aef9dc5908234d59d4429e1c1da27f5b15a4f65f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 30 Dec 2022 10:49:05 +0800 Subject: [PATCH] feat: support writing streams to existing tables --- source/libs/parser/src/parAstParser.c | 7 ++++++- source/libs/parser/src/parTranslater.c | 2 +- source/libs/parser/test/parInitialCTest.cpp | 14 +++++++++----- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 954e6c26fd..fae62626fa 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -355,7 +355,12 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm } static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) { - return collectMetaKeyFromQuery(pCxt, pStmt->pQuery); + int32_t code = + reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->targetDbName, pStmt->targetTabName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery); + } + return code; } static int32_t collectMetaKeyFromShowDnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index e0fa741b87..3ad2e97bb3 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5846,10 +5846,10 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt if (NULL != pStmt->pCols) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName); } + pReq->createStb = 1; return TSDB_CODE_SUCCESS; } if (TSDB_CODE_SUCCESS == code) { - // todo pReq->create = true; code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta); } return code; diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 8bdea07150..d2e74c4a88 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -816,16 +816,16 @@ TEST_F(ParserInitialCTest, createStream) { tFreeSCMCreateStreamReq(&req); }); - setCreateStreamReq("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1"); - run("CREATE STREAM s1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"); + setCreateStreamReq("s1", "test", "create stream s1 into st3 as select count(*) from t1 interval(10s)", "st3"); + run("CREATE STREAM s1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"); clearCreateStreamReq(); setCreateStreamReq( "s1", "test", - "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 " + "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 " "as select count(*) from t1 interval(10s)", - "st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1); - run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS " + "st3", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1); + run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS " "SELECT COUNT(*) " "FROM t1 INTERVAL(10S)"); clearCreateStreamReq(); @@ -839,6 +839,10 @@ TEST_F(ParserInitialCTest, createStream) { run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) " "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)"); clearCreateStreamReq(); + + setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1"); + run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)"); + clearCreateStreamReq(); } TEST_F(ParserInitialCTest, createStreamSemanticCheck) { -- GitLab