提交 0a7e4c75 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/fix/liaohj' into fix/liaohj

...@@ -105,7 +105,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -105,7 +105,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest); tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version); pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
...@@ -161,6 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -161,6 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
} }
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : { end : {
...@@ -235,6 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -235,6 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -246,7 +248,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -246,7 +248,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
// process meta // process meta
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) { if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -275,7 +278,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -275,7 +278,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册