From a71878d07fb94e184991259a3fd18a5745e4afd9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 May 2023 11:46:59 +0800 Subject: [PATCH] feat: support create topic as stable with conditions --- include/common/tmsg.h | 2 +- include/libs/executor/executor.h | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 20 +++++++++++++++++--- source/dnode/mnode/impl/src/mndStb.c | 4 ++-- source/dnode/mnode/impl/src/mndTopic.c | 10 ++++++++-- source/dnode/vnode/src/tq/tq.c | 15 +++++++++------ source/dnode/vnode/src/tq/tqMeta.c | 10 ++++++---- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/libs/executor/src/executil.c | 13 +++++++------ 9 files changed, 52 insertions(+), 26 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a7de03fbce..db46858cec 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2772,8 +2772,8 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { buf = taosDecodeString(buf, &pReq->qmsg); } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - buf = taosDecodeString(buf, &pReq->qmsg); buf = taosDecodeFixedI64(buf, &pReq->suid); + buf = taosDecodeString(buf, &pReq->qmsg); } return (void*)buf; } diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e3a75ecabc..d64b3dd193 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -82,7 +82,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id); -int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList); +int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* node, SArray **tableList); /** * set the task Id, usually used by message queue process diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 0248a195db..39c771d111 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -538,7 +538,23 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + }else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL){ + SNode *pAst = NULL; + if (nodesStringToNode(pTopic->ast, &pAst) != 0) { + mError("topic:%s, failed to create since %s", pTopic->name, terrstr()); + return -1; + } + + SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; + if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { + mError("failed to create topic:%s since %s", pTopic->name, terrstr()); + nodesDestroyNode(pAst); + return -1; + } + nodesDestroyNode(pAst); + } + if(pPlan){ int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); if (levelNum != 1) { qDestroyQueryPlan(pPlan); @@ -579,7 +595,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); - if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) { + if (pSubplan) { int32_t msgLen; pSubplan->execNode.epSet = pVgEp->epSet; @@ -591,8 +607,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - } else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){ - pVgEp->qmsg = taosStrdup(pTopic->ast); } else { pVgEp->qmsg = taosStrdup(""); } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8b708c3e0f..2b54eb6389 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1205,7 +1205,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql); - if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { + if (pTopic->ast == NULL) { sdbRelease(pSdb, pTopic); continue; } @@ -2247,7 +2247,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, } } - if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { + if (pTopic->ast == NULL) { sdbRelease(pSdb, pTopic); continue; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 7d71aae3f4..863055922a 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -420,6 +420,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * mError("failed to create topic:%s since %s", pCreate->name, terrstr()); taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.sql); + nodesDestroyNode(pAst); return -1; } @@ -427,6 +428,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (topicObj.ntbColIds == NULL) { taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.sql); + nodesDestroyNode(pAst); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -442,6 +444,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.sql); + nodesDestroyNode(pAst); return -1; } @@ -462,8 +465,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.stbUid = pStb->uid; mndReleaseStb(pMnode, pStb); - topicObj.ast = taosStrdup(pCreate->ast); - topicObj.astLen = strlen(pCreate->ast) + 1; + if(pCreate->ast != NULL){ + qDebugL("topic:%s ast %s", topicObj.name, pCreate->ast); + topicObj.ast = taosStrdup(pCreate->ast); + topicObj.astLen = strlen(pCreate->ast) + 1; + } } /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ /*topicObj.ast = NULL;*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index be3d6bc614..30c52d9fc2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -518,19 +518,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.execTb.qmsg = req.qmsg; req.qmsg = NULL; - if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s", terrstr()); - return -1; + if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { + if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { + tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(), + pVnode->config.vgId, req.subKey, pHandle->consumerId); + return -1; + } } SArray* tbUidList = NULL; - ret = qGetTableList(req.suid, pVnode->pMeta, pVnode, pHandle->execHandle.execTb.node, NULL, &tbUidList); + ret = qGetTableList(req.suid, pVnode->pMeta, pVnode, pHandle->execHandle.execTb.node, &tbUidList); if(ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, req.subKey, pHandle->consumerId); + tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId); taosArrayDestroy(tbUidList); goto end; } - tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, req.suid); + tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index ee7af5b2bf..e93efcd3cc 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -338,13 +338,15 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s", terrstr()); - return -1; + if(strcmp(handle.execHandle.execTb.qmsg, "") != 0) { + if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) { + tqError("nodesStringToNode error in sub stable, since %s", terrstr()); + return -1; + } } SArray* tbUidList = NULL; - int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, handle.execHandle.execTb.node, NULL, &tbUidList); + int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); taosArrayDestroy(tbUidList); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 38f5307384..820a8b0eba 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1041,7 +1041,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (isAdd) { SArray* list = NULL; - int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, pTqHandle->execHandle.execTb.node, NULL, &list); + int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); taosArrayDestroy(list); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 1fb35ae271..fb889692ef 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1122,13 +1122,14 @@ _end: return code; } -int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList){ - SScanPhysiNode node = {0}; - node.suid = suid; - node.uid = suid; - node.tableType = TSDB_SUPER_TABLE; +int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* node, SArray **tableList){ + SSubplan *pSubplan = (SSubplan *)node; + SScanPhysiNode pNode = {0}; + pNode.suid = suid; + pNode.uid = suid; + pNode.tableType = TSDB_SUPER_TABLE; STableListInfo* pTableListInfo = tableListCreate(); - int code = getTableList(metaHandle, pVnode, &node, pTagCond, pTagIndexCond, pTableListInfo, "qGetTableList"); + int code = getTableList(metaHandle, pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList"); *tableList = pTableListInfo->pTableList; pTableListInfo->pTableList = NULL; tableListDestroy(pTableListInfo); -- GitLab