提交 e6e0ac76 编写于 作者: wmmhello's avatar wmmhello

fix:alwalys in exec if consumer Id mismatch & check result for wal seek ver

上级 62520af2
...@@ -358,16 +358,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -358,16 +358,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
taosMsleep(5); taosMsleep(5);
} }
tqSetHandleExec(pHandle);
taosWUnLockLatch(&pTq->lock);
// 2. check re-balance status // 2. check re-balance status
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosWUnLockLatch(&pTq->lock);
return -1; return -1;
} }
tqSetHandleExec(pHandle);
taosWUnLockLatch(&pTq->lock);
// 3. update the epoch value // 3. update the epoch value
int32_t savedEpoch = pHandle->epoch; int32_t savedEpoch = pHandle->epoch;
...@@ -382,7 +383,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -382,7 +383,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
return tqExtractDataForMq(pTq, pHandle, &req, pMsg); int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
taosWLockLatch(&pTq->lock);
tqSetHandleIdle(pHandle);
taosWUnLockLatch(&pTq->lock);
return code;
} }
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
......
...@@ -185,7 +185,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -185,7 +185,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// lock // lock
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
code = tqRegisterPushHandle(pTq, pHandle, pMsg); code = tqRegisterPushHandle(pTq, pHandle, pMsg);
tqSetHandleIdle(pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
return code; return code;
...@@ -206,9 +205,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -206,9 +205,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
} }
taosWLockLatch(&pTq->lock);
tqSetHandleIdle(pHandle);
taosWUnLockLatch(&pTq->lock);
return code; return code;
} }
......
...@@ -247,7 +247,9 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -247,7 +247,9 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if (contLen == sizeof(SWalCkHead)) { if (contLen == sizeof(SWalCkHead)) {
break; break;
} else if (contLen == 0 && !seeked) { } else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, fetchVer); if(walReadSeekVerImpl(pRead, fetchVer) < 0){
return -1;
}
seeked = true; seeked = true;
continue; continue;
} else { } else {
...@@ -354,7 +356,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -354,7 +356,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
if (contLen == sizeof(SWalCkHead)) { if (contLen == sizeof(SWalCkHead)) {
break; break;
} else if (contLen == 0 && !seeked) { } else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver); if(walReadSeekVerImpl(pRead, ver) < 0){
return -1;
}
seeked = true; seeked = true;
continue; continue;
} else { } else {
...@@ -488,7 +492,10 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -488,7 +492,10 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
if (contLen == sizeof(SWalCkHead)) { if (contLen == sizeof(SWalCkHead)) {
break; break;
} else if (contLen == 0 && !seeked) { } else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pReader, ver); if(walReadSeekVerImpl(pReader, ver) < 0){
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
seeked = true; seeked = true;
continue; continue;
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册