diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 6c8fce5b143bee04e8e4715f8ad8a540d065ce6a..58cb7b9e636c66aaf7faa8a319a2576563399974 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -20,13 +20,13 @@ static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. // extract submit block from WAL, and add them into the input queue for the sources tasks. int32_t tqStreamTasksScanWal(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - int64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampMs(); while (1) { int32_t scan = pMeta->walScanCounter; - tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); + tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); // check all restore tasks bool shouldIdle = true; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 046dab380e311d9ffaec5a840258e81134918c8a..9ed297bd6b2419a7704d21a36d042d65305d2877 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -212,9 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, - pReq->upstreamTaskId); + qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + pReq->upstreamNodeId); + // todo add the input queue buffer limitation streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); @@ -222,10 +223,6 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (streamTryExec(pTask) < 0) { return -1; } - - /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatch(pTask);*/ - /*}*/ } else { streamSchedExec(pTask); }