From 41ed0138b437630f0b6cd0d08701c1ad0e85463c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 May 2023 17:25:11 +0800 Subject: [PATCH] fix:compile error --- include/libs/executor/executor.h | 2 +- source/dnode/vnode/src/tq/tq.c | 10 +++++----- source/dnode/vnode/src/tq/tqMeta.c | 9 ++++----- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/libs/executor/src/executil.c | 4 ++-- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 66418e6841..e46757f6f8 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -82,7 +82,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id); -int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList); +int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo); /** * set the task Id, usually used by message queue process diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 01a60a99e4..f8116f49ef 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -712,8 +712,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } + buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, + (SSnapContext**)(&handle.sContext)); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); + SArray* tbUidList = NULL; - ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList); + ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId); taosArrayDestroy(tbUidList); @@ -723,10 +727,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); - - buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index d4371f1264..0edffd7f05 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -345,9 +345,12 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { return -1; } } + buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, + handle.fetchMeta, (SSnapContext**)(&reader.sContext)); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); SArray* tbUidList = NULL; - int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList); + int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); taosArrayDestroy(tbUidList); @@ -357,10 +360,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); - - buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, - handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); taosWLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a5ec0d4def..6ef232995a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1084,7 +1084,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (isAdd) { SArray* list = NULL; - int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list); + int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); taosArrayDestroy(list); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 19b64d9833..a168050645 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1152,14 +1152,14 @@ _end: return code; } -int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList){ +int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo){ SSubplan *pSubplan = (SSubplan *)node; SScanPhysiNode pNode = {0}; pNode.suid = suid; pNode.uid = suid; pNode.tableType = TSDB_SUPER_TABLE; STableListInfo* pTableListInfo = tableListCreate(); - int code = getTableList(pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList"); + int code = getTableList(pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI); *tableList = pTableListInfo->pTableList; pTableListInfo->pTableList = NULL; tableListDestroy(pTableListInfo); -- GitLab