From 3b814f24784b237f4f161124fbeb37e9677088a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 1 May 2023 23:21:29 +0800 Subject: [PATCH] fix(stream): do some internal refactor. --- source/dnode/vnode/src/tq/tqRestore.c | 6 +++--- source/libs/stream/src/stream.c | 9 +++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 6c8fce5b14..58cb7b9e63 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -20,13 +20,13 @@ static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. // extract submit block from WAL, and add them into the input queue for the sources tasks. int32_t tqStreamTasksScanWal(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - int64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampMs(); while (1) { int32_t scan = pMeta->walScanCounter; - tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); + tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); // check all restore tasks bool shouldIdle = true; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 046dab380e..9ed297bd6b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -212,9 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, - pReq->upstreamTaskId); + qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + pReq->upstreamNodeId); + // todo add the input queue buffer limitation streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); @@ -222,10 +223,6 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (streamTryExec(pTask) < 0) { return -1; } - - /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatch(pTask);*/ - /*}*/ } else { streamSchedExec(pTask); } -- GitLab