diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e772a47e3dc79aec6f474f1f9ad93b4398c67f6e..01923d2b30af0b4f4de5eddac607503261f32fc1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2767,6 +2767,7 @@ typedef struct { typedef struct { SMsgHead head; int64_t leftForVer; + int64_t streamId; int32_t taskId; } SVDropStreamTaskReq; @@ -2958,6 +2959,7 @@ int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset); typedef struct { SMsgHead head; + int64_t streamId; int32_t taskId; } SVPauseStreamTaskReq; @@ -2976,6 +2978,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq typedef struct { SMsgHead head; int32_t taskId; + int64_t streamId; int8_t igUntreated; } SVResumeStreamTaskReq; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b241ae9b4125d4fd0fbaf40ef7477d819229572f..b9b24917f3dcf3a2a31780e1f9980b93da74efd4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -644,9 +644,9 @@ void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); -int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it -SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); +SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaBegin(SStreamMeta* pMeta); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2aac05b22d2c126780b479a3cbc2fa255d2f3e5f..36771147a9b5c312cb225ccae27bb426db39b3f5 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -232,7 +232,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory) { - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); + int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid; + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -335,8 +336,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; - mDebug("s-task:0x%x related history task:0x%x, level:%d", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId, - (*pHTask)->info.taskLevel); + mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId, + (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel); } } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4001202254d7717752712a97e2c38471be773182..a0d53ec780e26b969d1d543761582261b6251474 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -649,6 +649,8 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + STransAction action = {0}; memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; @@ -1361,6 +1363,8 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { } pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + STransAction action = {0}; memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; @@ -1501,7 +1505,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig } pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; pReq->igUntreated = igUntreated; + STransAction action = {0}; memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 558180a3c23f8293d33587d87ba8e7b47fe4db40..4000e728359b7f88f339519474b2033b14502bf6 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -35,9 +35,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); - int32_t taskId = req.taskId; - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, @@ -181,21 +179,21 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); return 0; } - streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); + streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask) { streamProcessRunReq(pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); @@ -213,9 +211,8 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; streamProcessDispatchMsg(pTask, &req, &rsp, exec); @@ -235,8 +232,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); - int32_t taskId = req.dstTaskId; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.dstTaskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0}; @@ -251,8 +247,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = ntohl(pRsp->upstreamTaskId); - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + + int32_t taskId = htonl(pRsp->upstreamTaskId); + int64_t streamId = htobe64(pRsp->streamId); + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, streamId, taskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pSnode->pMeta, pTask); @@ -260,7 +259,6 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { } else { return -1; } - return 0; } int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { @@ -297,7 +295,7 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) tDecoderClear(&decoder); // find task - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { return -1; } @@ -340,7 +338,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); @@ -400,7 +398,7 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, pSnode->pMeta->vgId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index af336adc6aaff9ee6cf0331e362956b33fc47019..ad1af080fdef9b6f8cd844dfb1061dd69efb2d2e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1062,7 +1062,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1072,8 +1072,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", - taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); @@ -1099,7 +1100,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, pTq->pStreamMeta->vgId); @@ -1160,7 +1161,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // not added into meta store if (added) { tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); - SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId); if (p != NULL) { // reset the downstreamReady flag. streamTaskCheckDownstreamTasks(p); } @@ -1178,7 +1179,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t code = TSDB_CODE_SUCCESS; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", pMeta->vgId, pReq->taskId); @@ -1234,7 +1235,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { bool done = false; // 1. get the related stream task - pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", @@ -1242,7 +1243,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaUnregisterTask(pMeta, pTask->id.taskId); + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return -1; } @@ -1350,7 +1351,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); return -1; @@ -1386,7 +1387,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pTq->pStreamMeta->vgId, req.downstreamTaskId); @@ -1412,7 +1413,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { tDecodeCompleteHistoryDataMsg(&decoder, &req); tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pTq->pStreamMeta->vgId, req.upstreamTaskId); @@ -1503,7 +1504,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; @@ -1538,7 +1539,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, exec); @@ -1552,10 +1553,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = ntohl(pRsp->upstreamTaskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - int32_t vgId = pTq->pStreamMeta->vgId; + int32_t vgId = pTq->pStreamMeta->vgId; + int32_t taskId = htonl(pRsp->upstreamTaskId); + int64_t streamId = htobe64(pRsp->streamId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, streamId, taskId); + if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1569,13 +1572,13 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId); return 0; } - streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); + streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -1584,7 +1587,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, pReq->taskId); @@ -1597,7 +1600,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, pTask->historyTaskId.taskId); @@ -1656,13 +1659,13 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); if (code != 0) { return code; } - SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); + SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask) { code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); } @@ -1681,8 +1684,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); - int32_t taskId = req.dstTaskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; @@ -1720,7 +1722,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { tDecoderClear(&decoder); int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, false); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c3e7d03e4397f5d21c1866e3d2117646cfe5cf2e..3d9a91899cb2f91e811fb66c15610a9d0717ddca 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -72,8 +72,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { taosWUnLockLatch(&pMeta->lock); for (int32_t i = 0; i < numOfTasks; ++i) { - int32_t* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId); + SStreamId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; } @@ -242,8 +242,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - int32_t* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); + SStreamId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1fd2f7edf47c63b14360aa023b1c128c27a170c5..c7da80fdaf216896a6838d372cda4650990b7e90 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -290,7 +290,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo: destroy the fill-history task here qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, @@ -350,10 +350,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); - int32_t taskId = pTask->id.taskId; // 5. free it and remove fill-history task from disk meta-store - streamMetaUnregisterTask(pMeta, taskId); + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); // 6. save to disk taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 80b690e20d0504c0b36b0754a596b39306129001..fe455c0190f3abe013fdfd1609b7cb473afda85b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -66,14 +66,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); + _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); if (pMeta->pTasks == NULL) { goto _err; } // task list - pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t)); + pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamId)); if (pMeta->pTaskList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -161,43 +161,6 @@ void streamMetaClose(SStreamMeta* pMeta) { taosMemoryFree(pMeta); } -#if 0 -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeStreamTask(&decoder, pTask) < 0) { - tDecoderClear(&decoder); - goto FAIL; - } - tDecoderClear(&decoder); - - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - ASSERT(0); - goto FAIL; - } - - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { - goto FAIL; - } - - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) { - taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t)); - ASSERT(0); - goto FAIL; - } - - return 0; - -FAIL: - if (pTask) tFreeStreamTask(pTask); - return -1; -} -#endif - int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; @@ -241,14 +204,15 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); return -1; } - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + taosArrayPush(pMeta->pTaskList, &pTask->id); if (streamMetaSaveTask(pMeta, pTask) < 0) { tFreeStreamTask(pTask); @@ -263,7 +227,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES); *pAdded = true; return 0; } @@ -274,10 +238,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { return (int32_t)size; } -SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { +SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + int64_t keys[2] = {streamId, taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); @@ -304,22 +269,24 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } } -static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) { +static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) { for (int32_t i = 0; i < num; ++i) { - int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); - if (*pTaskId == taskId) { + SStreamId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { taosArrayRemove(pMeta->pTaskList, i); break; } } } -int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; // pre-delete operation taosWLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + + int64_t keys[2] = {streamId, taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { pTask = *ppTask; atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -335,7 +302,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { while (1) { taosRLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { @@ -354,15 +321,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { // let's do delete of stream task taosWLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { - taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); + taosHashRemove(pMeta->pTasks, keys, sizeof(keys)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); ASSERT(pTask->status.timerActive == 0); - - int32_t num = taosArrayGetSize(pMeta->pTaskList); - doRemoveIdFromList(pMeta, num, pTask->id.taskId); + doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); // remove the ref by timer if (pTask->triggerParam != 0) { @@ -473,7 +438,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } // do duplicate task check. - void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { tdbFree(pKey); @@ -484,7 +450,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + taosArrayPush(pMeta->pTaskList, &pTask->id); } else { tdbFree(pKey); tdbFree(pVal); @@ -493,7 +459,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { continue; } - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ad486c3f201d3dab112b417d323c77ab7ccb5c01..e59b3f682d35ea498e6ae6fe9ecf8d0df367cbcc 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -19,7 +19,8 @@ typedef struct SStreamTaskRetryInfo { SStreamMeta* pMeta; - int32_t taskId; + int32_t taskId; + int64_t streamId; } SStreamTaskRetryInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); @@ -540,7 +541,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId); taosWLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); + int64_t keys[2] = {pInfo->streamId, pInfo->taskId}; + + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { ASSERT((*ppTask)->status.timerActive == 1); @@ -556,12 +559,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } taosWUnLockLatch(&pMeta->lock); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId); if (pTask != NULL) { ASSERT(pTask->status.timerActive == 1); // abort the timer if intend to stop task - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); qWarn( @@ -595,14 +598,16 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t hTaskId = pTask->historyTaskId.taskId; + int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (pHTask == NULL) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, hTaskId); SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); pInfo->taskId = pTask->id.taskId; + pInfo->streamId = pTask->id.streamId; pInfo->pMeta = pTask->pMeta; if (pTask->launchTaskTimer == NULL) { @@ -797,7 +802,8 @@ void launchFillHistoryTask(SStreamTask* pTask) { } ASSERT(pTask->status.downstreamReady == 1); - qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId); + qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, + pTask->historyTaskId.streamId, tId); // launch associated fill history task streamLaunchFillHistoryTask(pTask);