From 777ed1761ffc730211e8ff80ee757b337c8aba0e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Apr 2023 18:24:46 +0800 Subject: [PATCH] refactor: update logs. --- source/dnode/vnode/src/tq/tq.c | 14 +++++++------- source/dnode/vnode/src/tq/tqRestore.c | 4 ++-- source/libs/stream/src/stream.c | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a42a2119d0..fd01e3ffae 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -821,12 +821,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr); + tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr); int64_t st = taosGetTimestampMs(); streamSourceRecoverScanStep1(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { - double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -834,7 +833,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el); + tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); // build msg to launch next step SStreamRecoverStep2Req req; @@ -861,7 +860,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { memcpy(serializedReq, &req, len); // dispatch msg - tqDebug("s-task:%s start recover block stage(step 2)", pTask->id.idStr); + tqDebug("s-task:%s step 1 finished, send msg to start blocking recover stage(step 2)", pTask->id.idStr); SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; @@ -902,6 +901,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } // set status normal + tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr); code = streamSetStatusNormal(pTask); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -909,7 +909,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } double el = (taosGetTimestampMs() - st)/ 1000.0; - tqDebug("s-task:%s step2 recover finished, el:%.2f s", pTask->id.idStr, el); + tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task code = streamDispatchRecoverFinishReq(pTask); @@ -1367,12 +1367,12 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr()); + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } - tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = WAL_READ_TASKS_ID; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index b5fae3184a..c8b598763d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -100,7 +100,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr); + tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -113,7 +113,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } if (tInputQueueIsFull(pTask)) { - tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); + tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index a0bc95ac01..22c8a61a7b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -216,8 +216,8 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, - pReq->upstreamTaskId); + qDebug("s-task:%s receive dispatch msg from taskId:%d (vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + pReq->upstreamNodeId); streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); -- GitLab