diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0eaeafd0b3309c8c6efec157c59110356bc0ef7e..b9c9e4056260298b913e348eccbebe96b9ddd76a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -306,71 +306,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S // return 0; //} -// todo record the idle time for dispatch data -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - if (code != TSDB_CODE_SUCCESS) { - // dispatch message failed: network error, or node not available. - // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set - // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure - // happened too fast. todo handle the shuffle dispatch failure - if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code)); - return code; - } else { - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); - return doDispatchAllBlocks(pTask, pTask->msgInfo.pData); - } - } - - qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); - - // there are other dispatch message not response yet - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); - if (leftRsp > 0) { - return 0; - } - } - - pTask->msgInfo.retryCount = 0; - ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - - qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status); - - // the input queue of the (down stream) task that receive the output data is full, - // so the TASK_INPUT_STATUS_BLOCKED is rsp - // todo blocking the output status - if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - - int32_t waitDuration = 300; // 300 ms - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", - pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration); - streamRetryDispatchStreamBlock(pTask, waitDuration); - } else { // pipeline send data in output queue - // this message has been sent successfully, let's try next one. - destroyStreamDataBlock(pTask->msgInfo.pData); - pTask->msgInfo.pData = NULL; - - if (pTask->msgInfo.blockingTs != 0) { - int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; - qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el); - pTask->msgInfo.blockingTs = 0; - } - - // now ready for next data output - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - - // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); - } - - return 0; -} - int32_t streamProcessRunReq(SStreamTask* pTask) { if (streamTryExec(pTask) < 0) { return -1; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 41ed784d1644b9b171541af7ed86a9bff9805153..bcd45875fb5c0000c76115fca7f8cf49738d0527 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -718,3 +718,69 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { num); return 0; } + +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + // dispatch message failed: network error, or node not available. + // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set + // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure + // happened too fast. + // todo handle the shuffle dispatch failure + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, + pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); + int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (ret != TSDB_CODE_SUCCESS) { + } + + return TSDB_CODE_SUCCESS; + } + + qDebug("s-task:%s recv dispatch rsp, downstream task input status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, + code); + + // there are other dispatch message not response yet + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); + if (leftRsp > 0) { + return 0; + } + } + + pTask->msgInfo.retryCount = 0; + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); + + qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status); + + // the input queue of the (down stream) task that receive the output data is full, + // so the TASK_INPUT_STATUS_BLOCKED is rsp + if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", + pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + } else { // pipeline send data in output queue + // this message has been sent successfully, let's try next one. + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + + if (pTask->msgInfo.blockingTs != 0) { + int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; + qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", + pTask->id.idStr, pRsp->downstreamTaskId, el); + pTask->msgInfo.blockingTs = 0; + + // put data into inputQ of current task is also allowed + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + } + + // now ready for next data output + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); + + // otherwise, continue dispatch the first block to down stream task in pipeline + streamDispatchStreamBlock(pTask); + } + + return 0; +}