From cb26dd9fa2ee8241fb3dcead768114d4588a8978 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 8 Jun 2023 10:17:21 +0800 Subject: [PATCH] refactor(stream): do some internal refactor. --- include/libs/stream/tstream.h | 12 ++- source/libs/stream/inc/streamInc.h | 2 + source/libs/stream/src/stream.c | 43 +++++++-- source/libs/stream/src/streamDispatch.c | 123 +++++++++++++++--------- source/libs/stream/src/streamExec.c | 20 +--- 5 files changed, 128 insertions(+), 72 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e67423ca6e..1d80601178 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -292,8 +292,16 @@ typedef struct SSTaskBasicInfo { typedef struct SDispatchMsgInfo { void* pData; // current dispatch data int16_t msgType; // dispatch msg type + int32_t retryCount; // retry send data count + int64_t blockingTs; // output blocking timestamp } SDispatchMsgInfo; +typedef struct { + int8_t outputType; + int8_t outputStatus; + SStreamQueue* outputQueue; +} SSTaskOutputInfo; + struct SStreamTask { SStreamId id; SSTaskBasicInfo info; @@ -536,7 +544,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); -void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); +int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, + int64_t dstTaskId); +void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupTrigger(SStreamTask* pTask); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 3ed7bc8f7e..e51a618452 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,6 +33,7 @@ typedef struct { extern SStreamGlobalEnv streamEnv; +void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); @@ -44,6 +45,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); +int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index bdf7358f90..3befd5d55d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -235,9 +235,26 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return 0; } +// todo record the idle time for dispatch data int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + // dispatch message failed: network error, or node not available. + // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set + // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure + // happened too fast. todo handle the shuffle dispatch failure + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, + pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); + int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (ret != TSDB_CODE_SUCCESS) { + + } + + return TSDB_CODE_SUCCESS; + } + qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); + // there are other dispatch message not response yet if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); @@ -246,23 +263,31 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } } + pTask->msgInfo.retryCount = 0; int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); + qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus); + // the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp - // todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms. - if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - // TODO: init recover timer - qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId); + if (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED) { + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + + int32_t waitDuration = 300; // 300 ms + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", + pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration); + streamRetryDispatchStreamBlock(pTask, waitDuration); + } else { // pipeline send data in output queue + // this message has been sent successfully, let's try next one. + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - qError("s-task:%s ignore error, and reset task output status:%d", pTask->id.idStr, pTask->outputStatus); - return 0; + // otherwise, continue dispatch the first block to down stream task in pipeline + streamDispatchStreamBlock(pTask); } - // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); return 0; } @@ -271,7 +296,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { return -1; } - /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ + /*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ /*streamDispatchStreamBlock(pTask);*/ /*}*/ return 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 25ce470b33..3d01d4411d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -13,10 +13,9 @@ * along with this program. If not, see . */ +#include #include "streamInc.h" -static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); - static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -98,6 +97,27 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { return 0; } +int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, + int64_t dstTaskId) { + pReq->streamId = pTask->id.streamId; + pReq->dataSrcVgId = vgId; + pReq->upstreamTaskId = pTask->id.taskId; + pReq->upstreamChildId = pTask->info.selfChildId; + pReq->upstreamNodeId = pTask->info.nodeId; + pReq->blockNum = numOfBlocks; + pReq->taskId = dstTaskId; + + pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES); + pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); + if (pReq->data == NULL || pReq->dataLen == NULL) { + taosArrayDestroyP(pReq->data, taosMemoryFree); + taosArrayDestroy(pReq->dataLen); + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) { taosArrayDestroyP(pReq->data, taosMemoryFree); taosArrayDestroy(pReq->dataLen); @@ -248,7 +268,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR } int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { + SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -377,25 +397,17 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; + int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - SStreamDispatchReq req = { - .streamId = pTask->id.streamId, - .dataSrcVgId = pData->srcVgId, - .upstreamTaskId = pTask->id.taskId, - .upstreamChildId = pTask->info.selfChildId, - .upstreamNodeId = pTask->info.nodeId, - .blockNum = numOfBlocks, - }; - - req.data = taosArrayInit(numOfBlocks, POINTER_BYTES); - req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); - if (req.data == NULL || req.dataLen == NULL) { - taosArrayDestroyP(req.data, taosMemoryFree); - taosArrayDestroy(req.dataLen); - return TSDB_CODE_OUT_OF_MEMORY; + SStreamDispatchReq req = {0}; + + int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; + code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId); + if (code != TSDB_CODE_SUCCESS) { + return code; } for (int32_t i = 0; i < numOfBlocks; i++) { @@ -411,9 +423,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t vgId = pTask->fixedEpDispatcher.nodeId; SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; - int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; - - req.taskId = downstreamTaskId; qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr, pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId); @@ -426,8 +435,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t vgSz = taosArrayGetSize(vgInfo); + SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); if (pReqs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -435,20 +445,11 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } for (int32_t i = 0; i < vgSz; i++) { - pReqs[i].streamId = pTask->id.streamId; - pReqs[i].dataSrcVgId = pData->srcVgId; - pReqs[i].upstreamTaskId = pTask->id.taskId; - pReqs[i].upstreamChildId = pTask->info.selfChildId; - pReqs[i].upstreamNodeId = pTask->info.nodeId; - pReqs[i].blockNum = 0; - pReqs[i].data = taosArrayInit(0, sizeof(void*)); - pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t)); - if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId); + if (code != TSDB_CODE_SUCCESS) { goto FAIL_SHUFFLE_DISPATCH; } - - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - pReqs[i].taskId = pVgInfo->taskId; } for (int32_t i = 0; i < numOfBlocks; i++) { @@ -456,15 +457,18 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat // TODO: do not use broadcast if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + for (int32_t j = 0; j < vgSz; j++) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; } + if (pReqs[j].blockNum == 0) { atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); } pReqs[j].blockNum++; } + continue; } @@ -495,13 +499,31 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroy(pReqs[i].dataLen); } + taosMemoryFree(pReqs); } + return code; } +static void doRetryDispatchData(void* param, void* tmrId) { + SStreamTask* pTask = param; + ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); + + int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (code != TSDB_CODE_SUCCESS) { + streamRetryDispatchStreamBlock(pTask, 300); + } +} + +void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { + qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration); + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->timer); +} + int32_t streamDispatchStreamBlock(SStreamTask* pTask) { - ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); + ASSERT((pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH)); + int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, @@ -516,6 +538,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } + ASSERT(pTask->msgInfo.pData == NULL); qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); @@ -525,16 +548,28 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } + pTask->msgInfo.pData = pBlock; ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - int32_t code = streamDispatchAllBlocks(pTask, pBlock); - if (code != TSDB_CODE_SUCCESS) { - streamQueueProcessFail(pTask->outputQueue); - atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus); + int32_t retryCount = 0; + + while (1) { + int32_t code = streamDispatchAllBlocks(pTask, pBlock); + if (code == TSDB_CODE_SUCCESS) { + break; + } + + qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, + tstrerror(code), pTask->outputStatus, retryCount); + + if (++retryCount > 5) { // add to timer to retry + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, retry in %dms", pTask->id.idStr, + retryCount, tstrerror(code), 300); + streamRetryDispatchStreamBlock(pTask, 300); + break; + } } - // this block can be freed only when it has been pushed to down stream. - destroyStreamDataBlock(pBlock); - return code; + // this block can not be deleted until it has been sent to downstream task successfully. + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 096ec25af3..a1e72015ff 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -237,11 +237,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { taosFreeQitem(qRes); return code; } -// -// if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { -// qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); -// streamDispatchStreamBlock(pTask); -// } if (finished) { break; @@ -334,7 +329,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("s-task:%s start to extract data block from inputQ", id); while (1) { - if (streamTaskShouldPause(&pTask->status)) { + // downstream task's input queue is blocked, stop immediately + if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED)) { if (batchSize > 1) { break; } else { @@ -399,18 +395,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { continue; } - // wait for the task to be ready to go - while (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, - atomic_load_8(&pTask->status.taskStatus)); - taosMsleep(100); - } else { - break; - } - } - int64_t st = taosGetTimestampMs(); qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); -- GitLab