提交 aef9dc59 编写于 作者: X Xiaoyu Wang

feat: support writing streams to existing tables

上级 bc2fa582
...@@ -355,7 +355,12 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm ...@@ -355,7 +355,12 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
} }
static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) { static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) {
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery); int32_t code =
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->targetDbName, pStmt->targetTabName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
return code;
} }
static int32_t collectMetaKeyFromShowDnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { static int32_t collectMetaKeyFromShowDnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
......
...@@ -5846,10 +5846,10 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt ...@@ -5846,10 +5846,10 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
if (NULL != pStmt->pCols) { if (NULL != pStmt->pCols) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
} }
pReq->createStb = 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
// todo pReq->create = true;
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta); code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
} }
return code; return code;
......
...@@ -816,16 +816,16 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -816,16 +816,16 @@ TEST_F(ParserInitialCTest, createStream) {
tFreeSCMCreateStreamReq(&req); tFreeSCMCreateStreamReq(&req);
}); });
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1"); setCreateStreamReq("s1", "test", "create stream s1 into st3 as select count(*) from t1 interval(10s)", "st3");
run("CREATE STREAM s1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"); run("CREATE STREAM s1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
setCreateStreamReq( setCreateStreamReq(
"s1", "test", "s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 " "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 "
"as select count(*) from t1 interval(10s)", "as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1); "st3", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS " run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS "
"SELECT COUNT(*) " "SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)"); "FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
...@@ -839,6 +839,10 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -839,6 +839,10 @@ TEST_F(ParserInitialCTest, createStream) {
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) " run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)"); "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1");
run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
} }
TEST_F(ParserInitialCTest, createStreamSemanticCheck) { TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册