From 4393375e47423530e5f50f5765a687ad1fd98979 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Aug 2023 02:16:04 +0800 Subject: [PATCH] fix(stream): set correct task id. --- source/dnode/mnode/impl/src/mndScheduler.c | 7 ++++--- source/dnode/snode/src/snode.c | 11 +++++++---- source/dnode/vnode/src/tq/tq.c | 5 +++-- source/libs/stream/src/streamRecover.c | 3 ++- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2aac05b22d..36771147a9 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -232,7 +232,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory) { - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); + int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid; + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -335,8 +336,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; - mDebug("s-task:0x%x related history task:0x%x, level:%d", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId, - (*pHTask)->info.taskLevel); + mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId, + (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel); } } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 91346e1d83..4000e72835 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -192,7 +192,8 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask) { streamProcessRunReq(pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); @@ -246,8 +247,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = ntohl(pRsp->upstreamTaskId); - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, taskId); + + int32_t taskId = htonl(pRsp->upstreamTaskId); + int64_t streamId = htobe64(pRsp->streamId); + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, streamId, taskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pSnode->pMeta, pTask); @@ -255,7 +259,6 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { } else { return -1; } - return 0; } int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8454110e18..ad1af080fd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1072,8 +1072,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", - taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 79f856ee0b..e59b3f682d 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -802,7 +802,8 @@ void launchFillHistoryTask(SStreamTask* pTask) { } ASSERT(pTask->status.downstreamReady == 1); - qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId); + qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, + pTask->historyTaskId.streamId, tId); // launch associated fill history task streamLaunchFillHistoryTask(pTask); -- GitLab