diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3ad2e97bb30cf9188fb75885472cc8ce5637d0a9..eb1083801cb2b41a5f223fd9d8888cdf6e70fd98 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -352,7 +352,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa code = catalogGetTableMeta(pParCxt->pCatalog, &conn, pName, pMeta); } } - if (TSDB_CODE_SUCCESS != code && TSDB_CODE_TSC_INVALID_TABLE_NAME != code) { + if (TSDB_CODE_SUCCESS != code && TSDB_CODE_PAR_TABLE_NOT_EXIST != code) { parserError("0x%" PRIx64 " catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", pCxt->pParseCxt->requestId, tstrerror(code), pName->dbname, pName->tname); } @@ -5842,11 +5842,11 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt SCMCreateStreamReq* pReq) { STableMeta* pMeta = NULL; int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta); - if (TSDB_CODE_TSC_INVALID_TABLE_NAME == code) { + if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { if (NULL != pStmt->pCols) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName); } - pReq->createStb = 1; + pReq->createStb = STREAM_CREATE_STABLE_TRUE; return TSDB_CODE_SUCCESS; } if (TSDB_CODE_SUCCESS == code) { @@ -5908,8 +5908,6 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pReq->numOfTags = LIST_LENGTH(pStmt->pTags); } - pReq->createStb = STREAM_CREATE_STABLE_TRUE; - return code; } diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 9cc55a7cd55a0060635e9a9044ac076ce5a77119..ac801149f58f2aa838fae9748e92483e09e36f33 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -427,7 +427,7 @@ class MockCatalogServiceImpl { int32_t copyTableSchemaMeta(const string& db, const string& tbname, std::unique_ptr* dst) const { STableMeta* src = getTableSchemaMeta(db, tbname); if (nullptr == src) { - return TSDB_CODE_TSC_INVALID_TABLE_NAME; + return TSDB_CODE_PAR_TABLE_NOT_EXIST; } int32_t len = sizeof(STableMeta) + sizeof(SSchema) * (src->tableInfo.numOfTags + src->tableInfo.numOfColumns); dst->reset((STableMeta*)taosMemoryCalloc(1, len)); diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index d2e74c4a88191dde82e26bac0ac6d1c9a635e24d..8286ae72e19b767fdddf6a882e370261ebfc4ccb 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -755,14 +755,23 @@ TEST_F(ParserInitialCTest, createStream) { }; auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb, - int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, - int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED, - int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) { + int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t igExists = 0) { snprintf(expect.name, sizeof(expect.name), "0.%s", pStream); snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb); snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb); expect.igExists = igExists; expect.sql = strdup(pSql); + expect.createStb = createStb; + expect.triggerType = STREAM_TRIGGER_AT_ONCE; + expect.maxDelay = 0; + expect.watermark = 0; + expect.fillHistory = STREAM_DEFAULT_FILL_HISTORY; + expect.igExpired = STREAM_DEFAULT_IGNORE_EXPIRED; + }; + + auto setStreamOptions = [&](int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int64_t watermark = 0, + int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED, + int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) { expect.triggerType = triggerType; expect.maxDelay = maxDelay; expect.watermark = watermark; @@ -813,6 +822,8 @@ TEST_F(ParserInitialCTest, createStream) { ASSERT_EQ(pField->flags, pExpectField->flags); } } + ASSERT_EQ(req.checkpointFreq, expect.checkpointFreq); + ASSERT_EQ(req.createStb, expect.createStb); tFreeSCMCreateStreamReq(&req); }); @@ -824,7 +835,8 @@ TEST_F(ParserInitialCTest, createStream) { "s1", "test", "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 " "as select count(*) from t1 interval(10s)", - "st3", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1); + "st3", 1, 1); + setStreamOptions(STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1); run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS " "SELECT COUNT(*) " "FROM t1 INTERVAL(10S)"); @@ -840,7 +852,8 @@ TEST_F(ParserInitialCTest, createStream) { "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)"); clearCreateStreamReq(); - setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1"); + setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1", + STREAM_CREATE_STABLE_FALSE); run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)"); clearCreateStreamReq(); } diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index 4741d241b5ac8b5541b6f92d08dbe1dd26151099..3a12d62340f1744db5d351d7494e2e181b512d76 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -30,7 +30,7 @@ TEST_F(PlanOtherTest, createTopic) { TEST_F(PlanOtherTest, createStream) { useDb("root", "test"); - run("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 " + run("create stream if not exists s1 trigger window_close watermark 10s into st3 as select count(*) from t1 " "interval(10s)"); run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) " @@ -43,9 +43,9 @@ TEST_F(PlanOtherTest, createStream) { TEST_F(PlanOtherTest, createStreamUseSTable) { useDb("root", "test"); - run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 INTERVAL(10s)"); + run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 INTERVAL(10s)"); - run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)"); + run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)"); } TEST_F(PlanOtherTest, createSmaIndex) {