diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 33905bad86c74f9f990512ef06815190a54d720f..2563417c5ad5e965460e35016c1701140b9cac9b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -637,6 +637,12 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return code; } + SArray* pSinkTaskList = taosArrayGetP(pStream->tasks, 0); + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setEpToDownstreamTask(pAggTask, pSinkTask); + } + // source level return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); } else if (numOfPlanLevel == 1) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fead7253f842ff8a836edc0698e8074f1445cc33..9233b608b540fd98589ba39f3d9cc92dbcb2a54e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -158,8 +158,6 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s (vgId:%d) checkpoint to task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, - pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i); streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { // no need to dispatch msg to downstream task diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 939737ee2dc128805b7e3a847a8762ac9ce7d97f..32e9c965cb9e1c8e4ba0ed43f1c94c6c11fb0ce0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -444,8 +444,8 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointR tEncoderClear(&encoder); initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); - qDebug("s-task:%s (level:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); + qDebug("s-task:%s (level:%d, vgId:5d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, + pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); return 0;