提交 a71878d0 编写于 作者: wmmhello's avatar wmmhello

feat: support create topic as stable with conditions

上级 2713f4f6
...@@ -2772,8 +2772,8 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) ...@@ -2772,8 +2772,8 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
buf = taosDecodeString(buf, &pReq->qmsg); buf = taosDecodeString(buf, &pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
buf = taosDecodeString(buf, &pReq->qmsg);
buf = taosDecodeFixedI64(buf, &pReq->suid); buf = taosDecodeFixedI64(buf, &pReq->suid);
buf = taosDecodeString(buf, &pReq->qmsg);
} }
return (void*)buf; return (void*)buf;
} }
......
...@@ -82,7 +82,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v ...@@ -82,7 +82,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
uint64_t id); uint64_t id);
int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList); int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* node, SArray **tableList);
/** /**
* set the task Id, usually used by message queue process * set the task Id, usually used by message queue process
......
...@@ -538,7 +538,23 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -538,7 +538,23 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL){
SNode *pAst = NULL;
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
return -1;
}
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
mError("failed to create topic:%s since %s", pTopic->name, terrstr());
nodesDestroyNode(pAst);
return -1;
}
nodesDestroyNode(pAst);
}
if(pPlan){
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) { if (levelNum != 1) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
...@@ -579,7 +595,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -579,7 +595,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) { if (pSubplan) {
int32_t msgLen; int32_t msgLen;
pSubplan->execNode.epSet = pVgEp->epSet; pSubplan->execNode.epSet = pVgEp->epSet;
...@@ -591,8 +607,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -591,8 +607,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
} else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
pVgEp->qmsg = taosStrdup(pTopic->ast);
} else { } else {
pVgEp->qmsg = taosStrdup(""); pVgEp->qmsg = taosStrdup("");
} }
......
...@@ -1205,7 +1205,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, ...@@ -1205,7 +1205,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql); pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { if (pTopic->ast == NULL) {
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
continue; continue;
} }
...@@ -2247,7 +2247,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, ...@@ -2247,7 +2247,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
} }
} }
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { if (pTopic->ast == NULL) {
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
continue; continue;
} }
......
...@@ -420,6 +420,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -420,6 +420,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
mError("failed to create topic:%s since %s", pCreate->name, terrstr()); mError("failed to create topic:%s since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql); taosMemoryFree(topicObj.sql);
nodesDestroyNode(pAst);
return -1; return -1;
} }
...@@ -427,6 +428,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -427,6 +428,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (topicObj.ntbColIds == NULL) { if (topicObj.ntbColIds == NULL) {
taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql); taosMemoryFree(topicObj.sql);
nodesDestroyNode(pAst);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -442,6 +444,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -442,6 +444,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql); taosMemoryFree(topicObj.sql);
nodesDestroyNode(pAst);
return -1; return -1;
} }
...@@ -462,9 +465,12 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -462,9 +465,12 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.stbUid = pStb->uid; topicObj.stbUid = pStb->uid;
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
if(pCreate->ast != NULL){
qDebugL("topic:%s ast %s", topicObj.name, pCreate->ast);
topicObj.ast = taosStrdup(pCreate->ast); topicObj.ast = taosStrdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.astLen = strlen(pCreate->ast) + 1;
} }
}
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
/*topicObj.ast = NULL;*/ /*topicObj.ast = NULL;*/
/*topicObj.astLen = 0;*/ /*topicObj.astLen = 0;*/
......
...@@ -518,19 +518,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -518,19 +518,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->execHandle.execTb.qmsg = req.qmsg; pHandle->execHandle.execTb.qmsg = req.qmsg;
req.qmsg = NULL; req.qmsg = NULL;
if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr()); tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(),
pVnode->config.vgId, req.subKey, pHandle->consumerId);
return -1; return -1;
} }
}
SArray* tbUidList = NULL; SArray* tbUidList = NULL;
ret = qGetTableList(req.suid, pVnode->pMeta, pVnode, pHandle->execHandle.execTb.node, NULL, &tbUidList); ret = qGetTableList(req.suid, pVnode->pMeta, pVnode, pHandle->execHandle.execTb.node, &tbUidList);
if(ret != TDB_CODE_SUCCESS) { if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, req.subKey, pHandle->consumerId); tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
goto end; goto end;
} }
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, req.suid); tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
......
...@@ -338,13 +338,15 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -338,13 +338,15 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
if(strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) { if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr()); tqError("nodesStringToNode error in sub stable, since %s", terrstr());
return -1; return -1;
} }
}
SArray* tbUidList = NULL; SArray* tbUidList = NULL;
int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, handle.execHandle.execTb.node, NULL, &tbUidList); int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList);
if(ret != TDB_CODE_SUCCESS) { if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
......
...@@ -1041,7 +1041,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -1041,7 +1041,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) { if (isAdd) {
SArray* list = NULL; SArray* list = NULL;
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, pTqHandle->execHandle.execTb.node, NULL, &list); int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list);
if(ret != TDB_CODE_SUCCESS) { if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
taosArrayDestroy(list); taosArrayDestroy(list);
......
...@@ -1122,13 +1122,14 @@ _end: ...@@ -1122,13 +1122,14 @@ _end:
return code; return code;
} }
int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList){ int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* node, SArray **tableList){
SScanPhysiNode node = {0}; SSubplan *pSubplan = (SSubplan *)node;
node.suid = suid; SScanPhysiNode pNode = {0};
node.uid = suid; pNode.suid = suid;
node.tableType = TSDB_SUPER_TABLE; pNode.uid = suid;
pNode.tableType = TSDB_SUPER_TABLE;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
int code = getTableList(metaHandle, pVnode, &node, pTagCond, pTagIndexCond, pTableListInfo, "qGetTableList"); int code = getTableList(metaHandle, pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList");
*tableList = pTableListInfo->pTableList; *tableList = pTableListInfo->pTableList;
pTableListInfo->pTableList = NULL; pTableListInfo->pTableList = NULL;
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册