提交 2713f4f6 编写于 作者: D dapan1121

feat: support create topic as stable with conditions

上级 357e86b9
......@@ -347,6 +347,7 @@
#define TK_WAL 329
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602
......
......@@ -358,6 +358,7 @@ typedef struct SCreateTopicStmt {
bool ignoreExists;
bool withMeta;
SNode* pQuery;
SNode* pWhere;
} SCreateTopicStmt;
typedef struct SDropTopicStmt {
......
......@@ -51,6 +51,12 @@ typedef enum {
TARGET_TYPE_OTHER,
} ETargetType;
typedef enum {
TCOL_TYPE_COLUMN = 1,
TCOL_TYPE_TAG,
TCOL_TYPE_NONE,
} ETableColumnType;
#define QUERY_POLICY_VNODE 1
#define QUERY_POLICY_HYBRID 2
#define QUERY_POLICY_QNODE 3
......@@ -257,6 +263,7 @@ void destroyQueryExecRes(SExecResult* pRes);
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len);
char* parseTagDatatoJson(void* p);
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType);
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
void freeVgInfo(SDBVgInfo* vgInfo);
......
......@@ -3836,8 +3836,12 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
}
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
if (pReq->ast && strlen(pReq->ast) > 0) {
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
} else {
if (tEncodeI32(&encoder, 0) < 0) return -1;
}
}
if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
......
......@@ -206,7 +206,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists,
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
bool withMeta);
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
bool withMeta);
bool withMeta, SNode* pWhere);
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId, SToken* pTopicName);
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
......
......@@ -115,6 +115,7 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
#ifdef __cplusplus
}
......
......@@ -822,16 +822,9 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable) {
CHECK_PARSER_STATUS(pCxt);
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
SNode* select = createSelectStmtImpl(isDistinct, pProjectionList, pTable);
CHECK_OUT_OF_MEM(select);
select->isDistinct = isDistinct;
select->pProjectionList = pProjectionList;
select->pFromTable = pTable;
sprintf(select->stmtName, "%p", select);
select->isTimeLineResult = true;
select->onlyHasKeepOrderFunc = true;
select->timeRange = TSWINDOW_INITIALIZER;
return (SNode*)select;
return select;
}
static void setSubquery(SNode* pStmt) {
......@@ -1703,7 +1696,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
}
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
bool withMeta) {
bool withMeta, SNode* pWhere) {
CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName)) {
return NULL;
......@@ -1713,32 +1706,7 @@ SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists,
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
pStmt->ignoreExists = ignoreExists;
pStmt->withMeta = withMeta;
// SSelectStmt* pSelect = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
// if (NULL == pSelect) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// sprintf(pSelect->stmtName, "%p", pSelect);
//
// SRealTableNode* pRealTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
// if (NULL == pRealTable) {
// nodesDestroyNode((SNode*)pSelect);
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// snprintf(pRealTable->table.dbName, sizeof(pRealTable->table.dbName), "%s", pDb);
// snprintf(pRealTable->table.tableName, sizeof(pRealTable->table.tableName), "%s", pTable);
// snprintf(pRealTable->table.tableAlias, sizeof(pRealTable->table.tableAlias), "%s", pTable);
// pSelect->pFromTable = (SNode*)pRealTable;
//
// if (numOfProjs >= 0) {
// pSelect->pProjectionList = createProjectCols(numOfProjs, pProjCol);
// if (NULL == pSelect->pProjectionList) {
// nodesDestroyNode((SNode*)pSelect);
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// }
//
// pStmt->pQuery = pSelect;
pStmt->pWhere = pWhere;
strcpy(pStmt->subDbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->subSTbName, ((SRealTableNode*)pRealTable)->table.tableName);
......
......@@ -355,6 +355,11 @@ static int32_t collectMetaKeyFromCreateTopic(SCollectMetaKeyCxt* pCxt, SCreateTo
if (NULL != pStmt->pQuery) {
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
if (NULL != pStmt->pWhere) {
int32_t code = collectMetaKeyFromRealTableImpl(pCxt, pStmt->subDbName, pStmt->subSTbName,
AUTH_TYPE_READ);
return code;
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -55,6 +55,13 @@ typedef struct STranslateContext {
bool showRewrite;
} STranslateContext;
typedef struct SBuildTopicContext {
bool colExists;
bool colNotFound;
STableMeta* pMeta;
SNodeList* pTags;
} SBuildTopicContext;
typedef struct SFullDatabaseName {
char fullDbName[TSDB_DB_FNAME_LEN];
} SFullDatabaseName;
......@@ -5789,12 +5796,107 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
return code;
}
static int32_t addTagList(SNodeList** ppList, SNode* pNode) {
if (NULL == *ppList) {
*ppList = nodesMakeList();
}
nodesListStrictAppend(*ppList, pNode);
return TSDB_CODE_SUCCESS;
}
static EDealRes checkColumnTagsInCond(SNode* pNode, void* pContext) {
SBuildTopicContext* pCxt = (SBuildTopicContext*)pContext;
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
ETableColumnType type;
getColumnTypeFromMeta(pCxt->pMeta, ((SColumnNode*)pNode)->colName, &type);
if (type == TCOL_TYPE_COLUMN) {
pCxt->colExists = true;
return DEAL_RES_ERROR;
} else if (type == TCOL_TYPE_TAG) {
addTagList(&pCxt->pTags, nodesCloneNode(pNode));
} else {
pCxt->colNotFound = true;
return DEAL_RES_ERROR;
}
} else if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (0 == strcasecmp(pFunc->functionName, "tbname")) {
addTagList(&pCxt->pTags, nodesCloneNode(pNode));
}
}
return DEAL_RES_CONTINUE;
}
static int32_t checkCollectTopicTags(STranslateContext* pCxt, SCreateTopicStmt* pStmt, STableMeta* pMeta, SNodeList** ppProjection) {
SBuildTopicContext colCxt = {.colExists = false, .colNotFound = false, .pMeta = pMeta, .pTags = NULL};
nodesWalkExprPostOrder(pStmt->pWhere, checkColumnTagsInCond, &colCxt);
if (colCxt.colNotFound) {
nodesDestroyList(colCxt.pTags);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Invalid column name");
} else if (colCxt.colExists) {
nodesDestroyList(colCxt.pTags);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Columns are forbidden in where clause");
}
if (NULL == colCxt.pTags) {
for (int32_t i = 0; i < pMeta->tableInfo.numOfTags; ++i) {
SSchema* tag = &pMeta->schema[pMeta->tableInfo.numOfColumns + i];
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
strcpy(col->colName, tag->name);
strcpy(col->node.aliasName, col->colName);
strcpy(col->node.userAlias, col->colName);
addTagList(&colCxt.pTags, (SNode*)col);
}
}
*ppProjection = colCxt.pTags;
return TSDB_CODE_SUCCESS;
}
static int32_t buildQueryForTableTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SNode** pSelect) {
SParseContext* pParCxt = pCxt->pParseCxt;
SRequestConnInfo connInfo = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
SName name;
STableMeta* pMeta = NULL;
int32_t code = getTableMetaImpl(pCxt, toName(pParCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name), &pMeta);
if (code) {
taosMemoryFree(pMeta);
return code;
}
if (TSDB_SUPER_TABLE != pMeta->tableType) {
taosMemoryFree(pMeta);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Only supertable table can be used");
}
SNodeList* pProjection = NULL;
code = checkCollectTopicTags(pCxt, pStmt, pMeta, &pProjection);
if (TSDB_CODE_SUCCESS == code) {
SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
strcpy(realTable->table.dbName, pStmt->subDbName);
strcpy(realTable->table.tableName, pStmt->subSTbName);
strcpy(realTable->table.tableAlias, pStmt->subSTbName);
*pSelect = createSelectStmtImpl(true, pProjection, (SNode*)realTable);
((SSelectStmt*)*pSelect)->pWhere = nodesCloneNode(pStmt->pWhere);
code = translateQuery(pCxt, *pSelect);
}
taosMemoryFree(pMeta);
return code;
}
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
if (NULL == pStmt->pQuery) {
if (NULL == pStmt->pQuery && NULL == pStmt->pWhere) {
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
if (pStmt->pWhere) {
return buildQueryForTableTopic(pCxt, pStmt, &pStmt->pQuery);
} else if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (!pSelect->isDistinct &&
(NULL != pSelect->pFromTable && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) &&
......
......@@ -666,6 +666,22 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
return code;
}
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable) {
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == select) {
return NULL;
}
select->isDistinct = isDistinct;
select->pProjectionList = pProjectionList;
select->pFromTable = pTable;
sprintf(select->stmtName, "%p", select);
select->isTimeLineResult = true;
select->onlyHasKeepOrderFunc = true;
select->timeRange = TSWINDOW_INITIALIZER;
return (SNode*)select;
}
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
if (NULL == *pHash) {
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
......
此差异已折叠。
......@@ -1145,6 +1145,15 @@ TEST_F(ParserInitialCTest, createTopic) {
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 with meta as stable st1", nullptr, "test", "st1", 1);
run("CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1");
clearCreateTopicReq();
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 as stable st1 where tag1 > 0", nullptr, "test", "st1");
run("CREATE TOPIC IF NOT EXISTS tp1 AS STABLE st1 WHERE tag1 > 0");
clearCreateTopicReq();
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 with meta as stable st1 where tag1 > 0", nullptr, "test", "st1", 1);
run("CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1 WHERE tag1 > 0");
clearCreateTopicReq();
}
/*
......
......@@ -449,6 +449,18 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS;
}
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pName, pMeta->schema[i].name)) {
*pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
return;
}
}
*pType = TCOL_TYPE_NONE;
}
void freeVgInfo(SDBVgInfo* vgInfo) {
if (NULL == vgInfo) {
return;
......
......@@ -108,4 +108,14 @@ if $rows != 6 then
return -1
endi
return -1
sql create topic topic_stable_1 as stable stb where t1 > 0
sql create topic topic_stable_1 as stable stb where t1 > 0 and t1 < 0
sql create topic topic_stable_1 as stable stb where 1 > 0
sql create topic topic_stable_1 as stable stb where last(t1) > 0
sql create topic topic_stable_1 as stable stb where tbname is not null
sql create topic topic_stable_1 as stable stb where tbname > 'a'
sql create topic topic_stable_1 as stable stb where tbname > 0 and xx < 0
sql create topic topic_stable_1 as stable stb where tbname > 0 and c1 < 0
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册