diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 835f786d973d248dfa7e56ee9dfbabb243f5c0a4..b51289de5e46e05d58d5af8a8b130db292439103 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -149,8 +149,6 @@ typedef struct SWalReader { int64_t capacity; // int8_t curInvalid; // int8_t curStopped; - int64_t bodyCnt; - int64_t bodyTotalSize; TdThreadMutex mutex; SWalFilterCond cond; // TODO remove it diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f05a314e4433cc19988e9dbdaa2c56cc2cf814a3..9e60f8b04db1bdcd286745f1bae74fe679d93e48 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1702,7 +1702,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1710,7 +1710,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, + tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; #if 0 @@ -1805,12 +1805,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { " total:%" PRId64 " reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosFreeQitem(pollRspWrapper); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; - + pVg->emptyBlockReceiveTs = 0; tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 09b6db6afee5bfa9bd2dc31cee810ee88aecf18f..6154e30938a1c5177017bdec00d525efa060be9d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -262,8 +262,8 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total cnt:%"PRId64 ", total size:%"PRId64, pReader->pWal->cfg.vgId, ver, - pReadHead->bodyLen, pReader->bodyCnt, pReader->bodyTotalSize); + wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver, + pReadHead->bodyLen); if (pReader->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); @@ -300,8 +300,6 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver); pReader->curVersion = ver + 1; - pReader->bodyCnt++; - pReader->bodyTotalSize += pReadHead->bodyLen; return 0; }