提交 0acb00c5 编写于 作者: H Haojun Liao

fix(stream): add log.

上级 a3345477
...@@ -378,7 +378,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -378,7 +378,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
} }
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) { } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); qDebug("s-task:%s checkpoint/trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
} else if (type == STREAM_INPUT__GET_RES) { } else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later. // use the default memory limit, refactor later.
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
......
...@@ -364,7 +364,8 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p ...@@ -364,7 +364,8 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p
msg.pCont = buf; msg.pCont = buf;
msg.msgType = pTask->msgInfo.msgType; msg.msgType = pTask->msgInfo.msgType;
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg, len:%d", pTask->id.idStr, pReq->taskId, vgId,
msg.contLen);
return tmsgSendReq(pEpSet, &msg); return tmsgSendReq(pEpSet, &msg);
FAIL: FAIL:
......
...@@ -421,17 +421,12 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI ...@@ -421,17 +421,12 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue;
}
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
qDebug("s-task:%s sink task handle result block one-by-one", id); qDebug("s-task:%s sink task handle block one-by-one, type:%d", id, qItem->type);
*numOfBlocks = 1; *numOfBlocks = 1;
*pInput = qItem; *pInput = qItem;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -467,8 +462,7 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI ...@@ -467,8 +462,7 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
// previous existed blocks needs to be handle, before handle the checkpoint msg block // previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id, qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, *numOfBlocks);
*numOfBlocks);
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册