未验证 提交 2a4c97cf 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #19606 from taosdata/enh/3.0_planner_optimize

feat: support writing streams to existing tables
......@@ -1764,6 +1764,12 @@ typedef struct {
#define STREAM_CREATE_STABLE_TRUE 1
#define STREAM_CREATE_STABLE_FALSE 0
typedef struct SColLocation {
int16_t slotId;
col_id_t colId;
int8_t type;
} SColLocation;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
char sourceDB[TSDB_DB_FNAME_LEN];
......
......@@ -207,12 +207,6 @@ typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
typedef struct SColLocation {
int16_t slotId;
col_id_t colId;
int8_t type;
} SColLocation;
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
......
......@@ -547,7 +547,7 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) col_list_opt(H) tags_def_opt(F) subtable_opt(G)
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
......@@ -556,6 +556,12 @@ cmd ::= DROP STREAM exists_opt(A) stream_name(B).
col_list_opt(A) ::= . { A = NULL; }
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
%type tag_def_or_ref_opt { SNodeList* }
%destructor tag_def_or_ref_opt { nodesDestroyList($$); }
tag_def_or_ref_opt(A) ::= . { A = NULL; }
tag_def_or_ref_opt(A) ::= tags_def(B). { A = B; }
tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
......
......@@ -4993,7 +4993,7 @@ static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pCo
return NULL;
}
static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
static SSchema* getTagSchema(const STableMeta* pTableMeta, const char* pTagName) {
int32_t numOfTags = getNumOfTags(pTableMeta);
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
for (int32_t i = 0; i < numOfTags; ++i) {
......@@ -5628,6 +5628,13 @@ static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pS
return code;
}
static const char* getTagNameForCreateStreamTag(SNode* pTag) {
if (QUERY_NODE_COLUMN_DEF == nodeType(pTag)) {
return ((SColumnDefNode*)pTag)->colName;
}
return ((SColumnNode*)pTag)->colName;
}
static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SSelectStmt* pSelect) {
if (NULL == pStmt->pTags) {
return TSDB_CODE_SUCCESS;
......@@ -5638,7 +5645,7 @@ static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStream
bool found = false;
SNode* pPart = NULL;
FOREACH(pPart, pSelect->pPartitionByList) {
if (0 == strcmp(((SColumnDefNode*)pTag)->colName, ((SExprNode*)pPart)->userAlias)) {
if (0 == strcmp(getTagNameForCreateStreamTag(pTag), ((SExprNode*)pPart)->userAlias)) {
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSelect->pTags, nodesCloneNode(pPart))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -5647,7 +5654,7 @@ static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStream
}
}
if (!found) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnDefNode*)pTag)->colName);
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnDefNode*)pTag)->colName);
}
}
return TSDB_CODE_SUCCESS;
......@@ -5791,8 +5798,30 @@ static int32_t addProjToProjColPos(STranslateContext* pCxt, const SSchema* pSche
return code;
}
static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
SNodeList** pProjections) {
static int32_t setFillNullCols(SArray* pProjColPos, const STableMeta* pMeta, SCMCreateStreamReq* pReq) {
int32_t numOfBoundCols = taosArrayGetSize(pProjColPos);
pReq->fillNullCols = taosArrayInit(pMeta->tableInfo.numOfColumns - numOfBoundCols, sizeof(SColLocation));
if (NULL == pReq->fillNullCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t indexOfSchema = 0;
const SSchema* pSchemas = getTableColumnSchema(pMeta);
for (int32_t i = 0; i < numOfBoundCols; ++i) {
SProjColPos* pPos = taosArrayGet(pProjColPos, i);
while (indexOfSchema < pMeta->tableInfo.numOfColumns) {
const SSchema* pSchema = pSchemas + indexOfSchema++;
if (pSchema->colId == pPos->colId) {
break;
}
SColLocation colLoc = {.colId = pSchema->colId, .slotId = indexOfSchema - 1, .type = pSchema->type};
taosArrayPush(pReq->fillNullCols, &colLoc);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
SNodeList** pProjections, SCMCreateStreamReq* pReq) {
if (LIST_LENGTH(pCols) != LIST_LENGTH(*pProjections)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
}
......@@ -5807,7 +5836,12 @@ static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols
SNode* pProj = NULL;
FORBOTH(pCol, pCols, pProj, *pProjections) {
const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName);
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
if (NULL == pSchema) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)pCol)->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
......@@ -5828,6 +5862,10 @@ static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols
}
}
if (TSDB_CODE_SUCCESS == code && pMeta->tableInfo.numOfColumns > LIST_LENGTH(pCols)) {
code = setFillNullCols(pProjColPos, pMeta, pReq);
}
if (TSDB_CODE_SUCCESS == code) {
taosArrayDestroy(pProjColPos);
nodesDestroyList(*pProjections);
......@@ -5840,19 +5878,127 @@ static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols
return code;
}
static int32_t adjustStreamQueryForExistTableImpl(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
const STableMeta* pMeta) {
static int32_t adjustProjectionsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
const STableMeta* pMeta, SCMCreateStreamReq* pReq) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pStmt->pCols) {
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList);
}
return adjustOrderOfProjection(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList);
return adjustOrderOfProjections(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList, pReq);
}
static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
SCMCreateStreamReq* pReq) {
STableMeta* pMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta);
static int32_t adjustDataTypeOfTags(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pTags) {
if (getNumOfTags(pMeta) != LIST_LENGTH(pTags)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of tags");
}
SSchema* pSchemas = getTableTagSchema(pMeta);
int32_t index = 0;
SNode* pTag = NULL;
FOREACH(pTag, pTags) {
SSchema* pSchema = pSchemas + index++;
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
if (!dataTypeEqual(&dt, &((SExprNode*)pTag)->resType)) {
SNode* pFunc = NULL;
int32_t code = createCastFunc(pCxt, pTag, dt, &pFunc);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
REPLACE_NODE(pFunc);
}
}
return TSDB_CODE_SUCCESS;
}
static SNode* createNullValue() {
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == pValue) {
return NULL;
}
pValue->isNull = true;
pValue->node.resType.type = TSDB_DATA_TYPE_NULL;
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return (SNode*)pValue;
}
static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, const STableMeta* pMeta,
SNodeList** pTagExprs, SCMCreateStreamReq* pReq) {
if (LIST_LENGTH(pTags) != LIST_LENGTH(*pTagExprs)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of tags");
}
SArray* pTagPos = taosArrayInit(LIST_LENGTH(pTags), sizeof(SProjColPos));
if (NULL == pTagPos) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pTag = NULL;
SNode* pTagExpr = NULL;
FORBOTH(pTag, pTags, pTagExpr, *pTagExprs) {
const SSchema* pSchema = getTagSchema(pMeta, ((SColumnNode*)pTag)->colName);
if (NULL == pSchema) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, ((SColumnNode*)pTag)->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = addProjToProjColPos(pCxt, pSchema, pTagExpr, pTagPos);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
SNodeList* pNewTagExprs = NULL;
if (TSDB_CODE_SUCCESS == code) {
taosArraySort(pTagPos, projColPosCompar);
int32_t indexOfBoundTags = 0;
int32_t numOfBoundTags = taosArrayGetSize(pTagPos);
int32_t numOfTags = getNumOfTags(pMeta);
const SSchema* pTagsSchema = getTableTagSchema(pMeta);
pNewTagExprs = nodesMakeList();
if (NULL == pNewTagExprs) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
const SSchema* pTagSchema = pTagsSchema + i;
if (indexOfBoundTags < numOfBoundTags) {
SProjColPos* pPos = taosArrayGet(pTagPos, indexOfBoundTags);
if (pPos->colId == pTagSchema->colId) {
++indexOfBoundTags;
code = nodesListStrictAppend(pNewTagExprs, pPos->pProj);
pPos->pProj = NULL;
continue;
}
}
code = nodesListStrictAppend(pNewTagExprs, createNullValue());
}
}
if (TSDB_CODE_SUCCESS == code) {
taosArrayDestroy(pTagPos);
nodesDestroyList(*pTagExprs);
*pTagExprs = pNewTagExprs;
} else {
taosArrayDestroyEx(pTagPos, projColPosDelete);
nodesDestroyList(pNewTagExprs);
}
return code;
}
static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
SCMCreateStreamReq* pReq) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pStmt->pTags) {
return adjustDataTypeOfTags(pCxt, pMeta, pSelect->pTags);
}
return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq);
}
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
STableMeta** pMeta) {
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
if (NULL != pStmt->pCols) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
......@@ -5862,18 +6008,18 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
return TSDB_CODE_SUCCESS;
} else {
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
pReq->targetStbUid = pMeta->suid;
pReq->targetStbUid = (*pMeta)->suid;
}
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
}
taosMemoryFree(pMeta);
return code;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true;
int32_t code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
STableMeta* pMeta = NULL;
int32_t code = translateStreamTargetTable(pCxt, pStmt, pReq, &pMeta);
if (TSDB_CODE_SUCCESS == code) {
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pStmt->pQuery);
}
......@@ -5883,13 +6029,17 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTable(pCxt, pStmt, pReq);
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq);
}
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
}
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
taosMemoryFree(pMeta);
return code;
}
......@@ -5920,8 +6070,10 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->fillHistory = pStmt->pOptions->fillHistory;
pReq->igExpired = pStmt->pOptions->ignoreExpired;
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
if (pReq->createStb) {
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
}
}
return code;
......
此差异已折叠。
......@@ -852,9 +852,12 @@ TEST_F(ParserInitialCTest, createStream) {
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1",
STREAM_CREATE_STABLE_FALSE);
run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)");
// st1 already exists
setCreateStreamReq(
"s1", "test",
"create stream s1 into st1 tags(tag2) as select max(c1), c2 from t1 partition by tbname tag2 interval(10s)",
"st1", STREAM_CREATE_STABLE_FALSE);
run("CREATE STREAM s1 INTO st1 TAGS(tag2) AS SELECT MAX(c1), c2 FROM t1 PARTITION BY TBNAME tag2 INTERVAL(10S)");
clearCreateStreamReq();
}
......
......@@ -86,6 +86,7 @@ static void parseArg(int argc, char* argv[]) {
{"dump", no_argument, NULL, 'd'},
{"async", required_argument, NULL, 'a'},
{"skipSql", required_argument, NULL, 's'},
{"limitSql", required_argument, NULL, 'i'},
{"log", required_argument, NULL, 'l'},
{0, 0, 0, 0}
};
......@@ -101,6 +102,9 @@ static void parseArg(int argc, char* argv[]) {
case 's':
setSkipSqlNum(optarg);
break;
case 'i':
setLimitSqlNum(optarg);
break;
case 'l':
setLogLevel(optarg);
break;
......
......@@ -49,9 +49,11 @@ bool g_dump = false;
bool g_testAsyncApis = true;
int32_t g_logLevel = 131;
int32_t g_skipSql = 0;
int32_t g_limitSql = 0;
void setAsyncFlag(const char* pFlag) { g_testAsyncApis = stoi(pFlag) > 0 ? true : false; }
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); }
void setAsyncFlag(const char* pArg) { g_testAsyncApis = stoi(pArg) > 0 ? true : false; }
void setSkipSqlNum(const char* pArg) { g_skipSql = stoi(pArg); }
void setLimitSqlNum(const char* pArg) { g_limitSql = stoi(pArg); }
struct TerminateFlag : public exception {
const char* what() const throw() { return "success and terminate"; }
......@@ -63,22 +65,27 @@ int32_t getLogLevel() { return g_logLevel; }
class ParserTestBaseImpl {
public:
ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase), sqlNo_(0) {}
ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase), sqlNo_(0), sqlNum_(0) {}
void login(const std::string& user) { caseEnv_.user_ = user; }
void useDb(const string& acctId, const string& db) {
caseEnv_.acctId_ = acctId;
caseEnv_.db_ = db;
caseEnv_.nsql_ = g_skipSql;
caseEnv_.numOfSkipSql_ = g_skipSql;
caseEnv_.numOfLimitSql_ = g_limitSql;
}
void run(const string& sql, int32_t expect, ParserStage checkStage) {
++sqlNo_;
if (caseEnv_.nsql_ > 0) {
--(caseEnv_.nsql_);
if (caseEnv_.numOfSkipSql_ > 0) {
--(caseEnv_.numOfSkipSql_);
return;
}
if (caseEnv_.numOfLimitSql_ > 0 && caseEnv_.numOfLimitSql_ == sqlNum_) {
return;
}
++sqlNum_;
runInternalFuncs(sql, expect, checkStage);
runApis(sql, expect, checkStage);
......@@ -94,9 +101,10 @@ class ParserTestBaseImpl {
string acctId_;
string user_;
string db_;
int32_t nsql_;
int32_t numOfSkipSql_;
int32_t numOfLimitSql_;
caseEnv() : user_("wangxiaoyu"), nsql_(0) {}
caseEnv() : user_("wangxiaoyu"), numOfSkipSql_(0) {}
};
struct stmtEnv {
......@@ -532,6 +540,7 @@ class ParserTestBaseImpl {
stmtRes res_;
ParserTestBase* pBase_;
int32_t sqlNo_;
int32_t sqlNum_;
};
ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl(this)) {}
......
......@@ -65,10 +65,11 @@ class ParserDdlTest : public ParserTestBase {
extern bool g_dump;
extern void setAsyncFlag(const char* pFlag);
extern void setLogLevel(const char* pLogLevel);
extern void setAsyncFlag(const char* pArg);
extern void setLogLevel(const char* pArg);
extern int32_t getLogLevel();
extern void setSkipSqlNum(const char* pNum);
extern void setSkipSqlNum(const char* pArg);
extern void setLimitSqlNum(const char* pArg);
} // namespace ParserTest
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册