diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fe98007109c831e4ff64769fa56bbe67a760dfe9..192bebe95aa05b59c8ef27d444648be429ebb8a8 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -41,23 +41,22 @@ typedef struct { } SLocalFetch; typedef struct { - void* tqReader; - void* config; - void* vnode; - void* mnd; - SMsgCb* pMsgCb; - int64_t version; - bool initMetaReader; - bool initTableReader; - bool initTqReader; - int32_t numOfVgroups; - void* sContext; // SSnapContext* - - void* pStateBackend; - struct SStorageAPI api; + void* tqReader; + void* config; + void* vnode; + void* mnd; + SMsgCb* pMsgCb; + int64_t version; + bool initMetaReader; + bool initTableReader; + bool initTqReader; + int32_t numOfVgroups; + void* sContext; // SSnapContext* + void* pStateBackend; + int8_t fillHistory; + STimeWindow winRange; - int8_t fillHistory; - STimeWindow winRange; + struct SStorageAPI api; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2e6169dca3397cfd4835621eb9680a15db23a77f..8a905566f3c074b25abb11f4a120a401316bc1d9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -757,14 +757,22 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; - pTask->chkInfo.currentVer = ver; - - pTask->dataRange.range.maxVer = ver; - pTask->dataRange.range.minVer = ver; + // checkpoint exists, restore from the last checkpoint + if (pTask->chkInfo.keptCheckpointId != 0) { + ASSERT(pTask->chkInfo.version > 0); + pTask->chkInfo.currentVer = pTask->chkInfo.version; + pTask->dataRange.range.maxVer = pTask->chkInfo.version; + pTask->dataRange.range.minVer = pTask->chkInfo.version; + pTask->chkInfo.currentVer = pTask->chkInfo.version; + } else { + pTask->chkInfo.currentVer = ver; + pTask->dataRange.range.maxVer = ver; + pTask->dataRange.range.minVer = ver; + } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SStreamTask* pSateTask = pTask; - SStreamTask task = {0}; + SStreamTask task = {0}; if (pTask->info.fillHistory) { task.id = pTask->streamTaskId; task.pMeta = pTask->pMeta; @@ -777,12 +785,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } SReadHandle handle = { + .version = pTask->chkInfo.currentVer, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory, .winRange = pTask->dataRange.window, }; + initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); @@ -793,12 +803,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { SStreamTask* pSateTask = pTask; - SStreamTask task = {0}; + SStreamTask task = {0}; if (pTask->info.fillHistory) { task.id = pTask->streamTaskId; task.pMeta = pTask->pMeta; pSateTask = &task; } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { return -1; @@ -806,6 +817,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); SReadHandle handle = { + .version = pTask->chkInfo.currentVer, .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState, @@ -844,6 +856,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->tbSink.pTSchema == NULL) { return -1; } + pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr); } @@ -861,6 +874,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); + if (pTask->chkInfo.keptCheckpointId != 0) { + tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, + pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer); + } + return 0; } @@ -1283,14 +1301,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); + tDecoderClear(&decoder); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); - if (pTask) { + if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { + tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already", + pTq->pStreamMeta->vgId, req.taskId); tDeleteStreamDispatchReq(&req); return -1; } @@ -1565,27 +1586,25 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { if (tDecodeStreamCheckpointReq(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); - goto FAIL; + return code; } tDecoderClear(&decoder); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); - goto FAIL; + return TSDB_CODE_SUCCESS; } code = streamAddCheckpointRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { - goto FAIL; + streamMetaReleaseTask(pMeta, pTask); + return code; } streamProcessCheckpointReq(pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; - -FAIL: - return code; } // downstream task has complete the stream task checkpoint procedure @@ -1605,14 +1624,14 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { if (tDecodeStreamCheckpointRsp(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); - goto FAIL; + return code; } tDecoderClear(&decoder); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); - goto FAIL; + return code; } tqDebug("vgId:%d s-task:%s received the checkpoint rsp, handle it", vgId, pTask->id.idStr); @@ -1620,7 +1639,4 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { streamProcessCheckpointRsp(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask); return code; - - FAIL: - return code; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index e0d40cafa0852bb1cbb035ab301fb7153b85a996..d2ae324dd2a9e346d822d60d5b569831550117b0 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -48,12 +48,12 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); +int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); -int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); +int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 5838a5bf0feb3f784baffba2a3d401c7067db041..30c81d45863172f8e481ed32fd3a60c414f4bc92 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -174,7 +174,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; + return (status == TASK_INPUT_STATUS__NORMAL) ? 0 : -1; } int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { @@ -239,7 +239,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - // todo add the input queue buffer limitation + // if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked + streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); @@ -254,69 +255,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return 0; } -// todo record the idle time for dispatch data -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - if (code != TSDB_CODE_SUCCESS) { - // dispatch message failed: network error, or node not available. - // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set - // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure - // happened too fast. todo handle the shuffle dispatch failure - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); - int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); - if (ret != TSDB_CODE_SUCCESS) { - } - - return TSDB_CODE_SUCCESS; - } - - qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); - - // there are other dispatch message not response yet - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); - if (leftRsp > 0) { - return 0; - } - } - - pTask->msgInfo.retryCount = 0; - ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); - - qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus); - - // the input queue of the (down stream) task that receive the output data is full, - // so the TASK_INPUT_STATUS_BLOCKED is rsp - // todo blocking the output status - if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - - int32_t waitDuration = 300; // 300 ms - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", - pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration); - streamRetryDispatchStreamBlock(pTask, waitDuration); - } else { // pipeline send data in output queue - // this message has been sent successfully, let's try next one. - destroyStreamDataBlock(pTask->msgInfo.pData); - pTask->msgInfo.pData = NULL; - - if (pTask->msgInfo.blockingTs != 0) { - int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; - qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el); - pTask->msgInfo.blockingTs = 0; - } - - // now ready for next data output - atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - - // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); - } - - return 0; -} - int32_t streamProcessRunReq(SStreamTask* pTask) { if (streamTryExec(pTask) < 0) { return -1; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index af1224b7166c2e15f97e22bc81635ac23c9a895a..5d520421273a0d5399d1a1d86e6a997d80e6151b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -219,7 +219,7 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe // anymore ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0); - // there are still some upstream tasks not send checkpoint request + // there are still some upstream tasks not send checkpoint request, do nothing and wait for then int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); if (notReady > 0) { int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); @@ -230,12 +230,13 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe qDebug("s-task:%s received checkpoint req, all upstream sent checkpoint msg, dispatch checkpoint msg to downstream", pTask->id.idStr); - pTask->checkpointNotReadyTasks = (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) - ? 1 - : taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + + // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this node + // can start local checkpoint procedure + pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY - // 2. dispatch check point msg to all downstream tasks + // dispatch check point msg to all downstream tasks streamTaskDispatchCheckpointMsg(pTask, checkpointId); } @@ -257,7 +258,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) { appendCheckpointIntoInputQ(pTask); streamSchedExec(pTask); } else { - qDebug("s-task:%s %d downstream tasks are not ready, wait", pTask->id.idStr, notReady); + int32_t total = streamTaskGetNumOfDownstream(pTask); + qDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total); } return 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b2e2bfbda867274cd799a392018b995c67140028..4a49806035ce6b202340a8cd95c9ff9490346299 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -238,7 +238,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR return 0; } -int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { +static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; int32_t numOfBlocks = taosArrayGetSize(pData->blocks); @@ -807,3 +807,66 @@ int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pR return TSDB_CODE_SUCCESS; } + +// todo record the idle time for dispatch data +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + // dispatch message failed: network error, or node not available. + // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set + // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure + // happened too fast. todo handle the shuffle dispatch failure + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, + pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); + int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (ret != TSDB_CODE_SUCCESS) { + } + + return TSDB_CODE_SUCCESS; + } + + qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); + + // there are other dispatch message not response yet + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); + if (leftRsp > 0) { + return 0; + } + } + + pTask->msgInfo.retryCount = 0; + ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); + + qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus); + + // the input queue of the (down stream) task that receive the output data is full, + // so the TASK_INPUT_STATUS_BLOCKED is rsp + // todo blocking the output status + if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + + int32_t waitDuration = 300; // 300 ms + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", + pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration); + streamRetryDispatchStreamBlock(pTask, waitDuration); + } else { // pipeline send data in output queue + // this message has been sent successfully, let's try next one. + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + + if (pTask->msgInfo.blockingTs != 0) { + int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; + qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el); + pTask->msgInfo.blockingTs = 0; + } + + // now ready for next data output + atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + + // otherwise, continue dispatch the first block to down stream task in pipeline + streamDispatchStreamBlock(pTask); + } + + return 0; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 30d9b650a1022f54ef5820774dd1584c82dce0ca..aec6e4b44685a7dfd8a83a6f43f225e05873cb77 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -254,3 +254,18 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMemoryFree(pTask); } + +int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + return 0; + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + return 1; + } else { + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + return 1; + } else { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + return taosArrayGetSize(vgInfo); + } + } +}