diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index e9b38dfff225c83d84a8f87399b0f3456e0be15a..af67490888d21f2aec15cced841e9e452a981332 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -378,7 +378,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); + qDebug("s-task:%s checkpoint/trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 12ff0649638090fa843817e5c52d6c6b728f1aa1..bb321734045e8cbc44906a7ecc57175047542b75 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -364,7 +364,8 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->msgInfo.msgType; - qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); + qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg, len:%d", pTask->id.idStr, pReq->taskId, vgId, + msg.contLen); return tmsgSendReq(pEpSet, &msg); FAIL: diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b513d7c13e047ddf77cba35fc85252b367f68a31..6e6f23be01a2adad3a8f752a3a4607ccdd6620e6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -421,17 +421,12 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { - taosMsleep(10); - qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id); - continue; - } - qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); return TSDB_CODE_SUCCESS; } - qDebug("s-task:%s sink task handle result block one-by-one", id); + qDebug("s-task:%s sink task handle block one-by-one, type:%d", id, qItem->type); + *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS; @@ -467,8 +462,7 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI return TSDB_CODE_SUCCESS; } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block - qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id, - *numOfBlocks); + qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, *numOfBlocks); streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; }