diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 66418e684107ec6c549ce4b828d3ecb5594008c5..e46757f6f8d5aa141e7d85dd1594fc03109ee674 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -82,7 +82,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id); -int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList); +int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo); /** * set the task Id, usually used by message queue process diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 01a60a99e4254e62a9202e1c7f67e5f2812c8610..f8116f49efc00283d3058c101cf5d08143444ff0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -712,8 +712,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } + buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, + (SSnapContext**)(&handle.sContext)); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); + SArray* tbUidList = NULL; - ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList); + ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId); taosArrayDestroy(tbUidList); @@ -723,10 +727,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); - - buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index d4371f126435512b214dd301dacc3843c8a7c9cd..0edffd7f056077ddab5e92ef23c6359db20da070 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -345,9 +345,12 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { return -1; } } + buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, + handle.fetchMeta, (SSnapContext**)(&reader.sContext)); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); SArray* tbUidList = NULL; - int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList); + int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); taosArrayDestroy(tbUidList); @@ -357,10 +360,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); - - buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, - handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); taosWLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a5ec0d4def65ba8f1c7ba0b7e21e77dcadc58a35..6ef232995ac075021fcdc89569185d952d585037 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1084,7 +1084,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (isAdd) { SArray* list = NULL; - int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list); + int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task); if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); taosArrayDestroy(list); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 19b64d9833462138b64aabd85b513a45e6f679db..a168050645496d0c94e6cc1e0c05dc35d18b9346 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1152,14 +1152,14 @@ _end: return code; } -int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList){ +int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo){ SSubplan *pSubplan = (SSubplan *)node; SScanPhysiNode pNode = {0}; pNode.suid = suid; pNode.uid = suid; pNode.tableType = TSDB_SUPER_TABLE; STableListInfo* pTableListInfo = tableListCreate(); - int code = getTableList(pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList"); + int code = getTableList(pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI); *tableList = pTableListInfo->pTableList; pTableListInfo->pTableList = NULL; tableListDestroy(pTableListInfo);