From e6e0ac7603d61a51c780c10729f866ecfe748d18 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 16 May 2023 14:01:38 +0800 Subject: [PATCH] fix:alwalys in exec if consumer Id mismatch & check result for wal seek ver --- source/dnode/vnode/src/tq/tq.c | 11 ++++++++--- source/dnode/vnode/src/tq/tqUtil.c | 4 ---- source/libs/wal/src/walRead.c | 13 ++++++++++--- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2bb6bc8939..74ccd33d4c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -358,16 +358,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey); taosMsleep(5); } - tqSetHandleExec(pHandle); - taosWUnLockLatch(&pTq->lock); // 2. check re-balance status if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + taosWUnLockLatch(&pTq->lock); return -1; } + tqSetHandleExec(pHandle); + taosWUnLockLatch(&pTq->lock); // 3. update the epoch value int32_t savedEpoch = pHandle->epoch; @@ -382,7 +383,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); - return tqExtractDataForMq(pTq, pHandle, &req, pMsg); + int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); + taosWLockLatch(&pTq->lock); + tqSetHandleIdle(pHandle); + taosWUnLockLatch(&pTq->lock); + return code; } int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f424f45a29..dd4a02957d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -185,7 +185,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // lock taosWLockLatch(&pTq->lock); code = tqRegisterPushHandle(pTq, pHandle, pMsg); - tqSetHandleIdle(pHandle); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); return code; @@ -206,9 +205,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); tDeleteSMqDataRsp(&dataRsp); } - taosWLockLatch(&pTq->lock); - tqSetHandleIdle(pHandle); - taosWUnLockLatch(&pTq->lock); return code; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 37d97b35a6..36d526d162 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -247,7 +247,9 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - walReadSeekVerImpl(pRead, fetchVer); + if(walReadSeekVerImpl(pRead, fetchVer) < 0){ + return -1; + } seeked = true; continue; } else { @@ -354,7 +356,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - walReadSeekVerImpl(pRead, ver); + if(walReadSeekVerImpl(pRead, ver) < 0){ + return -1; + } seeked = true; continue; } else { @@ -488,7 +492,10 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - walReadSeekVerImpl(pReader, ver); + if(walReadSeekVerImpl(pReader, ver) < 0){ + taosThreadMutexUnlock(&pReader->mutex); + return -1; + } seeked = true; continue; } else { -- GitLab