From 8183be4afff416a66d9ec07879ac5e620d1e43d9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 20 May 2023 16:13:52 +0800 Subject: [PATCH] fix:move sleep logic from rpc thread to tmq thread & fix some error --- source/client/src/clientTmq.c | 9 ++++++--- source/dnode/vnode/src/tq/tq.c | 22 +++++++++++----------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c08fbd0adf..b57ecfa845 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1275,7 +1275,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - taosMsleep(500); +// taosMsleep(500); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId); @@ -1289,8 +1289,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; taosWriteQitem(tmq->mqueue, pRspWrapper); - } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert - taosMsleep(500); +// } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert +// taosMsleep(5); } else{ tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId, vgId, epoch, tstrerror(code), requestId); @@ -1731,6 +1731,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p // broadcast the poll request to all related vnodes static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { + if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){ + return 0; + } int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ce52277d71..91524f7a95 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -360,22 +360,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 2. check re-balance status if (pHandle->consumerId != consumerId) { - tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); - terrno = TSDB_CODE_TMQ_INVALID_MSG; + terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosWUnLockLatch(&pTq->lock); return -1; } // 3. update the epoch value - int32_t savedEpoch = pHandle->epoch; - if (savedEpoch <= reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, - reqEpoch); - pHandle->epoch = reqEpoch; - }else { - tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ", - consumerId, TD_VID(pTq->pVnode), req.subKey, savedEpoch, reqEpoch); + if (pHandle->epoch > reqEpoch) { + tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ", + consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->epoch, reqEpoch); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosWUnLockLatch(&pTq->lock); return -1; @@ -395,6 +390,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(10); } + if (pHandle->epoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch); + pHandle->epoch = reqEpoch; + } + char buf[80]; tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, @@ -577,7 +577,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); } - atomic_add_fetch_32(&pHandle->epoch, 1); +// atomic_add_fetch_32(&pHandle->epoch, 1); // kill executing task if(tqIsHandleExec(pHandle)) { -- GitLab