提交 77530e19 编写于 作者: H Haojun Liao

refactor: record the downstream input Queue blocking time.

上级 418849ed
...@@ -289,6 +289,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -289,6 +289,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// the input queue of the (down stream) task that receive the output data is full, // the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp // so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo blocking the output status
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
...@@ -301,6 +302,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -301,6 +302,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
destroyStreamDataBlock(pTask->msgInfo.pData); destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL; pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el);
pTask->msgInfo.blockingTs = 0;
}
// now ready for next data output
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline // otherwise, continue dispatch the first block to down stream task in pipeline
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册