提交 65b81a80 编写于 作者: H Haojun Liao

refactor: do some internal refactor and add some logs.

上级 fa5c024b
......@@ -237,7 +237,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) {
return 0;
}
......@@ -246,6 +246,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s receive dispatch rsp, output status:%d", pTask->id.idStr, pTask->outputStatus);
// the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms.
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
......
......@@ -510,14 +510,17 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) {
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr);
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
return 0;
}
qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus);
SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
if (pDispatchedBlock == NULL) {
qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr,
pTask->outputStatus);
return 0;
}
......@@ -527,6 +530,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) {
streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus);
}
// this block can be freed only when it has been pushed to down stream.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册