From 8641cfe22b6a30569e6f76b0e061d309ad0d7e30 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 9 Apr 2022 10:29:07 +0800 Subject: [PATCH] change test --- source/client/src/tmq.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 6 +++--- tests/test/c/tmqSim.c | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 5893a14bd5..955e25fd71 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1044,7 +1044,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); if (epStatus == 1) { int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); - tscDebug("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt); + tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt); if (epSkipCnt < 5000) return 0; } atomic_store_32(&tmq->epSkipCnt, 0); @@ -1235,7 +1235,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus != TMQ_VG_STATUS__IDLE) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscDebug("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); + tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; /*if (vgSkipCnt < 10000) continue;*/ #if 0 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f168695383..beefbc3d84 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } - vDebug("poll topic %s from consumer %ld (epoch %d) %s", pTopic->topicName, consumerId, pReq->epoch, pTopic->topicName); + vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, pTq->pVnode->vgId); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; @@ -325,8 +325,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ // TODO consumerEpoch = atomic_load_32(&pConsumer->epoch); - if (consumerEpoch > pReq->epoch) { - // TODO: return + if (consumerEpoch > reqEpoch) { + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 7372745cb8..236d1e2eed 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) { int32_t totalRows = 0; int32_t skipLogNum = 0; while (running) { - tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 4000); + tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 8000); if (tmqMsg) { totalMsgs++; -- GitLab