From 7b735c67d0a84f001c41c8f1145655cd3ddb40b4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 20 May 2023 16:36:25 +0800 Subject: [PATCH] fix:move sleep logic from rpc thread to tmq thread & fix some error --- source/dnode/vnode/src/tq/tq.c | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 91524f7a95..db03e97556 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -367,15 +367,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } - // 3. update the epoch value - if (pHandle->epoch > reqEpoch) { - tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ", - consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->epoch, reqEpoch); - terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; - taosWUnLockLatch(&pTq->lock); - return -1; - } - bool exec = tqIsHandleExec(pHandle); if(!exec) { tqSetHandleExec(pHandle); @@ -390,6 +381,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(10); } + // 3. update the epoch value if (pHandle->epoch < reqEpoch) { tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch); pHandle->epoch = reqEpoch; -- GitLab