From 4cb1c51d4e43d0964a3a318c584300c08d8138e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Mar 2023 13:27:58 +0800 Subject: [PATCH] fix(tmq): update some logs. --- source/dnode/vnode/src/tq/tq.c | 22 +++++++++++++--------- source/dnode/vnode/src/tq/tqExec.c | 4 ++-- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c5cd17cbf4..034240bf0c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -425,6 +425,7 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqOffset offset = {0}; + int32_t vgId = TD_VID(pTq->pVnode); SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); @@ -436,10 +437,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) { tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, - offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts); + offset.subKey, vgId, offset.val.uid, offset.val.ts); } else if (offset.val.type == TMQ_OFFSET__LOG) { tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, - TD_VID(pTq->pVnode), offset.val.version); + vgId, offset.val.version); if (offset.val.version + 1 == sversion) { offset.val.version += 1; } @@ -542,6 +543,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand uint64_t consumerId = pRequest->consumerId; STqOffsetVal reqOffset = pRequest->reqOffset; STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); + int32_t vgId = TD_VID(pTq->pVnode); + *pBlockReturned = false; // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value. @@ -551,12 +554,15 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand char formatBuf[80]; tFormatOffset(formatBuf, 80, pOffsetVal); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.", - consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf); + consumerId, pHandle->subKey, vgId, formatBuf); return 0; } else { // no poll occurs in this vnode for this topic, let's seek to the right offset value. if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (pRequest->useSnapshot) { + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", + consumerId, pHandle->subKey, vgId); + if (pHandle->fetchMeta) { tqOffsetResetToMeta(pOffsetVal, 0); } else { @@ -577,8 +583,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId, - pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, + pHandle->subKey, vgId, dataRsp.rspOffset.version); int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); tDeleteSMqDataRsp(&dataRsp); @@ -589,16 +595,14 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqInitTaosxRsp(&taosxRsp, pRequest); tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); -// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); tDeleteSTaosxRsp(&taosxRsp); *pBlockReturned = true; return code; } } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { - tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64 - " in vg %d, subkey %s, reset none failed", - pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey); + tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", + pHandle->subKey, consumerId, vgId, pRequest->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; return -1; } diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 73d3ac069f..a62101eb47 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs return -1; } - tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock); + tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock); // current scan should be stopped asap, since the rebalance occurs. if (pDataBlock == NULL) { @@ -120,7 +120,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs return -1; } - if(pRsp->withTbName || pRsp->withSchema){ + if (pRsp->withTbName || pRsp->withSchema) { tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema); return -1; } -- GitLab