diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0a02c39bdf6ee26668aebe08623986b1eefba3b9..691d31e64cd8061f5a4a04507864501078ebc4f5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -289,6 +289,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp + // todo blocking the output status if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time @@ -301,6 +302,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i destroyStreamDataBlock(pTask->msgInfo.pData); pTask->msgInfo.pData = NULL; + if (pTask->msgInfo.blockingTs != 0) { + int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; + qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el); + pTask->msgInfo.blockingTs = 0; + } + + // now ready for next data output atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline