From 77530e19565ef9f2bb2b4bd44c86e4139aa7b121 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Jul 2023 10:58:11 +0800 Subject: [PATCH] refactor: record the downstream input Queue blocking time. --- source/libs/stream/src/stream.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0a02c39bdf..691d31e64c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -289,6 +289,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp + // todo blocking the output status if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time @@ -301,6 +302,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i destroyStreamDataBlock(pTask->msgInfo.pData); pTask->msgInfo.pData = NULL; + if (pTask->msgInfo.blockingTs != 0) { + int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; + qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el); + pTask->msgInfo.blockingTs = 0; + } + + // now ready for next data output atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline -- GitLab