From 16d7707b9095e17488c477f13e15573b585f97e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 19:33:43 +0800 Subject: [PATCH] fix(stream): align the scan real time data for stream task. --- include/libs/stream/tstream.h | 22 +++- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/snode/src/snode.c | 8 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/tq/tq.c | 80 +++++++++---- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/libs/stream/inc/streamInt.h | 3 + source/libs/stream/src/stream.c | 14 ++- source/libs/stream/src/streamDispatch.c | 107 ++++++++++++++++- source/libs/stream/src/streamRecover.c | 121 +++++++++++++------- 11 files changed, 283 insertions(+), 82 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 329a6bbc29..b7a516190b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -336,6 +336,7 @@ struct SStreamTask { void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend + SArray* pRspMsgList; // the followings attributes don't be serialized int32_t notReadyTasks; @@ -457,7 +458,9 @@ typedef struct { typedef struct { int64_t streamId; - int32_t taskId; + int32_t upstreamTaskId; + int32_t downstreamTaskId; + int32_t upstreamNodeId; int32_t childId; } SStreamScanHistoryFinishReq, SStreamTransferReq; @@ -518,6 +521,17 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); +typedef struct { + int64_t streamId; + int32_t upstreamTaskId; + int32_t upstreamNodeId; + int32_t downstreamId; + int32_t downstreamNode; +} SStreamCompleteHistoryMsg; + +int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); +int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); + typedef struct { int64_t streamId; int32_t downstreamTaskId; @@ -567,6 +581,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); +SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); @@ -607,8 +622,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); // agg level -int32_t streamAggScanHistoryPrepare(SStreamTask* pTask); -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId); +int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo); +int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta void streamMetaInit(); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index b2fb7243ff..8206b4e425 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -79,6 +79,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d48bd3f847..462b5b9080 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -740,6 +740,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3b19c7ae4a..6288a048f7 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -274,7 +274,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return 0; } -int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { +int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -287,12 +287,12 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); // find task - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId); if (pTask == NULL) { return -1; } // do process request - if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) { + if (streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info) < 0) { streamMetaReleaseTask(pSnode->pMeta, pTask); return -1; } @@ -415,7 +415,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: - return sndProcessTaskRecoverFinishReq(pSnode, pMsg); + return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); case TDMT_STREAM_TASK_CHECK: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f5a610efc7..77e4a48249 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -250,8 +250,8 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); // sma diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 98e0696c19..aeb9b55d12 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1084,14 +1084,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); int64_t st = taosGetTimestampMs(); - int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, - TASK_SCHED_STATUS__WAITING); - if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { - tqDebug("s-task:%s failed to launch scan history data in current time window, unexpected sched status:%d", id, - schedStatus); - streamMetaReleaseTask(pMeta, pTask); - return 0; + // we have to continue retrying to successfully execute the scan history task. + while (1) { + int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, + TASK_SCHED_STATUS__WAITING); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + break; + } + + tqError("s-task:%s failed to start scan history in current time window, unexpected sched-status:%d, retry in 100ms", + id, schedStatus); + taosMsleep(100); } if (!streamTaskRecoverScanStep1Finished(pTask)) { @@ -1195,12 +1199,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scan history in current time window completed, no related fill history task, reset the time " + "s-task:%s scan history in stream time window completed, no related fill history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); } else { tqDebug( - "s-task:%s scan history in current time window completed, now start to handle data from WAL, start " + "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } @@ -1209,11 +1213,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { code = streamTaskScanHistoryDataComplete(pTask); streamMetaReleaseTask(pMeta, pTask); - // let's start the stream task by extracting data from wal - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - tqStartStreamTasks(pTq); - } - + // when all source task complete to scan history data in stream time window, they are allowed to handle stream data + // at the same time. return code; } @@ -1232,17 +1233,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); - tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.taskId); + tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.taskId); + tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); return -1; } int32_t remain = streamAlignTransferState(pTask); if (remain > 0) { - tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain); + tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); return 0; } @@ -1257,7 +1258,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1269,20 +1270,49 @@ int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); - // find task - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); if (pTask == NULL) { - tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId); + tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", + pTq->pStreamMeta->vgId, req.downstreamTaskId); return -1; } - int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId); + tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); + + int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return code; } -int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { - // +int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamCompleteHistoryMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecodeCompleteHistoryDataMsg(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", + pTq->pStreamMeta->vgId, req.upstreamTaskId); + return -1; + } + + tqDebug("s-task:%s scan-history finish rsp received from task:0x%x", pTask->id.idStr, req.downstreamId); + + int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); + if (remain > 0) { + tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain); + } else { + streamProcessScanHistoryFinishRsp(pTask); + } + + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e4a7ed224c..33af7631de 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_STREAM_TRANSFER_STATE: return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: - return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg); + return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); + return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 2164b63caf..ff3f35bfed 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -54,6 +54,9 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); +int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); +int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); + extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 4de9f6a7ed..090cef48de 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -419,4 +419,16 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { } } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } + +SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { + int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + for(int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); + if (pInfo->taskId == taskId) { + return pInfo; + } + } + + return NULL; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 566e7da9e4..a22a9ec534 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -25,6 +25,12 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; +static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { + pMsg->msgType = msgType; + pMsg->pCont = pCont; + pMsg->contLen = contLen; +} + static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -311,13 +317,12 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc msg.contLen = tlen + sizeof(SMsgHead); msg.pCont = buf; msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH; - msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, - pReq->taskId, vgId); + qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, + pReq->downstreamTaskId, vgId); return 0; } @@ -620,3 +625,99 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { // this block can not be deleted until it has been sent to downstream task successfully. return TSDB_CODE_SUCCESS; } + +int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +typedef struct { + SEpSet epset; + int32_t taskId; + SRpcMsg msg; +} SStreamContinueExecInfo; + +int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { + int32_t len = 0; + int32_t code = 0; + SEncoder encoder; + + SStreamCompleteHistoryMsg msg = { + .streamId = pReq->streamId, + .upstreamTaskId = pReq->upstreamTaskId, + .upstreamNodeId = pReq->upstreamNodeId, + .downstreamId = pReq->downstreamTaskId, + .downstreamNode = pTask->pMeta->vgId, + }; + + tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code); + if (code < 0) { + return code; + } + + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId); + + void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeCompleteHistoryDataMsg(&encoder, &msg); + tEncoderClear(&encoder); + + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + + SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet}; + initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); + info.msg.info = *pRpcInfo; + + // todo: fix race condition here + if (pTask->pRspMsgList == NULL) { + pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); + } + + taosArrayPush(pTask->pRspMsgList, &info); + + int32_t num = taosArrayGetSize(pTask->pRspMsgList); + qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, + num); + return TSDB_CODE_SUCCESS; +} + +int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); + + int32_t num = taosArrayGetSize(pTask->pRspMsgList); + for (int32_t i = 0; i < num; ++i) { + SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); + tmsgSendRsp(&pInfo->msg); + + qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel, + pInfo->taskId); + } + + taosArrayClear(pTask->pRspMsgList); + qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, + num); + return 0; +} diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 693d067506..958a94d741 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -72,9 +72,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - int32_t code = doLaunchScanHistoryTask(pTask); - streamTaskEnablePause(pTask); - return code; + return doLaunchScanHistoryTask(pTask); } else { ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, @@ -83,12 +81,11 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamSetParamForScanHistory(pTask); - streamAggScanHistoryPrepare(pTask); + streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); + streamTaskScanHistoryPrepare(pTask); } - - streamTaskEnablePause(pTask); return 0; } @@ -143,6 +140,12 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { streamTaskSetForReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); + + // enable pause when init completed. + if (pTask->historyTaskId.taskId == 0) { + streamTaskEnablePause(pTask); + } + launchFillHistoryTask(pTask); } @@ -195,14 +198,14 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { streamTaskSetRangeStreamCalc(pTask); if (status == TASK_STATUS__SCAN_HISTORY) { - qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, str); + qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); streamTaskLaunchScanHistory(pTask); } else { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); } // enable pause when init completed. - if (pTask->historyTaskId.taskId == 0 && pTask->info.fillHistory == 0) { + if (pTask->historyTaskId.taskId == 0) { streamTaskEnablePause(pTask); } @@ -296,7 +299,7 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) { } int32_t streamRestoreParam(SStreamTask* pTask) { - qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr); + qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr); return qRestoreStreamOperatorOption(pTask->exec.pExecutor); } @@ -334,23 +337,33 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { } int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { - SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; + SStreamScanHistoryFinishReq req = { + .streamId = pTask->id.streamId, + .childId = pTask->info.selfChildId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->pMeta->vgId, + }; // serialize if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.taskId = pTask->fixedEpDispatcher.taskId; + req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + pTask->notReadyTasks = 1; streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->notReadyTasks = numOfVgs; qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr, numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.taskId = pVgInfo->taskId; + req.downstreamTaskId = pVgInfo->taskId; streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } + } else { + qDebug("s-task:%s no downstream tasks, invoke history finish rsp directly", pTask->id.idStr); + streamProcessScanHistoryFinishRsp(pTask); } return 0; @@ -394,7 +407,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe tmsgSendReq(pEpSet, &msg); qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId); + pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId); return 0; } @@ -404,7 +417,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // serialize if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.taskId = pTask->fixedEpDispatcher.taskId; + req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -412,7 +425,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { int32_t numOfVgs = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.taskId = pVgInfo->taskId; + req.downstreamTaskId = pVgInfo->taskId; doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } @@ -421,10 +434,11 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { } // agg -int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) { +int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s agg task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, - pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus)); + qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", + pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, + streamGetTaskStatusStr(pTask->status.taskStatus)); return 0; } @@ -440,27 +454,56 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { return 0; } -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) { - if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); - ASSERT(left >= 0); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, + SRpcHandleInfo* pRpcInfo) { + int32_t taskLevel = pTask->info.taskLevel; + ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); + + // sink node do not send end of scan history msg to its upstream, which is agg task. + streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq); - if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data", - pTask->id.idStr, numOfTasks); + int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); + ASSERT(left >= 0); + if (left == 0) { + int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); + qDebug( + "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send " + "rsp to all upstream tasks", + pTask->id.idStr, numOfTasks); + + if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamAggUpstreamScanHistoryFinish(pTask); - } else { - qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", - pTask->id.idStr, taskId, childId, left); } + streamNotifyUpstreamContinue(pTask); + } else { + qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", + pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left); } return 0; } +int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { + ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); + SStreamMeta* pMeta = pTask->pMeta; + + // execute in the scan history complete call back msg, ready to process data from inputQ + streamSetStatusNormal(pTask); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + + taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pTask); + taosWUnLockLatch(&pMeta->lock); + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamSchedExec(pTask); + } + + return TSDB_CODE_SUCCESS; +} + static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; @@ -579,7 +622,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { return 0; } @@ -596,16 +638,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { return -1; } - ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); - - // ready to process data from inputQ - streamSetStatusNormal(pTask); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - - taosWLockLatch(&pMeta->lock); - streamMetaSaveTask(pMeta, pTask); - taosWUnLockLatch(&pMeta->lock); - return 0; } @@ -702,15 +734,20 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } + int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; tEndDecode(pDecoder); return 0; -- GitLab