From 1c63408b3e85510e1a372286d7be8c92fc30fb89 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 24 Apr 2023 20:18:20 +0800 Subject: [PATCH] opti:change push mgr to consume msg for subscribe --- include/libs/wal/wal.h | 2 -- source/client/src/clientTmq.c | 7 ++++--- source/libs/wal/src/walRead.c | 6 ++---- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 835f786d97..b51289de5e 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -149,8 +149,6 @@ typedef struct SWalReader { int64_t capacity; // int8_t curInvalid; // int8_t curStopped; - int64_t bodyCnt; - int64_t bodyTotalSize; TdThreadMutex mutex; SWalFilterCond cond; // TODO remove it diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f05a314e44..9e60f8b04d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1702,7 +1702,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1710,7 +1710,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, + tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; #if 0 @@ -1805,12 +1805,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { " total:%" PRId64 " reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosFreeQitem(pollRspWrapper); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; - + pVg->emptyBlockReceiveTs = 0; tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 09b6db6afe..6154e30938 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -262,8 +262,8 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total cnt:%"PRId64 ", total size:%"PRId64, pReader->pWal->cfg.vgId, ver, - pReadHead->bodyLen, pReader->bodyCnt, pReader->bodyTotalSize); + wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver, + pReadHead->bodyLen); if (pReader->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); @@ -300,8 +300,6 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver); pReader->curVersion = ver + 1; - pReader->bodyCnt++; - pReader->bodyTotalSize += pReadHead->bodyLen; return 0; } -- GitLab