diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c04e416896639e5221d0099707316053253e5041..a4ddf966300aca2974fa90f802bac7ed2aeacacb 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -151,33 +151,36 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { - sdbRelease(pMnode->pSdb, pDb); - - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(pVgs); - SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); - int32_t sinkLvSize = taosArrayGetSize(sinkLv); - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - for (int32_t j = 0; j < sinkLvSize; j++) { - SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); - if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->taskId; - break; - } + ASSERT(0); + return -1; + } + sdbRelease(pMnode->pSdb, pDb); + + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + ASSERT(pVgInfo->taskId != 0); + break; } } - } else { - pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pArray = taosArrayGetP(pStream->tasks, 0); - // one sink only - ASSERT(taosArrayGetSize(pArray) == 1); - SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; - pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; - pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } return 0; } @@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // dispatch - mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask); + if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) { + qDestroyQueryPlan(pPlan); + return -1; + } // exec pFinalTask->execType = TASK_EXEC__PIPE; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6fb5d75d8620b3138f95eb03fbd767168d3d39d1..d52337463874ab656c697ff72ce5123f5ac4ed64 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM .upstreamNodeId = pTask->nodeId, .blockNum = blockNum, }; - qInfo("dispatch from task %d (child id %d)", pTask->taskId, pTask->childId); req.data = taosArrayInit(blockNum, sizeof(void*)); req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); @@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM break; } } - ASSERT(vgId != 0); } + ASSERT(vgId != 0); req.taskId = downstreamTaskId; + qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId, + downstreamTaskId, vgId); + // serialize int32_t tlen; tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);