提交 78fd7020 编写于 作者: wmmhello's avatar wmmhello

fix:offset error in tmq & add test cases

上级 873262ec
...@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p ...@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0; return 0;
} }
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
if (!pVg->seekUpdated) { if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.beginOffset = *reqOffset; if(hasData) pVg->offsetInfo.beginOffset = *reqOffset;
pVg->offsetInfo.endOffset = *rspOffset; pVg->offsetInfo.endOffset = *rspOffset;
} else { } else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
...@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, pDataRsp->blockNum != 0);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
...@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
...@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, pollRspWrapper->taosxRsp.blockNum != 0);
if (pollRspWrapper->taosxRsp.blockNum == 0) { if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
......
...@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream ...@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever); int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { ...@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
} }
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, req.reqOffset);
dataRsp.blockNum = 0; dataRsp.blockNum = 0;
dataRsp.rspOffset = dataRsp.reqOffset;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId);
...@@ -714,7 +713,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -714,7 +713,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, req.reqOffset);
if (req.useSnapshot == true) { if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
......
...@@ -20,8 +20,9 @@ ...@@ -20,8 +20,9 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId); const SMqMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pReq->reqOffset; pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
...@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { ...@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
return 0; return 0;
} }
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pReq->reqOffset; pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->withTbName = 1; pRsp->withTbName = 1;
pRsp->withSchema = 1; pRsp->withSchema = 1;
...@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { ...@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) { static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId; uint64_t consumerId = pRequest->consumerId;
STqOffsetVal reqOffset = pRequest->reqOffset;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
...@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} else { } else {
// no poll occurs in this vnode for this topic, let's seek to the right offset value. // no poll occurs in this vnode for this topic, let's seek to the right offset value.
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
if (pRequest->useSnapshot) { if (pRequest->useSnapshot) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
consumerId, pHandle->subKey, vgId); consumerId, pHandle->subKey, vgId);
...@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1); tqInitDataRsp(&dataRsp, *pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version); pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
...@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
*pBlockReturned = true; *pBlockReturned = true;
return code; return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed", " in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, vgId, pRequest->subKey); pHandle->subKey, consumerId, vgId, pRequest->subKey);
...@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} }
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ //static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
if(offset->type == TMQ_OFFSET__LOG){ // if(offset->type == TMQ_OFFSET__LOG){
offset->version = ver; // offset->version = ver;
} // }
} //}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) { SRpcMsg* pMsg, STqOffsetVal* pOffset) {
...@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0; terrno = 0;
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest); tqInitDataRsp(&dataRsp, *pOffset);
dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset // dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
...@@ -160,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -160,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
} }
setRequestVersion(&dataRsp.reqOffset, pOffset->version); // setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : { end : {
...@@ -181,8 +182,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -181,8 +182,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SWalCkHead* pCkHead = NULL; SWalCkHead* pCkHead = NULL;
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest); tqInitTaosxRsp(&taosxRsp, *offset);
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset // taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if (offset->type != TMQ_OFFSET__LOG) { if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
...@@ -235,7 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -235,7 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version); // setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -248,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -248,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) { if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version); // setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -278,7 +279,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -278,7 +279,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version); // setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} else { } else {
...@@ -295,15 +296,13 @@ end: ...@@ -295,15 +296,13 @@ end:
} }
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1;
STqOffsetVal offset = {0};
STqOffsetVal reqOffset = pRequest->reqOffset; STqOffsetVal reqOffset = pRequest->reqOffset;
// 1. reset the offset if needed // 1. reset the offset if needed
if (IS_OFFSET_RESET_TYPE(reqOffset.type)) { if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
// handle the reset offset cases, according to the consumer's choice. // handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false; bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) { if (code != 0) {
return code; return code;
} }
...@@ -312,20 +311,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ ...@@ -312,20 +311,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if (blockReturned) { if (blockReturned) {
return 0; return 0;
} }
} else if(reqOffset.type != 0){ // use the consumer specified offset } else if(reqOffset.type == 0){ // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset;
} else {
uError("req offset type is 0"); uError("req offset type is 0");
return TSDB_CODE_TMQ_INVALID_MSG; return TSDB_CODE_TMQ_INVALID_MSG;
} }
// this is a normal subscribe requirement // this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} else { // todo handle the case where re-balance occurs. } else { // todo handle the case where re-balance occurs.
// for taosx // for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} }
} }
......
...@@ -75,7 +75,10 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -75,7 +75,10 @@ int32_t walNextValidMsg(SWalReader *pReader) {
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64, ", applied index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
if (fetchVer > appliedVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
while (fetchVer <= appliedVer) { while (fetchVer <= appliedVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) { if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1; return -1;
...@@ -97,7 +100,6 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -97,7 +100,6 @@ int32_t walNextValidMsg(SWalReader *pReader) {
} }
} }
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -23,16 +23,16 @@ class TDTestCase: ...@@ -23,16 +23,16 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cmdStr1 = '%s/build/bin/taosBenchmark -i 10 -B 1 -t 100000 -n 100000 -y &'%(buildPath) cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
tdLog.info(cmdStr1) tdLog.info(cmdStr1)
os.system(cmdStr1) os.system(cmdStr1)
time.sleep(20) time.sleep(10)
cmdStr2 = '%s/build/bin/tmq_offset_test &'%(buildPath) cmdStr2 = '%s/build/bin/tmq_offset_test &'%(buildPath)
tdLog.info(cmdStr2) tdLog.info(cmdStr2)
os.system(cmdStr2) os.system(cmdStr2)
time.sleep(30) time.sleep(20)
os.system("kill -9 `pgrep taosBenchmark`") os.system("kill -9 `pgrep taosBenchmark`")
result = os.system("kill -9 `pgrep tmq_offset_test`") result = os.system("kill -9 `pgrep tmq_offset_test`")
......
...@@ -190,6 +190,16 @@ void test_offset(TAOS* pConn){ ...@@ -190,6 +190,16 @@ void test_offset(TAOS* pConn){
ASSERT(0); ASSERT(0);
} }
for(int i = 0; i < numOfAssign; i++){
int64_t position = tmq_position(tmq, "tp", pAssign[i].vgId);
if(position == 0) continue;
printf("position = %lld\n", position);
tmq_commit_offset_sync(tmq, "tp", pAssign[i].vgId, position);
int64_t committed = tmq_committed(tmq, "tp", pAssign[i].vgId);
ASSERT(position == committed);
}
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册