未验证 提交 b1e01c70 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19275 from taosdata/enh/3.0_planner_optimize

feat: support writing streams to existing tables
......@@ -401,6 +401,7 @@ typedef struct SCreateStreamStmt {
SNode* pQuery;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pCols;
} SCreateStreamStmt;
typedef struct SDropStreamStmt {
......
......@@ -216,7 +216,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery);
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pStreamName);
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
......
......@@ -541,9 +541,15 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) tags_def_opt(F) subtable_opt(G) AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D); }
full_table_name(C) col_list_opt(H) tags_def_opt(F) subtable_opt(G)
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
%type col_list_opt { SNodeList* }
%destructor col_list_opt { nodesDestroyList($$); }
col_list_opt(A) ::= . { A = NULL; }
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
......
......@@ -1745,21 +1745,20 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
}
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery) {
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
CHECK_PARSER_STATUS(pCxt);
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
CHECK_OUT_OF_MEM(pStmt);
COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName);
if (NULL != pRealTable) {
strcpy(pStmt->targetDbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->targetTabName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pRealTable);
}
pStmt->ignoreExists = ignoreExists;
pStmt->pOptions = (SStreamOptions*)pOptions;
pStmt->pQuery = pQuery;
pStmt->pTags = pTags;
pStmt->pSubtable = pSubtable;
pStmt->pCols = pCols;
return (SNode*)pStmt;
}
......
......@@ -355,7 +355,12 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
}
static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) {
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
int32_t code =
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->targetDbName, pStmt->targetTabName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
return code;
}
static int32_t collectMetaKeyFromShowDnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
......
......@@ -352,7 +352,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
code = catalogGetTableMeta(pParCxt->pCatalog, &conn, pName, pMeta);
}
}
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_TSC_INVALID_TABLE_NAME != code) {
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_PAR_TABLE_NOT_EXIST != code) {
parserError("0x%" PRIx64 " catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", pCxt->pParseCxt->requestId,
tstrerror(code), pName->dbname, pName->tname);
}
......@@ -4979,10 +4979,10 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
return TSDB_CODE_SUCCESS;
}
static SSchema* getColSchema(STableMeta* pTableMeta, const char* pColName) {
static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pColName) {
int32_t numOfFields = getNumOfTags(pTableMeta) + getNumOfColumns(pTableMeta);
for (int32_t i = 0; i < numOfFields; ++i) {
SSchema* pSchema = pTableMeta->schema + i;
const SSchema* pSchema = pTableMeta->schema + i;
if (0 == strcmp(pColName, pSchema->name)) {
return pSchema;
}
......@@ -5002,7 +5002,8 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
return NULL;
}
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt,
const STableMeta* pTableMeta) {
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
if (getNumOfTags(pTableMeta) == 1 && pTagsSchema->type == TSDB_DATA_TYPE_JSON &&
(pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG || pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG ||
......@@ -5021,7 +5022,7 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Table is not super table");
}
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
if (NULL == pSchema) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type ||
......@@ -5721,6 +5722,139 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
return TSDB_CODE_SUCCESS;
}
static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pProjections) {
if (getNumOfColumns(pMeta) != LIST_LENGTH(pProjections)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
}
SSchema* pSchemas = getTableColumnSchema(pMeta);
int32_t index = 0;
SNode* pProj = NULL;
FOREACH(pProj, pProjections) {
SSchema* pSchema = pSchemas + index;
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
SNode* pFunc = NULL;
int32_t code = createCastFunc(pCxt, pProj, dt, &pFunc);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
REPLACE_NODE(pFunc);
}
}
return TSDB_CODE_SUCCESS;
}
typedef struct SProjColPos {
int32_t colId;
SNode* pProj;
} SProjColPos;
static int32_t projColPosCompar(const void* l, const void* r) {
return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId;
}
static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); }
static int32_t addProjToProjColPos(STranslateContext* pCxt, const SSchema* pSchema, SNode* pProj, SArray* pProjColPos) {
SNode* pNewProj = nodesCloneNode(pProj);
if (NULL == pNewProj) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
if (!dataTypeEqual(&dt, &((SExprNode*)pNewProj)->resType)) {
SNode* pFunc = NULL;
code = createCastFunc(pCxt, pNewProj, dt, &pFunc);
pNewProj = pFunc;
}
if (TSDB_CODE_SUCCESS == code) {
SProjColPos pos = {.colId = pSchema->colId, .pProj = pNewProj};
code = (NULL == taosArrayPush(pProjColPos, &pos) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pNewProj);
}
return code;
}
static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
SNodeList** pProjections) {
if (LIST_LENGTH(pCols) != LIST_LENGTH(*pProjections)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
}
SArray* pProjColPos = taosArrayInit(LIST_LENGTH(pCols), sizeof(SProjColPos));
if (NULL == pProjColPos) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pCol = NULL;
SNode* pProj = NULL;
FORBOTH(pCol, pCols, pProj, *pProjections) {
const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName);
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
SNodeList* pNewProjections = NULL;
if (TSDB_CODE_SUCCESS == code) {
taosArraySort(pProjColPos, projColPosCompar);
int32_t num = taosArrayGetSize(pProjColPos);
pNewProjections = nodesMakeList();
if (NULL == pNewProjections) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) {
SProjColPos* pPos = taosArrayGet(pProjColPos, i);
code = nodesListStrictAppend(pNewProjections, pPos->pProj);
pPos->pProj = NULL;
}
}
if (TSDB_CODE_SUCCESS == code) {
taosArrayDestroy(pProjColPos);
nodesDestroyList(*pProjections);
*pProjections = pNewProjections;
} else {
taosArrayDestroyEx(pProjColPos, projColPosDelete);
nodesDestroyList(pNewProjections);
}
return code;
}
static int32_t adjustStreamQueryForExistTableImpl(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
const STableMeta* pMeta) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pStmt->pCols) {
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList);
}
return adjustOrderOfProjection(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList);
}
static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
SCMCreateStreamReq* pReq) {
STableMeta* pMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
if (NULL != pStmt->pCols) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
}
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
}
return code;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true;
int32_t code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
......@@ -5733,6 +5867,9 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTable(pCxt, pStmt, pReq);
}
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
......@@ -5771,8 +5908,6 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
}
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
return code;
}
......@@ -7358,12 +7493,12 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
return TSDB_CODE_SUCCESS;
}
static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, const STableMeta* pTableMeta,
SVAlterTbReq* pReq) {
if (2 == getNumOfColumns(pTableMeta)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_COL);
}
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
if (NULL == pSchema) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
} else if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
......@@ -7379,11 +7514,11 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
return TSDB_CODE_SUCCESS;
}
static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, const STableMeta* pTableMeta,
SVAlterTbReq* pReq) {
pReq->colModBytes = calcTypeBytes(pStmt->dataType);
pReq->colModType = pStmt->dataType.type;
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
if (NULL == pSchema) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type ||
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -427,7 +427,7 @@ class MockCatalogServiceImpl {
int32_t copyTableSchemaMeta(const string& db, const string& tbname, std::unique_ptr<STableMeta>* dst) const {
STableMeta* src = getTableSchemaMeta(db, tbname);
if (nullptr == src) {
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
int32_t len = sizeof(STableMeta) + sizeof(SSchema) * (src->tableInfo.numOfTags + src->tableInfo.numOfColumns);
dst->reset((STableMeta*)taosMemoryCalloc(1, len));
......
......@@ -755,14 +755,23 @@ TEST_F(ParserInitialCTest, createStream) {
};
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t igExists = 0) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
expect.igExists = igExists;
expect.sql = strdup(pSql);
expect.createStb = createStb;
expect.triggerType = STREAM_TRIGGER_AT_ONCE;
expect.maxDelay = 0;
expect.watermark = 0;
expect.fillHistory = STREAM_DEFAULT_FILL_HISTORY;
expect.igExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
};
auto setStreamOptions = [&](int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int64_t watermark = 0,
int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
expect.triggerType = triggerType;
expect.maxDelay = maxDelay;
expect.watermark = watermark;
......@@ -813,19 +822,22 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ(pField->flags, pExpectField->flags);
}
}
ASSERT_EQ(req.checkpointFreq, expect.checkpointFreq);
ASSERT_EQ(req.createStb, expect.createStb);
tFreeSCMCreateStreamReq(&req);
});
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1");
run("CREATE STREAM s1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
setCreateStreamReq("s1", "test", "create stream s1 into st3 as select count(*) from t1 interval(10s)", "st3");
run("CREATE STREAM s1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReq(
"s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 "
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 "
"as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS "
"st3", 1, 1);
setStreamOptions(STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS "
"SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
......@@ -839,6 +851,11 @@ TEST_F(ParserInitialCTest, createStream) {
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1",
STREAM_CREATE_STABLE_FALSE);
run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
}
TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
......
......@@ -30,7 +30,7 @@ TEST_F(PlanOtherTest, createTopic) {
TEST_F(PlanOtherTest, createStream) {
useDb("root", "test");
run("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 "
run("create stream if not exists s1 trigger window_close watermark 10s into st3 as select count(*) from t1 "
"interval(10s)");
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
......@@ -43,9 +43,9 @@ TEST_F(PlanOtherTest, createStream) {
TEST_F(PlanOtherTest, createStreamUseSTable) {
useDb("root", "test");
run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 INTERVAL(10s)");
run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 INTERVAL(10s)");
run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
}
TEST_F(PlanOtherTest, createSmaIndex) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册